TL;DR
Traditional Iceberg ingestion pipelines require a streaming engine (Flink), a compaction job, and a scheduler - three moving parts for one table. RisingWave replaces all of them: one CREATE SINK statement continuously writes to Apache Iceberg with exactly-once semantics, second-level freshness, and no compaction overhead baked in by default.
The Batch Window Problem
Apache Iceberg is an excellent table format. It delivers ACID transactions, schema evolution, partition pruning, and time travel on top of object storage. But Iceberg itself does not stream data - it stores data. The ingestion layer is your problem.
The classic approach is to run an Apache Flink job that reads from Kafka, applies transformations, and writes to Iceberg via the Flink Iceberg sink. This works, but it introduces a structural tension that every team eventually hits.
The small files trap
Flink writes to Iceberg at checkpoint boundaries. A 1-minute checkpoint interval means 1,440 commits per day. Each commit produces a new set of data files. After a week of continuous operation, a single table can accumulate tens of thousands of files - each a few megabytes at best.
This is the small files problem. As documented in the Apache Iceberg GitHub issue #7568, users running Flink DataStream with CDC data from Kafka reported files "in KB units" - far too small to be read efficiently. Small files harm performance at every stage: the query engine must read more file footers, issue more GET requests against object storage, and do more work at planning time just to figure out what to scan.
The solution the community converged on is compaction: run a periodic RewriteDataFiles action to merge small files into larger ones. Flink's Iceberg sink v1.7 added built-in small-file compaction support. This helps - but it means you now run a background compaction service alongside your ingestion pipeline. Two systems to operate, two things to fail.
The latency-freshness trade-off
Flink's checkpoint interval is the knob that controls latency. Shorter checkpoints mean fresher data but more small files. Longer checkpoints mean fewer small files but a larger batch window - the gap between when an event happens and when it appears in the Iceberg table.
For dashboards that refresh every few minutes, a 5-minute batch window is acceptable. For fraud detection, compliance feeds, or downstream pipelines that trigger on new data, it is not. The fundamental trade-off does not go away by tuning a number.
Operational complexity
A production Flink-to-Iceberg pipeline typically requires:
- A Flink cluster (JobManager, TaskManagers, state backend)
- A Kafka source and Iceberg connector JAR
- A compaction job running on a schedule (Spark, Trino, or Flink itself)
- A snapshot expiration job to control metadata growth
- Monitoring for each of the above
That is a substantial infrastructure footprint for what is conceptually a write operation.
How RisingWave Writes to Iceberg
RisingWave is a streaming SQL database built for exactly this class of problem. It reads from event sources (Kafka, database CDC, object storage), maintains continuous queries as materialized views, and pushes results to downstream systems - including Apache Iceberg - as results change.
The Iceberg sink in RisingWave writes data at checkpoint boundaries, similar to Flink. What is different is what happens at each checkpoint:
Buffering: RisingWave accumulates output rows between checkpoints in memory, not in individual files. This allows it to write fewer, larger files per commit even at short checkpoint intervals.
Atomic snapshot commit: At each checkpoint, RisingWave writes Parquet files to object storage and commits an Iceberg snapshot atomically. Downstream readers see a consistent snapshot, never a partial write.
Exactly-once semantics: RisingWave uses its distributed checkpoint mechanism to guarantee that each row is written to Iceberg exactly once, even after a node failure and recovery.
No external compaction required: Because RisingWave buffers before writing and respects configurable thresholds (
commit_checkpoint_size_threshold_mb), it avoids the file proliferation that makes Flink→Iceberg pipelines require compaction.Upsert support: For CDC workloads, RisingWave supports upsert writes using Iceberg v2 equality deletes or merge-on-read semantics, controlled by the
write_modeparameter.
Data flow
flowchart LR
A[Kafka / PostgreSQL CDC / S3] --> B[RisingWave Source]
B --> C[Streaming SQL Transformations\nMaterialized Views]
C --> D[RisingWave Iceberg Sink]
D --> E[Apache Iceberg Table\nObject Storage]
E --> F[Trino / Spark / Snowflake\nQuery Engines]
RisingWave sits as a single stateful layer between your event sources and the Iceberg table. There is no separate transformation cluster, no compaction scheduler, and no batch job to coordinate.
Step-by-Step: Setting Up Streaming Iceberg Ingestion
The following examples show a complete end-to-end pipeline. All CREATE TABLE and CREATE MATERIALIZED VIEW statements were verified against RisingWave 2.8.0 running on localhost:4566. The CREATE SINK DDL was verified for syntax correctness (the engine returns a catalog connection error, not a syntax error, confirming the parameter names are accepted).
Step 1: Create the source
Connect RisingWave to a Kafka topic carrying order events:
CREATE SOURCE orders_source (
order_id VARCHAR,
user_id INT,
product_id VARCHAR,
amount DOUBLE PRECISION,
region VARCHAR,
created_at TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'orders',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
For PostgreSQL CDC, replace the connector block with connector = 'postgres-cdc' and the appropriate hostname, port, and slot parameters.
Step 2: Define a transformation (optional)
Create a materialized view that aggregates orders by region per minute. RisingWave maintains this view incrementally as new events arrive:
CREATE MATERIALIZED VIEW regional_sales AS
SELECT
region,
DATE_TRUNC('minute', created_at) AS window_start,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue
FROM orders_source
GROUP BY region, DATE_TRUNC('minute', created_at);
This was verified on a live instance with actual data. The view updates within seconds of new rows being inserted.
Step 3: Create the Iceberg sink
Append-only sink with REST catalog
Use this pattern when writing new records that never change - audit logs, event streams, append-only fact tables:
CREATE SINK regional_sales_iceberg_sink
FROM regional_sales
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
catalog.type = 'rest',
catalog.name = 'demo',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://my-lakehouse/warehouse',
s3.region = 'us-east-1',
database.name = 'analytics',
table.name = 'regional_sales',
commit_checkpoint_interval = '60'
);
Upsert sink for CDC workloads
Use this when the source can produce updates and deletes - database CDC streams, enriched dimension tables:
CREATE SINK orders_iceberg_sink AS
SELECT * FROM orders_source
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://my-lakehouse/warehouse',
s3.region = 'us-east-1',
database.name = 'analytics',
table.name = 'orders'
);
The primary_key parameter tells RisingWave which column to use for equality delete operations in Iceberg v2. RisingWave defaults to merge-on-read for upsert writes, which is efficient for write-heavy workloads. To switch to copy-on-write (better for read-heavy tables with infrequent updates), add write_mode = 'copy-on-write'.
Hive Metastore catalog variant
For environments using Hive Metastore:
CREATE SINK orders_hive_sink
FROM orders_source
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
catalog.type = 'hive',
catalog.name = 'demo',
catalog.uri = 'thrift://metastore:9083',
warehouse.path = 's3://my-lakehouse/warehouse',
s3.endpoint = 'http://minio:9000',
s3.access.key = 'minioadmin',
s3.secret.key = 'minioadmin',
s3.region = 'us-east-1',
database.name = 'analytics',
table.name = 'orders'
);
Key sink parameters reference
| Parameter | Required | Default | Notes |
|---|---|---|---|
connector | Yes | - | Must be 'iceberg' |
type | Yes | - | 'append-only' or 'upsert' |
catalog.type | Yes | - | 'rest', 'hive', 'storage', or 'glue' |
catalog.uri | Yes (rest/hive) | - | Catalog endpoint |
warehouse.path | Yes | - | S3/GCS/HDFS path |
database.name | Yes | - | Target database |
table.name | Yes | - | Target table |
primary_key | Yes (upsert) | - | Column(s) for equality deletes |
write_mode | No | merge-on-read | 'copy-on-write' for read-heavy tables |
commit_checkpoint_interval | No | 60 | Number of checkpoints between commits (default RisingWave checkpoint interval is ~1 s, so 60 ≈ 60 s) |
commit_checkpoint_size_threshold_mb | No | 128 | Trigger commit at this buffer size |
create_table_if_not_exists | No | false | Auto-create Iceberg table |
auto.schema.change | No | false | Propagate ADD COLUMN to Iceberg |
For the full parameter reference, see the RisingWave Iceberg sink documentation.
Comparing Approaches: RisingWave vs Flink Iceberg Sink
This is an honest comparison. Flink is a mature, battle-tested system with a large ecosystem. The question is not whether Flink works - it does. The question is what the trade-offs look like in practice.
Setup complexity
Flink: Deploy a Flink cluster, configure checkpointing, add the Flink Iceberg connector JAR, configure the catalog, write a DataStream or Table API job, deploy it, and configure a compaction schedule separately.
RisingWave: Run CREATE SOURCE and CREATE SINK. The catalog configuration is in the WITH clause. There is no separate deployment step for the sink logic.
For teams without dedicated Flink expertise, the RisingWave path is significantly shorter. For teams already running Flink for other workloads, adding an Iceberg sink to an existing job is straightforward.
Data freshness and latency
Both Flink and RisingWave commit at checkpoint boundaries. The key difference is what drives the checkpoint interval choice.
With Flink, reducing the checkpoint interval directly increases small file production. A 30-second checkpoint interval writing to a busy table can produce hundreds of files per hour, all needing compaction.
With RisingWave, commit_checkpoint_interval controls how many checkpoints pass before a commit (default: 60 checkpoints, which at RisingWave's ~1 s checkpoint cadence is roughly 60 seconds). The commit_checkpoint_size_threshold_mb parameter can trigger an early commit when buffered data exceeds the threshold, so high-traffic tables commit at a right-sized file boundary rather than waiting for the interval. On low-traffic tables, this avoids tiny commits entirely.
Both systems can deliver second-to-minute freshness in typical deployments.
Small files
Flink's community has worked to address this with the v1.7 built-in compaction feature. It helps, but it adds complexity: compaction runs as part of the Flink job, consuming resources that would otherwise go to data processing.
RisingWave avoids the root cause: by buffering in memory before writing and committing at size thresholds, it does not produce the proliferation of tiny files that triggers the need for compaction in the first place. There is no compaction service to operate.
Operational overhead
| Concern | Flink + Iceberg | RisingWave |
|---|---|---|
| Cluster to manage | Flink (JobManager + TaskManagers) | RisingWave |
| Transformation language | Java/Scala DataStream or Flink SQL | Standard SQL |
| Compaction job | Required (separate or built-in) | Not needed |
| Schema evolution | Manual or limited auto-detect | auto.schema.change = 'true' |
| Upsert/CDC support | Via equality deletes (complex config) | Built-in with type = 'upsert' |
| Monitoring | Flink Web UI + external metrics | RisingWave system tables |
Where Flink has the advantage
Flink has a larger connector ecosystem and a longer production track record in large-scale streaming deployments. If your team has already built pipelines around Flink's connector ecosystem and needs to write to multiple sinks in a single job, the migration cost may outweigh the benefits.
RisingWave is the better choice when: your team prefers SQL over Java/Scala, you want to avoid operating a compaction service, you are already using RisingWave for other streaming tasks, or you want a single system that handles ingestion, transformation, and Iceberg writes.
Key Takeaways
- The batch window problem in Iceberg comes from checkpoint-based ingestion - every commit creates files, and frequent commits create small files that degrade query performance.
- RisingWave uses size-threshold buffering to avoid small file proliferation, removing the need for a separate compaction service.
- A complete streaming Iceberg pipeline requires two SQL statements:
CREATE SOURCEandCREATE SINK. Transformations are expressed asMATERIALIZED VIEW. - RisingWave supports both append-only and upsert writes to Iceberg, with upsert suitable for CDC workloads from PostgreSQL, MySQL, and other transactional databases.
- Schema evolution in Iceberg is handled automatically with
auto.schema.change = 'true'- new columns added upstream propagate to the Iceberg table without manual intervention.
FAQ
Q: Does RisingWave support writing to an existing Iceberg table, or does it need to create the table?
A: Both. By default, the sink connects to an existing Iceberg table. Set create_table_if_not_exists = 'true' to have RisingWave create the table on first run using the schema inferred from the source. This is useful for bootstrapping new pipelines without pre-creating the Iceberg table manually.
Q: What happens if the RisingWave sink fails mid-commit? Will the Iceberg table have partial data?
A: No. RisingWave uses Iceberg's atomic snapshot mechanism - a commit either completes fully or does not appear in the table's snapshot history at all. Combined with RisingWave's exactly-once checkpoint semantics, the Iceberg table will never contain partial writes or duplicate rows from a recovery event.
Q: Can RisingWave write to Iceberg on GCS or HDFS, or only S3?
A: RisingWave supports S3 (and S3-compatible stores like MinIO), GCS (via gcs.credential), and HDFS. The storage backend is determined by the warehouse.path prefix and the corresponding credential parameters. The REST catalog spec is supported by many catalog backends including Hive, JDBC, and Nessie - check your catalog provider's docs for the REST endpoint URL. The same RisingWave SQL pattern applies regardless of which backend manages the Iceberg tables.
Ready to try it? RisingWave Cloud offers a free tier that includes Iceberg sink support. You can connect to an existing catalog in minutes. If you run into questions or want to share what you are building, the RisingWave Slack community is active and responsive.
For more on RisingWave and Apache Iceberg, see:

