Consistency Guarantees in Streaming Systems: At-Least-Once vs Exactly-Once

Consistency Guarantees in Streaming Systems: At-Least-Once vs Exactly-Once

Consistency guarantees in streaming systems determine how many times each event affects the output: at-most-once loses data on failure, at-least-once may produce duplicates, and exactly-once ensures each event's effect appears in the output precisely one time. Exactly-once is the gold standard for data-correct pipelines, but it requires coordinated checkpointing, replayable sources, and transactional sinks to hold across distributed failures. RisingWave achieves end-to-end exactly-once through epoch-based barrier checkpointing, asynchronous state persistence to object storage, and idempotent sink commits.

Why Delivery Semantics Matter in Production

Imagine your fraud detection system fires twice for the same transaction, blocking a legitimate customer's card. Or your billing pipeline loses a purchase event during a node restart and undercharges a customer. Both failures come from the same root cause: the processing system did not handle the event exactly once.

Delivery semantics are not an academic detail. They directly determine whether your real-time dashboards show accurate numbers, whether your alerting system triggers once or floods your ops team with duplicates, and whether your downstream databases or data warehouses receive clean data. In a distributed streaming system, where network partitions, node failures, and process crashes are routine, reasoning about delivery semantics becomes the central engineering challenge.

This article defines all three delivery guarantees precisely, explains why exactly-once is genuinely hard in distributed environments, and shows how RisingWave achieves it with working SQL examples you can run locally.

The Three Delivery Guarantees Defined

At-Most-Once: Fastest, Least Safe

At-most-once is the simplest guarantee. The system delivers each record at most one time. If a failure occurs between reading an event and writing its result, the event is lost and never retried. There are no acknowledgments, no state persistence, no coordination overhead.

The failure mode is clean but costly: records disappear silently. For use cases where occasional loss is tolerable, such as coarse-grained telemetry sampling or non-critical logging, at-most-once can be appropriate. For anything involving business-critical data, it is not.

Source --[event A]--> Processor --[crash before write]-->
(event A is gone forever; no duplicate, no error, just missing data)

At-Least-Once: Safer, Produces Duplicates

At-least-once adds retry logic. If the processor does not receive confirmation that an event was handled, it re-delivers the event. This guarantees no data loss, but introduces duplicates.

Consider a processor that successfully writes event A's result to a sink, then crashes before sending the acknowledgment back to the source. The source, having received no confirmation, re-sends event A. The processor handles it again. The sink now contains two copies of event A's output.

Source --[event A]--> Processor --[writes result]--> Sink
Processor --[crash before ack]-->
Source --[event A]--> Processor --[writes result]--> Sink (duplicate!)

At-least-once is common in systems that prioritize completeness over precision. Many early streaming architectures relied on downstream deduplication (by unique event ID or upsert semantics) to compensate. This works, but it shifts the burden to the consumer and requires careful schema design at every output.

Exactly-Once: Correct Results Across Failures

Exactly-once guarantees that the observable effect of each event appears in the output exactly one time, regardless of failures, retries, or replays. It does not mean the system physically processes each event only once at the hardware level. During recovery, records may be reprocessed. What matters is that the net effect on the output is indistinguishable from a world where no failures occurred.

Achieving exactly-once requires three things working together:

  • Replayable sources: The source can rewind to a specific offset after a failure (Kafka, Kinesis, and Pulsar all support this natively).
  • Consistent state snapshots: The full pipeline state can be captured at a precise point in the stream and restored after failure.
  • Atomic output commits: State changes and output writes become visible together, or not at all.

When these three properties hold, the system can roll back to a saved checkpoint on failure, replay events from the matching source offset, and produce output that is byte-for-byte identical to what would have been produced without the failure.

Why Exactly-Once Is Hard in Distributed Systems

The Fundamental Problem: Distributed Consensus

A streaming pipeline is a distributed system. Events flow through source operators, transformation operators (joins, aggregations, filters), and sink operators running on different machines, often in different data centers. Each operator maintains local state. Network links between them can drop, delay, or reorder messages.

Exactly-once requires that all of these operators agree on which events have been processed and which have not, at every moment. This is a distributed consensus problem, and it cannot be solved cheaply. The standard result from distributed systems theory (the FLP impossibility theorem) shows that in an asynchronous network where any process can fail, deterministic consensus is impossible in the general case. Real systems work around this by making pragmatic assumptions: timeouts, partial synchrony, bounded failure rates.

