In the present day, the project previously known as Stratosphere has transformed and is now recognized as Apache Flink, reigning as the most popular stream processing engine in the realm of big data. However, in contrast to the early days of Stratosphere, Flink has grown into a colossal project with considerable intricacy.Nevertheless, as someone who contributed to the initial design and development of Flink's stream processing engine, I still yearn for a user experience that embraces simplicity. My aspiration is for users to swiftly embark on their stream processing endeavors and encounter the streamlined efficiency it offers at remarkable speeds.
To uphold this belief, my team and I have crafted RisingWave, a cloud-based streaming database that furnishes users with a high-performance distributed stream processing with a PostgreSQL-like experience. In this article, I showcase how you can initiate your journey into stream processing using RisingWave with a mere four lines of code.
What is Stream Processing?
Stream processing and batch processing are the two fundamental modes of data processing. In the last two decades, stream processing systems and batch processing systems have experienced quick iterations, evolving from single-machine setups to distributed systems and adapting big data to cloud computing. Substantial architectural enhancements have been implemented in batch and stream processing systems.
The two primary distinctions between stream processing and batch processing are as follows:
- Stream processing systems operate on event-driven computations, whereas batch processing systems rely on user-initiated computations.
- Stream processing systems adopt an incremental computation model, while batch processing systems employ a full-computation model.
Regardless of whether it's stream processing or batch processing, both approaches are progressively shifting toward real-time capabilities. Batch systems are widely used in interactive analysis scenarios, while stream processing systems are extensively applied in monitoring, alerting, automation, and various other scenarios.
RisingWave: Stream Processing with PostgreSQL Experience
RisingWave is an open-source distributed SQL streaming database licensed under the Apache 2.0 license. It utilizes a PostgreSQL-compatible interface, allowing users to perform distributed stream processing like operating a PostgreSQL database.
RisingWave is primarily designed for two typical use cases: streaming ETL and streaming analytics.
Streaming ETL refers to the real-time ingestion of various data sources (such as OLTP databases, message queues, and file systems) into destination systems (such as OLAP databases, data warehouses, data lakes, or simply back to OLTP databases, message queues, file systems) after undergoing processing operations like joins, aggregations, groupings, windowing, and more. In this scenario, RisingWave can fully replace Apache Flink.
Streaming analytics, on the other hand, refers to the capability of ingesting data from multiple data sources (such as OLTP databases, message queues, and file systems) and performing complex analytics (with operations like joins, aggregations, groupings, windowing, and more) before displaying results in the BI dashboards. Users may also directly access data inside RisingWave using client libraries in different programming languages. In this scenario, RisingWave can replace a combination of Apache Flink and SQL/NoSQL databases (such as MySQL, PostgreSQL, and Cassandra, Redis).Streaming analytical use cases.
Deploying RisingWave with 4 Lines of Code
To install and run RisingWave on a Mac, follow these three commands in the command line window (refer to our documentation if you are a Linux user.)
$ brew tap risingwavelabs/risingwave
$ brew install risingwave
$ risingwave playground
Next, open a new command line window and execute the following command to establish a connection with RisingWave:
$ psql -h localhost -p 4566 -d dev -U root
For ease of understanding, let's first try to create a table and use INSERT to add some test data. In real-world scenarios, we typically need to fetch data from the message queues or OLTP databases, which will be introduced later.Let's create a table for web browsing records:
CREATE TABLE website_visits (
timestamp TIMESTAMP,
user_id VARCHAR,
page_id VARCHAR,
action VARCHAR
);
Next, we create a materialized view to count the number of visits, visitors, and last access time for each page. It is worth mentioning that materialized views based on streaming data are a core feature of RisingWave.
CREATE MATERIALIZED VIEW page_visits_mv AS
SELECT page_id,
COUNT(*) AS total_visits,
COUNT(DISTINCT user_id) AS unique_visitors,
MAX(timestamp) AS last_visit_time
FROM website_visits
GROUP BY page_id;
We use INSERT to add some data:
INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
('2023-06-13T10:00:00Z', 'user1', 'page1', 'view'),
('2023-06-13T10:01:00Z', 'user2', 'page2', 'view'),
('2023-06-13T10:02:00Z', 'user3', 'page3', 'view'),
('2023-06-13T10:03:00Z', 'user4', 'page1', 'view'),
('2023-06-13T10:04:00Z', 'user5', 'page2', 'view');
Take a look at the current results:
SELECT * from page_visits_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page2 | 2 | 2 | 2023-06-13 10:04:00
page3 | 1 | 1 | 2023-06-13 10:02:00
page1 | 2 | 2 | 2023-06-13 10:03:00
(3 rows)
Let's insert five more rows of data:
INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
('2023-06-13T10:05:00Z', 'user1', 'page1', 'click'),
('2023-06-13T10:06:00Z', 'user2', 'page2', 'scroll'),
('2023-06-13T10:07:00Z', 'user3', 'page1', 'view'),
('2023-06-13T10:08:00Z', 'user4', 'page2', 'view'),
('2023-06-13T10:09:00Z', 'user5', 'page3', 'view');
Inserting data twice is done to simulate the continuous influx of data. Let's take another look at the current results:
SELECT * FROM page_visits_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page1 | 4 | 3 | 2023-06-13 10:07:00
page2 | 4 | 3 | 2023-06-13 10:08:00
page3 | 2 | 2 | 2023-06-13 10:09:00
(3 rows)
We can see that the results have been updated. This result automatically stays up-to-date when we are processing real-time streaming data.
Interaction with Kafka
Given that message queues are commonly used in stream data processing, let's look at how to retrieve and process data from Kafka in real-time.
If you haven't installed Kafka yet, first download the appropriate compressed package from the official website (using 3.4.0 as an example here), and then unzip it:
$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0
Now let's start Kafka.
- Generate a cluster UUID:
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
- Format log directory:
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
- Start Kafka server:
$ bin/kafka-server-start.sh config/kraft/server.properties
After starting the Kafka server, we can create a topic:
$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Once Kafka is successfully launched, we can directly input messages from the command line.First, run the following command to start the producer:
$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
Once the '>' symbol appears, we can enter the message. To facilitate data consumption in RisingWave, we input data in JSON format:
{"timestamp": "2023-06-13T10:05:00Z", "user_id": "user1", "page_id": "page1", "action": "click"}
{"timestamp": "2023-06-13T10:06:00Z", "user_id": "user2", "page_id": "page2", "action": "scroll"}
{"timestamp": "2023-06-13T10:07:00Z", "user_id": "user3", "page_id": "page1", "action": "view"}
{"timestamp": "2023-06-13T10:08:00Z", "user_id": "user4", "page_id": "page2", "action": "view"}
{"timestamp": "2023-06-13T10:09:00Z", "user_id": "user5", "page_id": "page3", "action": "view"}
We can start a consumer to view the messages we have inputted:
$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
Now let's take a look at how RisingWave retrieves data from this message queue. In this scenario, RisingWave plays the role of a message consumer. Let's switch back to the psql window and create a data source to establish a connection with the previously created topic. It's important to note that we are only establishing the connection at this stage and have yet to start consuming data.
When creating a data source, we can directly define a schema to map relevant fields from the JSON data in the streaming data. To avoid conflicts with the tables mentioned earlier, we will name the data source website_visits_stream.
CREATE source IF NOT EXISTS website_visits_stream (
timestamp TIMESTAMP,
user_id VARCHAR,
page_id VARCHAR,
action VARCHAR
)
WITH (
connector='kafka',
topic='test',
properties.bootstrap.server='localhost:9092',
scan.startup.mode='earliest'
)
ROW FORMAT JSON;
We must create a materialized view for RisingWave to start ingesting data and performing computations. We have created a materialized view similar to the above example for easy understanding.
CREATE MATERIALIZED VIEW visits_stream_mv AS
SELECT page_id,
COUNT(*) AS total_visits,
COUNT(DISTINCT user_id) AS unique_visitors,
MAX(timestamp) AS last_visit_time
FROM website_visits_stream
GROUP BY page_id;
Now we can take a look at the results:
SELECT * FROM visits_stream_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page1 | 1 | 1 | 2023-06-13 10:07:00
page2 | 3 | 2 | 2023-06-13 10:08:00
page3 | 1 | 1 | 2023-06-13 10:09:00
(3 rows)
At this point, we have successfully retrieved data from Kafka and performed processing operations on it.
Advanced: Build a Real-time Monitoring System with RisingWave
Real-time monitoring plays a crucial role in streaming applications. By processing data in real-time, you can visualize and monitor the results as they happen. RisingWave can act as a data source, seamlessly connecting to visualization tools like Superset, Grafana, and more, allowing you to display processed metric data in real-time. We encourage you to take on the challenge of building your own stream processing and visualization system. For specific steps, you can refer to our use case document. In this document, we showcase how RisingWave can be used to monitor and process system performance metrics, subsequently presenting them in real time through Grafana. Although our demonstration is relatively simple, we firmly believe that with real data, and in your familiar business scenarios, you can achieve much richer and more impactful display effects.
> RisingWave distinguishes itself with its remarkable simplicity, enabling users to engage in distributed stream processing effortlessly using SQL. In terms of performance, RisingWave outperforms stream processing platforms of the big data era, such as Apache Flink. > > The extent of this performance improvement is quite noteworthy. Brace yourself for the details: stateless computing demonstrates an enhancement of around 10%-30%, while stateful computing showcases an astonishing 10X+ boost! > > Keep an eye out for the upcoming performance report to delve deeper into these findings. An efficient stream processing platform should always prioritize simplicity, and RisingWave delivers precisely that and more.
This article was originally published on Medium.