Redpanda is an Apache Kafka®-compatible streaming data platform. It was built from the ground up with performance and simplicity in mind. It requires no Zookeeper®, no JVM, and no code changes.
RisingWave works seamlessly with Redpanda to provide a real-time data streaming and processing solution that makes it so much easier to build and maintain real-time applications.
Overview
In this tutorial, you will learn how to use RisingWave to consume Redpanda data streams and perform data analysis. We will use ad impression and click events as sample data and try to count clicks of an ad within one minute after the ad was shown.
Below is the schema of the ad impression and click events:
{
"user_id": 2926375,
"click_timestamp": "2022-05-11 16:04:06.416369",
"impression_timestamp": "2022-05-11 16:04:06.273401",
"ad_id": 8184596
}
For users who are not familiar with digital advertising, an impression is counted whenever an ad is displayed within an app or on a website. impression_timestamp
is the date and time when the ad was shown to a user. In the schema, impression_timestamp
should be smaller (earlier) than click_timestamp
to ensure that only clicks subsequent to impressions are counted.
We have set up a demo cluster specifically for the Redpanda and RisingWave stack so that you do not need to install them separately.
Prerequisites
Ensure you have Docker and Docker Compose installed in your environment. Note that Docker Compose is included in Docker Desktop for Windows and macOS. If you use Docker Desktop, ensure that it is running before launching the demo cluster.
Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment.
- To install
psql
on macOS, run this command:brew install postgres
- To install
psql
on Ubuntu, run this command:sudo apt-get install postgresql-client
- To install
Step 1: Launch the demo cluster
First, let us clone the risingwave-demo repository to your environment.
git clone https://github.com/singularity-data/risingwave-demo.git
Now let us navigate to the ad-click directory and start the demo cluster from the docker compose file.
cd ad-click
docker-compose up -d
A Redpanda instance and necessary RisingWave components, including frontend node, compute node, metadata node, and MinIO, will be started.
A workload-generator is also packaged in the docker-compose. It will generate some random data and feed them into Redpanda.
Step 2: Connect RisingWave to the Redpanda stream
Now let us connect to RisingWave so that we can manage data streams and perform data analysis.
psql -h localhost -p 4566 -d dev -U root
Note that RisingWave can be connected via psql on port 4566 by default, while Redpanda will listens on port 29092. If you intend to ingest data directly from Redpanda, you should use port 29092 instead.
We set up the connection with a Redpanda topic with this SQL statement:
create source ad_source (
user_id bigint,
ad_id bigint,
click_timestamp timestamp,
impression_timestamp timestamp
) with (
connector = 'kafka',
kafka.topic = 'ad_clicks',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) row format json;
Let us dive a little deeper into the parameters in the WITH
clause:
connector = 'kafka'
: As Redpanda is Kafka-compatible, it can be connected in the same way as Kafka.kafka.topic = 'user_activities'
: The Redpanda topic.kafka.brokers = 'redpanda:29092'
: The addresses of the Redpanda broker.kafka.scan.startup.mode = 'earliest'
: It means the RisingWave will start to consume data from the earliest entry in the stream. Alternatively, you can set this parameter to ‘latest`, which means RisingWave will start to consume data from the latest entry.
Step 3: Analyze the data
We’ll define a materialized view to count the clicks on each ad within one minute after the ad was shown.
With materialized views, only incremental calculations are performed each time a new event comes in, and the results are persisted right after calculations for a new event are completed.
create materialized view m_click_statistic as
select
ad_id,
count(user_id) as clicks_count
from
ad_source
where
click_timestamp is not null
and impression_timestamp < click_timestamp
and impression_timestamp + interval '1' minute >= click_timestamp
group by
ad_id;
We want to make sure that only ads that have been clicked are calculated, so we limit the scope by using the click_timestamp is not null
condition. Any clicks one minute after the impression are considered as non-relevant and therefore have been excluded. That is why we include the impression_timestamp + interval '1' minute >= click_timestamp
condition.
Step 4: Query the results
RisingWave is designed to achieve both second-level freshness and low query-latency via pre-aggregations on streams. Downstream applications can query results at extremely short intervals if needed.
We query the results with the following statement:
select * from m_click_statistic;
The results may look like this:
ad_id | clicks_count
------+--------------
1 | 356
2 | 340
3 | 319
4 | 356
5 | 333
6 | 368
7 | 355
8 | 349
9 | 359
(9 rows)
If you query multiple times, you will be able to see that the results are changing as new events come in. For example, if you run the query again in 10 seconds, you may get the results as follows.
ad_id | clicks_count
------+--------------
1 | 362
2 | 345
3 | 325
4 | 359
5 | 335
6 | 369
7 | 360
8 | 353
9 | 360
(9 rows)
When you finish, run the following command to remove the Docker containers.
docker-compose down
In this tutorial, we connected RisingWave to a Redpanda stream and performed basic ad performance analysis. The use case is a bit simple and intended to be inspirational rather than complete. If you want to share your ideas about what RisingWave can do or are interested in a particular use scenario, please let us know in the RisingWave Community workspace on Slack. Please use this invitation link to join the workspace.