Trophy iconTrophy iconClose icon
Tutorials

Twitch Streaming Graph Analysis - Part 3

by
Katarina Supe
Blog hero image

Introduction

This blog is divided into three parts, depending on the part of the application we are building:

  • Part 1: data source and backend implementation
  • Part 2: frontend implementation
  • Part 3: streaming data from Kafka cluster

To get started, read Part 1 and Part 2. If you want to skip that and hop right on the streaming part, you can find the backend and frontend implementations from the first two parts here.

Implementation

For streaming, we are going to create a twitch-stream folder within our project root directory. We will be streaming data made of chatters of one of the current streamers in the database - BadBoyHalo. This data is in the chatters.csv file in the twitch-stream folder. We will use the dummy.py script for connecting to Memgraph, running Kafka, and creating a producer that periodically sends data to Memgraph. Let’s explain what we are doing step by step.

First, we are parsing the arguments which will be provided in the docker-compose.yml file later. That will be the name of the .csv file we are sending to Memgraph and the interval between each message.

def parse_args():
    """
    Parse input command line arguments.
    """
    parser = ArgumentParser(
        description="A Twitch stream machine powered by Memgraph.")
    parser.add_argument("--file", help="File with chatter data.")
    parser.add_argument(
        "--interval",
        type=int,
        help="Interval for sending data in seconds.")
    return parser.parse_args()

Next, we are connecting to Memgraph:

memgraph = setup.connect_to_memgraph(MEMGRAPH_IP, MEMGRAPH_PORT)

Let’s check what the connect_to_memgraph method actually does. In setup.py we have:

def connect_to_memgraph(memgraph_ip, memgraph_port):
    memgraph = Memgraph(host=memgraph_ip, port=int(memgraph_port))
    while(True):
        try:
            if (memgraph._get_cached_connection().is_active()):
                return memgraph
        except:
            log.info("Memgraph probably isn't running.")
            sleep(1)

Using gqlalchemy we are trying to connect to Memgraph, just like we have done before in our backend.

After connecting with Memgraph, we are running Kafka:

setup.run(memgraph, KAFKA_IP, KAFKA_PORT)

That means we are connecting to Kafka and creating a new topic:

def get_admin_client(kafka_ip, kafka_port):
    retries = 30
    while True:
        try:
            admin_client = KafkaAdminClient(
                bootstrap_servers=kafka_ip + ':' + kafka_port,
                client_id="twitch-stream")
            return admin_client
        except NoBrokersAvailable:
            retries -= 1
            if not retries:
                raise
            log.info("Failed to connect to Kafka")
            sleep(1)


def run(memgraph, kafka_ip, kafka_port):
    admin_client = get_admin_client(kafka_ip, kafka_port)
    log.info("Connected to Kafka")

    topic_list = [
        NewTopic(
            name="chatters",
            num_partitions=1,
            replication_factor=1), ]

    try:
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
    except TopicAlreadyExistsError:
        pass
    log.info("Created topics")

    log.info("Creating stream connections on Memgraph")
    memgraph.execute(
        "CREATE STREAM chatter_stream TOPICS chatters
        TRANSFORM twitch.chatters")
    memgraph.execute("START STREAM chatter_stream")

Notice that we have created a topic called chatters, and after that, stream connection on Memgraph called chatter_stream. Here we have to create a stream in Memgraph with the topic chatters, which we have to transform in a certain way so that Memgraph can understand what to do with it. That’s why we have created a new query module in memgraph/query_modules folder called twitch.py.

@mgp.transformation
def chatters(messages: mgp.Messages
             ) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
    result_queries = []

    for i in range(messages.total_messages()):
        message = messages.message_at(i)
        comment_info = json.loads(message.payload().decode('utf8'))
        result_queries.append(
            mgp.Record(
                query=("MERGE (u:User:Stream {id: $user_id}) "
                       "MERGE (c:User {name: $chatter_login}) "
                       "CREATE (c)-[:CHATTER]->(u)"),
                parameters={
                    "user_id": comment_info["user_id"],
                    "chatter_login": comment_info["chatter_login"]}))

    return result_queries

Kafka cluster feeds messages to Memgraph, and the above script transforms those messages. It acts like a translator who translates from Kafka to Memgraph language. Each message that Memgraph receives is being processed here, and appropriate Cypher queries are being made. Since we are streaming chatter data, here we are just merging chatter nodes to the correct streamer node and creating the connection between them. After that, chatter_stream can be started.

Dockerizing the stream

All that is left to do is add the missing services in the docker-compose.yml file and update some details. For this to work, we had to add a few more files. First, we need to create a Dockerfile for the twitch-stream service.

FROM python:3.8

# Install CMake for gqlalchemy
RUN apt-get update && \
  apt-get --yes install cmake && \
  rm -rf /var/lib/apt/lists/*

# Install packages
COPY requirements.txt ./
RUN pip3 install -r requirements.txt

COPY dummy.py /app/dummy.py
COPY setup.py /app/setup.py
COPY chatters.csv /app/chatters.csv

WORKDIR /app

The requirements.txt will be:

gqlalchemy==1.0.5
kafka-python==2.0.2

We added a new query module, so we are changing memgraph-mage service a bit. We are building from memgraph folder and copying our query module to Memgraph’s query modules.

FROM memgraph/memgraph-mage

USER root

# Copy the local query modules and data import files
COPY query_modules/twitch.py /usr/lib/memgraph/query_modules/twitch.py

USER memgraph

Starting the whole application

To start the app, run:

docker-compose up core
docker-compose up twitch-app
docker-compose up react-app

By starting the core service, you are running everything necessary for the application, as well as streaming (memgraph-mage, kafka and zookeeper services). After that, by starting the twitch-app service you are running the backend. The react-app service is our frontend service which runs on localhost:3000 - go check it out! To start streaming data, run:

docker-compose up twitch-stream

Notice how the nodes and edges counter is going up. Also, when you refresh PageRank results, you’ll see the change since the popularity of BadBoyHalo is going up! In this photo, you can see the old PageRank results:

memgraph-tutorial-twitch-page-rank

After some data has arrived, BadBoyHalo has taken the first place, as you can see in the photo below. memgraph-tutorial-twitch-page-rank-stream

Conclusion

We’ve seen that it’s pretty easy and useful to stream your data. With just a few alterations and lines of code, we have added a whole new dimension to our application. This way, you can stream any kind of data you want, whatever you think would give you cool insights.

Also, you can play with the datasets, make this application your own. If you are keener on backend development, play with various queries. On the other side, if you like working on the frontend, you can make different React components that suit you best. In the end, if you want to give feedback, talk about this app, or Memgraph, make sure to join our Discord Community Server!

Table of Contents
Sign up for our Newsletter

Continue Reading