Background


Social media platforms like Twitter process a high volume of messages every second. User behaviors, such as tweets, upvotes, downvotes, and comments, are recorded to serve various data requirements. Companies want to find meaning in data gathered from social media platforms to support their business decisions. To do so, they need to adopt stream processing in their tech stack. However, this can be a struggle as many stream processing frameworks are complicated to design and implement.

RisingWave is a cloud-native streaming database that uses SQL as the interface language. Its design reduces the complexity and cost of building real-time applications. RisingWave consumes streaming data, performs event-driven processing, and updates results dynamically.

Apache Pulsar is an open-source, distributed, cloud-native pub-sub messaging and streaming platform originally developed at Yahoo. It was built from the ground up to support a flexible messaging model, multi-tenancy, geo-replication, and strong durability guarantees.

RisingWave works seamlessly with Pulsar to provide a real-time data streaming and processing solution, making building and maintaining real-time applications much easier.


Overview and data model


This tutorial will use RisingWave to consume Pulsar data streams and perform data analysis. We will use sample Twitter data to query Twitter users and their posted tweets.

Below are the schemas for tweets and Twitter users. In the tweet schema, text contains the content of a tweet, and created_at contains the date and time when a tweet was posted. Hashtags will be extracted from text.

{
    "tweet": {
        "created_at": "2020-02-12T17:09:56.000Z",
        "id": "1227640996038684673",
        "text": "Doctors: Googling stuff online does not make you a doctor\n\nDevelopers: https://t.co/mrju5ypPkb",
        "lang": "English"
    },
    "author": {
        "created_at": "2013-12-14T04:35:55.000Z",
        "id": "2244994945",
        "name": "Singularity Data",
        "username": "singularitty"
    }
}


Prerequisites

  • Ensure Docker Desktop is installed in your environment and it is running (i.e., have the Docker Desktop app open) before launching the demo cluster.
  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment.
    • To install psql on macOS, run this command: brew install postgres
    • To install psql on Ubuntu, run this command: sudo apt-get install postgresql-client


Step 1: Launch the demo cluster


First, clone the risingwave-demo repository to your environment.

git clone https://github.com/singularity-data/risingwave-demo.git

Now, navigate to the twitter directory and start the demo cluster from the docker-compose file.

cd twitter-pulsar
docker-compose up -d

A Pulsar instance and necessary RisingWave components, including frontend node, compute node, metadata node, and MinIO, will be started.

A workload generator is also packaged in the docker-compose file. It will generate random data and feed them into Pulsar.


Step 2: Connect RisingWave to the Pulsar stream


Now, connect to RisingWave so you can manage data streams and perform data analysis.

psql -h localhost -p 4566 -d dev -U root

Connect to the data stream with the following SQL statement.

CREATE SOURCE twitter (
    data STRUCT < created_at TIMESTAMP,
    id VARCHAR,
    text VARCHAR,
    lang VARCHAR >,
    author STRUCT < created_at TIMESTAMP,
    id VARCHAR,
    name VARCHAR,
    username VARCHAR,
    followers INT >
) WITH (
    connector = 'pulsar',
    pulsar.topic = 'twitter',
    pulsar.admin.url = 'http://message_queue:8080',
    pulsar.service.url = 'pulsar://message_queue:6650'
) ROW FORMAT JSON;

Note that the SQL statement uses the struct data type. It is not a standard Postgres data type but an extension we made to Postgres’s SQL dialect to process more complex schemas. The struct data type can create nested tables. Elements in a nested table need to be enclosed with angle brackets (< and >). For details about the struct data type, please see Data types.


Step 3: Define materialized views


Next, create a materialized view that stores the tweets posted by influencers with more than 5000 followers. This materialized view is a basic one. It may facilitate further pipelines that join with it to maintain a more in-depth view of the influencers.

If you get a metadata error when running the SQL statement, we suggest that you wait one minute for the stream metadata to be fully parsed, and try again.

