Deep Dive Into the RisingWave Stream Processing Engine - Part 2: Computational Model
This article presents the computational model of the RisingWave stream processing engine and describes how to transform declarative SQL queries into a series of stream operators. Starting from the relational algebra behind SQL, it extends to a stream computational model based on TVR and further introduces the change relation stream model of RisingWave.
In the previous article, "Deep Dive into the RisingWave Stream Processing Engine - Part 1: Overview," we described the design principles and core features of the RisingWave stream processing engine. In this article, we will focus on the computational model.
SQL and relational algebra
In the previous article, we mentioned that RisingWave is a streaming database that uses SQL to define stream processing tasks declaratively. Let's first look at the computational model of a classic SQL query engine and then think about how to design the stream processing computational model under SQL.
SQL has become a long-lasting database query language due to the underlying relational algebra's significant role. Any SQL statement can be equivalently transformed into an operator tree composed of relational algebra operators. For example, the following SQL query can be represented as the operator tree in the figure below.
CREATE TABLE StoriesVC(story_id int, vote_count int); CREATE TABLE Stories (id int, title text); /* Get stories information with more than 1 vote*/ SELECT id, title, vote_count FROM Stories JOIN StoriesVC ON StoriesVC.story_id = Stories.id WHERE vote_count > 1;
Relational algebra, as an algebraic system, satisfies the closure property, meaning that the input and output of any relational algebra operator are relations with specific structures (schemas). This allows relational operators to be flexibly nested without exceeding the scope of relational algebra. Query engines can leverage the characteristics of relational operations and use a unified framework for processing. This flexibility is reflected in SQL through the presence of subqueries everywhere. Users can chain their own defined SQL queries together. For example, the previous query can evolve into the following query.
/* Get stories information with more than 1 vote*/ SELECT id, title, vote_count FROM Storiess JOIN ( SELECT count(*) AS vote_count, story_id FROM Votes; ) StoriesVC ON StoriesVC.story_id = Stories.id WHERE vote_count > 1;
Through the analysis of SQL and relational algebra, we can conclude that to support the computational model of stream processing using SQL, the following two requirements need to be met:
- Operators need to be extended from traditional relational algebra operators so that SQL can be transformed into corresponding classic SQL operators and then into corresponding stream operators.
- The closure property needs to be satisfied to form a new algebraic system where the input and output have the same properties.
In summary, the key issue in the computational model is the definition of the computational objects.
Computational model based on TVR
Obviously, computational objects must be extended from relations to meet the first requirement. One attempt to solve this problem is the streaming SQL on Time-varying Relations (TVR). This model is explained in detail in Chapter 8 of "Streaming Systems" and "One SQL to Rule Them All – an Efficient and Syntactically Idiomatic Approach to Management of Streams and Tables." In simple terms, TVR is a time-varying multi-version relation. For any classic SQL operator, the output TVR can be calculated from each snapshot version of the input TVR, resulting in the desired output TVR. This satisfies the two requirements mentioned above.
TVR is a theory-oriented model. If we build a query engine that adheres to its original definition, every time an upstream TVR changes, generating a new snapshot version and calculating the output result with full computation is required. In the end, we would obtain an algorithm for fully calculating and refreshing materialized views. For example, if we build a materialized view called
StoriesVC and make modifications to the base table
Votes, the calculation process would be as shown in the figure below. Each modification generates a new snapshot, and to update the materialized view
StoriesVC, the calculation process (full computation) needs to be re-executed on the snapshot.
CREATE TABLE Votes(user_id int, story_id int); CREATE MATERIALIZED VIEW StoriesVC AS SELECT story_id, COUNT(*) AS vcount FROM Votes GROUP BY story_id HAVING vcount >= 2; INSERT INTO Votes VALUES (1,1), (2,1), (3,2); DELETE FROM Votes WHERE user_id = 1 AND story_id = 1; INSERT INTO Votes VALUES (4,1), (5,2);
Computational model based on change streams
Performing full computation on each snapshot is expensive and involves a significant amount of redundant computation. RisingWave, like other stream processing systems, adopts an incremental computational model. Specifically, the input and output of operators are changes to relations. RisingWave's stream computation engine represents changes using a series of
DELETE operations. In the example mentioned earlier, the computational model under this model is shown in the figure, where "+" represents
INSERT and "-" represents
DELETE. The changes to the base table
Votes go through individual operators to compute the changes to the target materialized view
It is worth noting that there can be various ways to represent changes to relations, but they still need to satisfy the closure property, meaning that the representation of changes in the input and output of operators must be the same. Therefore, RisingWave adopts the simplest
DELETE operations for representing changes, and on top of that, it provides additional desirable properties.
A change stream consisting only of
DELETE without any other guarantees is not friendly to storage engines. Just imagine finding a value from an unordered list. To support fast insertion and deletion, it usually requires a unique key for indexing the data. Therefore, to ensure that there is always a unique key that can be used as a primary key for each materialized view, RisingWave's optimizer performs a bottom-up deduction on the operator tree to ensure that each operator's output has a unique key. It is also known as a stream key, representing the operation on the change stream with respect to this key.
For example, consider the following join materialized view statement. Its output does not have a unique key. The optimizer rewrites it by adding two columns as stream keys in the Scan operator and Join operator, taken from the original primary key UUID of the two tables. These two columns will appear in the result set, and the materialized view can use them as hidden columns for a composite primary key.
CREATE TABLE VisitEvent( story_id int, user_id int, time datetime, uuid varchar primary key); CREATE TABLE Comments( story_id int, user_id int, created_at datetime, content varchar, uuid varchar primary key); CREATE MATERIALIZED VIEW mv AS SELECT Comments.story_id as story_id, Comments.user_id as user_id, Comments.contentas content, Comments.created_atas comment_time, VisitEvent.timeas visit_time, FROM CommentsJOIN VisitEventUSING(story_id, user_id);
Since the stream key is derived from the properties of relational algebra itself, the output change stream naturally satisfies the stream key constraint when the input change stream satisfies its own stream key constraint. In this case, the constraint means that
DELETE operations with the same stream key must alternate and that an existing stream key cannot be inserted or a non-existing stream key cannot be deleted.
Other types of change streams
Readers familiar with stream processing are likely aware that different scenarios may have different representations of change streams, which differ slightly from RisingWave's change streams. In this section, we will briefly introduce append-only streams and upsert streams and describe how RisingWave converts them into its internal representation when these change streams are input from external systems.
An append-only stream is a change stream on a table with specific restrictions. It can only represent changes that
INSERT records are appended to the original table. Records that have already been inserted in the table will not be deleted, revoked, or updated in the future. For such a stream, RisingWave uses a Snowflake-like primary key generation algorithm to generate a new column as the stream key. Afterward, this stream becomes an internal RisingWave change stream that only has
Similarly, an upsert stream is a change stream defined on a table with keys, which includes
DELETE operations. The behavior of the
UPSERT operation depends on the current state of the key in the table. If the key does not exist in the table, the record is inserted; otherwise, the corresponding record with the key in the table is overwritten. The
UPSERT operation loses the state of the record before the change compared to RisingWave's change stream. Not all stream operators efficiently support
UPSERT operations. For example, aggregation operations like
SUM require knowledge of the old value before the change event to calculate the incremental change in the result, but the upsert stream loses this information. Therefore, RisingWave converts incoming upsert streams into internal change streams. Specifically, RisingWave materializes the corresponding table and when encountering an
UPSERT operation, it queries the value of the key in the table and fills in the corresponding
DELETE operation as needed.
This article presents the computational model of the RisingWave stream processing engine and describes how to transform declarative SQL queries into a series of stream operators. Starting from the relational algebra behind SQL, it extends to a stream computational model based on TVR and further introduces the change relation stream model of RisingWave. It provides a detailed description of the types and characteristics of change streams in RisingWave and compares them with common change streams in other systems.