Deep Dive Into the RisingWave Stream Processing Engine - Part 2: Computational Model

Deep Dive Into the RisingWave Stream Processing Engine - Part 2: Computational Model

SQL and relational algebra

In the previous article, we mentioned that RisingWave is a streaming database that uses SQL to define stream processing tasks declaratively. Let's first look at the computational model of a classic SQL query engine and then think about how to design the stream processing computational model under SQL.

SQL has become a long-lasting database query language due to the underlying relational algebra's significant role. Any SQL statement can be equivalently transformed into an operator tree composed of relational algebra operators. For example, the following SQL query can be represented as the operator tree in the figure below.

CREATE TABLE StoriesVC(story_id int, vote_count int);
CREATE TABLE Stories (id int, title text);

/* Get stories information with more than 1 vote*/
SELECT id, title, vote_count
FROM Stories JOIN StoriesVC
ON StoriesVC.story_id = Stories.id
WHERE vote_count > 1;

The operator tree of the aforementioned SQL query.

Relational algebra, as an algebraic system, satisfies the closure property, meaning that the input and output of any relational algebra operator are relations with specific structures (schemas). This allows relational operators to be flexibly nested without exceeding the scope of relational algebra. Query engines can leverage the characteristics of relational operations and use a unified framework for processing. This flexibility is reflected in SQL through the presence of subqueries everywhere. Users can chain their own defined SQL queries together. For example, the previous query can evolve into the following query.

/* Get stories information with more than 1 vote*/
SELECT id, title, vote_count
FROM Storiess JOIN (
    SELECT count(*) AS vote_count, story_id FROM Votes;
) StoriesVC
ON StoriesVC.story_id = Stories.id
WHERE vote_count > 1;

Through the analysis of SQL and relational algebra, we can conclude that to support the computational model of stream processing using SQL, the following two requirements need to be met:

  1. Operators need to be extended from traditional relational algebra operators so that SQL can be transformed into corresponding classic SQL operators and then into corresponding stream operators.
  2. The closure property needs to be satisfied to form a new algebraic system where the input and output have the same properties.

In summary, the key issue in the computational model is the definition of the computational objects.

Computational model based on TVR

Obviously, computational objects must be extended from relations to meet the first requirement. One attempt to solve this problem is the streaming SQL on Time-varying Relations (TVR). This model is explained in detail in Chapter 8 of "Streaming Systems" and "One SQL to Rule Them All – an Efficient and Syntactically Idiomatic Approach to Management of Streams and Tables." In simple terms, TVR is a time-varying multi-version relation. For any classic SQL operator, the output TVR can be calculated from each snapshot version of the input TVR, resulting in the desired output TVR. This satisfies the two requirements mentioned above.

TVR is a theory-oriented model. If we build a query engine that adheres to its original definition, every time an upstream TVR changes, generating a new snapshot version and calculating the output result with full computation is required. In the end, we would obtain an algorithm for fully calculating and refreshing materialized views. For example, if we build a materialized view called StoriesVC and make modifications to the base table Votes, the calculation process would be as shown in the figure below. Each modification generates a new snapshot, and to update the materialized view StoriesVC, the calculation process (full computation) needs to be re-executed on the snapshot.

CREATE TABLE Votes(user_id int, story_id int);

CREATE MATERIALIZED VIEW StoriesVC AS
SELECT story_id, COUNT(*) AS vcount
FROM Votes GROUP BY story_id
HAVING vcount >= 2;

INSERT INTO Votes VALUES (1,1), (2,1), (3,2);
DELETE FROM Votes WHERE user_id = 1 AND story_id = 1;
INSERT INTO Votes VALUES (4,1), (5,2);

The calculation process of the example mentioned earlier under the incremental TVR model.

Computational model based on change streams

Performing full computation on each snapshot is expensive and involves a significant amount of redundant computation. RisingWave, like other stream processing systems, adopts an incremental computational model. Specifically, the input and output of operators are changes to relations. RisingWave's stream computation engine represents changes using a series of INSERT and DELETE operations. In the example mentioned earlier, the computational model under this model is shown in the figure, where "+" represents INSERT and "-" represents DELETE. The changes to the base table Votes go through individual operators to compute the changes to the target materialized view StoriesVC.

