Twitch Streaming Graph Analysis - Part 3

by
Katarina Supe
Twitch Streaming Graph Analysis - Part 3

Introduction

This blog is divided into three parts, depending on the part of the application we are building:

  • Part 1: data source and backend implementation
  • Part 2: frontend implementation
  • Part 3: streaming data from Kafka cluster

To get started, read Part 1 and Part 2. If you want to skip that and hop right on the streaming part, you can find the backend and frontend implementations from the first two parts in the repository.

Implementation

For streaming, there is a twitch-stream folder within the project root directory. You will be streaming data made of chatters of one of the current streamers in the database - BadBoyHalo. This data is in the chatters.csv file in the twitch-stream folder. You will use the dummy.py script to connect to Memgraph, run Kafka, and create a producer that periodically sends data to Memgraph. Let’s explain what to do step by step.

First, you have to parse the arguments which you’ll define in the docker-compose.yml file later. That will be the name of the .csv file you’re sending to Memgraph and the interval between each message.

def parse_args():
    """
    Parse input command line arguments.
    """
    parser = ArgumentParser(
        description="A Twitch stream machine powered by Memgraph.")
    parser.add_argument("--file", help="File with chatter data.")
    parser.add_argument(
        "--interval",
        type=int,
        help="Interval for sending data in seconds.")
    return parser.parse_args()

Next, you have to connect to Memgraph:

memgraph = setup.connect_to_memgraph(MEMGRAPH_IP, MEMGRAPH_PORT)

Let’s check what the connect_to_memgraph method actually does. In setup.py you need to create connect_to_memgraph() method that looks like this:

def connect_to_memgraph(memgraph_ip, memgraph_port):
    memgraph = Memgraph(host=memgraph_ip, port=int(memgraph_port))
    while(True):
        try:
            if (memgraph._get_cached_connection().is_active()):
                return memgraph
        except:
            log.info("Memgraph probably isn't running.")
            sleep(1)

Connection to Memgraph is established using GQLalchemy, a fully open-source Python library that aims to be the go-to Object Graph Mapper (OGM) - a link between Graph Database objects and Python objects.

After connecting with Memgraph, you have to run Kafka:

setup.run(memgraph, KAFKA_IP, KAFKA_PORT)

That means you are connecting to Kafka and creating a new topic:

def get_admin_client(kafka_ip, kafka_port):
    retries = 30
    while True:
        try:
            admin_client = KafkaAdminClient(
                bootstrap_servers=kafka_ip + ':' + kafka_port,
                client_id="twitch-stream")
            return admin_client
        except NoBrokersAvailable:
            retries -= 1
            if not retries:
                raise
            log.info("Failed to connect to Kafka")
            sleep(1)


def run(memgraph, kafka_ip, kafka_port):
    admin_client = get_admin_client(kafka_ip, kafka_port)
    log.info("Connected to Kafka")

    topic_list = [
        NewTopic(name="chatters", num_partitions=1, replication_factor=1),
    ]

    try:
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
    except TopicAlreadyExistsError:
        pass
    log.info("Created topics")

    log.info("Creating stream connections on Memgraph")
    stream = MemgraphKafkaStream(name="chatter_stream", topics=["chatters"], transform="twitch.chatters")
    memgraph.create_stream(stream)
    memgraph.start_stream(stream)

First, you have to create a topic called chatters, and after that, stream connection on Memgraph called chatter_stream with chatters topic and transformation module twitch.chatters. Transformation module can be found in twitch.py:

@mgp.transformation
def chatters(messages: mgp.Messages
             ) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
    result_queries = []

    for i in range(messages.total_messages()):
        message = messages.message_at(i)
        comment_info = json.loads(message.payload().decode('utf8'))
        result_queries.append(
            mgp.Record(
                query=("MERGE (u:User:Stream {id: $user_id}) "
                       "MERGE (c:User {name: $chatter_login}) "
                       "CREATE (c)-[:CHATTER]->(u)"),
                parameters={
                    "user_id": comment_info["user_id"],
                    "chatter_login": comment_info["chatter_login"]}))

    return result_queries

Kafka cluster feeds messages to Memgraph, and the above script transforms those messages. It acts like a translator which translates from Kafka to Memgraph language. Each message that Memgraph receives is being processed here, and appropriate Cypher queries are being made. Since the chatter data is being streamed, here you have to merge chatter node to the correct streamer node and create the connection between them. After that, chatter_stream can be started.

Dockerizing the stream

Stream service is named twitch-stream and it can be found at docker-compose.yml file. For this to work, you have to create a Dockerfile for the twitch-stream service.

FROM python:3.8

# Install CMake for gqlalchemy
RUN apt-get update && \
  apt-get --yes install cmake && \
  rm -rf /var/lib/apt/lists/*

# Install packages
COPY requirements.txt ./
RUN pip3 install -r requirements.txt

COPY dummy.py /app/dummy.py
COPY setup.py /app/setup.py
COPY chatters.csv /app/chatters.csv

WORKDIR /app

The content of the requirements.txt file, that is being copied to the Docker container, is:

gqlalchemy==1.1.2
kafka-python==2.0.2

Starting the whole application

To start the app, run:

docker-compose up core
docker-compose up twitch-app
docker-compose up react-app

By starting the core service, you are running everything necessary for the application, as well as streaming (memgraph-mage, kafka and zookeeper services). After that, by starting the twitch-app service you are running the backend. The react-app service is our frontend service which runs on localhost:3000 - go check it out! To start streaming data, run:

docker-compose up twitch-stream

Notice how the nodes and edges counter is going up. Also, when you refresh PageRank results, you’ll see the change since the popularity of BadBoyHalo is going up! In this photo, you can see the old PageRank results:

memgraph-tutorial-twitch-page-rank

After some data has arrived, BadBoyHalo has taken the first place, as you can see in the photo below. memgraph-tutorial-twitch-page-rank-stream

Conclusion

You’ve seen that it’s pretty easy and useful to stream your data. With just a few alterations and lines of code, you can add a whole new dimension to your application. This way, you can stream any kind of data you want, whatever you think would give you cool insights.

Also, you can play with the datasets, make this application your own. If you are keener on backend development, play with various queries. On the other side, if you like working on the frontend, you can make different React components that suit you best. In the end, if you want to give feedback, talk about this app, or Memgraph, make sure to join our Discord Community Server!

Table of Contents

Get a personalized Memgraph demo
and get your questions answered.

Continue Reading

solving-most-common-problems-in-energy-management-systems-solved-with-graph-analytics
Use Cases
Energy Management System
Graph Algorithms
Solving Most Common Problems in Energy Management Systems Solved With Graph Analytics

For every problem in the energy management system, there is a graph algorithm that can point you in the right direction! Here is an overview of the most useful graph algorithms for highlighting weak links, high-risk nodes and many more.

by
Josip Mrden
November 25, 2022
five-recommendation-algorithms-no-recommendation-engine-is-whole-without
Use Cases
Recommendation Engine
Graph Algorithms
PageRank
Five Recommendation Algorithms No Recommendation Engine Is Whole Without

How many recommendation algorithms do you need for a successful recommendation engine? Start with these five and we'll recommend you more!

by
Niko Krvavica
November 21, 2022
benefits-graph-databases-bring-to-identity-and-access-management
Use Cases
IAM systems
Graph Algorithms
Benefits Graph Databases Bring to Identity and Access Management

When you notice your traditional IAM system no longer provides adequate analysis and decision making is getting harder as your company grows because you always have to pick up the slack manually, it’s high-time you turn your attention to graphs. They have everything you need - high performance, flexibility and scalability.

by
Antonio Filipovic
November 18, 2022