Apache Iceberg exactly-once ingestion means every source record lands in your Iceberg table exactly one time: no duplicates from retried writes, no missing records from failed commits. Iceberg's snapshot model makes this achievable at the storage layer, but the responsibility for coordinating the commit protocol falls on your streaming engine.
Why Exactly-Once Matters for Iceberg Tables
Iceberg tables are not just files on object storage. They are versioned, transactional datasets with downstream consumers: BI dashboards, ML feature pipelines, and other Iceberg readers that expect consistent, non-duplicated data.
At-least-once semantics, the default for many streaming pipelines, cause duplicate records in Iceberg tables every time a job restarts. At-most-once semantics drop records on failure. Neither is acceptable when Iceberg tables serve as the authoritative source of truth for analytics or regulatory reporting.
Exactly-once matters because data engineers cannot tolerate manual deduplication at query time. Deduplication downstream shifts operational burden, increases query cost, and still leaves a window of inconsistency before deduplication runs.
How Iceberg's Snapshot Model Enables Exactly-Once
Iceberg stores table state as a sequence of immutable snapshots. Each snapshot has a globally unique snapshot ID and points to a manifest list that describes the set of data files belonging to that version.
A write operation does not modify existing files. It writes new Parquet data files to object storage, then atomically updates the metadata pointer in the catalog from the current snapshot to a new one. The catalog update is the commit: before it completes, the new files are invisible to readers; after it completes, the new snapshot is durable.
This atomic snapshot advance is the foundation for exactly-once. If a commit succeeds, its snapshot ID is permanently recorded. If a commit fails before the catalog update, the data files exist in object storage but are invisible to readers because no snapshot references them. The engine can safely retry by writing new data files and attempting the catalog commit again.
The Two-Phase Commit Protocol
Exactly-once in streaming systems requires a two-phase commit (2PC) protocol that coordinates between the streaming engine's checkpoint mechanism and the Iceberg catalog.
Phase 1: Pre-commit (prepare). When the streaming engine triggers a checkpoint, each sink operator flushes its buffered records to Iceberg data files. The data files are written to object storage but not yet committed to the catalog. The operator records the file metadata (paths, byte counts, row counts) in its checkpoint state. At this point the data is durable in object storage but invisible to readers.
Phase 2: Commit. Once all operators confirm their checkpoint is complete, the engine signals the commit phase. The sink operator submits the file metadata accumulated during phase 1 as a new Iceberg snapshot commit to the catalog. The catalog atomically advances the snapshot pointer. Readers can now see the new data.
The separation between these phases is critical. Phase 1 can be repeated safely because writing data files to object storage is idempotent by content. Phase 2 must happen exactly once per checkpoint, which is enforced by the snapshot ID.
Snapshot ID as Idempotency Token
The snapshot ID is the mechanism that prevents duplicate commits on restart.
When a job restarts after a failure, it restores its state from the last successful checkpoint. That state includes the file metadata from phase 1. Before the engine retries the phase 2 commit, it checks the Iceberg catalog: does a snapshot with the expected ID already exist?
If the snapshot exists, the commit already succeeded before the failure. The engine skips the retry. If the snapshot does not exist, the commit is attempted fresh. This check-then-commit pattern makes the commit operation idempotent.
For this to work, the streaming engine must persist the intended snapshot ID as part of its checkpoint state during phase 1. This is a non-trivial requirement: it means the engine must generate the snapshot ID before the commit attempt, store it durably, and use it as the idempotency key on retry.
What Can Go Wrong
Partial writes without commit. If a job crashes after writing data files but before completing the phase 2 catalog commit, orphan files accumulate in object storage. Iceberg's garbage collection process eventually cleans these up, but they are not visible to readers and do not cause duplicates. This is the safe failure mode.
Duplicate commits on restart. If the engine loses its checkpoint state (or does not implement 2PC correctly), it may reprocess source records from a prior offset and commit a second snapshot containing the same records. This is the dangerous failure mode. It results in duplicate rows that are invisible at the storage layer and require expensive downstream deduplication to fix.
Coordinator failures in distributed commits. When a sink is parallelized across many operator subtasks, each subtask writes its own data files. A coordinator process must aggregate the file metadata from all subtasks before submitting a single Iceberg snapshot commit. If the coordinator fails after some subtasks have confirmed but before the catalog commit, the aggregated metadata must be recovered from checkpoint state. Implementations that do not persist aggregated metadata durably will silently lose records or duplicate them.
Catalog connectivity issues. The Iceberg catalog (Hive Metastore, REST catalog, AWS Glue) is a distributed system with its own failure modes. A transient catalog write failure during the commit phase may leave the snapshot in an ambiguous state. The streaming engine must distinguish between "commit definitely failed" (safe to retry) and "commit may have succeeded" (must check before retrying).
How Flink Implements Exactly-Once Iceberg Ingestion
Apache Flink implements Iceberg exactly-once through its TwoPhaseCommittingSink interface. The IcebergSink in the iceberg-flink connector uses this interface to coordinate 2PC with Flink's checkpoint mechanism.
The Flink implementation works as follows:
- Each
IcebergStreamWritersubtask buffers records and flushes them to Parquet files on checkpoint trigger (phase 1). - The
IcebergFilesCommitteroperator, which runs as a single-parallelism coordinator, collects file metadata from all writer subtasks. - On checkpoint completion signal, the committer submits the aggregated file list as a new Iceberg snapshot.
This approach is functionally correct when configured properly, but it introduces operational complexity that teams frequently underestimate.
Checkpointing alignment. Flink's exactly-once mode requires checkpoint barriers to propagate through the entire operator graph before a checkpoint is considered complete. In high-throughput pipelines with skewed partitions, barrier alignment can cause latency spikes or checkpoint timeouts. If checkpointing fails repeatedly, Flink falls back to at-least-once mode by default unless setTolerableCheckpointFailureNumber(0) is explicitly set.
Subtask coordination overhead. The IcebergFilesCommitter runs at parallelism 1, which creates a coordination bottleneck as the number of writer subtasks increases. All writer subtasks must complete their pre-commit phase before the committer can proceed. A single slow subtask delays the entire commit.
ZooKeeper or filesystem dependency for checkpoint storage. Flink checkpoints must be persisted to a highly available backend (typically S3 or HDFS). Configuring and maintaining this storage, along with checkpoint retention policies, is additional operational work.
Configuration surface. Achieving exactly-once with Flink and Iceberg requires correctly setting execution.checkpointing.mode, commit.target-file-size-bytes, write.distribution-mode, and catalog-type (Hadoop catalog does not support concurrent commits and must not be used for exactly-once). Missing any of these settings can silently degrade to at-least-once.
Here is a minimal Flink SQL configuration for Iceberg exactly-once:
-- Flink SQL: configure checkpointing and Iceberg sink for exactly-once
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '60s';
SET 'execution.checkpointing.min-pause' = '10s';
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'rest', -- REST catalog required; Hadoop catalog not safe
'uri' = 'http://catalog:8181',
'warehouse' = 's3://my-bucket/warehouse'
);
CREATE TABLE iceberg_catalog.db.events (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
ts TIMESTAMP(3)
) WITH (
'write.upsert.enabled' = 'false',
'write.distribution-mode' = 'hash'
);
-- Insert from Kafka source
INSERT INTO iceberg_catalog.db.events
SELECT event_id, user_id, event_type, ts
FROM kafka_source;
The key constraint: catalog-type must be rest (or hive). The Hadoop catalog uses a lock file on HDFS for concurrency control and is not suitable for production exactly-once workloads.
How RisingWave Handles Exactly-Once Natively
RisingWave is a PostgreSQL-compatible streaming database written in Rust with disaggregated storage on S3. Its Iceberg sink implements exactly-once through a fundamentally different architecture: commit correctness is a property of the database engine's internal state machine, not a configuration responsibility delegated to the user.
RisingWave's streaming operators run inside a transactional checkpoint system that persists operator state, input offsets, and output commit metadata together in a single atomic checkpoint. When the checkpoint completes, all three are durably consistent: the engine knows exactly what it has read, what it has computed, and what it has committed to Iceberg.
The Iceberg sink in RisingWave is a first-class database feature, not a connector plugin. Its 2PC logic is implemented in the engine's recovery path, which means:
- On restart, the engine restores its state from the last checkpoint, including the pending Iceberg commit metadata.
- The recovery process checks the Iceberg catalog for the snapshot ID that was pending at the time of failure.
- If the snapshot exists, recovery skips the commit. If not, it retries deterministically with the same file metadata.
There is no subtask coordination layer to configure. There is no separate committer operator. The parallelism of the sink is managed by the engine's scheduler, and commit aggregation happens inside the engine before any catalog interaction.
Capability Comparison: Flink vs RisingWave
| Dimension | Apache Flink + IcebergSink | RisingWave |
| Exactly-once guarantee | Yes, with correct configuration | Yes, by default |
| Commit coordination | User-configured TwoPhaseCommittingSink | Built into engine recovery path |
| Catalog requirement | REST or Hive (Hadoop catalog unsafe) | REST catalog recommended |
| Checkpoint backend required | Yes (S3/HDFS for HA) | Yes (S3 for state backend) |
| Parallelism of committer | 1 (bottleneck at scale) | Distributed, managed by scheduler |
| Configuration surface | High (checkpointing + catalog + distribution mode) | Low (CREATE SINK statement) |
| Query interface | Flink SQL or DataStream API | PostgreSQL-compatible SQL |
| Open source license | Apache 2.0 | Apache 2.0 |
| Runtime language | JVM (Scala/Java) | Rust |
| Storage architecture | Stateful operators on JVM heap | Disaggregated (S3 + remote state store) |
Production Checklist
Before running an Iceberg exactly-once pipeline in production, verify the following:
Use a catalog that supports concurrent writes. REST catalog (Iceberg REST specification) or Hive Metastore. Hadoop catalog uses HDFS lock files and is not safe for multi-writer or recovery scenarios.
Enable WAL or equivalent durability. For RisingWave, ensure the checkpoint storage (typically S3) is correctly configured and that the
state_storebackend is not in-memory. In-memory state does not survive restarts.Set
commit_checkpoint_intervalappropriately. In RisingWave, this controls how often the engine commits accumulated data to Iceberg. Lower values reduce data latency but increase catalog write frequency. A value of 60 seconds is a reasonable starting point for most pipelines.Monitor snapshot commit latency. A slowdown in catalog writes (e.g., REST catalog under load) will back-pressure the commit phase and potentially cause checkpoint timeouts. Alert on catalog write P99 latency.
Configure orphan file cleanup. Partial writes that never received a catalog commit leave orphan data files. Both Flink and RisingWave can produce these on failure. Iceberg's
expireSnapshotsanddeleteOrphanFilesprocedures should run on a schedule (daily or weekly) to reclaim storage.Validate idempotency under restart. Before production cutover, deliberately kill and restart your pipeline mid-checkpoint and verify that the row count in the Iceberg table matches the source exactly. Do not rely on theoretical guarantees without empirical validation.
RisingWave Iceberg Sink: Minimal Setup
Creating an Iceberg sink in RisingWave requires a single DDL statement:
CREATE SINK iceberg_events_sink
FROM events_source
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'rest',
catalog.uri = 'http://catalog:8181',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'db',
table.name = 'events',
commit_checkpoint_interval = 60
);
RisingWave handles the 2PC coordination, snapshot ID management, and recovery logic internally. There is no separate committer process to deploy or monitor.
FAQ
Does Iceberg's snapshot model alone guarantee exactly-once?
No. The snapshot model provides atomic commits and idempotency primitives, but the streaming engine must implement 2PC and persist commit state across restarts. If the engine does not save the pending snapshot ID in its checkpoint, restarts will re-read source data and produce duplicate snapshots.
Can I achieve exactly-once with the Hadoop catalog?
No. The Hadoop catalog uses an HDFS lock file for commit serialization. This mechanism is not safe for concurrent writers and does not provide the atomic commit semantics required for exactly-once in multi-subtask pipelines. Use a REST catalog or Hive Metastore.
What is the difference between idempotent writes and exactly-once?
Idempotent writes mean that retrying the same write operation produces the same result. Exactly-once means each logical record from the source appears in the sink precisely once. Idempotency at the file level (writing the same bytes twice) does not prevent duplicate records if the engine reprocesses source offsets after a restart without tracking what was already committed.
How does commit_checkpoint_interval affect exactly-once correctness?
It does not affect correctness, only latency. Setting a higher interval means data accumulates in the engine's internal buffer longer before being committed to Iceberg. The exactly-once guarantee holds regardless of this value because the engine commits atomically at each checkpoint boundary.
Does RisingWave support upsert (MERGE) mode for Iceberg exactly-once?
Yes. RisingWave supports upsert writes to Iceberg using primary key columns defined on the sink. The engine computes the change stream (inserts, updates, deletes) from the streaming computation and writes position-delete files plus new data files atomically. The 2PC guarantee applies to both append-only and upsert modes.

