Real-time Social Media Sentiment Analysis Using RisingWave and StreamNative

Real-time Social Media Sentiment Analysis Using RisingWave and StreamNative

Overview

Social media platforms process countless messages from millions of users every day. It is demanding for companies to keep up with these posts as popular topics are constantly changing. But analyzing these messages is imperative as it allows businesses to make more strategic business decisions by understanding the values of their consumers and competitors. A streaming system would be helpful in this case as it enables companies to update with trending topics constantly.

To keep track of topics, social media platforms, such as Twitter, use hashtags to indicate what their post is about. The number of times a hashtag is used tracks audience engagement. If a hashtag is used often, it suggests that a particular topic is popular. And if we track how often a hashtag is used over time, we can determine if audience engagement is increasing or decreasing.

In this blog post, we are going to retrieve Twitter events related to hash-tags from a data generator, ingest it into StreamNative Serverless topic and read it in RisingWave to perform real-time events processing to extract key insights.

Real-time social media sentiment analysis using RisingWave and StreamNative.

Set up StreamNative

StreamNative Cloud is a resilient, scalable streaming data service based on Apache Pulsar and compatible with Apache Kafka, delivered as a fully managed service. It incorporates all the features of Apache Pulsar and Apache Kafka, plus StreamNative's tooling to remove the complexity of managing both platforms. StreamNative Cloud provides a turnkey solution to help organizations transition to a "streaming first" architecture.

StreamNative offers versatile managed cloud deployment options including StreamNative Hosted, BYOC Cloud, and the new Serverless Cluster. It leverages the Ursa engine, a Kafka-compatible data streaming engine evolved from Apache Pulsar. StreamNative can act as the backbone for your real-time data, core business applications, and microservice messaging platforms.

This guide goes through the steps to create a StreamNative cluster on StreamNative Cloud and connect it to RisingWave for data ingestion. For more information regarding the data ingestion from StreamNative, please refer to StreamNative Documentation.

Sign up for a StreamNative Cloud account

Begin by signing up for a free StreamNative Cloud account with free credits, which provides access to StreamNative services.

StreamNative Cloud sign up.

Create an organization

After signing up and logging into your StreamNative Cloud account, go to the StreamNative Cloud Console. Click on your profile, then select Organizations, and create your organization.

Create an organization.

Create an instance

To create a new instance, navigate to the Dashboard by clicking it on the left navigation pane. On the Instances card, click New, and then click Serverless to initiate the instance creation process.

Create an instance.

On the Instance Configuration page, provide a name for your instance, choose an infrastructure pool and an Availability Zone (AZ), and then click Cluster Location.

Instance configuration.

On the Cluster Location page, enter a name for your cluster, select the cluster location, and proceed by clicking Finish.

Cluster location.

Create a service account

To create a service account, start by clicking Service Accounts on the left navigation pane. Then, click Create Service Account. Enter a name for the service account, and finalize the process by clicking Confirm.

Create a service account.

Grant service account permissions

To configure the service account, start by clicking Tenants/Namespaces in the Admin section of the left navigation pane. Select the Public tenant and then choose the Default namespace under that tenant. Next, click on the POLICY tab. In the Authorization area, click ADD ROLE and select the service account you created earlier.

Grant service account permissions.

Now, you can use you can now use this service account to connect to your cluster using the Kafka CLI tool or Kafka client.

Click on Kafka Clients, use the drop-down menu below the service name you just added to select your client libraries. Then, choose the service account, create an API to use with this client, install the client libraries, and select your target Tenant, Namespace, and Topic. Finally, copy the following code to produce and consume messages from the StreamNative Cluster.

Kafka client setup.

Sending Twitter-like events to StreamNative cluster with Python

# Install the required libraries with the following command:
# pip install kafka-python faker

from kafka import KafkaProducer
import random
import json
from datetime import datetime, timedelta
from faker import Faker

# Define the TweetData and TwitterUser structures
class TweetData:
    def __init__(self, created_at, tweet_id, text, lang):
        self.created_at = created_at
        self.id = tweet_id
        self.text = text
        self.lang = lang

class TwitterUser:
    def __init__(self, created_at, user_id, name, username, followers):
        self.created_at = created_at
        self.id = user_id
        self.name = name
        self.username = username
        self.followers = followers

# Define the TwitterEvent structure
class TwitterEvent:
    def __init__(self, data, author):
        self.data = data
        self.author = author

    def to_postgres_sql(self):
        return (
            f"INSERT INTO tweet (created_at, id, text, lang, author_id) values "
            f"('{self.data.created_at}', '{self.data.id}', '{self.data.text}', "
            f"'{self.data.lang}', '{self.author.id}'); "
            f"INSERT INTO user (created_at, id, name, username, followers) values "
            f"('{self.author.created_at}', '{self.author.id}', '{self.author.name}', "
            f"'{self.author.username}', {self.author.followers});"
        )

    def to_json(self):
        return json.dumps(self, default=lambda o: o.__dict__)