CREATE MATERIALIZED VIEW influencer_tweets AS
SELECT
    (author).id as author_id, (data).text as tweet
FROM
    twitter
WHERE
    (author).followers > 1000
    AND (data).lang = 'English';

In this demo, we rely on the syntax of Postgres’s composite types(author).id indicates the field “id” in column “author” which is a struct type. The parentheses around “author” are required to avoid ambiguity when there is a table with the same name as the column.


Step 4: Query the results


Wait for one or two minutes for the workload generator to generate some mock data, and then you can query some tweets from the materialized view:

SELECT * FROM influencer_tweets LIMIT 5;

The results may look like this:

author_idtweet
2929346146#5th generation #Graphic Interface Now lots bread too part usually last did host you just kuban education hail are recently yet already a.
5369531922Graceful my joyously accordingly its yourself graceful hand onto should heat themselves which child off Bangladeshi why.
4868249670Across tonight ginger horror regularly these range their yourself since early why stand what who cloud yourselves listen.
4179993078Stupidly power her it from never which positively photographer weekly woman did cat first while rich.
7293315540#interactive #User-centric Mine were tomorrow does not day these factory cent monthly peacock first to about wisdom regularly cook above frequently you.

When you finish, run the following command to disconnect RisingWave.

quit

Run the following command to remove the Docker images.

docker-compose down

Summary

This tutorial shows how RisingWave can connect to a Pulsar stream to query sample tweets written by influencers. This use case is intended to be inspirational rather than complete. If you want to share your ideas about what RisingWave can do or are interested in a particular use case scenario, don’t hesitate to let us know in the RisingWave Community workspace on Slack. Please use this invitation link to join the workspace.

The amount of streaming data has grown explosively over the past few years. A lot of businesses realize that they need to move to stream processing, but they have a hard time figuring out the route to take. Most of the stream processing frameworks out there are too complex to design and implement.

RisingWave is a cloud-native streaming database that uses SQL as the interface. It is designed to reduce the complexity and cost of building real-time applications. RisingWave consumes streaming data, performs continuous queries, and updates results dynamically.

Redpanda is an Apache Kafka®-compatible streaming data platform. It was built from the ground up with performance and simplicity in mind. It requires no Zookeeper®, no JVM, and no code changes.

RisingWave works seamlessly with Redpanda to provide a real-time data streaming and processing solution that makes it so much easier to build and maintain real-time applications.


Overview


In this tutorial, you will learn how to use RisingWave to consume Redpanda data streams and perform data analysis. We will use ad impression and click events as sample data and try to count clicks of an ad within one minute after the ad was shown.

Below is the schema of the ad impression and click events:

{
  "user_id": 2926375,
  "click_timestamp": "2022-05-11 16:04:06.416369",
  "impression_timestamp": "2022-05-11 16:04:06.273401",
  "ad_id": 8184596
}

For users who are not familiar with digital advertising, an impression is counted whenever an ad is displayed within an app or on a website. impression_timestamp is the date and time when the ad was shown to a user. In the schema, impression_timestamp should be smaller (earlier) than click_timestamp to ensure that only clicks subsequent to impressions are counted.

We have set up a demo cluster specifically for the Redpanda and RisingWave stack so that you do not need to install them separately.


Prerequisites

  • Ensure you have Docker and Docker Compose installed in your environment. Note that Docker Compose is included in Docker Desktop for Windows and macOS. If you use Docker Desktop, ensure that it is running before launching the demo cluster.
  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment.
    • To install psql on macOS, run this command: brew install postgres
    • To install psql on Ubuntu, run this command: sudo apt-get install postgresql-client


Step 1: Launch the demo cluster


First, let us clone the risingwave-demo repository to your environment.

git clone https://github.com/singularity-data/risingwave-demo.git

Now let us navigate to the ad-click directory and start the demo cluster from the docker compose file.

cd ad-click
docker-compose up -d

A Redpanda instance and necessary RisingWave components, including frontend node, compute node, metadata node, and MinIO, will be started.

