Skip to main content
Version: 2.0.1

Import JSON data

JSON (JavaScript Object Notation) is an open standard file format and data interchange format that uses human-readable text to store and transmit data objects consisting of attribute-value pairs and arrays (or other serializable values). It is a common data format with a diverse range of functionality in data interchange, including communication of web applications with servers.

Example#

Let's assume we have the following schemas coming out of their respective topics JsonStreamProfile, JsonStreamCompany, JsonStreamWork :

profile = {        "name": str,        "age" : int        "mail": str,        "address" : str,    }
company = {        "name" : str,        "address" : str,    }
works_at = {            "person"  : str,            "company" : str,        }

We can use the schemas to build the following graph:

Transformation modules#

Before consuming data from the stream, we need to implement a transformation module that will consume JSON messages from Kafka and output Cypher queries. In order to create a transformation module, you need to:

  1. Create a Python module
  2. Save it into the Memgraph's query modules directory (default: /usr/lib/memgraph/query_modules)
  3. Load it into Memgraph either on startup (automatically) or by running the CALL mg.load_all query

Each procedure in the transformation module is responsible for one type of data in the stream. The procedure profile_transformation can be found below:

@mgp.transformationdef profile_transformation(messages: mgp.Messages) -> mgp.Record(query = str, parameters=mgp.Nullable[mgp.Map]):    result_queries = []
    for i in range(messages.total_messages()):        message = messages.message_at(i)        message_json = json.loads(message.payload())        result_queries.append(mgp.Record (                query=f'''CREATE (p: Profile {{ id: {message_json["id"]}, name: "{message_json["name"]}", age: ToInteger({message_json["age"]})                address: "{message_json["address"]}", mail: "{message_json["mail"]}" }})''' ,                parameters=None            ))
    return result_queries

Creating the streams#

To import data into Memgraph, we need to create a stream for each topic and apply our transformation module on incoming data:

CREATE STREAM JsonStreamProfile TOPICS json-stream-profiles  TRANSFORM transformation.profile_transformation;CREATE STREAM JsonStreamCompany TOPICS json-stream-companies  TRANSFORM transformation.company_transformation;CREATE STREAM JsonStreamWork TOPICS json-stream-work TRANSFORM transformation.works_at_transformation;

To start the streams, execute the following query:

START ALL STREAMS;

Run the following query to check if all the streams were started correctly:

SHOW STREAMS;

You can also check the node counter in Memgraph Lab (Overview tab) to see if new nodes and relationships are arriving.

Next steps#

Check out the example-streaming-app on GitHub to see how Memgraph can be connected to a Kafka stream.