Overview


Seven months ago, we open-sourced RisingWave, a cloud-native streaming database written in Rust. In this blog, I will introduce how RisingWave manages the streaming state in our Hummock cloud storage engine.

Close
Featured An overview of RisingWave architecture.

Hummock Overview


Hummock is the kernel storage part of all internal states and materialized views in RisingWave. As drawn in the architecture diagram above, Hummock is not a standalone storage system but a storage engine running on every compute node and interacting with the shared storage. Currently, only object storage systems with S3 protocol are supported.

Hummock supports the following interfaces:

  • get(key, epoch): get a value by key and epoch
  • iter(prefix, epoch): scan a range by prefix and epoch
  • batch_ingest(batches): ingest a batch of key-value pairs


As you can see, Hummock does not provide the standard put interface but only the batch input one, unlike the standard key-value store interface. At the same time, all operations are parameterized with epoch. This is related to RisingWave's epoch-based state management mechanism.


Epoch-Based Checkpoint


RisingWave is a partially synchronized system based on a global epoch. The central meta node generates an epoch periodically and issues an InjectBarrier request to all source nodes of the entire DAG. After receiving the barrier, the source node injects it into the source stream.

Close
Featured RisingWave’s meta node will generate epochs periodically and issue injectBarrier requests to all the source nodes in the DAG.
message Barrier {
  Epoch epoch = 1;
  oneof mutation {
    NothingMutation nothing = 2;
    StopMutation stop = 3;
    UpdateMutation update = 4;
    AddMutation add = 5;
  }
}

For any operator in the DAG, when receiving a barrier, the following will happen:

  1. If it is an operator for multiple input streams (JoinUnion), it must wait for barriers from other streams until the same barrier is collected for all input streams.
  2. If a mutation needs to be manipulated (for scale-out, create mview, drop mview), it must apply the corresponding configuration change.
  3. Dump its local state (async checkpoint).


This blog focuses on the third item. In short, RisingWave is neither a local state backend nor a remote state backend but a hybrid form. Only the state after the latest barrier is the local state maintained by the operator, while the previous data is the remote state. The operator chooses to dump the state to the hummock store when it receives a barrier. This is why the hummock store only provides the ingest batch interface. The operator will only dump the local state into the Hummock when it receives a barrier.


Async Checkpoint


As we mentioned, when an operator receives a barrier, it dumps the data into Hummock. However, the barrier flows with the data stream, so if each operator needs to upload the wait state to the shared storage (S3-compatible storage systems) synchronously, then the data processing will block a whole upload round trip. Suppose there are N stateful operators in the DAG. In that case, the barrier will be delayed by N round trips during the entire transfer process, which will greatly influence the processing throughput of the whole system. Therefore, we make the barrier processing asynchronously. After receiving a barrier, a stateful operator must take the current epoch's local state and reset it to an empty state so the operator can move on to the next epoch's data. This also introduces a series of problems.

  1. Where is the local state of this epoch being taken to?
  2. Since the local state is not uploaded to S3 synchronously, what should be done with the queries for the data during this time?
  3. What if the compute node crashes when uploading asynchronously, and how do we know whether the checkpoint is successful?

To answer the above questions, we introduce Shared Buffer.


Shared Buffer


Shared Buffer is a background task shared by all operators of a Compute Node. When a stateful operator receives a barrier, the local state is taken to the Shared Buffer.

The Shared Buffer is mainly responsible for the following things:

  1. (Optional) The state of some operators may be small, such as SimpleAgg, and depending on the size of the local state, the state of different operators is sliced and merged at file granularity as appropriate.
  2. Upload the operator's local state to shared storage.
  3. Register the state to the meta service that has been persistent.
  4. Serve queries from within the operator for local states that have not yet been uploaded successfully.


From the user's point of view, a checkpoint is considered complete only when all local states of all operators in an epoch are uploaded and registered with the meta service. Both user query and recovery are based on the latest complete checkpoint.

From the internal operator's point of view, when reading its state, it must be required to read the complete and latest form, so in fact, the internal operator needs the merged results from the state store, shared buffer, and local state.


Local Cache


Since most of the state is in the state store, RisingWave makes it easy to implement scale-out, but the cost is obvious. Compared to the local state design of Flink, RisingWave requires a lot more remote lookup.

When the HashAgg operator receives a barrier, it dumps the statistics of the current barrier into the shared buffer, resetting the local state of the operator to empty. However, when processing the next epoch data, the recently processed group key may still be a hot spot, and we have to retrieve the corresponding key from the shared buffer or even the remote state. Therefore, the option is not to clear the local state of the previous epoch by resetting it inside the operator. We mark it as evictable and then clean up the evictable data records when there is not enough memory.

In the case of insufficient memory or for operators with tiny states (e.g., the SimpleAgg operator has only one record), all states are in memory and operated by the current thread, maximizing performance. In contrast, the persistent state is only used for recovery and query. In the case of insufficient memory or arithmetic with apparent hot and cold features (e.g., the TopN operator), it is possible to guarantee correct operation (remote lookup for cold data) while fully exploiting every bit of memory.


Compactor


RisingWave will introduce a background process named compactor to maintain the remote state store:

The compactor has the following main tasks:

  1. Recycle garbage: Some operators will generate DELETE records, which will also create a tombstone record that needs to be deleted during the compaction—also, overwriting of the same key need to be merged to reclaim space.
  2. Organizing data: Some operators tend to merge the states of different operators in the same epoch to reduce write amplification when uploading. However, compaction tends to merge the states of different epochs of the same operator for subsequent query-oriented optimization to reduce read amplification. In addition, RisingWave tends to align compute and storage distributions as much as possible, so compaction is also needed to organize the data after a scale-out occurs.


The compactor that performs the compaction task can be flexibly deployed, either mounted on a compute node or started by a standalone process. In the cloud, we can also use cheap spot instances to run multiple compactors so that they can match up the fluctuating workload without influencing the online streaming jobs. If the user needs both freshness and query latency, paying more to perform more frequent compaction tasks and vice versa to help the user save money is reasonable.

Avatar

Tianyi Zhuang

Software Engineer

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.