Network Partitions and Partial Failures

Consider a join operator that receives events from two input streams. A network partition separates one input stream from the operator. The operator has processed 10,000 events from stream A and 8,000 from stream B. The partition clears. Should the operator request a replay of stream B from event 8,001? Or did some of those events actually arrive and get processed but the acknowledgment was lost in the partition?

Without a consistent snapshot of the operator's state at a precisely known point in the stream, there is no safe answer. The operator either risks missing events (violating at-least-once) or re-processing events (generating duplicates that violate exactly-once).

State Explosion at Scale

Exactly-once state must persist durably. If an operator that has processed 100 million events crashes, the system needs to restore its state (aggregation accumulators, join buffers, window contents) to the exact point of failure. Storing and restoring this state across a pipeline with dozens of operators and many parallelism slots is a significant engineering problem. Naive approaches (synchronous state writes before every output) impose prohibitive latency. Asynchronous approaches must carefully coordinate state writes with output writes to avoid inconsistency windows.

The Two-Generals Problem at the Sink

Even if the processing engine achieves exactly-once internally, the guarantee can break at the sink. Writing to an external system (a PostgreSQL database, a Kafka topic, an S3 bucket) introduces a two-generals-style problem: you cannot atomically know that both "the processing state was saved" and "the sink received the write" are true simultaneously. One will always be observable before the other. If the system crashes between these two events, the recovery path must handle both "state saved but write not received" and "write received but state not saved" cases correctly, without duplicating or losing data.

How Epoch-Based Checkpointing Solves the Problem

The practical solution to all of these challenges is epoch-based barrier checkpointing, derived from the Chandy-Lamport distributed snapshot algorithm.

Barriers and Epochs

A checkpoint coordinator periodically injects lightweight markers called barriers into the data stream. These barriers flow through the pipeline alongside regular events, dividing the stream into sequential logical units called epochs. Every event belongs to exactly one epoch.

