Disclaimer: This blog focuses on the design in the released version v0.1.10. RisingWave is actively evolving, and we expect any technical details to be outdated at any time.

In database systems, we can speed up join computations by creating indexes. For example, Postgres supports Index Scans to speed up joins by scanning the index instead of the original data. However, "index" is difficult to implement in stream processing systems, mainly due to two reasons:

  • The pattern of accessing states in streaming systems is usually "one-read-one-write." Stream processing systems deal with unbounded data, which requires persisting intermediate states in the state storage and recovering from checkpoints in the case of system failures. These intermediate states are usually only read and written by a single operator with a single degree of parallelism and are not shared.
  • An index is not a stream of data flowing in the dataflow graph but something that is queried by an operator. Generally speaking, the streaming data flows through the dataflow, and operators can always obtain data produced by upstream operators in the dataflow. The stream operators only perform operations based on the data in the received stream and their local intermediate states. On the other hand, the index is something that needs to be actively queried (rather than flowing on the graph). The operator needs to actively obtain the index to get the final result.


Differential Dataflow (DD) implements an indexing scheme, referred to as arrangement, in a streaming system. An arrangement can be interpreted as an MVCC index that records the mapping of (index key, time) → value. An arrangement built for streaming data is essentially an index, where downstream operators can efficiently query all values corresponding to a certain time point through the index key.

On top of arrangements, DD introduces Delta Join, a special kind of join operator in its streaming system. As long as all streams that need to be joined are indexed on the join key, Delta Join can utilize the index to generate the final join result directly without any intermediate state. That is saying if two joins cover the same stream, which is indexed, the delta join operators for two joins can access the same arrangement on that stream directly, thus saving the storage and maintenance cost. In contrast, a regular join operator needs to persist the records of the left and right streams, respectively. Furthermore, if it is a multi-way join, the record of each binary join also needs to be persisted.

It might seem satisfying that with arrangement and delta joins, we can share the state in a streaming system and save the cost of duplicated states. But this approach does not directly apply to RisingWave for mainly two reasons:

  • Arrangement join is implemented as in-memory MVCC, in which each record is timestamped, and index persistence is not considered. However, in RisingWave, streams are not timestamped.
  • Delta join in DD only works in a single instance, and RisingWave has been a distributed system from day one.

In RisingWave, we support state sharing by revising the delta join implementation. The core idea is to explore the epoch-based MVCC capability of the underlying cloud-native state storage. In this blog, I first recap RisingWave's data persistence process on its cloud-native storage, then introduce the implementation of shared state, and finally, I detail the implementation of delta join in RisingWave.


Checkpointing in RisingWave


Checkpointing is the process in which stream processing systems persist their intermediate states. In RisingWave, checkpointing is completely asynchronous. The checkpointing procedure does not affect the flow of data in the streaming system. The operator can continuously process the streaming data without being affected by the progress of the checkpointing. You can check this document for more details. Here, we briefly recapped the parts of the checkpoint process that may be relevant to implementing shared states.

The data of RisingWave's state storage engine "Hummock" mainly exists in two places: the shared buffer on each compute node and object storage services (which can be MinIO, S3, etc.). Shared buffer organizes data by checkpoint epoch; object storage service uses an LSM tree to organize data. Hummock stores snapshots at a certain point in time. The intermediate state calculated by the operator in an epoch is stored in the flush buffer inside the operator.

Let's take a simple query as an example:

CREATE MATERIALIZED VIEW mv_1 AS 

  SELECT * FROM t1, t2 

    WHERE t1.v1 = t2.v2 


This query yields two TableScan operators — Join operator and Materialize operator — in the streaming engine. The TableScan operator injects the changes from internal tables (or external data sources) to downstream operators in real time; the Join operator is used to compute the match from two streams satisfying the join condition t1.v1 = t2.v2; and the Materialize operator materializes the final result into storage, which can be queried by user-issued arbitrary queries. Suppose both tables t1 and t2 have only one column v1 and v2, respectively. At this point, the user inserts two rows in 1,2 in t1.

Close
Featured The user inserts two rows in 1,2 in t1.

After the two rows arrive at the Join operator, the operator will query the cache on whether these two rows have a match in t2. If there is a cache miss, then the operator will instead search the state store. The state store will scan the shared buffer as well as the data on the shared storage and merge the flush buffers to get the corresponding two lines in t2.

Close
Featured

After finding the match, the two new lines in t1 will be written to the flush buffer of the operator. At the same time, the matched data will be sent to downstream operators in the dataflow graph.

Close
Featured

Then the downstream Materialize operator will write the two new lines of matched results to its own flush buffer.

