If you prefer to use GUI, you can connect to data streams by using a wizard in the Stream section of Memgraph Lab. If you prefere writing commands, you can manage streams with queries.
If you need a Kafka stream to play around with, we've provided some at Awesome Data Stream!
How to add a stream?
To add a stream in Memgraph Lab:
- Switch to Streams and Add New Stream.
- Choose stream type, enter a stream name, server address, and topics you want to subscribe to.
- Go to the Next Step.
- Click on Edit (pencil icon) to modify the Consumer Group, Batch Interval or Batch Size.
If you are trying to connect to MovieLens Kafka data stream from the Awesome Data Stream, the stream configuration should look like this:
Once the basic configuration is finished, you need to define a transformation module and attach it to the stream.
How to add a transformation module?
A transformation module is a set of user-defined transformation procedures written in C or Python that act on data received from a streaming source. Transformation procedures instruct Memgraph on how to transform the incoming messages to consume them correctly.
At the moment, you can only develop Python transformation modules directly from Memgraph Lab.
To add a Python transformation module to a stream:
- Click on Add Transformation Module.
- Click on Choose Transformation Module.
- Select an existing transformation module or + Create new transformation.
- Review an existing module or clear the screen and write a new transformation procedure.
- Save the transformation module.
- Check if the necessary transformation procedure is visible under Detected transformation functions on the right.
- Select a transformation procedure and Attach to Stream.
You can also develop transformation modules in Python beforehand, in the section Query Modules. Click on the New Module, and the Lab will automatically recognize transformation procedures once you define them.
If you developed a procedure in C, you have to load it into Memgraph first, and then you will be able to see it in the Query Modules section and attach it to a stream.
Check the transformation module for MovieLens on Awesome Data Stream.
How to set Kafka configuration parameters?
If necessary, add the Kafka configuration parameters to customize the stream further:
- In the Kafka Configuration Parameters + Add parameter field.
- Insert the parameter name and value.
- To add another parameter, Add parameter filed.
- Save Configuration once you have set all parameters.
To connect to the Awesome Data Stream you need to set the following Kafka configuration parameters:
- sasl.username | public
- sasl.password | public
- security.protocol | SASL_PLAINTEXT
- sasl.mechanism | PLAIN
How to connect Memgraph to the stream and start ingesting the data?
Once the stream is configured, you can Connect to Stream.
Memgraph will do a series of checks, ensuring that defined topics and transformation procedures are correctly configured. If all checks pass successfully, you can Start the stream. Once you start the stream, you will no longer be able to change any of the configuration settings, just the transformation module.
The stream status changes to Running, and data is ingested into Memgraph. You can see the number of nodes and relationships rising as the data keeps coming in. If your nodes and relationships numbers stay at zero, check the transformation module, as there might be a flaw in the logic that needs to be resolved.
Switch to Query Execution and run a query to visualize the data coming in:
RETURN p LIMIT 100;
How to manage a stream?
To manage a stream in Memgraph Lab, go to Streams and click on the stream you want to manage.
How to start, stop or delete a stream?
To start a draft steam, click on Connect to Stream.
To stop or start a stream, click on Stop Stream/Start Stream.
To delete a stream, click on Delete Stream.
How to edit a stream?
You cannot edit a started stream. You can only create a new stream with the changes you want to implement.
You can only change the transformation module and the stream offset..
How to change Kafka stream offset?
Kafka stream offset can be changed using a query only:
CALL mg.kafka_set_stream_offset(streamName, offset)
An offset of
-1 denotes the beginning offset available for the given
An offset of
-2 denotes the end of the stream and only the
next produced message will be consumed.