About this series


In the "Deep Dive into the RisingWave Stream Processing Engine" series, we aim to share various aspects of the RisingWave stream processing engine from top to bottom through a series of articles, gradually delving into each module's technical principles and implementation details. We'll discuss the design philosophy behind creating this stream processing engine from scratch and dive deeper into the technical intricacies of each module.

Stream processing is still a relatively young field, and many best practices in design and engineering have yet to be standardized. We are eager to engage in deeper discussions with enthusiasts, researchers, and users interested in stream processing. We welcome discussions in our Slack channel and GitHub repo to collectively explore the future of stream processing technology.

So let's get started!


What does RisingWave's stream processing engine do?


In simple terms, RisingWave's stream processing engine supports running long-running distributed computing tasks. It continuously calculates the changes in the result table, which contains the result of a SQL query on a series of source tables, with the changes in the source tables.

This might sound a bit abstract, so let's illustrate it with an example.

In the following SQL, we define a table called "Stories" to maintain detailed information about articles, and users’ like behavior generates an append-only table called "Votes."

CREATE TABLE Stories (id int, author int, title text, url text, PRIMARY KEY id);
CREATE TABLE Votes (user int, story_id int);

Based on these two tables, we can use SQL statements to define a materialized view (MView). For example, the MView StoriesWithVC maintains the number of likes for each article.

CREATE MATERIALIZED VIEW StoriesWithVC AS
SELECT id, author, title, url, vcount
FROM stories
JOIN (
    SELECT story_id, COUNT(DISTINCT user_id) AS vcount
    FROM votes GROUP BY story_id
) VoteCount
WHERE VoteCount.story_id = stories.id;

Furthermore, we can define new MViews on StoriesWithVC.

CREATE MATERIALIZED VIEW Top10VotedStories AS
SELECT * FROM StoriesWithVC
ORDER BY vcount DESC
LIMIT 10;

In this example, RisingWave needs to efficiently maintain the contents of the MView and, since it supports cascading MView on MView, it also needs to calculate the changes in the MView itself. Maintaining the real-time results of the MView is straightforward, so RisingWave's stream processing engine only needs to calculate the downstream table's corresponding changes based on the changes in the upstream table. As shown in the diagram, when the source tables Stories and Votes receive changes, the stream processing engine calculates the changes in the downstream MView StoriesWithVC and MView Top10VotedStories.

Close
Featured Example of how RisingWave handles stream processing.

Design philosophy


SQL-native computational model


It is evident that both input and output of RisingWave's stream processing engine are data changes on relational tables, with well-defined schemas. In fact, the internal operators also use relational changes as the input and output. This is a result of RisingWave's positioning as a streaming database using a standard SQL interface to support users in creating stream processing jobs. From day one, RisingWave's stream processing was designed to handle SQL queries and relational data workloads. This is different from some other stream processing systems that emerged from the big data era. They often initially build a generic stream processing framework and then build a SQL engine based on that framework to provide a SQL interface.


From general to specialized design philosophy


The SQL-native model of computation allows us to set the problem boundaries of RisingWave in stream processing well, allowing us to build the entire stream processing engine with a Top-Down approach. In the designing of SQL features, it is usually started with the most general SQL operators. Then, specialization and optimization are done for different scenarios to achieve optimal performance for specific workloads. This design philosophy ensures that while RisingWave's stream processing model is general, it can also achieve excellent performance for various specific scenarios.

It is worth mentioning that during development, we found that this specialization can be distributed at different levels:

  1. Runtime adaptation of executors: We implemented an LRU-based state cache in the operators, ensuring that when the state is small and memory is abundant, all states can exist in memory.
  2. Optimizer specialization in generating plans: For example, we specialized operators for append-only streams or streams with watermarks, significantly optimizing performance.
  3. Configuration parameters: For instance, users can balance data freshness and execution cost by converging them to a few configuration parameters like barrier_interval.


Core features


Apart from this, some core features run through the design consistently. We consider these features as first-class citizens of the RisingWave stream processing engine and maintain these features consistently in every design and implementation.


Distributed parallel execution engine


RisingWave is a distributed system capable of processing data on a large scale in parallel. The RisingWave stream processing engine can make full use of multi-node and multi-core computing resources. The generated execution plan avoids single-point calculations as much as possible, distributing calculations to multiple nodes to reduce potential single-point bottlenecks in the system.


Flexible and elastic cloud-native scheduling


RisingWave, as a cloud-native stream database, supports dynamic scaling of computing resources on the basis of parallel processing. Since stream processing tasks are long-running, to make dynamic scaling of clusters effective, the stream processing engine must support online scheduling and migration of tasks. This poses significant challenges in terms of design abstractions for state storage, data partitioning, and more. The RisingWave stream processing engine exposes scheduling and migration interfaces to allow external control components to easily migrate and schedule computing tasks.


Fast and efficient fault tolerance and recovery


Fault tolerance is a fundamental capability that modern stream processing systems must have. Stream processing systems typically use a checkpoint mechanism to persist computing states, achieving exactly-once semantics within the system. Simultaneously, it is necessary to minimize or reduce the resource preemption caused by the checkpoint process, which affects foreground stream processing tasks. RisingWave has its own state storage engine and implements asynchronous checkpointing throughout the entire process, making the checkpoint process almost invisible to foreground tasks.


Snapshot isolation of stream computing results


As a stream database, RisingWave's internal objects, such as MViews and tables, need to ensure the consistency of computing results between them. To be more specific, when a user queries different MViews at the same moment, they should be based on the same version of the source table. This requires that changes in the upstream table and corresponding changes in the downstream table be atomically committed in the same write transaction.

This means that every stream operator cannot easily delay data or do batching. It should promptly dispatch changes when receiving upstream data. Readers familiar with stream processing may know that stream operators can provide better performance through batching. We have made two optimizations for this: first, we introduced the concept of an "epoch," which allows multiple changes on the source table within one epoch. It is equivalent to enlarging the size of the aforementioned write transaction, allowing operators to batch within that transaction. Second, for the common use case of stream computing, we designed the query semantics of "EMIT ON WINDOW CLOSE" for batching with watermark RFC: The Semantics of EMIT ON WINDOW CLOSE by fuyufjh · Pull Request #30 · risingwavelabs/rfcs (github.com), allowing users to declare the consistency semantics they desire between MViews and the upstream.


Overall architecture


The overall architecture is shown in the diagram below. The leftmost Frontend node connects with users via pgwire and is responsible for converting DDL SQL statements into stream processing execution plans. It optimizes the plans in the optimizer and sends the execution plans to the central Meta node for registration. Meta node persists the execution plans in a globally consistent and highly available MetaStore and sends computation tasks to computing nodes based on the execution plans. Throughout the entire lifecycle of a streaming job, the Meta node can send control instructions to computing nodes through RPC to implement task migration, scheduling, checkpoint, and failover. External systems or users can also directly control and schedule computing workloads on computing nodes through the interfaces exposed by the Meta node.

Close
Featured The overall architecture.

Conclusion

Starting from practical scenarios, this article introduced the use cases of the RisingWave stream processing engine, described the design philosophy and core features it relies on in architecture design, and provided an overview of the entire stream processing engine’s architecture.

Avatar

Tianshuo Shi

Software Engineer

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.