Streaming and Trigger Support With GQLAlchemy
Working with a new technology stack is never an easy task, especially when you are part of the Python ecosystem. You get accustomed to many libraries that make your life easier on a daily basis. That's why we decided to add some missing functionalities to our Object Graph Mapper (OGM) GQLAlchemy.
From now on, Python developers won’t have to create and manage data streams and database triggers from Memgraph directly with the Cypher query language but can instead use the GQLAlchemy library to accomplish these tasks programmatically.
Let's see the new functionalities in action!
Prerequisites
1. If you want to start a dummy Kafka stream and actually connect to it with Memgraph, then clone the project data-streams and run the following command:
python start.py --platform kafka --dataset movielens
2. From the same directory data-stream
run the following command to start Memgraph:
docker-compose up memgraph-mage
3. Now, we can connect to Memgraph using GQLAlchemy in a Python script:
from gqlalchemy import Memgraph, Node, Field
memgraph = Memgraph()
class User(Node):
name: str = Field(index=True, exists=True, unique=True, db=memgraph)
user = User(name='Ron Swanson').save(memgraph)
print(user)
What happened? Well, we just created a node with the label User
in the database and fetched it to our program. Now it's time to dive into streams and triggers!
Connecting to a data stream from GQLAlchemy
The stream functionality enables Memgraph to connect to a Kafka, Pulsar or Redpanda cluster and run graph analytics on the data stream.
1. Create a stream in Memgraph
This step is pretty easy. You just have to call the create_stream()
method with the parameters for the specific stream:
from gqlalchemy import Memgraph, MemgraphKafkaStream, match
memgraph = Memgraph()
stream = MemgraphKafkaStream(
name="ratings_stream",
topics=["ratings"],
transform="movielens.rating",
bootstrap_servers="'kafka:9092'",
)
memgraph.create_stream(stream)
2. Start the stream
To start the stream, just call the start_stream()
method:
memgraph.start_stream(stream)
Now, let's check if data is being ingested by Memgraph:
movies = match().node(variable="m", labels="Movie").return_().limit(5).execute()
print(list(movies))
Hopefully, you just printed out a bunch of movie titles. If not, it means that there is an error somewhere. You can always ask for help on our Discord Server.
3. Check the status of the stream
To check the status of the stream in Memgraph, just run the following command:
streams = memgraph.get_streams()
print(streams)
4. Delete the stream
You can use the drop_stream()
method to delete a stream:
memgraph.drop_stream(stream)
Creating database triggers in GQLAlchemy
Because Memgraph supports database triggers on CREATE
, UPDATE
and DELETE
operations, GQLAlchemy also implements a simple interface for maintaining these triggers.
Why do you need database triggers for graph analytics? Imagine a graph that is being updated continuously with new data. Maybe you need to inform another service of each change or run a graph algorithm after particular changes take effect. Triggers make it possible to create custom notifications, and if you write your own query module, you can execute whatever code you want once the trigger fires. You could be sending data to a Kafka cluster, calling a remote API, saving the information to another system, etc.
1. Create the trigger
To set up the trigger, first, create a MemgraphTrigger
object with all the required arguments:
name: str
➡ The name of the trigger.event_type: TriggerEventType
➡ The type of event that will trigger the execution. The options are:TriggerEventType.CREATE
,TriggerEventType.UPDATE
andTriggerEventType.DELETE
.event_object: TriggerEventObject
➡ The objects that are affected with theevent_type
. The options are: ``TriggerEventObject.ALL,TriggerEventObject.NODE
andTriggerEventObject.RELATIONSHIP
.execution_phase: TriggerExecutionPhase
➡ The phase when the trigger should be executed in regard to the transaction commit. The options are:BEFORE
andAFTER
.statement: str
➡ The Cypher query that should be executed when the trigger fires.
Now, let's create a trigger in GQLAlchemy:
from gqlalchemy import Memgraph, MemgraphTrigger
from gqlalchemy.models import (
TriggerEventType,
TriggerEventObject,
TriggerExecutionPhase,
)
memgraph = Memgraph()
trigger = MemgraphTrigger(
name="ratings_trigger",
event_type=TriggerEventType.CREATE,
event_object=TriggerEventObject.NODE,
execution_phase=TriggerExecutionPhase.AFTER,
statement="UNWIND createdVertices AS node SET node.created_at = LocalDateTime()",
)
memgraph.create_trigger(trigger)
The trigger names ratings_trigger
will be executed every time a node is created in the database. After the transaction that created the node in question finishes, the Cypher query statement
will execute, and in this case, it will set the property created_at
of the newly created node to the current date and time.
2. Check the status of a trigger
You can return all of the triggers from the database with the get_Triggers()
method:
triggers = memgraph.get_triggers()
print(triggers)
3. Delete the trigger
You can use the drop_trigger()
method to delete a trigger:
memgraph.drop_trigger(trigger)
Conclusion
As you can see, it's not particularly hard to work with streams and triggers in Memgraph, and GQLAlchemy only makes the job easier. While some options were covered in this article, there are lots more available. Not only can you connect to Pulsar and Redpanda streams as well, but through the use of query modules, you can write custom procedures in Python to additionally analyze the incoming data or pass on the results. The same goes for triggers where you are not limited to defining a Cypher query that should be executed but rather writing custom procedures that are only limited by your knowledge of Python.
If you found this short tutorial interesting, don't forget to check out the GQLAlchemy project on GitHub and throw us a star.