Building a Source Connector: A RisingWave Code Camp Experience
Immerse yourself in the RisingWave Code Camp experience of building a source connector. This blog offers valuable insights and practical guidance on creating robust and efficient source connectors, empowering you to enhance your data integration capabilities.
In a nutshell, RisingWave is a distributed, cloud-native streaming database. It is built from the ground up to ingest and process data from streaming messaging queues in a horizontally scalable way. I discovered the RisingWave Code Camp, a mentorship program for junior developers to advance their skills in software and database systems, through a friend who'd met the team at Kubecon EU. Unfamiliar with Rust and admittedly daunted at the prospect of working on a database, I was hesitant to apply but hey, I needed an excuse to learn Rust, and I was intrigued by the idea of working with the amazing people building RisingWave!
While the code camp offered a total of five projects to apply to, I chose the connector considering my familiarity with Rust and the relative expertise required for the other projects. Subsequently, I worked on a qualification task that involved building a publish-subscribe demo for Apache Kafka, complete with benchmarks, which was good enough for my mentors to choose me!
Source connectors are RisingWave components that enable data ingestion from streaming data sources. At the time of writing, RisingWave has connectors for Apache Kafka, Apache Pulsar, and Amazon Kinesis. Since PR#5476 is merged, RisingWave now has a source connector for Google's Cloud Pub/Sub (henceforth "pubsub") – a fully managed cloud streaming platform. In this blog, I take a look back at my time spent at the code camp implementing the connector.
At the starting stage of the camp, I met my mentors first on Slack and then on Zoom. Expectations were set, a rough timeline was drawn, and we decided to meet on a weekly basis alongside regular asynchronous communication on Slack.
RisingWave is written in Rust, a modern low-level language with a focus on safety owing to its unique data ownership model. Due to its design, it's unlike other languages I had worked with, so as my first task, my mentors directed me to the Rustlings exercises to learn the language. With additional help from the Rust Book and Rust by Example, I started getting familiar with the language and reached a point of elementary proficiency in around 2 weeks. Of course, I was not very confident about my skills with the language at this point, nor did I understand many of the complex parts well. My idea was to learn on the go as I worked on the connector, an approach I'd successfully used before while working on other projects, and I'd become familiar enough with the language to pull that off!
Although not entirely necessary to build a connector, it was important for me to understand how RisingWave is architected. Fortunately, we have an excellent set of docs that includes one on the architecture design. I also had an architecture review session with two of my mentors, Bohan Zhang and Yanghao Wang, where Yanghao walked me through parts of the database relevant to the connector. I'll be honest – I was overwhelmed. Still, but armed with the context from my review of the architecture documentation, the walkthrough made me more confident than ever about my understanding of the system and where the connector fits into it all.
Here's a gist: a source is divided into splits within RisingWave. A split is a unit that generally represents a logical shard/partition in the data source, which is then handed off to a
SplitReader. A reader is responsible for pulling messages from the source splits and converting them into a standard source-agnostic data type, the
Choosing a Pub/Sub Crate
Rust has a rich ecosystem of packages, called crates. For the Google Pub/Sub connector, we could work on a crate from scratch or find one to suit our purposes and fill in the gaps. After spending some time on crates.io – Rust's crate registry, and doing an analysis on the state of feature-support, development activity on their repositories and usage indicators like issues and discussions, I found an excellent crate in google-cloud-pubsub, written in asynchronous Rust and supporting most of the functionality presumably needed for the connector. Once the crate was chosen, I wrote a POC demo to test out the crate the way it would ostensibly be used in the connector.
Implementing the Connector
RisingWave extensively uses Rust macros to improve and streamline the developer experience. For the source connectors, it uses macros to generate concrete connector implementations at compile time. In practice, this means building a connector involves implementing some traits (Rust interfaces), which are then plugged into the codegen macros to avoid repeated boilerplate.
The first trait I implemented was the
SplitEnumerator. This trait is responsible for enumerating splits (usually each representing a logical partition in the source). This presented us with our first challenge owing to pubsub's unusual design. Google Pub/sub is a high-level system that abstracts any logical sharding GCP might use under the hood from the client. For a while I was unsure which way we should proceed. However, following a discussion with Bohan and Yanghao about implementation specifics and questions they had for me, we decided to, in Bohan's words, adjust the split abstraction in a top-down way. By allowing the number of splits as a configuration parameter on pub/sub sources, it was possible to distribute processing while reading from a single pub/sub topic!
This way, we could set up n splits to read from the pub/sub stream, which distributes the message load between active consumers. The goal is to distribute the stream processing between a number of splits that can handle the workload. The user is expected to create the source with the workload in mind.
SplitEnumerator, it was time to implement the
SplitReader trait. The reader is the part of the connector responsible for actually pulling in messages from the source, given a stored source split, and then yielding it to the compute node for processing and storage. To do this, the first part is pulling messages in the source-native format followed by converting it to the RW native
SourceMessage. Implementing this trait was fairly straightforward from a technical standpoint, barring some performance concerns which required upstream changes. I will discuss this in the next section.
With the split enumerator and reader done, the connector needed to support seeking back to offsets of consumed messages — an operation RisingWave uses to rollback the consumer state when things go wrong. We quickly ran into another issue due to pubsub's unusual design. In most streaming systems, all messages are retained for a set duration regardless of their acknowledgment status. In pub/sub, however, acknowledged messages are discarded by default unless configured otherwise, and retaining messages incurs extra storage charges. After some discussion, we decided to enforce the retain-on-acknowledge policy since rollbacks are a vital and frequently needed operation. Still, we also decided to only accept subscriptions premade by the user to ensure transparency into what the connector needs. To me, this felt like the optimal balance between getting what we need out of pub/sub and giving control to the user.
Contributing to Upstream
As a distributed, fault-tolerant, and performance-critical system, RisingWave posts some requirements that our pub/sub crate of choice did not meet originally. The first was the lack of a batch-acknowledgment API. Pub/sub requires messages to be acknowledged by consumers, so, it knows that they've been consumed and are safe to discard. A lack of acknowledgment from a consumer means the pub/sub will deliver the message again in a future pull. Messages may be acknowledged individually or in batches through a single API call, but the former's network overhead is not acceptable for RisingWave's high-volume use cases. This became the first upstream issue I created. The author revealed he already had plans to support the feature and was happy to provide a quick patch release!
Later, as we were discussing the critical seek-back functionality, I created yet another upstream issue followed by a pull request to support timestamp and snapshot-based seek operations in google-cloud-rust. Working on this pull request was challenging as I dealt with limitations of the Google pub/sub emulator used for testing and more, all made better with the prompt help of the crate maintainer. This was my first rust-based code contribution, a small yet proud accomplishment in my code camp experience.
Google Cloud SDKs usually tie authentication to JSON files on the host filesystem. For a dynamic distributed system like RisingWave, this constraint means leaking credentials out of the system boundaries to node filesystems, which is less than ideal and not used elsewhere in the system. Cue my final upstream contribution: supporting filesystem-independent authentication, where I patched google-cloud-rust with support for an additional environment variable which signals it to read credentials from the environment.
As a whole, the upstream contributions were the most personally rewarding smaller accomplishments made during the code camp, and I'm glad to have made them!
With the important parts done, we needed to make sure everything works and keep working through automated tests. Bohan and I discussed the tests in one of our final weekly calls. We started by discussing unit tests, for which we figured there weren't many testable parts within the connector. Next up were end-to-end tests, which are absolutely essential to verify that the connector as a whole behaves as expected. I was first directed to the existing
Sqllogictest-instrumented  e2e tests around Apache Kafka, followed by the question of how to use pub/sub in the test infrastructure. Because Pub/sub is a hosted cloud service that requires a GCP account, using it in tests would be trickier if it were not for the pub/sub emulator, which allowed the possibility of using a locally hosted emulator in the test environment. Bohan still had some concerns about the availability of the emulator's dependencies, but I figured that out eventually!
With the hows out of the way, I got down to integrating the test setup and emulator into the existing CI infrastructure managed by the RiseDev  tool. I'd not anticipated the complexity and perfectly structured way in which the tool was written. I dove into it independently and was proud of myself for integrating the pub/sub emulator, much like the other parts of the development environment managed by the tool! Besides integration with RiseDev, I also wrote scripts to install the emulator with some configuration targets to enable it in developer environments. Next, I wrote a little Rust script to set up the test data in the emulator, followed by a
Sqllogictest script to verify that things work. At this point everything worked perfectly for me locally, but the main goal was to have it working in the Continuous Integration process which required some more fiddling in true developer fashion.
The code camp was one of the most rewarding experiences I've had recently. I've been lucky to meet and work with some of the very brightest people I've been around. During the course of the code camp, I went from not knowing anything about Rust or streaming systems to being an enthusiastic novice of my new favorite language and writing code that has/will go into production in the most awesome streaming database out there.
Meetings with my mentors were an awkwardly fun time of my week I always looked forward to. We had a lot of experiences over my nearly three months long term at the camp, with some that really stood out. In particular, one of my fondest memories is when Bohan complimented my code editor. I was flattered when he said it looked like Vim, but alas, it was the good old Visual Studio Code, only looking a lot sharper! In that same call, we also discussed the horrifically slow compile times I'd been working with, initially attributed solely to my lack of an apple-silicon powered compilation machine (only later did I discover I also have a problem with my LLD linker setup!).
Perhaps most importantly, my mentors always handled difficult situations with patience, charm, and support. I had to take most of my calls while at the university, a crowded, often noisy place where it was hard for me to maintain a good connection. Once, at a critical meeting where we were to discuss my progress, I had to drop off the call with no warning due to persistent connection failures. Bohan only had reassuring words and encouraged better async communication once I managed to get through!
There is not much more I could say to express how fantastic my experience was, but that it made me realize just how much more I can learn about databases and streaming systems. For all that I learned and experienced, I'm certain I'm a better developer than when I came in!
Sqllogictest is a rust crate that provides a testing framework to verify the correctness of an SQL database. This crate implements a
Sqllogictest parser and runner in Rust. Check out the source code.
 For some context, RiseDev is the tool for RisingWave developers, built to enhance the developer experience by automating and encapsulating common processes and setup.