Hummock: A Storage Engine Designed for Stream Processing
Hummock is a cloud-native LSM storage engine developed by us. In this article, we will introduce some optimizations that Hummock has made for streaming.
Hummock is a cloud-native LSM storage engine developed by us. In this article, we will introduce some optimizations that Hummock has made for streaming.
In our previous article, State Management for Cloud Native Streaming: Getting to the Core, we introduced the core storage engine of RisingWave, Hummock, and its storage architecture. This article will focus on some optimizations that Hummock has made for streaming.
Similar to RocksDB, Hummock provides consistency in reading (Snapshot Read), which is crucial for incremental data joins. For detailed information on how Hummock achieves this, please refer to Shared Indexes and Joins in Streaming Databases. Here, we will briefly explain how Hummock implements consistency in reading.
Hummock uses an Epoch bound to the Barrier as the MVCC version for all written data. This allows us to specify the version to read in Hummock using the Barrier that the current operator has passed through. For a given query Epoch, if a target key has a version number greater than the Epoch, it ignores that version of data and locates the latest (newer) version equal to or less than the Epoch.
Similarly, when users query the Materialize View or intermediate state of data, as the queried data may involve multiple ComputeNode nodes, we need a consistent snapshot to ensure the correctness of query results. To achieve this, the Frontend obtains the most recently committed Barrier from the MetaServer at the beginning of each SQL statement or transaction. This Barrier is used as the query Epoch version number. Subsequently, all queries sent from the Frontend to all ComputeNodes use this Epoch to query data.
Suppose a key has multiple versions:
key1: epoch=5, value=v5
key1: epoch=4, value=v4
key1: epoch=3, value=v3
If a user query with epoch=4 is still ongoing, even though the version epoch=4 has been overwritten by epoch=5, we must retain this data during compaction and only remove the epoch=3 version. To determine which data can be reclaimed, RisingWave maintains the epochs of ongoing queries at all Frontend nodes. It periodically reports the minimum epoch of unfinished queries to the MetaServer. The MetaServer collects all reported epoch values and the currently committed barriers, taking the minimum value (safe epoch), and sends it to the Compactor node. The Compactor then follows the rules described earlier to only reclaim historical version data below the safe epoch.
For Streaming operators, since their queries are always greater than or equal to the committed barrier and the current system's safe epoch, no additional data structures need to be maintained.
Storage engines in LSM Tree architecture split data files into multiple layers based on write order or other rules. This means that even when reading a very small range of data, it's still necessary to query multiple files, leading to additional I/O and computation overhead. A popular solution is to create a Bloom Filter for all keys in the same file. When a query x`is encountered, the Bloom Filter is used to filter out unnecessary files, and then the remaining files are queried.
Typically, LSM Tree engines create a Bloom Filter for the entire key. However, RisingWave optimizes this by creating a Bloom Filter for the most appropriate part based on the operator's specific requirements. For example, for the SQL query below, RisingWave would create separate State Tables for A and P. When creating a Bloom Filter, it would only select seller field, allowing the query to filter State Table A for data corresponding to A.seller=P.id when State Table P updates a data entry.
By creating Bloom Filters in this way, RisingWave can improve performance in more scenarios, avoiding unnecessary I/O and significantly boosting query performance.
CREATE MATERIALIZED VIEW nexmark_q3
AS
SELECT P.name,
P.city,
P.state,
A.id
FROM auction AS A
INNER JOIN person AS P on A.seller = P.id
WHERE A.category = 10
To improve the compacting speed of L0 files, we took inspiration from the design of the CockroachDB storage engine pebble.
As shown in the diagram, files committed by a checkpoint bound to the same barrier are placed in the same sub-level, known as an overlapping level. Subsequently, this overlapping level undergoes compaction to become a non-overlapping level, where multiple files within do not overlap.
This allows us to select only a portion of L1 files for compacting when choosing compact tasks from L0 to L1. This avoids selecting a massive and slow task, thereby increasing parallelism and throughput.
As an efficient streaming database, RisingWave provides sub-second data real-time capabilities. This means that user input data can reflect in query results in as little as one second. On top of this, RisingWave performs a checkpoint every 10 seconds. If a cluster node crashes for any reason and recovers later, RisingWave only needs to reprocess the last ten seconds of historical data to catch up with the latest user input. This significantly reduces the impact of failures on business operations.
To support such a high-frequency checkpoint, we have made various optimizations in the storage engine:
Different business scenarios have significant differences in write traffic and data distribution. When the base level is in the middle of compaction and there is hot data or other reasons causing slow compaction for a specific range of data, L0 data can accumulate. Since LSM Tree queries essentially involve multi-way merging, having too much L0 data can slow down query performance. RisingWave selects a portion of L0 files for merging based on certain strategies to accelerate queries, and we refer to such tasks as L0 Intra Compaction Tasks.
Since it is possible to have a small amount of written data, RisingWave calculates write amplification by considering the proportion of the largest file among the files participating in the merge. For example, if four files participate in the merge, and the largest file accounts for 50% of the total input size, we record a write amplification value of 2. This means that 100% of the computation and I/O were used to make 50% of the data unordered. We record the write amplification as 2. If three files participate in the merge, and the largest file accounts for 66.6% of the total input size, the write amplification is 3. To minimize write amplification, we currently filter out tasks in Intra Level Compaction with write amplification exceeding 3.
CONCLUSION
Hummock was designed from the ground up to be a cloud-native storage engine for streaming computations. We are constantly evolving to achieve faster calculations and lower costs. In the future, we will gradually introduce features such as local file cache Disk Cache to improve I/O efficiency and a Serverless Compaction service for automatic scaling based on loads, further reducing the cost of cloud-based streaming computing.
Wei Liu
Software Engineer
In this article, we'll show you how to set up a continuous data pipeline that seamlessly captures changes from your Postgres database using Change Data Capture (CDC) and streams them to Apache Iceberg.
By combining platforms like EMQX for industrial data streaming and RisingWave for real-time analytics, manufacturers can tap into machine-generated data as it happens, enabling predictive maintenance, reduced downtime, and improved efficiency. This integrated approach allows industries to respond swiftly to equipment failures, optimize production, and make data-driven decisions that boost overall equipment effectiveness (OEE) and operational agility.
In this article, we’ve demonstrated how to build a core fraud detection system using RisingWave. With minimal setup, you can easily integrate these components into your existing technical stack and have a functional fraud detection solution up and running.