My last job was developing a distributed OLTP database, and when it came to buzzwords like high availability and fault tolerance, I couldn’t help but associate them with multiple replicas, Paxos, WAL (Write-ahead Log), and so on. Maybe you feel the same? Yeah, we are both qualified system designers.
However, what I really want to say today is that stream computing is very different from databases. The whole point of stream computing is computation. If the data flow stops, it loses its meaning. Under the topic of “high availability,” databases want to ensure that every byte written is persisted and not lost, while the goal of stream computing is to quickly recover tasks and get the data pipeline flowing again.
Unfortunately, we cannot use Paxos to guarantee the high availability of stream computing, and it is also not worth allocating redundant nodes for computing tasks. So, what should we do? Before revealing the answer, please allow me to talk about the story of the Apollo Guidance Computer.
The Apollo Guidance Computer (AGC)
You may have seen many AGC memes on the Internet, such as: Your smartphone is millions of times more powerful than the Apollo 11 guidance computers. It’s true. The picture below is the AGC. This big iron box weighs 70 pounds (32kg) and can execute 43,000 instructions per second. In today’s terms, that’s 0.043 MHz, and it comes with a handmade 2K word RAM and 36K word ROM (word length is 15 bits). Today’s iPhone 15 Pro has a 3.78 GHz 6-core CPU, about 530,000 times more powerful, and we just use it to play Genshin Impact.
Limited by the length of the article, today we will focus on its software or OS design. It may be the world’s most famous and earliest real-time operating system, featuring:
- Digital display user interface (the black and white box in the picture)
- Peripheral IO devices, including control buttons, various radars and sensors
- Scheduler designed for real-time systems, based on a priority queue with a maximum length of 8
- Checkpoint mechanism, restart on failure to recover — this is our main focus today
The design that saved the moon landing from failure
Designers of the AGC believed that most software faults are transient. In fact, not only the AGC, but also your home router and various embedded devices all face the same problem. Network jitter, bad disk sectors, or memory bit flips, etc., faults like these happen every day in data centers. The AGC working in cosmic space is even more so.
Instead of expecting every exception to be handled correctly, why not restart the entire program? Yes, this has been proven time and time to be the most effective way. But rebooting will lose the current state, which is why checkpoints are needed. Programs record “good states” when they are running normally. Once bad thing occurs, they restart and recover to the last checkpoint.
During the braking phase (Phase 63) of Apollo 11’s moon landing, the AGC reported 1201 and 1202 errors several times and temporarily stopped working. Fortunately, the spacecraft successfully landed on the moon in the end. What happened during this period?
The braking phase is one of the most computation-intensive stages. The AGC continuously corrects the spacecraft’s posture through the navigation algorithm, thus smoothly landing on the moon. This program consumes 80% of the CPU, which is expected. (figure a)
However, something unexpected happened. The rendezvous radar, which was designed to be used in the return phase, had a hardware bug that sent 1,000 interrupts per second to the AGC without actually conveying any information. The AGC’s poor CPU was used up 15% by this bug, leaving only 5% redundancy. Fortunately, it still could function normally and nobody knows the radar’s problem. (figure b)
Buzz Aldrin, one of the moon-landing astronauts, ran the V16N68 program on the screen, which was used to monitor the moon-landing process, including displaying the current distance to the moon, velocity, etc., and updating in real-time. This program consumes about 10% of the CPU. Buzz had repeated this process many times in past simulation training. (figure c)
However, the remaining 5% of the AGC’s CPU was not enough to support this program. The task queue gradually became congested until the enqueue request for new tasks timed out, triggering a restart. The thing Aldrin least wanted to see happened - the PROG (program error) indicator light flashed, followed by the RESTART light illuminating. The AGC restarted. (figure d & e)
Luckily, the system recovered to Phase 63 from the checkpoint and continued the guidance program. Buzz’s manually run V16N68 was not included, after all, it was just an unimportant, stateless monitoring UI. The CPU load returned to 95% and could continue to function normally. (figure a)
At first, Aldrin didn’t know what the problem was, so he repeated the above process several times, until he noticed that the program’s abnormalities seemed to be related to him starting V16N68. Of course, thankfully, each time the AGC correctly recovered. All this has to thank the checkpoint-restart mechanism!
Back to 2023
Today, compared to many shiny new concepts, the checkpoint-restart mechanism can be said to be no name, and it doesn’t even have a canonical name. But its spirit exists in many places, such as the “let-it-crash” advocated by Erlang and actor systems. They just don’t emphasize the importance of preserving state but expect developers to keep it in the database.
Trying to Google this keyword, the results involved HPC, MPI, simulation, etc. They all have one thing in common: they are all long-time, computation-intensive tasks, just like streaming processing.
To the best of my knowledge, Apache Flink is the first streaming system to introduce checkpoints. Apache Flink innovatively applied the Chandy-Lamport algorithm to take checkpoint for streaming jobs, which I think is one of the critical reasons for its success. The Chandy-Lamport algorithm can achieve a globally consistent snapshot in a running distributed system, allowing us to recover a job without losing or duplicating any events i.e. exactly-once delivery.
Regarding of fault tolerance, both Flink and RisingWave essentially follow the same principles as the AGC:
- Record a recoverable checkpoint every once in a while and save it to persistent storage
- As soon as the system encounters an insurmountable problem, recover from the last checkpoint and attempt to continue
Moreover, RisingWave’s implementation is much lighter.
We hope to restart within a second like AGC, but the state of streaming jobs is too large! Depending on the workload, it could range from a few GB to several TB, and even with today’s hardware, it takes some time to restore all states.
From the very beginning, we regarded compute nodes (CNs) as almost stateless node. Specifically, we regarded data in CN memory and disks as caches, which can bring better performance, but is not necessary for the running of the job. Or in other words, it lazily loads checkpoints during job execution. This design allows RisingWave to restart tasks at the millisecond level without fully loading the state.
Another easily overlooked issue is the freshness of the checkpoint, or the frequency of checkpoints. If the AGC recovers to an earlier phase after restart, more steps will be needed to reach the state before the restart. The same is true for streaming. For example, if the last checkpoint was 30 minutes ago, then the recent 30 minutes of input will need to be replayed before it really reaches the current state.
Our solution is to checkpoint every 1s, that sounds quite frequent, doesn’t it? In fact, one put_object
every second is super easy for object storage like S3. RisingWave’s storage was designed from scratch, based on the LSM-Tree structure, each checkpoint only uploads those new changes, which allows it to efficiently perform high-frequency checkpoints. In benchmark testing, the performance was about the same at 1s, 10s or 1min checkpoint intervals.
In some terrible cases, we have observed the cluster attempting to recover at a rate of about ten times per second. Of course this is not a good news for that users. I am just trying to show the speed and reliability of the recovery mechanism itself. For reference, databases based on Paxos often take longer (e.g., 10s or more) to recover from failure.
The control plane, usually referring to Kubernetes, also played a vital role during this period. The control plane has an independent liveness detection mechanism. When there is a problem with the machine, it will instantly reschedule the load; or when the CN unexpectedly exits, the control plane will immediately bring it up. In most of the time, the control plane will ensure that a specified number of nodes are alive in the cluster.
Specifically, when a node is lost, RisingWave tends to wait for new nodes to join instead of immediately scaling in tasks to fewer nodes. Clearly, considering that failure is likely related to high load, scaling in at this point does not sound like a good idea.
So, about metadata?
Metadata is an exception. It is a true database but just for storing metadata, but our requirements for it are no different from a normal database — high availability, high performance, preferably with ACID transactions.
RisingWave’s metadata is stored in etcd (we are currently refactoring this part, changes may occur in the future), a multi-replica KV database. The Meta Service is almost stateless, which makes it easy to restart or migrate.
As we learned in school, network partitions can happen. The control plane cannot always guarantee the uniqueness of the Meta Service, and we should never assume that it does. Therefore, Meta Service implement leader election mechanism to ensure that there is at most one primary Meta Service at any time, providing metadata and scheduling services to the cluster.
Tolerating external system failures
The counterpart to internal fault is the failure of external systems. As a streaming system, there are often two or more systems upstream and downstream, such as consuming changes from Apache Kafka and eventually outputting to PostgresQL.
The processing of source-side exceptions is straightforward: treat it as “silent” - no new data coming in.
The sink side is more difficult: RisingWave’s SQL interface is so flexible that complex DAG tasks can easily be created. For example, one task may output to a data lake while also outputting filtered and grouped data to Kafka. If one of the downstream systems suddenly becomes slow or unavailable, all connected tasks will be impacted. RisingWave plans to add a buffer queue before the sink to solve this problem. In version 1.3, the Kafka append-only Sink is supported.
>
So there you have it, RisingWave’s fault tolerance and high availability are built on a checkpoint-recovery mechanism. With the efficient checkpoint and recovery, streaming jobs continually retry to recover from transient failures. > >
>
The persistence of data is outsourced to S3 — a multi-replica, high availability object storage (or any similar service), and metadata is in etcd (a multi-replica, high availability database). > >
>
Finally, if you are interested in Apollo 11 and AGC, I recommend you to watch this video, the figures about AGC in this article also come from here. > >