Start Your Stream Processing Journey With Just 4 Lines of Code
Take the first step into stream processing with RisingWave! Discover how, with just four lines of code, you can embark on an exciting journey.
Take the first step into stream processing with RisingWave! Discover how, with just four lines of code, you can embark on an exciting journey.
I stumbled upon an intriguing big data project called Stratosphere a decade ago. What immediately captured my interest was a particular section in its introduction: the ability to initiate a cluster on a single machine and execute MapReduce-based WordCount computations with just three lines of code.
During a time dominated by Hadoop, installing and running a WordCount program would typically require several hours or even days of effort. Therefore, encountering a project that achieved the same functionality in merely three lines of code left an indelible impression on me. Motivated by this concise yet powerful approach, I delved extensively into the project and eventually became a contributor.
In the present day, the project previously known as Stratosphere has transformed and is now recognized as Apache Flink, reigning as the most popular stream processing engine in the realm of big data. However, in contrast to the early days of Stratosphere, Flink has grown into a colossal project with considerable intricacy.
Nevertheless, as someone who contributed to the initial design and development of Flink's stream processing engine, I still yearn for a user experience that embraces simplicity. My aspiration is for users to swiftly embark on their stream processing endeavors and encounter the streamlined efficiency it offers at remarkable speeds.
To uphold this belief, my team and I have crafted RisingWave, a cloud-based streaming database that furnishes users with a high-performance distributed stream processing with a PostgreSQL-like experience. In this article, I showcase how you can initiate your journey into stream processing using RisingWave with a mere four lines of code.
Stream processing and batch processing are the two fundamental modes of data processing. In the last two decades, stream processing systems and batch processing systems have experienced quick iterations, evolving from single-machine setups to distributed systems and adapting big data to cloud computing. Substantial architectural enhancements have been implemented in batch and stream processing systems.
The two primary distinctions between stream processing and batch processing are as follows:
Regardless of whether it's stream processing or batch processing, both approaches are progressively shifting toward real-time capabilities. Batch systems are widely used in interactive analysis scenarios, while stream processing systems are extensively applied in monitoring, alerting, automation, and various other scenarios.
RisingWave is an open-source distributed SQL streaming database licensed under the Apache 2.0 license. It utilizes a PostgreSQL-compatible interface, allowing users to perform distributed stream processing like operating a PostgreSQL database.
RisingWave is primarily designed for two typical use cases: streaming ETL and streaming analytics.
Streaming ETL refers to the real-time ingestion of various data sources (such as OLTP databases, message queues, and file systems) into destination systems (such as OLAP databases, data warehouses, data lakes, or simply back to OLTP databases, message queues, file systems) after undergoing processing operations like joins, aggregations, groupings, windowing, and more. In this scenario, RisingWave can fully replace Apache Flink.
Streaming analytics, on the other hand, refers to the capability of ingesting data from multiple data sources (such as OLTP databases, message queues, and file systems) and performing complex analytics (with operations like joins, aggregations, groupings, windowing, and more) before displaying results in the BI dashboards.
Users may also directly access data inside RisingWave using client libraries in different programming languages. In this scenario, RisingWave can replace a combination of Apache Flink and SQL/NoSQL databases (such as MySQL, PostgreSQL, and Cassandra, Redis).
Streaming analytical use cases.
To install and run RisingWave on a Mac, follow these three commands in the command line window (refer to our documentation if you are a Linux user.)
$ brew tap risingwavelabs/risingwave
$ brew install risingwave
$ risingwave playground
Next, open a new command line window and execute the following command to establish a connection with RisingWave:
$ psql -h localhost -p 4566 -d dev -U root
For ease of understanding, let's first try to create a table and use INSERT to add some test data. In real-world scenarios, we typically need to fetch data from the message queues or OLTP databases, which will be introduced later.
Let's create a table for web browsing records:
CREATE TABLE website_visits (
timestamp TIMESTAMP,
user_id VARCHAR,
page_id VARCHAR,
action VARCHAR
);
Next, we create a materialized view to count the number of visits, visitors, and last access time for each page. It is worth mentioning that materialized views based on streaming data are a core feature of RisingWave.
CREATE MATERIALIZED VIEW page_visits_mv AS
SELECT page_id,
COUNT(*) AS total_visits,
COUNT(DISTINCT user_id) AS unique_visitors,
MAX(timestamp) AS last_visit_time
FROM website_visits
GROUP BY page_id;
We use INSERT to add some data:
INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
('2023-06-13T10:00:00Z', 'user1', 'page1', 'view'),
('2023-06-13T10:01:00Z', 'user2', 'page2', 'view'),
('2023-06-13T10:02:00Z', 'user3', 'page3', 'view'),
('2023-06-13T10:03:00Z', 'user4', 'page1', 'view'),
('2023-06-13T10:04:00Z', 'user5', 'page2', 'view');
Take a look at the current results:
SELECT * from page_visits_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page2 | 2 | 2 | 2023-06-13 10:04:00
page3 | 1 | 1 | 2023-06-13 10:02:00
page1 | 2 | 2 | 2023-06-13 10:03:00
(3 rows)
Let's insert five more rows of data:
INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
('2023-06-13T10:05:00Z', 'user1', 'page1', 'click'),
('2023-06-13T10:06:00Z', 'user2', 'page2', 'scroll'),
('2023-06-13T10:07:00Z', 'user3', 'page1', 'view'),
('2023-06-13T10:08:00Z', 'user4', 'page2', 'view'),
('2023-06-13T10:09:00Z', 'user5', 'page3', 'view');
Inserting data twice is done to simulate the continuous influx of data. Let's take another look at the current results:
SELECT * FROM page_visits_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page1 | 4 | 3 | 2023-06-13 10:07:00
page2 | 4 | 3 | 2023-06-13 10:08:00
page3 | 2 | 2 | 2023-06-13 10:09:00
(3 rows)
We can see that the results have been updated. This result automatically stays up-to-date when we are processing real-time streaming data.
Given that message queues are commonly used in stream data processing, let's look at how to retrieve and process data from Kafka in real-time.
If you haven't installed Kafka yet, first download the appropriate compressed package from the official website (using 3.4.0 as an example here), and then unzip it:
$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0
Now let's start Kafka.
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
2. Format log directory:
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
3. Start Kafka server:
$ bin/kafka-server-start.sh config/kraft/server.properties
After starting the Kafka server, we can create a topic:
$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Once Kafka is successfully launched, we can directly input messages from the command line.
First, run the following command to start the producer:
$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
Once the '>' symbol appears, we can enter the message. To facilitate data consumption in RisingWave, we input data in JSON format:
{"timestamp": "2023-06-13T10:05:00Z", "user_id": "user1", "page_id": "page1", "action": "click"}
{"timestamp": "2023-06-13T10:06:00Z", "user_id": "user2", "page_id": "page2", "action": "scroll"}
{"timestamp": "2023-06-13T10:07:00Z", "user_id": "user3", "page_id": "page1", "action": "view"}
{"timestamp": "2023-06-13T10:08:00Z", "user_id": "user4", "page_id": "page2", "action": "view"}
{"timestamp": "2023-06-13T10:09:00Z", "user_id": "user5", "page_id": "page3", "action": "view"}
We can start a consumer to view the messages we have inputted:
$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
Now let's take a look at how RisingWave retrieves data from this message queue. In this scenario, RisingWave plays the role of a message consumer. Let's switch back to the psql window and create a data source to establish a connection with the previously created topic. It's important to note that we are only establishing the connection at this stage and have yet to start consuming data.
When creating a data source, we can directly define a schema to map relevant fields from the JSON data in the streaming data. To avoid conflicts with the tables mentioned earlier, we will name the data source website_visits_stream.
CREATE source IF NOT EXISTS website_visits_stream (
timestamp TIMESTAMP,
user_id VARCHAR,
page_id VARCHAR,
action VARCHAR
)
WITH (
connector='kafka',
topic='test',
properties.bootstrap.server='localhost:9092',
scan.startup.mode='earliest'
)
ROW FORMAT JSON;
We must create a materialized view for RisingWave to start ingesting data and performing computations. We have created a materialized view similar to the above example for easy understanding.
CREATE MATERIALIZED VIEW visits_stream_mv AS
SELECT page_id,
COUNT(*) AS total_visits,
COUNT(DISTINCT user_id) AS unique_visitors,
MAX(timestamp) AS last_visit_time
FROM website_visits_stream
GROUP BY page_id;
Now we can take a look at the results:
SELECT * FROM visits_stream_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page1 | 1 | 1 | 2023-06-13 10:07:00
page2 | 3 | 2 | 2023-06-13 10:08:00
page3 | 1 | 1 | 2023-06-13 10:09:00
(3 rows)
At this point, we have successfully retrieved data from Kafka and performed processing operations on it.
Real-time monitoring plays a crucial role in streaming applications. By processing data in real-time, you can visualize and monitor the results as they happen. RisingWave can act as a data source, seamlessly connecting to visualization tools like Superset, Grafana, and more, allowing you to display processed metric data in real-time.
We encourage you to take on the challenge of building your own stream processing and visualization system. For specific steps, you can refer to our use case document. In this document, we showcase how RisingWave can be used to monitor and process system performance metrics, subsequently presenting them in real time through Grafana.
Although our demonstration is relatively simple, we firmly believe that with real data, and in your familiar business scenarios, you can achieve much richer and more impactful display effects.
Conclusion
RisingWave distinguishes itself with its remarkable simplicity, enabling users to engage in distributed stream processing effortlessly using SQL. In terms of performance, RisingWave outperforms stream processing platforms of the big data era, such as Apache Flink.
The extent of this performance improvement is quite noteworthy. Brace yourself for the details: stateless computing demonstrates an enhancement of around 10%-30%, while stateful computing showcases an astonishing 10X+ boost!
Keep an eye out for the upcoming performance report to delve deeper into these findings. An efficient stream processing platform should always prioritize simplicity, and RisingWave delivers precisely that and more.
This article was originally published on Medium.
Yingjun Wu
Founder and CEO at RisingWave Labs
In this article, we'll show you how to set up a continuous data pipeline that seamlessly captures changes from your Postgres database using Change Data Capture (CDC) and streams them to Apache Iceberg.
By combining platforms like EMQX for industrial data streaming and RisingWave for real-time analytics, manufacturers can tap into machine-generated data as it happens, enabling predictive maintenance, reduced downtime, and improved efficiency. This integrated approach allows industries to respond swiftly to equipment failures, optimize production, and make data-driven decisions that boost overall equipment effectiveness (OEE) and operational agility.
In this article, we’ve demonstrated how to build a core fraud detection system using RisingWave. With minimal setup, you can easily integrate these components into your existing technical stack and have a functional fraud detection solution up and running.