Skip to content

How to manage Pulsar streams

The stream functionality enables Memgraph to connect to a Kafka, Pulsar or Redpanda cluster and run graph analytics on the data stream.

Info

You can also use this feature with Neo4j:

db = Neo4j(host="localhost", port="7687", username="neo4j", password="test")

1. Create a Pulsar stream in Memgraph

To set up the streams, first, create a MemgraphPulsarStream object with all the required arguments:

  • name: str ➡ The name of the stream.
  • topics: List[str] ➡ List of topic names.
  • transform: str ➡ The transformation procedure for mapping incoming messages to Cypher queries.
  • batch_interval: str = None ➡ Maximum wait time in milliseconds for consuming messages before calling the transform procedure.
  • batch_size: str = None ➡ Maximum number of messages to wait for before calling the transform procedure.
  • service_url: str = None ➡ URL to the running Pulsar cluster.

Now you just have to call the create_stream() method with the newly created MemgraphPulsarStream object:

from gqlalchemy import MemgraphPulsarStream

stream = MemgraphPulsarStream(name="ratings_stream", topics=["ratings"], transform="movielens.rating", service_url="localhost:6650")
db.create_stream(stream)

2. Start the stream

To start the stream, just call the start_stream() method:

db.start_stream(stream)

3. Check the status of the stream

To check the status of the stream in Memgraph, just run the following command:

check = db.get_streams()

4. Delete the stream

You can use the drop_stream() method to delete a stream:

check = db.drop_stream(stream)