Twitch Streaming Graph Analysis - Part 3
Introduction
This blog is divided into three parts, depending on the part of the application we are building:
- Part 1: data source and backend implementation
- Part 2: frontend implementation
- Part 3: streaming data from Kafka cluster
To get started, read Part 1 and Part 2. If you want to skip that and hop right on the streaming part, you can find the backend and frontend implementations from the first two parts in the repository.
Implementation
For streaming, there is a
twitch-stream
folder within the project root directory. You will be streaming data made of
chatters of one of the current streamers in the database - BadBoyHalo. This
data is in the
chatters.csv
file in the twitch-stream
folder. You will use the
dummy.py
script to connect to Memgraph, run Kafka, and create a producer that
periodically sends data to Memgraph. Let's explain what to do step by step.
First, you have to parse the arguments which you'll define in the
docker-compose.yml
file later. That will be the name of the .csv
file you're
sending to Memgraph and the interval between each message.
def parse_args():
"""
Parse input command line arguments.
"""
parser = ArgumentParser(
description="A Twitch stream machine powered by Memgraph.")
parser.add_argument("--file", help="File with chatter data.")
parser.add_argument(
"--interval",
type=int,
help="Interval for sending data in seconds.")
return parser.parse_args()
Next, you have to connect to Memgraph:
memgraph = setup.connect_to_memgraph(MEMGRAPH_IP, MEMGRAPH_PORT)
Let's check what the connect_to_memgraph
method actually does. In
setup.py
you need to create connect_to_memgraph()
method that looks like this:
def connect_to_memgraph(memgraph_ip, memgraph_port):
memgraph = Memgraph(host=memgraph_ip, port=int(memgraph_port))
while(True):
try:
if (memgraph._get_cached_connection().is_active()):
return memgraph
except:
log.info("Memgraph probably isn't running.")
sleep(1)
Connection to Memgraph is established using GQLalchemy, a fully open-source Python library that aims to be the go-to Object Graph Mapper (OGM) - a link between Graph Database objects and Python objects.
After connecting with Memgraph, you have to run Kafka:
setup.run(memgraph, KAFKA_IP, KAFKA_PORT)
That means you are connecting to Kafka and creating a new topic:
def get_admin_client(kafka_ip, kafka_port):
retries = 30
while True:
try:
admin_client = KafkaAdminClient(
bootstrap_servers=kafka_ip + ':' + kafka_port,
client_id="twitch-stream")
return admin_client
except NoBrokersAvailable:
retries -= 1
if not retries:
raise
log.info("Failed to connect to Kafka")
sleep(1)
def run(memgraph, kafka_ip, kafka_port):
admin_client = get_admin_client(kafka_ip, kafka_port)
log.info("Connected to Kafka")
topic_list = [
NewTopic(name="chatters", num_partitions=1, replication_factor=1),
]
try:
admin_client.create_topics(new_topics=topic_list, validate_only=False)
except TopicAlreadyExistsError:
pass
log.info("Created topics")
log.info("Creating stream connections on Memgraph")
stream = MemgraphKafkaStream(name="chatter_stream", topics=["chatters"], transform="twitch.chatters")
memgraph.create_stream(stream)
memgraph.start_stream(stream)
First, you have to create a topic called chatters
, and after that, stream
connection on Memgraph called chatter_stream
with chatters
topic and
transformation module twitch.chatters
. Transformation module can be found in
twitch.py
:
@mgp.transformation
def chatters(messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
result_queries = []
for i in range(messages.total_messages()):
message = messages.message_at(i)
comment_info = json.loads(message.payload().decode('utf8'))
result_queries.append(
mgp.Record(
query=("MERGE (u:User:Stream {id: $user_id}) "
"MERGE (c:User {name: $chatter_login}) "
"CREATE (c)-[:CHATTER]->(u)"),
parameters={
"user_id": comment_info["user_id"],
"chatter_login": comment_info["chatter_login"]}))
return result_queries
Kafka cluster feeds messages to Memgraph, and the above script
transforms those messages. It acts like a translator which translates from
Kafka to Memgraph language. Each message that Memgraph receives is being
processed here, and appropriate Cypher queries are being made. Since the chatter
data is being streamed, here you have to merge chatter node to the correct
streamer node and create the connection between them. After that,
chatter_stream
can be started.
Dockerizing the stream
Stream service is named twitch-stream
and it can be found at
docker-compose.yml
file. For this to work, you have to create a
Dockerfile
for the twitch-stream
service.
FROM python:3.8
# Install CMake for gqlalchemy
RUN apt-get update && /
apt-get --yes install cmake && /
rm -rf /var/lib/apt/lists/*
# Install packages
COPY requirements.txt ./
RUN pip3 install -r requirements.txt
COPY dummy.py /app/dummy.py
COPY setup.py /app/setup.py
COPY chatters.csv /app/chatters.csv
WORKDIR /app
The content of the
requirements.txt
file, that is being copied to the Docker container, is:
gqlalchemy==1.1.2
kafka-python==2.0.2
Starting the whole application
To start the app, run:
docker-compose up core
docker-compose up twitch-app
docker-compose up react-app
By starting the core
service, you are running everything necessary for the
application, as well as streaming (memgraph-mage
, kafka
and zookeeper
services). After that, by starting the twitch-app
service you are running the
backend. The react-app
service is our frontend service which runs on
localhost:3000
- go check it out! To start streaming data, run:
docker-compose up twitch-stream
Notice how the nodes and edges counter is going up. Also, when you refresh PageRank results, you'll see the change since the popularity of BadBoyHalo is going up! In this photo, you can see the old PageRank results:
After some data has arrived, BadBoyHalo has taken the first place, as you can see in the photo below.
Conclusion
You've seen that it's pretty easy and useful to stream your data. With just a few alterations and lines of code, you can add a whole new dimension to your application. This way, you can stream any kind of data you want, whatever you think would give you cool insights.
Also, you can play with the datasets, make this application your own. If you are keener on backend development, play with various queries. On the other side, if you like working on the frontend, you can make different React components that suit you best. In the end, if you want to give feedback, talk about this app, or Memgraph, make sure to join our Discord Community Server!