Deep Dive Into the RisingWave Stream Processing Engine (Part 3): Trigger Mechanism

Deep Dive Into the RisingWave Stream Processing Engine (Part 3): Trigger Mechanism

In the previous two articles, we learned that the RisingWave stream processing engine performs incremental computation on the change streams derived from the base relational tables to obtain the change streams of the target relational tables. In this model, since the input change streams are unbounded, the stream processing engine can choose to process arbitrary portions of the streams, and the output change streams can also include materialized intermediate results. For example, consider the following case.

CREATE TABLE Votes(user_id int, story_id int);

CREATE MATERIALIZED VIEW StoriesVC AS
SELECT story_id, COUNT(*) AS vcount
FROM Votes GROUP BY story_id

/* Txn 1 */
INSERT INTO Votes VALUES (1,1), (2,1), (3,2);
/* Txn 2 */
DELETE FROM Votes WHERE user_id = 1 AND story_id = 1;
/* Txn 3 */
INSERT INTO Votes VALUES (4,1), (5,2);

Compute with the different portions.

The output streams in the three cases differ for the same input stream, but all are correct. The final materialized results of the output streams are (1, 2) and (2, 2).

Barrier

In the above example, the most obvious difference is that the input stream is divided into different granularities and processed segment by segment. Like some other stream processing systems, RisingWave introduces the barrier on the stream. The sources on the computing DAG insert barriers into the data stream, which divide the input stream into many segments.

After receiving a barrier, each operator performs the following steps:

  1. Alignment: Operators with multiple input streams (such as join and union) need to consume data from other streams until they reach their respective barriers, and finally collect all input streams under the same barrier.

  2. Flush: The operator processes all the inputs received so far and outputs the resulting change stream to the downstream.

  3. Broadcast: Sends the barrier to the downstream operators.

The example mentioned above illustrates the insertion of barriers as follows:

Barrier example with the different portions.

By the way, barriers also play a crucial role in the fault tolerance and recovery of the RisingWave stream processing. From the perspective of ACID transactions, here we use barriers to constrain the behavior of operators, ensuring that when nodes in the graph receive a barrier, all the change streams they have received are based on the same version of the base table. This achieves snapshot isolation and ensures the consistency of the states of nodes in the graph. In the upcoming detailed articles on fault tolerance and recovery in RisingWave stream processing, we will further explore how RisingWave ensures atomicity and durability.

Barrier injection strategy

Barriers control the computation of each operator in the stream processing graph, keeping them in sync. The strategy for injecting barriers into the data stream at the source nodes is crucial.

One straightforward approach is to inject barriers at fixed time intervals (barrier interval). For the compute engine, batching can eliminate unnecessary intermediate results and provide better performance. However, in practical applications, users have certain freshness requirements for the output results, so the compute engine cannot accumulate batches indefinitely. Here, the barrier interval sets the upper limit for batching. When the barrier arrives, batching must be stopped and the current intermediate results are output to the downstream. This allows users to achieve a trade-off between performance and freshness by simply configuring a parameter.

On the other hand, for change streams with transactional semantics, sources can maintain the transactional semantics of the upstream by controlling the insertion of barriers without interrupting the upstream transactions.

Watermark and trigger

Regarding when to trigger the output of stream processing, another classic approach is the Dataflow Model proposed by Google in 2015. It partitions the data based on time windows defined on the event time column and uses watermarks to describe the completeness of events. This allows the compute engine to process the events within each window in a batch-like manner only when the events are complete and will not change. This triggering method is also known as completeness triggering.

Watermark(t) is inserted into the data stream, indicating that events with an event time earlier than t have already arrived in the data stream and will not appear again. In RisingWave, this means that there will be no changes on records with event time before t. Under this model, the computation of operators is triggered by the watermark, and operators output results when the watermark determines that the computation results will not change.

For example, the following graph shows an example of counting within a tumble window of 5 seconds. When the aggregation operator receives a watermark with tm > 12:00:11, it knows that there will no longer be data in the windows starting at 12:00:00 and 12:00:05. At this point, the operator can output the results of these two windows.

Watermark example.

Where does the watermark come from? In practice, few upstream systems can provide perfect watermark semantics guarantees when producing streams. Typically, watermark injection is specified in the stream computing system through timeouts or other strategies. For example, in RisingWave, you can define a watermark with a timeout of 5 seconds using SQL. More details can be found in the documentation. In future articles, we will discuss other details of watermarks, such as handling late data.

EMIT ON WINDOW CLOSE

When it comes to window computations, we have two output strategies: the first one is to output partial computation results even if the window is not closed when a barrier is reached, known as "emit on update." The second one is to ignore the barriers and output the complete computation results only when the window is closed, named as "emit on window close" (EOWC). The first strategy provides better real-time consistency, while the second strategy ensures that the output data is no longer subject to changes and eliminates the intermediate state. RisingWave defaults to the first strategy, but when users require the second strategy, we provide an experimental EOWC syntax. It is mainly used in the following scenarios:

  1. When there is a need for an append-only output stream. This is often required by downstream systems such as Kafka or S3.

  2. When better performance is desired at the cost of some delay. The EOWC semantics eliminate the intermediate state of certain operators and allow downstream operators to consume an append-only stream. This optimization can significantly improve performance, particularly in queries involving window aggregation and OVER window functions.

  3. In scenarios like monitoring and alerts, sometimes users do not want to see intermediate results but only expect to see the aggregated results after the window is closed.

Conclusion

This article showcases two modes of triggering computation in the RisingWave stream processing engine. The default barrier-triggered computation ensures the consistency of the states between nodes in the computation graph, resulting in data consistency between materialized views. It also allows users to balance cost and freshness by adjusting the barrier interval. The EOWC semantics introduced from the Dataflow Model leverages additional watermark definitions to trigger computation only when the window’s events are complete. This mode plays a unique role in some scenarios such as append-only sinks and window aggregation.

Also in this series

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.