- The pattern of accessing states in streaming systems is usually "one-read-one-write." Stream processing systems deal with unbounded data, which requires persisting intermediate states in the state storage and recovering from checkpoints in the case of system failures. These intermediate states are usually only read and written by a single operator with a single degree of parallelism and are not shared.
- An index is not a stream of data flowing in the dataflow graph but something that is queried by an operator. Generally speaking, the streaming data flows through the dataflow, and operators can always obtain data produced by upstream operators in the dataflow. The stream operators only perform operations based on the data in the received stream and their local intermediate states. On the other hand, the index is something that needs to be actively queried (rather than flowing on the graph). The operator needs to actively obtain the index to get the final result.
Differential Dataflow (DD) implements an indexing scheme, referred to as arrangement, in a streaming system. An arrangement can be interpreted as an MVCC index that records the mapping of (index key, time) → value
. An arrangement built for streaming data is essentially an index, where downstream operators can efficiently query all values corresponding to a certain time point through the index key.
On top of arrangements, DD introduces Delta Join, a special kind of join operator in its streaming system. As long as all streams that need to be joined are indexed on the join key, Delta Join can utilize the index to generate the final join result directly without any intermediate state. That is saying if two joins cover the same stream, which is indexed, the delta join operators for two joins can access the same arrangement on that stream directly, thus saving the storage and maintenance cost. In contrast, a regular join operator needs to persist the records of the left and right streams, respectively. Furthermore, if it is a multi-way join, the record of each binary join also needs to be persisted.
It might seem satisfying that with arrangement and delta joins, we can share the state in a streaming system and save the cost of duplicated states. But this approach does not directly apply to RisingWave for mainly two reasons:
- Arrangement join is implemented as in-memory MVCC, in which each record is timestamped, and index persistence is not considered. However, in RisingWave, streams are not timestamped.
- Delta join in DD only works in a single instance, and RisingWave has been a distributed system from day one.
In RisingWave, we support state sharing by revising the delta join implementation. The core idea is to explore the epoch-based MVCC capability of the underlying cloud-native state storage. In this blog, I first recap RisingWave's data persistence process on its cloud-native storage, then introduce the implementation of shared state, and finally, I detail the implementation of delta join in RisingWave.
Checkpointing in RisingWave
Checkpointing is the process in which stream processing systems persist their intermediate states. In RisingWave, checkpointing is completely asynchronous. The checkpointing procedure does not affect the flow of data in the streaming system. The operator can continuously process the streaming data without being affected by the progress of the checkpointing. You can check this document for more details. Here, we briefly recapped the parts of the checkpoint process that may be relevant to implementing shared states.
The data of RisingWave's state storage engine "Hummock" mainly exists in two places: the shared buffer on each compute node and object storage services (which can be MinIO, S3, etc.). Shared buffer organizes data by checkpoint epoch; object storage service uses an LSM tree to organize data. Hummock stores snapshots at a certain point in time. The intermediate state calculated by the operator in an epoch is stored in the flush buffer inside the operator.
Let's take a simple query as an example:
CREATE MATERIALIZED VIEW mv_1 AS
SELECT * FROM t1, t2
WHERE t1.v1 = t2.v2
This query yields two TableScan
operators — Join operator
and Materialize operator
— in the streaming engine. The TableScan
operator injects the changes from internal tables (or external data sources) to downstream operators in real time; the Join
operator is used to compute the match from two streams satisfying the join condition t1.v1 = t2.v2
; and the Materialize
operator materializes the final result into storage, which can be queried by user-issued arbitrary queries. Suppose both tables t1 and t2
have only one column v1 and v2
, respectively. At this point, the user inserts two rows in 1,2
in t1
.
After the two rows arrive at the Join
operator, the operator will query the cache on whether these two rows have a match in t2
. If there is a cache miss, then the operator will instead search the state store. The state store will scan the shared buffer as well as the data on the shared storage and merge the flush buffers to get the corresponding two lines in t2
.
After finding the match, the two new lines in t1
will be written to the flush buffer of the operator. At the same time, the matched data will be sent to downstream operators in the dataflow graph.
Then the downstream Materialize
operator will write the two new lines of matched results to its own flush buffer.
At this point, the system decides to start a checkpointing process. RisingWave's meta-service will inject barriers into all data sources to start the checkpointing process. In this example, there are two sources, Table 1
and Table 2
. When the system injects a barrier with epoch = 5, the data from epoch = 4 will be written to the state store by the operator, and the operator that receives the barrier will next process the data with epoch = 5.
The Join
operator aligns two barriers from its two upstreams. After receiving the barrier, the operator will write the data in the flush buffer to the shared buffer in the form of key-value pairs, and the operator can start processing the next batch of data.
After the barrier flows through the Materialize
operator, the materialized result will also be written into the shared buffer.
And once the barrier flows through the entire graph, each computing node encodes the data in its shared buffer into SST files in the background and uploads it to the shared storage layer. When all nodes are uploaded, the checkpoint is declared complete. At this point, the shared buffer will clear the data that has been checkpointed, and the data on the shared storage will also be visible to the user's batch query.
At this juncture, when users execute a query, e.g., SELECT * FROM mv_1
, they can see the latest data.
Therefore, for an operator, checkpointing is simply encoding the data and writing it to the shared buffer without waiting for any I/O operations. In the background, the computing node will upload the data written to the shared buffer to the shared storage and complete the checkpoint process asynchronously. The regular computation of the stream operator will not be affected at all throughout the whole checkpoint process.
Shared state and index
After introducing RisingWave's barrier-based checkpointing process, we will now discuss how to implement a shared state based on such a design.
Index
In RisingWave, the only state that can be shared is the "index," which is a special kind of materialized view.
CREATE TABLE table(v1 int, v2 int, v3 int);
CREATE INDEX idx on table(v1, v2);
Here CREATE INDEX
is equivalent to:
CREATE MATERIALIZED VIEW idx AS
SELECT * FROM table ORDER BY v1, v2;
In RisingWave, the index is a specialized materialized view sorted according to the index key specified by the user. A materialized view is essentially key-value pairs ordered by certain rules. If an operator wants to query all records satisfying the condition v1 = 1, v2 = 2
, it only needs to sequentially scan all records starting with [1, 2]
in the indexed view. Indexed materialized views are constructed using the Arrange operator.
Next, we discuss how to implement shared indexes - allowing multiple downstream operators to access the same index (shared state) at the same time. In RisingWave, most of the state is "one-read-one-write." For example, the join operator will only read and write the join key in its local computation. Data is placed on shared storage, mainly facilitating the adjustment of parallelism (i.e., scale-in, scale-out). The following points should be considered to achieve a "one-write-multiple-read" state:
- How to ensure that the operator can read a consistent state? If an intermediate state is in the process of writing, there is a problem with the access consistency of this state, and some special handling is required.
- How does reading shared state reduce the impact on stream operators? If every access to the shared state requires accessing the remote storage or querying the upstream operator through RPC, it will greatly affect the performance of streaming computation.
Single node shared state
We start with discussing the case of sharing states on a single node. Let's first assume that there is an operator called Read
that needs to read the shared state. The Read
operator happens to be on the same compute node as the Arrange
operator.
At this point, the Arrange
operator receives the barrier.
The Arrange
operator writes the data with epoch = 1 into the shared buffer and sends the barrier to its downstream operators.
When the Read
operator receives the barrier, we can safely say that the upstream operator has written its state into the shared buffer. Therefore, in the single-node scenario, the downstream operator can directly read the epoch = 1 data that the Arrange
operator writes into the shared buffer. After receiving the read request of epoch = 1, the storage will merge the data of epoch < 1 on the shared storage and the data of epoch = 1 in the shared buffer to get a complete state snapshot (with epoch = 1) of the Arrange
operator in the state storage.
Shared state across nodes
Things are more tricky when it comes to multiple nodes. If the Arrange
and Read
operators are not located on the same node, the Read
operator needs to obtain the states written by the upstream Arrange
operator remotely in some way. RisingWave achieves this by message replication: RisingWave replicates the update from the Arrange
operator to the downstream node. The shared state is written to the shared storage only by the upstream operator, and the downstream operator only writes it to its local shared buffer after receiving the message. After the checkpoint is completed, the downstream node discards its local data and obtains a complete snapshot directly from the shared storage. Here is an example to describe this process in detail:
The Arrange
and Read
operators are located on two different nodes, Worker 1
and Worker 2
, respectively. In the downstream node Worker 2
, we create a "shadow operator" Replicate
for the Arrange
operator. After the Replicate
operator receives the data sent by the upstream Arrange
, it replicates the data and writes it to the shared buffer on Worker 2
.
The Read
operator receives a barrier, indicating that the data for its local computation can be accessed on the node where it is located. As shown in the figure below, the data read by the Read
operator is actually the combination of the replicated data in the shared buffer of Worker 2
and the data on the shared storage. At this point, the data written by the Arrange
operator exists in the shared buffer of Worker 1
and Worker 2
at the same time.
During the checkpointing process, only Worker 1
, i.e., the node where Arrange
is located, will upload the states in its local shared buffer to the shared storage. Worker 2
at the downstream will directly discard the "shadow data" replicated by the Replicate
operator in its local memory. At this point, if the Read
operator initiates a request to read epoch = 1, it can read all the data from the shared storage.
Delta Join
Delta Join is an implementation of joins without maintaining intermediate states. Materialize implements delta join based on differential dataflow, in which each record has its own timestamp. In RisingWave, there is no native support for timestamping each record. The data on the shared storage are organized based on the barrier(epoch)-based checkpointing. To support delta join in RisingWave, we need to revise its implementation.
Mathematical principles
After supporting the shared state, we can reuse the state of indexes in delta join.
For stream processing systems, given the query A⋈B (assuming natural join, i.e., equal joins), the streaming engine actually needs to compute Δ(A⋈B) in each epoch. According to the equivalence rules of relational algebra, we have:
Δ(A⋈B)=[ΔA⋈(B∪ΔB)]∪(ΔB∪A)
The rule also generalized to multi-way joins. For example, the delta Δ(A⋈B⋈C) of a three-way join query can be expanded into:
Δ(A⋈B⋈C)=[ΔA⋈(B∪ΔB)⋈(C∪ΔC)]
∪[ΔB⋈A⋈(C∪ΔC)]
∪[ΔC⋈A⋈B]
It is easy to find that the update of N-way joins can always be expressed as a ‘normal form’: the union of N terms, in which:
- Each clause is a production of n items.
- The first item is ΔT.
- Each table appears only once.
- Except for the first item, each table appears as either T or T+ΔT.
In RisingWave, we can build a dataflow graph to represent such delta join computation in the streaming engine. Here T+ΔT can be read from the index of the current epoch, and T can be read from the index of the previous epoch. For example, for a three-way join, using the above formula, we can construct the following dataflow graph:
We implement a special Union
operator, which merges the delta stream from three upstream Lookup
executors in order. Firstly ΔC, then ΔB, finally ΔA. Otherwise, the order of the two causally related records will be reversed, resulting in erroneous results.
Lookup operator implementation
What the Lookup
operator does is essentially stream-table join. One of its inputs is a stream, and the other is an index (it can be a Replicate
, or it can be directly connected to Arrange
).
If the Lookup
operator accesses the index of the current epoch, it will wait until the barrier on the Arrange
side arrives before starting the computation. As shown in the figure below, before receiving the ready barrier, the Lookup
operator will cache the data on the stream; after receiving the barrier, it will immediately start processing the buffered data and find the corresponding matches from the index.
If the Lookup
operator accesses the index of the last epoch, since the Lookup operator always aligns the barrier, when processing the current epoch data, the barrier of the previous epoch must have been received. Therefore, the index of the last epoch must have been ready. In this mode, when Lookup
receives a message, it matches the message from the index directly.
Representation of Delta Join in the Optimizer
The dataflow graph of delta join is derived from LogicalMultiWayJoin
. If there is an index matching the join operation, the optimizer will convert the logical multi-way join plan into a delta join dataflow graph. In the optimizer, the representation of the Lookup operator is similar to that of HashJoin
, and it can perform optimizations such as predicate pushdown just like other operators. In particular, when generating N-way delta join plans, we implement a delta join solver to search for the best Lookup
order. The solver uses a greedy algorithm to select the order of the tables in each line of join to find a plan with the minimum number of shuffles.
State sharing and join optimization in RisingWave are taking shape. We implement the shared state and indexes of the stream processing system based on our cloud-native MVCC state storage. On top of that, we implement index-based join optimization in stream processing. There are still many optimizations that can be done in the future, such as supporting secondary indexes, shared caches, more join types, and much more. We will discuss these topics in the future. RisingWave is an SQL-based open-source cloud-native streaming database implemented in Rust. It is backed by RisingWave Labs. Join our Slack community, follow us on Twitter and Linkedin, and subscribe to our Youtube channel to learn more about RisingWave.