Close
Featured

At this point, the system decides to start a checkpointing process. RisingWave's meta-service will inject barriers into all data sources to start the checkpointing process. In this example, there are two sources, Table 1 and Table 2. When the system injects a barrier with epoch = 5, the data from epoch = 4 will be written to the state store by the operator, and the operator that receives the barrier will next process the data with epoch = 5.

Close
Featured

The Join operator aligns two barriers from its two upstreams. After receiving the barrier, the operator will write the data in the flush buffer to the shared buffer in the form of key-value pairs, and the operator can start processing the next batch of data.

Close
Featured

After the barrier flows through the Materialize operator, the materialized result will also be written into the shared buffer.

Close
Featured

And once the barrier flows through the entire graph, each computing node encodes the data in its shared buffer into SST files in the background and uploads it to the shared storage layer. When all nodes are uploaded, the checkpoint is declared complete. At this point, the shared buffer will clear the data that has been checkpointed, and the data on the shared storage will also be visible to the user's batch query.

Close
Featured

At this juncture, when users execute a query, e.g., SELECT * FROM mv_1, they can see the latest data.

Therefore, for an operator, checkpointing is simply encoding the data and writing it to the shared buffer without waiting for any I/O operations. In the background, the computing node will upload the data written to the shared buffer to the shared storage and complete the checkpoint process asynchronously. The regular computation of the stream operator will not be affected at all throughout the whole checkpoint process.


Shared state and index


After introducing RisingWave's barrier-based checkpointing process, we will now discuss how to implement a shared state based on such a design.


Index


In RisingWave, the only state that can be shared is the "index," which is a special kind of materialized view.

CREATE TABLE table(v1 int, v2 int, v3 int);
CREATE INDEX idx on table(v1, v2); 


Here CREATE INDEX is equivalent to:

CREATE MATERIALIZED VIEW idx AS
  SELECT * FROM table ORDER BY v1, v2; 


In RisingWave, the index is a specialized materialized view sorted according to the index key specified by the user. A materialized view is essentially key-value pairs ordered by certain rules. If an operator wants to query all records satisfying the condition v1 = 1, v2 = 2, it only needs to sequentially scan all records starting with [1, 2] in the indexed view. Indexed materialized views are constructed using the Arrange operator.

Next, we discuss how to implement shared indexes - allowing multiple downstream operators to access the same index (shared state) at the same time. In RisingWave, most of the state is "one-read-one-write." For example, the join operator will only read and write the join key in its local computation. Data is placed on shared storage, mainly facilitating the adjustment of parallelism (i.e., scale-in, scale-out). The following points should be considered to achieve a "one-write-multiple-read" state:

  • How to ensure that the operator can read a consistent state? If an intermediate state is in the process of writing, there is a problem with the access consistency of this state, and some special handling is required.
  • How does reading shared state reduce the impact on stream operators? If every access to the shared state requires accessing the remote storage or querying the upstream operator through RPC, it will greatly affect the performance of streaming computation.


Single node shared state


We start with discussing the case of sharing states on a single node. Let's first assume that there is an operator called Read that needs to read the shared state. The Read operator happens to be on the same compute node as the Arrange operator.

At this point, the Arrange operator receives the barrier.

Close
Featured

The Arrange operator writes the data with epoch = 1 into the shared buffer and sends the barrier to its downstream operators.

Close
Featured

When the Read operator receives the barrier, we can safely say that the upstream operator has written its state into the shared buffer. Therefore, in the single-node scenario, the downstream operator can directly read the epoch = 1 data that the Arrange operator writes into the shared buffer. After receiving the read request of epoch = 1, the storage will merge the data of epoch < 1 on the shared storage and the data of epoch = 1 in the shared buffer to get a complete state snapshot (with epoch = 1) of the Arrange operator in the state storage.

Close
Featured

Shared state across nodes


Things are more tricky when it comes to multiple nodes. If the Arrange and Read operators are not located on the same node, the Read operator needs to obtain the states written by the upstream Arrange operator remotely in some way. RisingWave achieves this by message replication: RisingWave replicates the update from the Arrange operator to the downstream node. The shared state is written to the shared storage only by the upstream operator, and the downstream operator only writes it to its local shared buffer after receiving the message. After the checkpoint is completed, the downstream node discards its local data and obtains a complete snapshot directly from the shared storage. Here is an example to describe this process in detail:

The Arrange and Read operators are located on two different nodes, Worker 1 and Worker 2, respectively. In the downstream node Worker 2, we create a "shadow operator" Replicate for the Arrange operator. After the Replicate operator receives the data sent by the upstream Arrange, it replicates the data and writes it to the shared buffer on Worker 2.

