RisingWave writes to Apache Iceberg using a two-phase commit protocol: it buffers in-memory row changes, flushes them as Parquet data files to object storage, and then atomically commits a new Iceberg snapshot via the catalog. This guarantees exactly-once delivery even in the presence of worker failures or catalog unavailability.
Why the Write Path Matters
Understanding how your streaming engine writes to a table format isn't academic — it directly affects data freshness, file count, query performance, and failure recovery. A poorly designed write path produces thousands of tiny files that cripple Trino queries. A correct design batches writes, maintains ACID semantics, and recovers cleanly from failures without duplicate data.
RisingWave's Iceberg sink is designed around three principles:
- Exactly-once semantics — no duplicates even after restarts
- Snapshot isolation — readers always see a consistent view
- Efficient file layout — writes are batched to minimize small-file problems
The Commit Protocol
RisingWave uses Apache Iceberg's atomic table commit mechanism. Here is the sequence for each checkpoint interval:
- Buffering: As the stream processor emits new rows (inserts, updates, deletes), RisingWave buffers them in memory.
- Parquet flush: At each checkpoint, buffered rows are encoded as Parquet and written to the
warehouse.pathon S3 (or compatible storage). Files are immutable once written. - Manifest update: RisingWave constructs a new manifest file listing the newly written data files.
- Snapshot commit: RisingWave calls the Iceberg catalog's atomic commit API. The catalog performs an optimistic concurrency check — if the current snapshot ID matches what RisingWave expected, the commit succeeds and a new snapshot is appended to the metadata chain.
- Checkpoint confirmation: Only after the catalog confirms the commit does RisingWave acknowledge the checkpoint. If the process dies before acknowledgment, the entire operation is retried from the last successful checkpoint — producing no duplicates because the Parquet files from the failed attempt are simply orphaned (and cleaned up by Iceberg's
expire_snapshotsmaintenance procedure).
Upsert vs. Append-Only Modes
RisingWave supports two write modes for Iceberg sinks:
| Mode | Iceberg Operation | Use Case |
append-only | Append new data files | Immutable event logs, time-series |
upsert | Position deletes + appends | CDC pipelines, mutable dimensions |
In upsert mode, RisingWave leverages Iceberg's equality delete files (v2 format) or position delete files to mark old row versions as deleted before writing updated versions. This keeps the write path efficient — no full file rewrites — while maintaining correct semantics for readers.
Setting Up the Sink
Here is a complete example of an upsert sink for a CDC-sourced orders table:
-- Step 1: CDC source from PostgreSQL
CREATE SOURCE orders_pg (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT,
status VARCHAR,
total_amount NUMERIC(12, 2),
updated_at TIMESTAMPTZ
)
WITH (
connector = 'postgres-cdc',
hostname = 'postgres.internal',
port = '5432',
username = 'cdc_user',
password = 'cdc_pass',
database.name = 'orders_db',
table.name = 'orders'
)
FORMAT DEBEZIUM ENCODE JSON;
-- Step 2: Materialized view for enrichment
CREATE MATERIALIZED VIEW orders_enriched AS
SELECT
o.order_id,
o.customer_id,
o.status,
o.total_amount,
o.updated_at,
DATE_TRUNC('day', o.updated_at) AS partition_day
FROM orders_pg o;
-- Step 3: Upsert sink to Iceberg
CREATE SINK orders_iceberg_sink AS
SELECT * FROM orders_enriched
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://my-warehouse/data',
s3.region = 'us-east-1',
database.name = 'operations',
table.name = 'orders'
);
The primary_key parameter tells RisingWave which column to use for equality deletes. When an order's status changes in PostgreSQL, RisingWave emits a delete for the old row version and an insert for the new one — both landing in the same Iceberg snapshot.
File Format Details
RisingWave writes Apache Parquet files with the following characteristics:
- Compression: Snappy by default (configurable)
- Row group size: Tuned to produce row groups of ~128MB for optimal Parquet read performance
- Statistics: Min/max column statistics are written to enable Iceberg's partition pruning and column-level filtering
- Data file naming: UUID-based to prevent collisions across concurrent workers
For the delete path in upsert mode, RisingWave writes equality delete files — small Parquet files containing only the primary key columns of deleted rows. Iceberg readers merge these against the data files at query time.
Catalog Integration
RisingWave supports two catalog types for Iceberg sinks:
-- REST catalog (most flexible, works with Tabular, Nessie, Iceberg REST server)
CREATE SINK rest_catalog_sink AS SELECT * FROM my_mv
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-rest:8181',
warehouse.path = 's3://bucket/warehouse',
s3.region = 'us-east-1',
database.name = 'db',
table.name = 'tbl'
);
-- Storage catalog (catalog metadata lives alongside data files, no external service)
CREATE SINK storage_catalog_sink AS SELECT * FROM my_mv
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'storage',
warehouse.path = 's3://bucket/warehouse',
s3.region = 'us-east-1',
database.name = 'db',
table.name = 'tbl'
);
The REST catalog is preferred for production because it supports multi-writer concurrency control. The storage catalog is simpler but lacks the optimistic concurrency guarantees needed when multiple writers target the same table.
Exactly-Once Guarantees in Depth
The key to exactly-once is the checkpoint barrier mechanism. RisingWave injects periodic barrier messages into the stream. When a barrier reaches the Iceberg sink operator:
- All buffered data is flushed to Parquet files on S3.
- The sink records the intended Iceberg snapshot ID in a local state store (backed by RisingWave's own object storage).
- The catalog commit is attempted.
- If successful, the checkpoint completes. If the process dies here, the next startup reads the state store, detects the uncommitted snapshot, and retries the catalog commit (idempotent).
- If the commit fails (e.g., conflicting writer), RisingWave retries with exponential backoff.
This protocol means RisingWave never loses committed data and never writes duplicates — a significantly stronger guarantee than "at-least-once" systems that require deduplication at the reader.
FAQ
Q: How large are the Parquet files that RisingWave writes? A: This depends on your checkpoint interval and event throughput. At a 60-second interval with 10,000 events/second, you might produce files in the range of 50–200 MB. Use Iceberg's compaction to merge very small files.
Q: Does RisingWave support writing to Iceberg v2 tables with merge-on-read?
A: Yes. The upsert mode uses Iceberg v2 equality delete files, which implement merge-on-read semantics. Copy-on-write is not currently supported.
Q: Can multiple RisingWave instances write to the same Iceberg table? A: With a REST catalog that supports optimistic concurrency, yes. Each instance will attempt catalog commits independently; conflicts are resolved via retry. The storage catalog does not support multi-writer safely.
Q: How does RisingWave handle S3 eventual consistency?
A: RisingWave uses the s3:HeadObject call to verify that data files are visible before committing the catalog snapshot. This guards against the rare S3 read-after-write inconsistency window.
Q: What happens to orphaned Parquet files from failed commits?
A: They remain on S3 but are not referenced by any Iceberg snapshot. Run Iceberg's remove_orphan_files procedure periodically to reclaim storage.
Get Started
Explore RisingWave's full Iceberg sink documentation and connect with the community:

