The amount of streaming data has grown explosively over the past few years. A lot of businesses realize that they need to move to stream processing, but they have a hard time figuring out the route to take. Most of the stream processing frameworks out there are too complex to design and implement.

RisingWave is a cloud-native streaming database that uses SQL as the interface. It is designed to reduce the complexity and cost of building real-time applications. RisingWave consumes streaming data, performs continuous queries, and updates results dynamically.

Redpanda is an Apache Kafka®-compatible streaming data platform. It was built from the ground up with performance and simplicity in mind. It requires no Zookeeper®, no JVM, and no code changes.

RisingWave works seamlessly with Redpanda to provide a real-time data streaming and processing solution that makes it so much easier to build and maintain real-time applications.


Overview


In this tutorial, you will learn how to use RisingWave to consume Redpanda data streams and perform data analysis. We will use ad impression and click events as sample data and try to count clicks of an ad within one minute after the ad was shown.

Below is the schema of the ad impression and click events:

{
  "user_id": 2926375,
  "click_timestamp": "2022-05-11 16:04:06.416369",
  "impression_timestamp": "2022-05-11 16:04:06.273401",
  "ad_id": 8184596
}

For users who are not familiar with digital advertising, an impression is counted whenever an ad is displayed within an app or on a website. impression_timestamp is the date and time when the ad was shown to a user. In the schema, impression_timestamp should be smaller (earlier) than click_timestamp to ensure that only clicks subsequent to impressions are counted.

We have set up a demo cluster specifically for the Redpanda and RisingWave stack so that you do not need to install them separately.


Prerequisites

  • Ensure you have Docker and Docker Compose installed in your environment. Note that Docker Compose is included in Docker Desktop for Windows and macOS. If you use Docker Desktop, ensure that it is running before launching the demo cluster.
  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment.
    • To install psql on macOS, run this command: brew install postgres
    • To install psql on Ubuntu, run this command: sudo apt-get install postgresql-client


Step 1: Launch the demo cluster


First, let us clone the risingwave-demo repository to your environment.

git clone https://github.com/singularity-data/risingwave-demo.git

Now let us navigate to the ad-click directory and start the demo cluster from the docker compose file.

cd ad-click
docker-compose up -d

A Redpanda instance and necessary RisingWave components, including frontend node, compute node, metadata node, and MinIO, will be started.

A workload-generator is also packaged in the docker-compose. It will generate some random data and feed them into Redpanda.


Step 2: Connect RisingWave to the Redpanda stream


Now let us connect to RisingWave so that we can manage data streams and perform data analysis.

psql -h localhost -p 4566 -d dev -U root

Note that RisingWave can be connected via psql on port 4566 by default, while Redpanda will listens on port 29092. If you intend to ingest data directly from Redpanda, you should use port 29092 instead.

We set up the connection with a Redpanda topic with this SQL statement:

create source ad_source (
  user_id bigint,
  ad_id bigint,
  click_timestamp timestamp,
  impression_timestamp timestamp
) with (
  connector = 'kafka',
  kafka.topic = 'ad_clicks',
  kafka.brokers = 'message_queue:29092',
  kafka.scan.startup.mode = 'earliest'
) row format json;

Let us dive a little deeper into the parameters in the WITH clause:

  • connector = 'kafka': As Redpanda is Kafka-compatible, it can be connected in the same way as Kafka.
  • kafka.topic = 'user_activities': The Redpanda topic.
  • kafka.brokers = 'redpanda:29092': The addresses of the Redpanda broker.
  • kafka.scan.startup.mode = 'earliest': It means the RisingWave will start to consume data from the earliest entry in the stream. Alternatively, you can set this parameter to ‘latest`, which means RisingWave will start to consume data from the latest entry.


Step 3: Analyze the data


We’ll define a materialized view to count the clicks on each ad within one minute after the ad was shown.

With materialized views, only incremental calculations are performed each time a new event comes in, and the results are persisted right after calculations for a new event are completed.

create materialized view m_click_statistic as
select
  ad_id,
  count(user_id) as clicks_count
from
  ad_source
where
  click_timestamp is not null
  and impression_timestamp < click_timestamp
  and impression_timestamp + interval '1' minute >= click_timestamp
group by
  ad_id;

We want to make sure that only ads that have been clicked are calculated, so we limit the scope by using the click_timestamp is not null condition. Any clicks one minute after the impression are considered as non-relevant and therefore have been excluded. That is why we include the impression_timestamp + interval '1' minute >= click_timestamp condition.


Step 4: Query the results


RisingWave is designed to achieve both second-level freshness and low query-latency via pre-aggregations on streams. Downstream applications can query results at extremely short intervals if needed.

We query the results with the following statement:

select * from m_click_statistic;

The results may look like this:

ad_id | clicks_count
------+--------------
    1 | 356
    2 | 340
    3 | 319
    4 | 356
    5 | 333
    6 | 368
    7 | 355
    8 | 349
    9 | 359
(9 rows)

If you query multiple times, you will be able to see that the results are changing as new events come in. For example, if you run the query again in 10 seconds, you may get the results as follows.

ad_id | clicks_count
------+--------------
    1 | 362
    2 | 345
    3 | 325
    4 | 359
    5 | 335
    6 | 369
    7 | 360
    8 | 353
    9 | 360
(9 rows)

When you finish, run the following command to remove the Docker containers.

docker-compose down

Summary

In this tutorial, we connected RisingWave to a Redpanda stream and performed basic ad performance analysis. The use case is a bit simple and intended to be inspirational rather than complete. If you want to share your ideas about what RisingWave can do or are interested in a particular use scenario, please let us know in the RisingWave Community workspace on Slack. Please use this invitation link to join the workspace.

Avatar

Tao Wu

Product Manager

Avatar

Heng Ma

Content Lead

Avatar

Emily Le

Developer Advocate

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.