Object Store Optimizations in RisingWave
In this blog, we'll look at three techniques in RisingWave that leverage the power of S3-compatible object stores to achieve better performance.
RisingWave uses a cloud object store as its underlying storage system, which provides high availability and scalability. While its APIs seem straightforward enough, they must be used carefully to avoid extra overhead. In this blog, we explore three techniques in RisingWave that leverage the power of S3-compatible object stores to achieve better performance.
1. Streaming Write
During LSM compaction or operator state flushing, we need to write sorted KV pairs into multiple SST files. A naive approach would be to buffer the entire SST in memory and launch a background task to upload it once it reaches capacity.
This approach has two drawbacks:
- CPU is not fully overlapped with network IO. Uploading does not occur until the first SST is built. While this may seem insignificant as other SSTs will still be pipelined, the overhead is considerable when the compaction target level is deep, where the SST capacity can hit 512 MiB.
- The memory consumption is unnecessarily large. We need a buffer as large as the SST capacity.
Since SSTs are read-only, there is no point in keeping them entirely in memory. Instead, we can overlap the building and uploading process and discard uploaded data immediately. This is where the streaming write feature of RisingWave comes in handy. It can significantly accelerate the writing process and reduce memory usage from proportional to the SST capacity to, ideally, a fixed constant (the upload buffer size).
Although the Rust S3 SDK already accepts a
Stream object as the input of the ordinary
PutObject request, it requires the object size to be pre-determined. This predetermination is impossible in our case because even if we have a capacity configuration, we cannot assume the maximum size of the last KV pair. Hence, we turn to multipart upload for the solution. It allows a single object to be uploaded as a set of parts; each part is a contiguous portion of the object's data.
Now, we only need a buffer as large as the part size. When the buffer is full, we flush its contents to the object store as a part of the object. After all the parts are uploaded, we can call
CompleteMultipartUpload to assemble them into the final object.
Some details also need to be addressed:
- Determine the part size. A comprehensive test covering varying SST capacities and part sizes reveals that 16 MB is the best part size for all SST capacities.
- Clean up the stale parts if the multipart upload is aborted. If the upload is interrupted due to network issues or compaction task cancellation, we must clean up the dangling parts to minimize the storage costs. Therefore, RisingWave automatically configures the bucket's lifecycle rule (if not previously configured) to clean up incomplete multipart uploads after one day.
- Lazily start the multipart upload. If the data size is smaller than the part size, each multipart upload introduces two more network IOs for
CompleteMultipartUpload, compared with
PutObject. So, the multipart upload is only initiated when the buffer is full for the first time; otherwise, we fall back to
In this micro-benchmark, we use AWS S3 as the storage backend. We prepare 1 GiB of data and compare the performance of the building and uploading SSTs with different capacities. Not surprisingly, the greater the SST capacity, the faster the streaming write is because the other method is time-consuming, waiting for the first SST to be built.
2. Streaming Read
During compaction, we also need to read and iterate through SST files. Similar to streaming write, the purpose of streaming read is to reduce memory consumption while reading large SST files from the object store. Previously, we would buffer all SST files in memory prior to performing a merge sort on them, meaning the memory overhead would be O(N.C), where N is the number of SSTs with overlapping key ranges, and C is their capacity. Streaming read allows us to keep only a small proportion of data being merged in memory, reducing the overhead to O(N.B), where B is the buffer size, which can be orders of magnitude smaller than C.
Since objects requested over the object store are already provided via an HTTP/1 stream, we simply needed to take advantage of it by converting the
GetObject response into a stream-like object. More specifically, AWS SDK provides a function
into_async_read to create a
tokio::io::AsyncRead object over the resultant HTTP body. We can then call
read_exact on it to read data like a byte stream. We rely on the buffering mechanism of HTTP to help us overlap the data fetch over the network and computation over the SST blocks.
To improve usability, we then create a
BlockStream abstraction over the SST byte stream. With SST meta, this yields the next block of the file, whose KV pairs can be parsed and merge sorted.
We tested the memory consumption of the compactor while compacting the stateful parts of streaming queries from TPCH.
Here is the memory consumption observed without streaming read:
Here is the memory consumption observed with streaming read:
3. Multiple Prefixes for S3
In AWS S3, each object stored in the bucket is identified by a key, e.g.,
mybucket/myprefix/ part is the prefix of the example S3 URI. The prefix is logically a partition, and there is no limit to the number of prefixes in a bucket. But a single prefix has its own limitations. Amazon claims that a prefix can support at least 3,500
PUT/COPY/POST/DELETE or 5,500
GET/HEAD requests per second. Applications may hit request throttle limits when the request rates rise to a certain high level (e.g., AWS SDK will report
TooManyRequestsException). As a cloud service, S3 has infinite scalability, which means it can scale automatically when the QPS of the application frequently reaches the threshold. But the prerequisite is that the application has organized the objects into different prefixes.
We used to store all SST files with the same key prefix,
hummock_001/, on the bucket. For example, the path to an SST file with id 1 would be
s3://BUCKET_NAME/hummock_001**/**1.sst. Then all read and write requests will go to the single prefix, becoming a bottleneck if we suffer a burst of request rates.
We employ a simple technique to set up a maximum number of prefixes for the cluster and distribute the SST files to different prefixes based on the hash value of their IDs. For example, we can configure the maximum number of prefixes to 128, which can theoretically support up to 3,500 * 128 write or 5,500 * 128 read requests per second.
It is worth mentioning that there can be a 30 - 60-minute delay before S3 finishes scaling its internal partitions to accommodate new request rates. But, if your business cannot tolerate the temporary throttle errors from S3, you should contact us directly to partition the bucket for you in advance manually.
In this blog, we discussed three object store optimization techniques that aim to reduce memory consumption and increase system throughput. The streaming write and read techniques are designed for efficient sequential data access, which relies on SST’s immutability and sortedness. The multiple prefixes for S3 optimize and prevent the request rate from becoming a bottleneck. The object store may look simple at first glance, but we must dig deep into its specifications and consider the system design to tap its full potential.