A workload-generator is also packaged in the docker-compose. It will generate some random data and feed them into Redpanda.


Step 2: Connect RisingWave to the Redpanda stream


Now let us connect to RisingWave so that we can manage data streams and perform data analysis.

psql -h localhost -p 4566 -d dev -U root

Note that RisingWave can be connected via psql on port 4566 by default, while Redpanda will listens on port 29092. If you intend to ingest data directly from Redpanda, you should use port 29092 instead.

We set up the connection with a Redpanda topic with this SQL statement:

create source ad_source (
  user_id bigint,
  ad_id bigint,
  click_timestamp timestamp,
  impression_timestamp timestamp
) with (
  connector = 'kafka',
  kafka.topic = 'ad_clicks',
  kafka.brokers = 'message_queue:29092',
  kafka.scan.startup.mode = 'earliest'
) row format json;

Let us dive a little deeper into the parameters in the WITH clause:

  • connector = 'kafka': As Redpanda is Kafka-compatible, it can be connected in the same way as Kafka.
  • kafka.topic = 'user_activities': The Redpanda topic.
  • kafka.brokers = 'redpanda:29092': The addresses of the Redpanda broker.
  • kafka.scan.startup.mode = 'earliest': It means the RisingWave will start to consume data from the earliest entry in the stream. Alternatively, you can set this parameter to ‘latest`, which means RisingWave will start to consume data from the latest entry.


Step 3: Analyze the data


We’ll define a materialized view to count the clicks on each ad within one minute after the ad was shown.

With materialized views, only incremental calculations are performed each time a new event comes in, and the results are persisted right after calculations for a new event are completed.

create materialized view m_click_statistic as
select
  ad_id,
  count(user_id) as clicks_count
from
  ad_source
where
  click_timestamp is not null
  and impression_timestamp < click_timestamp
  and impression_timestamp + interval '1' minute >= click_timestamp
group by
  ad_id;

We want to make sure that only ads that have been clicked are calculated, so we limit the scope by using the click_timestamp is not null condition. Any clicks one minute after the impression are considered as non-relevant and therefore have been excluded. That is why we include the impression_timestamp + interval '1' minute >= click_timestamp condition.


Step 4: Query the results


RisingWave is designed to achieve both second-level freshness and low query-latency via pre-aggregations on streams. Downstream applications can query results at extremely short intervals if needed.

We query the results with the following statement:

select * from m_click_statistic;

The results may look like this:

ad_id | clicks_count
------+--------------
    1 | 356
    2 | 340
    3 | 319
    4 | 356
    5 | 333
    6 | 368
    7 | 355
    8 | 349
    9 | 359
(9 rows)

If you query multiple times, you will be able to see that the results are changing as new events come in. For example, if you run the query again in 10 seconds, you may get the results as follows.

ad_id | clicks_count
------+--------------
    1 | 362
    2 | 345
    3 | 325
    4 | 359
    5 | 335
    6 | 369
    7 | 360
    8 | 353
    9 | 360
(9 rows)

When you finish, run the following command to remove the Docker containers.

docker-compose down

Summary

In this tutorial, we connected RisingWave to a Redpanda stream and performed basic ad performance analysis. The use case is a bit simple and intended to be inspirational rather than complete. If you want to share your ideas about what RisingWave can do or are interested in a particular use scenario, please let us know in the RisingWave Community workspace on Slack. Please use this invitation link to join the workspace.

sign up successfully_

Welcome to RisingWave community

Get ready to embark on an exciting journey of growth and inspiration. Stay tuned for updates, exclusive content, and opportunities to connect with like-minded individuals.

message sent successfully_

Thank you for reaching out to us

We appreciate your interest in RisingWave and will respond to your inquiry as soon as possible. In the meantime, feel free to explore our website for more information about our services and offerings.

subscribe successfully_

Welcome to RisingWave community

Get ready to embark on an exciting journey of growth and inspiration. Stay tuned for updates, exclusive content, and opportunities to connect with like-minded individuals.