The calculation process of the example mentioned earlier under the incremental computational model.

It is worth noting that there can be various ways to represent changes to relations, but they still need to satisfy the closure property, meaning that the representation of changes in the input and output of operators must be the same. Therefore, RisingWave adopts the simplest INSERT and DELETE operations for representing changes, and on top of that, it provides additional desirable properties.

Stream key

A change stream consisting only of INSERT and DELETE without any other guarantees is not friendly to storage engines. Just imagine finding a value from an unordered list. To support fast insertion and deletion, it usually requires a unique key for indexing the data. Therefore, to ensure that there is always a unique key that can be used as a primary key for each materialized view, RisingWave's optimizer performs a bottom-up deduction on the operator tree to ensure that each operator's output has a unique key. It is also known as a stream key, representing the operation on the change stream with respect to this key.

For example, consider the following join materialized view statement. Its output does not have a unique key. The optimizer rewrites it by adding two columns as stream keys in the Scan operator and Join operator, taken from the original primary key UUID of the two tables. These two columns will appear in the result set, and the materialized view can use them as hidden columns for a composite primary key.

CREATE TABLE VisitEvent(
  story_id int,
  user_id int,
  time datetime,
  uuid varchar primary key);
CREATE TABLE Comments(
  story_id int,
  user_id int,
  created_at datetime,
  content varchar,
  uuid varchar primary key);

CREATE MATERIALIZED VIEW mv AS SELECT
  Comments.story_id as story_id,
  Comments.user_id as user_id,
  Comments.contentas content,
  Comments.created_atas comment_time,
  VisitEvent.timeas visit_time,
FROM CommentsJOIN VisitEventUSING(story_id, user_id);

The stream keys derivation of the example.

Since the stream key is derived from the properties of relational algebra itself, the output change stream naturally satisfies the stream key constraint when the input change stream satisfies its own stream key constraint. In this case, the constraint means that INSERT and DELETE operations with the same stream key must alternate and that an existing stream key cannot be inserted or a non-existing stream key cannot be deleted.

Other types of change streams

Readers familiar with stream processing are likely aware that different scenarios may have different representations of change streams, which differ slightly from RisingWave's change streams. In this section, we will briefly introduce append-only streams and upsert streams and describe how RisingWave converts them into its internal representation when these change streams are input from external systems.

An append-only stream is a change stream on a table with specific restrictions. It can only represent changes that INSERT records are appended to the original table. Records that have already been inserted in the table will not be deleted, revoked, or updated in the future. For such a stream, RisingWave uses a Snowflake-like primary key generation algorithm to generate a new column as the stream key. Afterward, this stream becomes an internal RisingWave change stream that only has INSERT operations.

Similarly, an upsert stream is a change stream defined on a table with keys, which includes UPSERT and DELETE operations. The behavior of the UPSERT operation depends on the current state of the key in the table. If the key does not exist in the table, the record is inserted; otherwise, the corresponding record with the key in the table is overwritten. The UPSERT operation loses the state of the record before the change compared to RisingWave's change stream. Not all stream operators efficiently support UPSERT operations. For example, aggregation operations like SUM require knowledge of the old value before the change event to calculate the incremental change in the result, but the upsert stream loses this information. Therefore, RisingWave converts incoming upsert streams into internal change streams. Specifically, RisingWave materializes the corresponding table and when encountering an UPSERT operation, it queries the value of the key in the table and fills in the corresponding DELETE operation as needed.

Convert the upsert stream to RisingWave's change stream.> This article presents the computational model of the RisingWave stream processing engine and describes how to transform declarative SQL queries into a series of stream operators. Starting from the relational algebra behind SQL, it extends to a stream computational model based on TVR and further introduces the change relation stream model of RisingWave. It provides a detailed description of the types and characteristics of change streams in RisingWave and compares them with common change streams in other systems.

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.