Working with a new technology stack is never an easy task, especially when you are part of the Python ecosystem. You get accustomed to many libraries that make your life easier on a daily basis. That’s why we decided to add some missing functionalities to our Object Graph Mapper (OGM) GQLAlchemy.
From now on, Python developers won’t have to create and manage data streams and database triggers from Memgraph directly with the Cypher query language but can instead use the GQLAlchemy library to accomplish these tasks programmatically.
Let’s see the new functionalities in action!
1. If you want to start a dummy Kafka stream and actually connect to it with Memgraph, then clone the project data-streams and run the following command:
python start.py --platform kafka --dataset movielens
2. From the same directory
data-stream run the following command to start Memgraph:
docker-compose up memgraph-mage
3. Now, we can connect to Memgraph using GQLAlchemy in a Python script:
from gqlalchemy import Memgraph, Node, Field memgraph = Memgraph() class User(Node): name: str = Field(index=True, exists=True, unique=True, db=memgraph) user = User(name='Ron Swanson').save(memgraph) print(user)
What happened? Well, we just created a node with the label
User in the database and fetched it to our program. Now it’s time to dive into streams and triggers!
Connecting to a data stream from GQLAlchemy
The stream functionality enables Memgraph to connect to a Kafka, Pulsar or Redpanda cluster and run graph analytics on the data stream.
1. Create a stream in Memgraph
This step is pretty easy. You just have to call the
create_stream() method with the parameters for the specific stream:
from gqlalchemy import Memgraph, MemgraphKafkaStream, match memgraph = Memgraph() stream = MemgraphKafkaStream( name="ratings_stream", topics=["ratings"], transform="movielens.rating", bootstrap_servers="'kafka:9092'", ) memgraph.create_stream(stream)
2. Start the stream
To start the stream, just call the
Now, let’s check if data is being ingested by Memgraph:
movies = match().node(variable="m", labels="Movie").return_().limit(5).execute() print(list(movies))
Hopefully, you just printed out a bunch of movie titles. If not, it means that there is an error somewhere. You can always ask for help on our Discord Server.
3. Check the status of the stream
To check the status of the stream in Memgraph, just run the following command:
streams = memgraph.get_streams() print(streams)
4. Delete the stream
You can use the
drop_stream() method to delete a stream:
Creating database triggers in GQLAlchemy
Because Memgraph supports database triggers on
DELETE operations, GQLAlchemy also implements a simple interface for maintaining these triggers.
Why do you need database triggers for graph analytics? Imagine a graph that is being updated continuously with new data. Maybe you need to inform another service of each change or run a graph algorithm after particular changes take effect. Triggers make it possible to create custom notifications, and if you write your own query module, you can execute whatever code you want once the trigger fires. You could be sending data to a Kafka cluster, calling a remote API, saving the information to another system, etc.
1. Create the trigger
To set up the trigger, first, create a
MemgraphTrigger object with all the required arguments:
name: str➡ The name of the trigger.
event_type: TriggerEventType➡ The type of event that will trigger the execution. The options are:
event_object: TriggerEventObject➡ The objects that are affected with the
event_type. The options are: ``TriggerEventObject.ALL,
execution_phase: TriggerExecutionPhase➡ The phase when the trigger should be executed in regard to the transaction commit. The options are:
statement: str➡ The Cypher query that should be executed when the trigger fires.
Now, let’s create a trigger in GQLAlchemy:
from gqlalchemy import Memgraph, MemgraphTrigger from gqlalchemy.models import ( TriggerEventType, TriggerEventObject, TriggerExecutionPhase, ) memgraph = Memgraph() trigger = MemgraphTrigger( name="ratings_trigger", event_type=TriggerEventType.CREATE, event_object=TriggerEventObject.NODE, execution_phase=TriggerExecutionPhase.AFTER, statement="UNWIND createdVertices AS node SET node.created_at = LocalDateTime()", ) memgraph.create_trigger(trigger)
The trigger names
ratings_trigger will be executed every time a node is created in the database. After the transaction that created the node in question finishes, the Cypher query
statement will execute, and in this case, it will set the property
created_at of the newly created node to the current date and time.
2. Check the status of a trigger
You can return all of the triggers from the database with the
triggers = memgraph.get_triggers() print(triggers)
3. Delete the trigger
You can use the
drop_trigger() method to delete a trigger:
As you can see, it’s not particularly hard to work with streams and triggers in Memgraph, and GQLAlchemy only makes the job easier. While some options were covered in this article, there are lots more available. Not only can you connect to Pulsar and Redpanda streams as well, but through the use of query modules, you can write custom procedures in Python to additionally analyze the incoming data or pass on the results. The same goes for triggers where you are not limited to defining a Cypher query that should be executed but rather writing custom procedures that are only limited by your knowledge of Python.
If you found this short tutorial interesting, don’t forget to check out the GQLAlchemy project on GitHub and throw us a star.