Memgraph logo
Back to blog
Twitch Streaming Graph Analysis - Part 3

Twitch Streaming Graph Analysis - Part 3

By Katarina Supe
7 min readNovember 3, 2021

Introduction

This blog is divided into three parts, depending on the part of the application we are building:

  • Part 1: data source and backend implementation
  • Part 2: frontend implementation
  • Part 3: streaming data from Kafka cluster

To get started, read Part 1 and Part 2. If you want to skip that and hop right on the streaming part, you can find the backend and frontend implementations from the first two parts in the repository.

Implementation

For streaming, there is a twitch-stream folder within the project root directory. You will be streaming data made of chatters of one of the current streamers in the database - BadBoyHalo. This data is in the chatters.csv file in the twitch-stream folder. You will use the dummy.py script to connect to Memgraph, run Kafka, and create a producer that periodically sends data to Memgraph. Let's explain what to do step by step.

First, you have to parse the arguments which you'll define in the docker-compose.yml file later. That will be the name of the .csv file you're sending to Memgraph and the interval between each message.

def parse_args():
    """
    Parse input command line arguments.
    """
    parser = ArgumentParser(
        description="A Twitch stream machine powered by Memgraph.")
    parser.add_argument("--file", help="File with chatter data.")
    parser.add_argument(
        "--interval",
        type=int,
        help="Interval for sending data in seconds.")
    return parser.parse_args()

Next, you have to connect to Memgraph:

memgraph = setup.connect_to_memgraph(MEMGRAPH_IP, MEMGRAPH_PORT)

Let's check what the connect_to_memgraph method actually does. In setup.py you need to create connect_to_memgraph() method that looks like this:

def connect_to_memgraph(memgraph_ip, memgraph_port):
    memgraph = Memgraph(host=memgraph_ip, port=int(memgraph_port))
    while(True):
        try:
            if (memgraph._get_cached_connection().is_active()):
                return memgraph
        except:
            log.info("Memgraph probably isn't running.")
            sleep(1)

Connection to Memgraph is established using GQLalchemy, a fully open-source Python library that aims to be the go-to Object Graph Mapper (OGM) - a link between Graph Database objects and Python objects.

After connecting with Memgraph, you have to run Kafka:

setup.run(memgraph, KAFKA_IP, KAFKA_PORT)

That means you are connecting to Kafka and creating a new topic:

def get_admin_client(kafka_ip, kafka_port):
    retries = 30
    while True:
        try:
            admin_client = KafkaAdminClient(
                bootstrap_servers=kafka_ip + ':' + kafka_port,
                client_id="twitch-stream")
            return admin_client
        except NoBrokersAvailable:
            retries -= 1
            if not retries:
                raise
            log.info("Failed to connect to Kafka")
            sleep(1)


def run(memgraph, kafka_ip, kafka_port):
    admin_client = get_admin_client(kafka_ip, kafka_port)
    log.info("Connected to Kafka")

    topic_list = [
        NewTopic(name="chatters", num_partitions=1, replication_factor=1),
    ]

    try:
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
    except TopicAlreadyExistsError:
        pass
    log.info("Created topics")

    log.info("Creating stream connections on Memgraph")
    stream = MemgraphKafkaStream(name="chatter_stream", topics=["chatters"], transform="twitch.chatters")
    memgraph.create_stream(stream)
    memgraph.start_stream(stream)

First, you have to create a topic called chatters, and after that, stream connection on Memgraph called chatter_stream with chatters topic and transformation module twitch.chatters. Transformation module can be found in twitch.py:

@mgp.transformation
def chatters(messages: mgp.Messages
             ) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
    result_queries = []

    for i in range(messages.total_messages()):
        message = messages.message_at(i)
        comment_info = json.loads(message.payload().decode('utf8'))
        result_queries.append(
            mgp.Record(
                query=("MERGE (u:User:Stream {id: $user_id}) "
                       "MERGE (c:User {name: $chatter_login}) "
                       "CREATE (c)-[:CHATTER]->(u)"),
                parameters={
                    "user_id": comment_info["user_id"],
                    "chatter_login": comment_info["chatter_login"]}))

    return result_queries

Kafka cluster feeds messages to Memgraph, and the above script transforms those messages. It acts like a translator which translates from Kafka to Memgraph language. Each message that Memgraph receives is being processed here, and appropriate Cypher queries are being made. Since the chatter data is being streamed, here you have to merge chatter node to the correct streamer node and create the connection between them. After that, chatter_stream can be started.

Dockerizing the stream

Stream service is named twitch-stream and it can be found at docker-compose.yml file. For this to work, you have to create a Dockerfile for the twitch-stream service.

FROM python:3.8

# Install CMake for gqlalchemy
RUN apt-get update && /
  apt-get --yes install cmake && /
  rm -rf /var/lib/apt/lists/*

# Install packages
COPY requirements.txt ./
RUN pip3 install -r requirements.txt

COPY dummy.py /app/dummy.py
COPY setup.py /app/setup.py
COPY chatters.csv /app/chatters.csv

WORKDIR /app

The content of the requirements.txt file, that is being copied to the Docker container, is:

gqlalchemy==1.1.2
kafka-python==2.0.2

Starting the whole application

To start the app, run:

docker-compose up core
docker-compose up twitch-app
docker-compose up react-app

By starting the core service, you are running everything necessary for the application, as well as streaming (memgraph-mage, kafka and zookeeper services). After that, by starting the twitch-app service you are running the backend. The react-app service is our frontend service which runs on localhost:3000 - go check it out! To start streaming data, run:

docker-compose up twitch-stream

Notice how the nodes and edges counter is going up. Also, when you refresh PageRank results, you'll see the change since the popularity of BadBoyHalo is going up! In this photo, you can see the old PageRank results:

memgraph-tutorial-twitch-page-rank

After some data has arrived, BadBoyHalo has taken the first place, as you can see in the photo below. memgraph-tutorial-twitch-page-rank-stream

Conclusion

You've seen that it's pretty easy and useful to stream your data. With just a few alterations and lines of code, you can add a whole new dimension to your application. This way, you can stream any kind of data you want, whatever you think would give you cool insights.

Also, you can play with the datasets, make this application your own. If you are keener on backend development, play with various queries. On the other side, if you like working on the frontend, you can make different React components that suit you best. In the end, if you want to give feedback, talk about this app, or Memgraph, make sure to join our Discord Community Server!

Join us on Discord!
Find other developers performing graph analytics in real time with Memgraph.
© 2024 Memgraph Ltd. All rights reserved.