Background


Social media platforms like Twitter process a high volume of messages every second. User behaviors, such as tweets, upvotes, downvotes, and comments, are recorded to serve various data requirements. Companies want to find meaning in data gathered from social media platforms to support their business decisions. To do so, they need to adopt stream processing in their tech stack. However, this can be a struggle as many stream processing frameworks are complicated to design and implement.

RisingWave is a cloud-native streaming database that uses SQL as the interface language. Its design reduces the complexity and cost of building real-time applications. RisingWave consumes streaming data, performs event-driven processing, and updates results dynamically.

Apache Pulsar is an open-source, distributed, cloud-native pub-sub messaging and streaming platform originally developed at Yahoo. It was built from the ground up to support a flexible messaging model, multi-tenancy, geo-replication, and strong durability guarantees.

RisingWave works seamlessly with Pulsar to provide a real-time data streaming and processing solution, making building and maintaining real-time applications much easier.


Overview and data model


This tutorial will use RisingWave to consume Pulsar data streams and perform data analysis. We will use sample Twitter data to query Twitter users and their posted tweets.

Below are the schemas for tweets and Twitter users. In the tweet schema, text contains the content of a tweet, and created_at contains the date and time when a tweet was posted. Hashtags will be extracted from text.

{
    "tweet": {
        "created_at": "2020-02-12T17:09:56.000Z",
        "id": "1227640996038684673",
        "text": "Doctors: Googling stuff online does not make you a doctor\n\nDevelopers: https://t.co/mrju5ypPkb",
        "lang": "English"
    },
    "author": {
        "created_at": "2013-12-14T04:35:55.000Z",
        "id": "2244994945",
        "name": "Singularity Data",
        "username": "singularitty"
    }
}


Prerequisites

  • Ensure Docker Desktop is installed in your environment and it is running (i.e., have the Docker Desktop app open) before launching the demo cluster.
  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment.
    • To install psql on macOS, run this command: brew install postgres
    • To install psql on Ubuntu, run this command: sudo apt-get install postgresql-client


Step 1: Launch the demo cluster


First, clone the risingwave-demo repository to your environment.

git clone https://github.com/singularity-data/risingwave-demo.git

Now, navigate to the twitter directory and start the demo cluster from the docker-compose file.

cd twitter-pulsar
docker-compose up -d

A Pulsar instance and necessary RisingWave components, including frontend node, compute node, metadata node, and MinIO, will be started.

A workload generator is also packaged in the docker-compose file. It will generate random data and feed them into Pulsar.


Step 2: Connect RisingWave to the Pulsar stream


Now, connect to RisingWave so you can manage data streams and perform data analysis.

psql -h localhost -p 4566 -d dev -U root

Connect to the data stream with the following SQL statement.

CREATE SOURCE twitter (
    data STRUCT < created_at TIMESTAMP,
    id VARCHAR,
    text VARCHAR,
    lang VARCHAR >,
    author STRUCT < created_at TIMESTAMP,
    id VARCHAR,
    name VARCHAR,
    username VARCHAR,
    followers INT >
) WITH (
    connector = 'pulsar',
    pulsar.topic = 'twitter',
    pulsar.admin.url = 'http://message_queue:8080',
    pulsar.service.url = 'pulsar://message_queue:6650'
) ROW FORMAT JSON;

Note that the SQL statement uses the struct data type. It is not a standard Postgres data type but an extension we made to Postgres’s SQL dialect to process more complex schemas. The struct data type can create nested tables. Elements in a nested table need to be enclosed with angle brackets (< and >). For details about the struct data type, please see Data types.


Step 3: Define materialized views


Next, create a materialized view that stores the tweets posted by influencers with more than 5000 followers. This materialized view is a basic one. It may facilitate further pipelines that join with it to maintain a more in-depth view of the influencers.

If you get a metadata error when running the SQL statement, we suggest that you wait one minute for the stream metadata to be fully parsed, and try again.

CREATE MATERIALIZED VIEW influencer_tweets AS
SELECT
    (author).id as author_id, (data).text as tweet
FROM
    twitter
WHERE
    (author).followers > 1000
    AND (data).lang = 'English';

In this demo, we rely on the syntax of Postgres’s composite types(author).id indicates the field “id” in column “author” which is a struct type. The parentheses around “author” are required to avoid ambiguity when there is a table with the same name as the column.


Step 4: Query the results


Wait for one or two minutes for the workload generator to generate some mock data, and then you can query some tweets from the materialized view:

SELECT * FROM influencer_tweets LIMIT 5;

The results may look like this:

author_idtweet
2929346146#5th generation #Graphic Interface Now lots bread too part usually last did host you just kuban education hail are recently yet already a.
5369531922Graceful my joyously accordingly its yourself graceful hand onto should heat themselves which child off Bangladeshi why.
4868249670Across tonight ginger horror regularly these range their yourself since early why stand what who cloud yourselves listen.
4179993078Stupidly power her it from never which positively photographer weekly woman did cat first while rich.
7293315540#interactive #User-centric Mine were tomorrow does not day these factory cent monthly peacock first to about wisdom regularly cook above frequently you.

When you finish, run the following command to disconnect RisingWave.

quit

Run the following command to remove the Docker images.

docker-compose down

Summary

This tutorial shows how RisingWave can connect to a Pulsar stream to query sample tweets written by influencers. This use case is intended to be inspirational rather than complete. If you want to share your ideas about what RisingWave can do or are interested in a particular use case scenario, don’t hesitate to let us know in the RisingWave Community workspace on Slack. Please use this invitation link to join the workspace.

Avatar

Tao Wu

Product Manager

Avatar

Heng Ma

Content Lead

Avatar

Emily Le

Developer Advocate

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.