# Generate random Twitter users
def generate_twitter_users(num_users=1000):
    faker = Faker()
    users = []

    for _ in range(num_users):
        user_id = faker.random_number(digits=10, fix_len=True)

        # Generate a valid date range
        created_at = faker.date_time_between(start_date="-10y", end_date="now").strftime('%Y-%m-%d %H:%M:%S')
        name = f"{faker.first_name()} {faker.last_name()}"
        username = faker.user_name()
        followers = faker.random_int(min=1, max=100000)

        user = TwitterUser(created_at, user_id, name, username, followers)
        users.append(user)

    return users

# Generate Twitter events with random delays in the created_at field
def generate_twitter_events(users, num_events=100):
    faker = Faker()
    events = []
    current_time = datetime.now()

    # List of predefined sentences for meaningful text
    predefined_sentences = [
        "Just setting up my Twitter.",
        "Excited to start tweeting!",
        "Hello world! This is my first tweet.",
        "Feeling great today!",
        "Check out my new blog post.",
        "Had a wonderful day at the beach.",
        "Looking forward to the weekend!",
        "Just finished reading an amazing book.",
        "Loving the new updates to my favorite app.",
        "Can't wait for the upcoming event!"
    ]

    for _ in range(num_events):
        tweet_id = faker.random_number(digits=19, fix_len=True)
        author = random.choice(users)

        # Select a predefined sentence and optionally add hashtags
        sentence = random.choice(predefined_sentences)
        hashtags_count = random.randint(0, 2)
        hashtags = ' '.join([f"#{faker.word()}" for _ in range(hashtags_count)])

        # Add random delay to the event's creation time
        delay_seconds = random.randint(0, 300)
        event_time = current_time + timedelta(seconds=delay_seconds)
        current_time = event_time

        tweet_data = TweetData(
            created_at=event_time.strftime('%Y-%m-%d %H:%M:%S'),
            tweet_id=tweet_id,
            text=f"{hashtags} {sentence}",
            lang='English'
        )

        event = TwitterEvent(data=tweet_data, author=author)
        events.append(event)

    return events

# Send generated Twitter events to Kafka
def send_to_kafka(events, server_url, topic_name, namespace, password):
    producer = KafkaProducer(
        bootstrap_servers=server_url,
        security_protocol='SASL_SSL',
        sasl_mechanism='PLAIN',
        sasl_plain_username=namespace,
        sasl_plain_password=password,
        value_serializer=lambda v: json.dumps(v.to_json()).encode('utf-8')
    )

    for event in events:
        producer.send(topic_name, value=event)
        print(f"Message sent to the StreamNative topic: {event.to_json()}")
    producer.flush()

# Configuration and usage
server_url = "" # Your Server URL of the StreamNative Cluster
jwt_token = ""  # Your Generated API Key
topic_name = "public/default/twitter"
namespace = "public/default"
password = "token:" + jwt_token

# Example usage
users = generate_twitter_users(num_users=1000)
events = generate_twitter_events(users, num_events=1000)
send_to_kafka(events, server_url, topic_name, namespace, password)

The below image shows the events as they are sent to StreamNative Cluster.

Sending messages to StreamNative cluster.

Now, this blog will use RisingWave to consume data streams and perform data analysis. Tweets will be used as sample data so we can query the most popular hashtags on a given day to keep track of trending topics. This data is ingested into StreamNative twitter topic as the data generator creates this data.

Below are the schemas for tweets and Twitter users. In the tweet schema, text contains the content of a tweet, and created_at contains the date and time when a tweet was posted. Hashtags will be extracted from text.

{
    "data": {
        "created_at": "2020-02-12T17:09:56.000Z",
        "id": "1227640996038684673",
        "text": "Doctors: Googling stuff online does not make you a doctornnDevelopers: <https://t.co/mrju5ypPkb>",
        "lang": "English"
    },
    "author": {
        "created_at": "2013-12-14T04:35:55.000Z",
        "id": "2244994945",
        "name": "Singularity Data",
        "username": "singularitty"
    }
}

The StreamNative dashboard shows topic partitions, producers, consumers, and other details about throughput and data size.

StreamNative cluster dashboard.

Ingest data from StreamNative into RisingWave

We’ll use RisingWave to ingest and analyze the events. RisingWave is a SQL platform for event-driven workloads.

To use RisingWave, there are two options: the open-source RisingWave and the managed service, RisingWave Cloud. In this blog, we will focus on using RisingWave Cloud, which provides a user-friendly experience and simplifies the operational aspects of managing and utilizing RisingWave for our Twitter hash-tags analysis solution.

