Skip to main content
Version: 2.3.1

Importing data from Kafka streams

Related - Tutorial

Related - How to

Related - Reference Guide

Memgraph can natively ingest streaming data from upstream sources using Apache Kafka and Confluent Platform. To import data using streams, you must:

  1. Start Memgraph and connect to it
  2. Define a transformation module
  3. Create the stream in Memgraph
  4. Start ingesting data from the stream

Prerequisites

In order to create a Kafka pipeline, you must have:

  • a working Kafka stream
  • access to a running Memgraph instance.

Importing data

When importing data, we should be aware of all the different nodes and relationships our stream contains. The best practice is to have a dedicated topic for each message type in order to parse the data more efficiently. Each topic requires a separate procedure within a single transformation module to handle the conversion. Once we create a stream in Memgraph and start ingesting data, we are all set to analyze it.

1. Start Memgraph

Start Memgraph and establish a connection to the database.

If you are starting Memgraph using a Docker image and would like to access configuration files or logs, be sure to run the image with the following volumes:

  • -v mg_log:/var/log/memgraph
  • -v mg_etc:/etc/memgraph

2. Define the transformation module

A transformation module is a user-defined program that receives data from Kafka and returns processed data in the form of Cypher queries. The most common formats received from Kafka are:

Transformation modules can be written in either Python or C. Take a look at the Python API guide for an example of how to implement transformation modules in Python.

When started, Memgraph will automatically attempt to load the query modules from all *.so and *.py files it finds in the default /usr/lib/memgraph/query_modules directory. You can point to a different directory by changing or extending the --query-modules-directory flag in the main configuration file (/etc/memgraph/memgraph.conf) or define it within a command-line parameter when using Docker.

caution

Please remember that if you are using Memgraph Platform image, you should pass configuration flags within MEMGRAPH environmental variable (e.g. docker run -p 7687:7687 -p 3000:3000 -p 4777:4777 -e MEMGRAPH="--log-level=TRACE" memgraph/memgraph-platform) and if you are using any other image, you should pass them as arguments after the image name (e.g., ... memgraph/memgraph-mage --log-level=TRACE --query-modules-directory=path/path).

Transfer a transformation module into a Docker container
If you are using Docker to run Memgraph, you will need to copy the transformation module file from your local directory into the Docker container where Memgraph can access it.

1. Open a new terminal and find the CONTAINER ID of the Memgraph Docker container:

docker ps

2. Copy a file from your current directory to the container with the command:

docker cp ./trans_module.py <CONTAINER ID>:/usr/lib/memgraph/query_modules/trans_module.py

The file is now inside your Docker container.

If the transformation module has been added to the directory while the Memgraph instance was already running, you need to load it manually by using the following query:

CALL mg.load('<transformation_name>');

or

CALL mg.load_all;

If you want to check if your module has properly loaded in Memgraph run:

CALL mg.transformations() YIELD *;

You should see an output similar to the following:

+-------------------------------------------------------+
| name |
+-------------------------------------------------------+
| "transformation_name.my_transformation_module" |
+-------------------------------------------------------+

3. Create a stream in Memgraph

First, make sure Kafka and Memgraph are running, and there is a topic available. Then, make sure the transformation module is loaded

Create the stream in Memgraph with the following query:

CREATE KAFKA STREAM <stream_name>
TOPICS <topic1>[, <topic2>, ...]
TRANSFORM <transformation_module.transformation_procedure>
[BOOTSTRAP_SERVERS <bootstrap servers>];

You need to create one stream for each topic and procedure you have.

For more options and information about the CREATE .. STREAM query check out the reference guide.

4. Start ingesting data from the stream

The previous query only created the streams. To start streaming data, execute the following query:

START STREAM <stream_name>;

or

START ALL STREAMS;

Your data should be slowly arriving in your Memgraph instance. To check if everything is working, run the following query:

CHECK STREAM <stream_name>

or

SHOW STREAMS;

You can also check the node counter in Memgraph Lab (Overview tab) to see if new nodes and relationships are arriving.

For all the other stream commands, check out the reference guide.

Logs

Errors and notifications regarding streams are contained in Memgraph's log files which can be found at /var/log/memgraph/memgraph_<date>.log Look for the name of your stream in the log file to find the error. You can use the grep command to search for the stream in the log file:

grep '<stream_name>' /var/log/memgraph/memgraph_<date>.log

What next?

Take a look at the tutorial we made to help you connect Memgraph and Kafka. Learn more about the query power of Cypher language, or check out MAGE - an open-source repository that contains graph algorithms and modules that can help you tackle the most interesting and challenging graph analytics problems. If you are using Memgraph Lab, a visual user interface for running queries and visualizing graph data, you might be interested in the Graph Style Script language that will help you bedazzle your graphs. Above all, enjoy your graph database!