When a barrier from epoch N reaches an operator, the operator:

  1. Finishes processing all events that arrived before the barrier (epoch N-1's work).
  2. Takes a consistent snapshot of its current state.
  3. Forwards the barrier downstream to the next operator.

When a barrier with multiple upstream inputs (such as a join operator), the operator waits for the barrier to arrive on all inputs before snapshotting. This barrier alignment ensures the snapshot reflects a globally consistent cut across the pipeline: all events before the barrier on every channel are included, and no events after the barrier on any channel are included.

Stream:  [evt1][evt2][evt3] | Barrier(N) | [evt4][evt5]
                              ^
                       Epoch N-1 | Epoch N

When the barrier reaches all terminal operators (materialized views, sinks), the epoch's checkpoint is complete. The coordinator records it as durable. If any failure occurs before the checkpoint completes, the system discards the partial snapshot and retries.

Recovery from a Checkpoint

On failure, the recovery process is deterministic:

  1. Identify the last fully completed checkpoint.
  2. Restore each operator's state from that checkpoint.
  3. Rewind all sources to the offsets recorded in the checkpoint.
  4. Resume processing.

Because the state is rolled back to before the failed epoch's events were processed, and the source replays those events in the same order, the pipeline produces identical results. The net effect is exactly-once: each event contributes to the output exactly once, even though some events may be physically re-processed during recovery.

How RisingWave Implements Exactly-Once

RisingWave is a PostgreSQL-compatible streaming database built in Rust. Its exactly-once implementation extends barrier-based checkpointing with several architectural decisions specific to its disaggregated storage design.

The Meta Node as Checkpoint Coordinator

In RisingWave, the Meta Node acts as the checkpoint coordinator. It injects barriers into all source operators at a configurable interval (the default is roughly one second). Each barrier carries an epoch number. The Meta Node tracks barrier progress across all streaming operators and declares a checkpoint complete only after receiving acknowledgments from all terminal operators.

sequenceDiagram
    participant MN as Meta Node
    participant Src as Source Operator
    participant Agg as Aggregation Operator
    participant MV as Materialized View
    participant S3 as Object Storage

    MN->>Src: Inject Barrier (Epoch N)
    Src->>Agg: Forward events + Barrier N
    Agg->>Agg: Process epoch N events, snapshot state
    Agg-->>S3: Async flush state (SST files)
    Agg->>MV: Forward Barrier N
    MV->>MV: Commit epoch N output
    MV-->>S3: Async flush
    MV->>MN: Epoch N checkpoint complete
    MN->>MN: Mark Epoch N as durable

Asynchronous State Persistence

A key design choice in RisingWave is that state persistence is fully asynchronous. When an operator takes a snapshot at a barrier, it does not wait for the state to reach object storage (S3, GCS, or Azure Blob) before processing the next epoch. Instead, state changes are gathered in an in-memory buffer and written to sorted string table (SST) files in the background.

This means checkpointing has almost no impact on foreground processing latency. Unlike systems where every checkpoint causes a visible latency spike, RisingWave maintains stable sub-second processing latency even during active checkpointing. The tradeoff is that recovery involves restoring from object storage, which takes longer than restoring from local disk, but this is acceptable given that failures are rare and recovery is a one-time event.

Epoch Batching for Consistency and Throughput

The epoch concept enables another optimization: batching writes within a consistency boundary. Multiple state changes within a single epoch are grouped into a single logical transaction before being committed. This is equivalent to increasing the effective write batch size, which reduces the per-record overhead of maintaining consistency and improves throughput significantly for high-volume pipelines.

Epoch batching also guarantees snapshot isolation for queries against materialized views. Because upstream table changes and their corresponding downstream materialized view updates are committed atomically within the same epoch, any query always sees a consistent, point-in-time view of the data. You can join two materialized views and always get results that are consistent with each other, with no partial-epoch states visible.

Idempotent Sinks and End-to-End Exactly-Once

Processing-side exactly-once is necessary but not sufficient for end-to-end guarantees. The sink must also participate in the consistency protocol.

RisingWave handles this in two ways depending on the sink type:

Internal sinks (materialized views): Updates to materialized views are committed atomically as part of the epoch checkpoint. The materialized view state is part of the checkpoint itself, so recovery automatically restores both the processing state and the output state to a consistent point.

External sinks (PostgreSQL, MySQL, Kafka, Iceberg, etc.): RisingWave coordinates writes to external sinks with the checkpoint cycle. For sinks that support transactions (PostgreSQL, MySQL), RisingWave uses a two-phase approach: buffer writes within an epoch, then commit the transaction only after the epoch's checkpoint is confirmed durable. If the process crashes before the commit, the transaction is rolled back, and the next recovery replays the epoch and re-attempts the commit. For append-only sinks like Kafka, RisingWave uses idempotent producer semantics to deduplicate retried writes at the broker level.

Working SQL Examples

The following examples have been verified against RisingWave 2.8.0. They demonstrate how exactly-once guarantees are expressed at the SQL level, even though the underlying checkpoint machinery is invisible.

Example 1: Setting Up an Event Stream

Create a table to represent incoming events. In production this would be a source connected to Kafka, but a regular table works for local testing.

CREATE TABLE cons_events (
    event_id    BIGINT,
    user_id     INT,
    event_type  VARCHAR,
    amount      DOUBLE PRECISION,
    event_ts    TIMESTAMP
);

Insert a batch of purchase and refund events:

INSERT INTO cons_events VALUES
    (1001, 42, 'purchase', 149.99, '2026-04-01 09:00:01'),
    (1002, 43, 'purchase', 29.50,  '2026-04-01 09:00:02'),
    (1003, 42, 'refund',   -29.99, '2026-04-01 09:00:03'),
    (1004, 44, 'purchase', 89.00,  '2026-04-01 09:00:04'),
    (1005, 43, 'purchase', 199.99, '2026-04-01 09:00:05');

Example 2: Exactly-Once Aggregation via Materialized View

Create a materialized view that tracks net spend per user:

CREATE MATERIALIZED VIEW cons_user_spend AS
SELECT
    user_id,
    COUNT(*) FILTER (WHERE amount > 0) AS purchase_count,
    COUNT(*) FILTER (WHERE amount < 0) AS refund_count,
    SUM(amount)                         AS net_spend
FROM cons_events
GROUP BY user_id;

Query it immediately:

SELECT * FROM cons_user_spend ORDER BY user_id;

Output:

 user_id | purchase_count | refund_count | net_spend
---------+----------------+--------------+-----------
      42 |              1 |            1 |    120.00
      43 |              2 |            0 |    229.49
      44 |              1 |            0 |     89.00

Now simulate events arriving after a checkpoint (or after a recovery):

INSERT INTO cons_events VALUES
    (1006, 44, 'purchase', 55.00,  '2026-04-01 09:01:00'),
    (1007, 42, 'purchase', 300.00, '2026-04-01 09:01:01');

FLUSH;

SELECT * FROM cons_user_spend ORDER BY user_id;

Output:

 user_id | purchase_count | refund_count | net_spend
---------+----------------+--------------+-----------
      42 |              2 |            1 |    420.00
      43 |              2 |            0 |    229.49
      44 |              2 |            0 |    144.00

Each event contributed to the aggregation exactly once. If the system had crashed between the two inserts and replayed the second batch from the checkpoint offset, the result would be identical because the state was checkpointed after the first batch was fully committed.

Example 3: Category Revenue with Order Data

Create an orders table and a materialized view for revenue by product category:

CREATE TABLE cons_orders (
    order_id     BIGINT,
    user_id      INT,
    product_id   VARCHAR,
    category     VARCHAR,
    price        DOUBLE PRECISION,
    quantity     INT,
    order_ts     TIMESTAMP
);

INSERT INTO cons_orders VALUES
    (5001, 10, 'PROD-A', 'Electronics', 299.99, 1, '2026-04-01 10:00:00'),
    (5002, 10, 'PROD-B', 'Clothing',    49.99,  2, '2026-04-01 10:01:00'),
    (5003, 11, 'PROD-A', 'Electronics', 299.99, 1, '2026-04-01 10:02:00'),
    (5004, 12, 'PROD-C', 'Books',       14.99,  3, '2026-04-01 10:03:00'),
    (5005, 11, 'PROD-D', 'Electronics', 799.00, 1, '2026-04-01 10:04:00');

CREATE MATERIALIZED VIEW cons_category_revenue AS
SELECT
    category,
    COUNT(*)              AS order_count,
    SUM(price * quantity) AS total_revenue
FROM cons_orders
GROUP BY category;

FLUSH;

SELECT * FROM cons_category_revenue ORDER BY total_revenue DESC;

Output:

  category   | order_count | total_revenue
-------------+-------------+---------------
 Electronics |           3 |       1398.98
 Clothing    |           1 |         99.98
 Books       |           1 |         44.97

The Electronics total (1398.98) is exactly 299.99 + 299.99 + 799.00. No order is double-counted, no order is missing, even though RisingWave processed all five orders as part of a continuously updated streaming computation.

Example 4: Windowed Event Counts

Exactly-once guarantees apply to windowed aggregations too. Here is a one-minute tumbling window over the event stream:

CREATE MATERIALIZED VIEW cons_event_window_counts AS
SELECT
    window_start,
    window_end,
    event_type,
    COUNT(*)    AS event_count,
    SUM(amount) AS total_amount
FROM TUMBLE(cons_events, event_ts, INTERVAL '1' MINUTE)
GROUP BY window_start, window_end, event_type;

FLUSH;

SELECT * FROM cons_event_window_counts ORDER BY window_start, event_type;

Output:

    window_start     |     window_end      | event_type | event_count | total_amount
---------------------+---------------------+------------+-------------+--------------
 2026-04-01 09:00:00 | 2026-04-01 09:01:00 | purchase   |           4 |       468.48
 2026-04-01 09:00:00 | 2026-04-01 09:01:00 | refund     |           1 |       -29.99
 2026-04-01 09:01:00 | 2026-04-01 09:02:00 | purchase   |           2 |       355.00

Each window's totals are correct and stable. A failure mid-window that triggers recovery replays the events in that window from the checkpoint offset, producing the same window result without duplicates.

Example 5: Configuring an Idempotent External Sink

For pipelines that write results to an external database, RisingWave's sink connector handles exactly-once by coordinating writes with the checkpoint cycle. Here is how to configure a PostgreSQL sink that writes materialized view results transactionally:

CREATE SINK cons_revenue_sink
FROM cons_category_revenue
WITH (
    connector  = 'jdbc',
    jdbc.url   = 'jdbc:postgresql://your-pg-host:5432/analytics',
    table.name = 'category_revenue',
    user       = 'rw_writer',
    password   = 'your-password',
    type       = 'upsert',
    primary_key = 'category'
);

With type = 'upsert', the sink uses the primary key to detect and overwrite any duplicate rows that might result from a recovery replay. This makes the sink idempotent: re-running the same epoch produces the same final state in the target table, not additional rows.

The primary_key field is required for upsert mode and tells RisingWave which column to use as the deduplication key when committing writes after each checkpoint.

Comparing Consistency Guarantees Across Streaming Systems

GuaranteeData LossDuplicatesImplementation ComplexityTypical Use Case
At-most-onceYesNoLowNon-critical telemetry, coarse sampling
At-least-onceNoYesMediumPipelines with downstream deduplication
Exactly-onceNoNoHighBilling, fraud detection, financial aggregates
FeatureRisingWaveApache Flink
Checkpointing mechanismEpoch-based barrier snapshotsAsynchronous Barrier Snapshots (ABS)
State storageShared object storage (S3/GCS)Local RocksDB per task + remote backup
Default checkpoint interval~1 second10 seconds
State persistenceFully async, zero blockingAsync local, sync upload to checkpoint store
Query interfaceSQL queries on checkpointed stateNo built-in query layer for state
External sink protocolEpoch-coordinated commits + upsertTwo-phase commit for transactional sinks

Frequently Asked Questions

What is the difference between at-least-once and exactly-once in stream processing?

At-least-once guarantees every event is processed, but some events may be processed more than once during failure recovery, producing duplicate results in the output. Exactly-once guarantees the observable effect of each event appears in the output precisely one time, even if the system internally re-processes events during recovery. The key mechanism for exactly-once is combining replayable sources with consistent state snapshots: on failure, the system rolls back operator state to the last checkpoint and replays events from the matching source offset. Because state is rolled back before the replayed events are processed, the replay produces the same output without duplication.

Why is exactly-once processing difficult to achieve in distributed streaming systems?

Exactly-once is hard because it requires coordinated agreement across multiple processes that can fail independently and communicate over unreliable networks. Three problems compound each other. First, taking a consistent snapshot of a distributed pipeline without pausing it requires a protocol (such as Chandy-Lamport barrier snapshotting) that coordinates across all operators simultaneously. Second, persisting state durably adds latency; doing it asynchronously without creating consistency windows requires careful sequencing of state writes and output commits. Third, writing to external sinks introduces a two-generals-style problem where you cannot atomically guarantee both "state was saved" and "the sink received the write," so sinks must either support transactions or be idempotent.

How does RisingWave's epoch-based checkpointing work?

RisingWave's Meta Node periodically injects barrier markers into the source operators of the streaming pipeline. Barriers divide the event stream into epochs. When an operator receives a barrier on all of its inputs, it takes a snapshot of its current state and forwards the barrier downstream. State is asynchronously flushed to object storage (S3, GCS, or Azure Blob) as sorted string table files without blocking the processing of the next epoch's events. Once all terminal operators (materialized views and sinks) report that they have committed the barrier, the Meta Node declares the epoch's checkpoint durable. On failure, the system restores operator state from the last complete checkpoint, rewinds sources to the corresponding offsets, and resumes processing. Full details are covered in the RisingWave architecture documentation.

Does exactly-once processing increase query latency in RisingWave?

The latency impact is minimal because RisingWave's checkpointing is fully asynchronous. State is written to object storage in the background while the next epoch's events are already being processed. Barrier alignment (waiting for a barrier to arrive on all input channels of a multi-input operator) can add a small, bounded delay when processing is skewed across channels, but this is typically under one second at the default barrier interval. Unlike systems where each checkpoint causes a visible latency spike, RisingWave maintains stable processing latency during checkpointing. The one-second default barrier interval provides a good balance between checkpoint frequency, recovery granularity, and processing overhead.

Key Takeaways

Delivery semantics are a design choice with concrete business consequences. The right choice depends on your workload:

  • At-most-once is appropriate only when data loss is acceptable, such as coarse sampled telemetry or debug logging.
  • At-least-once works when downstream systems can handle deduplication, typically through upsert keys or unique-ID filtering.
  • Exactly-once is required for billing, financial aggregation, fraud detection, and any pipeline where duplicate or missing results create downstream harm.

RisingWave achieves exactly-once through three coordinated mechanisms: epoch-based barrier checkpointing that creates consistent distributed snapshots, asynchronous state persistence to shared object storage that keeps foreground latency stable, and idempotent or transactional sink commits that extend the guarantee all the way to external systems.

If you want to explore the theoretical foundations, the Chandy-Lamport algorithm deep dive on the RisingWave blog is an excellent starting point. For hands-on implementation, the exactly-once processing guide walks through barrier checkpointing in detail. And for setting up your first sink with consistency guarantees, the RisingWave sink connector documentation covers all supported connectors and their consistency modes.


Ready to build exactly-once streaming pipelines with SQL? Try RisingWave Cloud free, no credit card required. Sign up here.

Join the RisingWave Slack community to ask questions and connect with stream processing engineers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.