Sign up for RisingWave Cloud

To create a RisingWave cluster in RisingWave Cloud and explore the various features it offers, you can sign up for the free plan available. The free plan allows you to test the functionalities of RisingWave without any cost. For detailed instructions on how to create a RisingWave cluster and get started, you can refer to the official RisingWave documentation. It will provide you with step-by-step guidance to set up and explore the features of RisingWave. If you need additional help with setting up this integration, join our active Slack community.

RisingWave Cloud: Account Registration and Sign-In Process.

Create a RisingWave cluster

RisingWave supports various options for creating clusters, select the Developer cluster type that free tier for users.

  • Cluster Name: Give your RisingWave cluster a unique name for identification.
  • Cloud Provider: Select either AWS or Google Cloud for the RisingWave cluster.
  • Region: Choose the region where your RisingWave cluster will be hosted.

Creating a RisingWave Cluster.

Ingest data streams into RisingWave

StreamNative enables users to interact with StreamNative clusters using Kafka clients. We will leverage the RisingWave Kafka source connector to read data from the StreamNative cluster. With the RisingWave cluster set up, navigate to the Workspace and connect to the data streams using the SQL statement below.

RisingWave Console.

CREATE SOURCE twitter (
    data STRUCT < created_at TIMESTAMP WITH TIME ZONE,
    id VARCHAR,
    text VARCHAR,
    lang VARCHAR >,
    author STRUCT < created_at TIMESTAMP WITH TIME ZONE,
    id VARCHAR,
    name VARCHAR,
    username VARCHAR,
    followers INT >
) WITH (
    connector = 'kafka',
    topic = 'twitter',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Note that the SQL statement uses the STRUCT data type. For details about the STRUCT data type, please see Data types.

Set up a materialized view and analyze data

We will create a materialized view that tracks how often each hashtag is used daily.

To do so, start by extracting all the hashtags used within a tweet by using the regexp_matches function. For instance, if given the following tweet:

Struggling with the high cost of scaling? Fret not! #RisingWave cares about performance and cost-efficiency. We use a tiered architecture that fully utilizes the #cloud resources to give the users fine-grained control over cost and performance.

The regexp_matches function will find all the text in the tweet that matches the RegEx pattern #w+. This extracts all the hashtags from the tweet and stores them in an array.

       hashtag        |        created_at
----------------------+--------------------------
[#RisingWave, #cloud] | 2022-05-18 17:00:00+00:00

Then the unnest function will separate each item in the array into separate rows.

   hashtag   |        created_at
----------------------------------------
 #RisingWave | 2022-05-18 17:00:00+00:00
   #cloud    | 2022-05-18 17:00:00+00:00

Finally, we can group by hashtag and window_start to count how many times each hashtag was used daily.

CREATE MATERIALIZED VIEWhot_hashtags AS WITH tags AS (
    SELECT
unnest(regexp_matches((data).text,'#w+','g')) AS hashtag,
        (data).created_at AS created_at
    FROM
twitter
)
SELECT
hashtag,
    COUNT(*) AS hashtag_occurrences,
window_start
FROM
    TUMBLE(tags,created_at, INTERVAL'1 MINUTES')
GROUP BY
hashtag,
window_start;

Query the results

We can query the ten most often used hashtags.

SELECT * FROM hot_hashtags
ORDER BY hashtag_occurrences DESC
LIMIT 10;

The results may look like this:

  hashtag  | hashtag_occurrences |       window_start
------------------------------------------------------------
   #Multi  |         262         | 2022-08-18 00:00:00+00:00
   #zero   |         198         | 2022-08-18 00:00:00+00:00
 knowledge |         150         | 2022-08-18 00:00:00+00:00
   #Open   |         148         | 2022-08-18 00:00:00+00:00
   #User   |         142         | 2022-08-18 00:00:00+00:00
  #Cross   |         141         | 2022-08-18 00:00:00+00:00
  #local   |         139         | 2022-08-18 00:00:00+00:00
  #client  |         138         | 2022-08-18 00:00:00+00:00
  #system  |         135         | 2022-08-18 00:00:00+00:00
    #Re    |         132         | 2022-08-18 00:00:00+00:00

Most used hashtags from different dates will be shown if the workload generator runs for more events.

In this blog, we discussed how to set up a StreamNative Serverless cluster, configure it to ingest hashtag-related data from Twitter into a topic using a data generator, and then read this data in RisingWave using Kafka source connector. StreamNative and RisingWave work seamlessly to provide powerful stream processing solutions and robust stream processing pipelines for businesses across various industries. StreamNative offers a comprehensive streaming platform, while RisingWave delivers advanced stream processing capabilities, enabling organizations to gain valuable insights from their data streams. Together, they help drive innovation and optimize operations across the organization.

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.