In RisingWave, we have developed our own cloud-native LSM storage engine called Hummock in Rust. We use it to store the state of stateful operators in stream processing.
Similar to general LSM storage engines, data newly written to Hummock is stored in a mutable mem-table. Under specific conditions, the mutable mem-table is frozen into an immutable mem-table. Ultimately, the immutable mem-table is written to an SST (Sorted String Table) file, which is stored persistently. Simultaneously, SST is added to the overlapping L0 in the LSM's metadata. After compaction, data from the overlapping L0 is moved to the non-overlapping LSM lower levels.
Stateful operators perform get
(point lookup) or iter
(range query) operations on the state storage.
When handling get
requests, after filtering through min-max and bloom filter, Hummock searches from the top-level mutable mem-table to the bottom-level non-overlapping levels. When it finds the corresponding key, it stops searching and returns the corresponding value.
When handling iter
requests, unlike get
requests, data within the given range may exist in any layer. Therefore, we need to merge the data from each layer. Each layer of data consists of several sorted data run. Both mutable mem-table and immutable mem-table are single sorted data runs in memory, while in overlapping L0, each SST itself is a single sorted data run. Finally, the SSTs in the non-overlapping levels do not overlap with each other, making each layer a single sorted data run. This allows us to perform multi-way merges on data from various layers to handle range queries.
In RisingWave, each sorted data run is abstracted as a HummockIterator. HummockIterator is a Rust trait, and each sorted structure implements this trait. The simplified definition of HummockIterator after simplification is as follows:
#[async_trait]
pub trait HummockIterator: Send + 'static {
async fn next(&mut self) -> HummockResult<()>;
fn key(&self) -> &[u8];
fn value(&self) -> Option<&[u8]>;
}
#[async_trait]
impl HummockIterator for MemtableIterator {...}
#[async_trait]
impl HummockIterator for SstIterator {...}
We use a MergeIterator to perform multi-way merging of multiple HummockIterators using a heap. Since HummockIterator is a trait, and there are multiple types of sorted structures that implement this trait (such as mem-table iterators, SST iterators), and Rust is a statically-typed language, we cannot directly place multiple types in the heap. Therefore, we use Box<dyn HummockIterator>
to unify multiple types of HummockIterator and obtain the following implementation of MergeIterator:
pub struct MergeIterator {
heap: BinaryHeap<Box<dyn HummockIterator>>,
}
#[async_trait]
impl HummockIterator for MergeIterator {
async fn next(&mut self) -> HummockResult<()> {
if let Some(top) = self.heap.peek_mut() {
top.next().await?
}
Ok(())
}
...
}
Dynamic Dispatch in the Code
In the code above, dynamic dispatch is used in two places, i.e., Box<dyn ...>
. One is used to unify multiple implementations of the HummockIterator
trait by using Box<dyn HummockIterator>
, and the other is related to the use of the #[async_trait]
macro.
Since the next
method may involve IO operations, such as fetching the next block from an SST, in the definition of HummockIterator
, next
is designed as an asynchronous method, allowing it to be suspended by the user-level scheduler while performing IO operations at the bottom of the call stack. Asynchronous methods in Rust do not immediately return their return values but instead return an anonymous type implementing the Future
trait based on the specific implementation code of the method. Therefore, for two asynchronous methods with the same return type, their intermediate returned Future
types differ due to the differences in the specific implementation code of the methods. However, trait objects with uncertain return types are not object-safe and cannot be used with Box<dyn ...>
. The async_trait
macro transforms the return values of asynchronous methods' implementations into BoxFuture
using dynamic dispatch, resulting in a unified return type.
While dynamic dispatch brings convenience to the code, it can incur significant overhead in CPU-intensive scenarios like multi-way merging. Therefore, an attempt is made to replace dynamic dispatch in the code with static dispatch to reduce runtime overhead.
Optimizing Dynamic Dispatch
Initially, we attempted to remove the async_trait
macro. After removing the macro, in the implementations of HummockIterator
, each implementation no longer returns a unified BoxFuture
but returns a type implementing the Future
trait corresponding to the code of its implementation. We can view that in different implementations of HummockIterator
, each has a type implementing the Future
trait as the associated type within this implementation of the HummockIterator trait. Thus, we modified the trait as follows, where NextFuture
is the associated type generated when implementing the next
method.
pub trait HummockIterator: Send + 'static {
type NextFuture:
Future<Output = HummockResult<()>> + Send;
fn next(&mut self) -> Self::NextFuture;
fn key(&self) -> &[u8];
fn value(&self) -> Option<&[u8]>;
}
In the implementations of HummockIterator
, we can use TAIT (trait alias impl trait) to specify the type implementing the Future
trait generated when implementing the next
method as the associated type NextFuture
of HummockIterator.
impl HummockIterator for MergeIterator {
type NextFuture =
impl Future<Output = HummockResult<()>>;
fn next(&mut self) -> Self::NextFuture {
async move {
if let Some(top) = self.heap.peek_mut() {
top.next().await?
}
Ok(())
}
}
...
}
However, this code will encounter an error in compile time:
fn next(&mut self) -> Self::NextFuture {
|--------- hidden type `[async block@src/lib.rs:87:9: 92:10]` captures the anonymous lifetime defined here
The reason for this issue is that the self
variable is used in the next
implementation's Future, and therefore, it captures the lifetime of self
. The error occurs because the lifetime capture is not specified in the return type. To solve this problem, we need to include lifetimes in NextFuture
. At this point, we can use Rust's Generic Associated Types (GAT) to add lifetimes to the associated type.
pub trait HummockIterator: Send + 'static {
type NextFuture<'a>:
Future<Output = HummockResult<()>> + Send + 'a
where Self: 'a;
fn next(&mut self) -> Self::NextFuture<'_>;
fn key(&self) -> &[u8];
fn value(&self) -> Option<&[u8]>;
}
With the above modification, we can define and implement HummockIterator with asynchronous methods without using the async_trait
. In our MergeIterator for multi-way merging, we can use the generic type of HummockIterator to replace the previous Box<dyn HummockIterator>
.
pub struct MergeIterator<I: HummockIterator> {
heap: BinaryHeap<I>,
}
Now, MergeIterator can only accept a single type that implements HummockIterator, but in practical applications, MergeIterator needs to accept multiple types of HummockIterator. In this case, we can manually forward different types of HummockIterator using an enum and combine them into one type as the generic parameter for MergeIterator.
pub enum HummockIteratorUnion<
I1: HummockIterator,
I2: HummockIterator,
I3: HummockIterator,
> {
First(I1),
Second(I2),
Third(I3),
}
impl<
I1: HummockIterator<Direction = D>,
I2: HummockIterator<Direction = D>,
I3: HummockIterator<Direction = D>,
> HummockIterator for HummockIteratorUnion<I1, I2, I3>
{
type NextFuture<'a> = impl Future<Output = HummockResult<()>> + 'a;
fn next(&mut self) -> Self::NextFuture<'_> {
async move {
match self {
First(iter) => iter.next().await,
Second(iter) => iter.next().await,
Third(iter) => iter.next().await,
}
}
}
...
}
Finally, a static type of MergeIterator has a specific type:
type HummockMergeIterator = MergeIterator<
HummockIteratorUnion<
// For mem-table
MemtableIterator,
// For overlapping level SST
SstIterator,
// For non-overlapping level sorted runs
ConcatIterator<SstIterator>,
>
>;
With this, we have completed the optimization of dynamic dispatch in the code.
Time Taken | Reduction in Time Taken | |
box dyn | 309.58 ms | 0% |
Single-type MergeIterator | 198.94 ms | 35.7% |
Multi-type MergeIterator | 237.88 ms | 23.2% |
The optimized code has achieved a significant performance improvement.
Code Simplification
In the code above, both the definition and implementation of HummockIterator require careful handling of associated types, resulting in complex code. In the latest Rust nightly version, Rust provides the impl_trait_in_assoc_type
feature, which allows us to define the Future
directly in the trait definition without using associated types. Additionally, if we use the async_fn_in_trait
feature, we can implement asynchronous methods in a trait without enclosing the code in an async block, treating it like a regular async method. Ultimately, we can simplify the code as follows:
pub trait HummockIterator: Send + 'static {
fn next(&mut self) ->
impl Future<Output = HummockResult<()>> + Send + '_;
fn key(&self) -> &[u8];
fn value(&self) -> Option<&[u8]>;
}
impl HummockIterator for MergeIterator {
async fn next(&mut self) -> HummockResult<()> {
if let Some(top) = self.heap.peek_mut() {
top.next().await?
}
Ok(())
}
...
}
Note: If it weren't for Tokio's requirement for Future to be Send in the trait definition above, you could directly define next as async fn next(&mut self) -> HummockResult<()>;
In conclusion, we have successfully optimized the dynamic dispatch in our Rust code for the LSM-tree iterator in RisingWave's Hummock storage engine. By transitioning from dynamic dispatch using
Box
to static dispatch with generic types, we achieved a significant reduction in execution time. Overall, our efforts in optimizing dynamic dispatch and simplifying the code have led to substantial performance gains, making Hummock even more efficient for handling stateful operators in stream processing.