Close
Featured

The Read operator receives a barrier, indicating that the data for its local computation can be accessed on the node where it is located. As shown in the figure below, the data read by the Read operator is actually the combination of the replicated data in the shared buffer of Worker 2 and the data on the shared storage. At this point, the data written by the Arrange operator exists in the shared buffer of Worker 1 and Worker 2 at the same time.

Close
Featured

During the checkpointing process, only Worker 1, i.e., the node where Arrange is located, will upload the states in its local shared buffer to the shared storage. Worker 2 at the downstream will directly discard the "shadow data" replicated by the Replicate operator in its local memory. At this point, if the Read operator initiates a request to read epoch = 1, it can read all the data from the shared storage.


Delta Join


Delta Join is an implementation of joins without maintaining intermediate states. Materialize implements delta join based on differential dataflow, in which each record has its own timestamp. In RisingWave, there is no native support for timestamping each record. The data on the shared storage are organized based on the barrier(epoch)-based checkpointing. To support delta join in RisingWave, we need to revise its implementation.


Mathematical principles


After supporting the shared state, we can reuse the state of indexes in delta join.

For stream processing systems, given the query A⋈B (assuming natural join, i.e., equal joins), the streaming engine actually needs to compute Δ(A⋈B) in each epoch. According to the equivalence rules of relational algebra, we have:

Δ(A⋈B)=[ΔA⋈(B∪ΔB)]∪(ΔB∪A)

The rule also generalized to multi-way joins. For example, the delta Δ(A⋈B⋈C) of a three-way join query can be expanded into:

Δ(A⋈B⋈C)=[ΔA⋈(B∪ΔB)⋈(C∪ΔC)]
 ∪[ΔB⋈A⋈(C∪ΔC)]
 ∪[ΔC⋈A⋈B]

It is easy to find that the update of N-way joins can always be expressed as a ‘normal form’: the union of N terms, in which:

  • Each clause is a production of n items.
  • The first item is ΔT.
  • Each table appears only once.
  • Except for the first item, each table appears as either T or T+ΔT.


In RisingWave, we can build a dataflow graph to represent such delta join computation in the streaming engine. Here T+ΔT can be read from the index of the current epoch, and T can be read from the index of the previous epoch. For example, for a three-way join, using the above formula, we can construct the following dataflow graph:

Close
Featured Each line of operators corresponds to exactly one term in the normal form.

We implement a special Union operator, which merges the delta stream from three upstream Lookup executors in order. Firstly ΔC, then ΔB, finally ΔA. Otherwise, the order of the two causally related records will be reversed, resulting in erroneous results.


Lookup operator implementation


What the Lookup operator does is essentially stream-table join. One of its inputs is a stream, and the other is an index (it can be a Replicate, or it can be directly connected to Arrange).

If the Lookup operator accesses the index of the current epoch, it will wait until the barrier on the Arrange side arrives before starting the computation. As shown in the figure below, before receiving the ready barrier, the Lookup operator will cache the data on the stream; after receiving the barrier, it will immediately start processing the buffered data and find the corresponding matches from the index.

Close
Featured

If the Lookup operator accesses the index of the last epoch, since the Lookup operator always aligns the barrier, when processing the current epoch data, the barrier of the previous epoch must have been received. Therefore, the index of the last epoch must have been ready. In this mode, when Lookup receives a message, it matches the message from the index directly.

Close
Featured

Representation of Delta Join in the Optimizer


The dataflow graph of delta join is derived from LogicalMultiWayJoin. If there is an index matching the join operation, the optimizer will convert the logical multi-way join plan into a delta join dataflow graph. In the optimizer, the representation of the Lookup operator is similar to that of HashJoin, and it can perform optimizations such as predicate pushdown just like other operators. In particular, when generating N-way delta join plans, we implement a delta join solver to search for the best Lookup order. The solver uses a greedy algorithm to select the order of the tables in each line of join to find a plan with the minimum number of shuffles.

summary

State sharing and join optimization in RisingWave are taking shape. We implement the shared state and indexes of the stream processing system based on our cloud-native MVCC state storage. On top of that, we implement index-based join optimization in stream processing. There are still many optimizations that can be done in the future, such as supporting secondary indexes, shared caches, more join types, and much more. We will discuss these topics in the future.

RisingWave is an SQL-based open-source cloud-native streaming database implemented in Rust. It is backed by RisingWave Labs. Join our Slack community, follow us on Twitter and Linkedin, and subscribe to our Youtube channel to learn more about RisingWave.

Avatar

Chi Zhang

Community Contributor

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.