CDC to Iceberg is a common data engineering challenge: you need the latest changes from your operational database replicated to your data lake in minutes, not hours. RisingWave solves this with a single SQL-native pipeline - connecting directly to Postgres or MySQL via built-in CDC, applying any transformations you need, and writing to Apache Iceberg tables on S3. No Debezium, no Kafka, no Flink.
TL;DR
RisingWave turns a four-component CDC stack (Debezium + Kafka + Flink + Iceberg writer) into a two-liner: one CREATE TABLE to attach to your database's WAL, one CREATE SINK to write to Iceberg. You get sub-minute latency, exactly-once delivery, and built-in compaction - without running a JVM cluster.
The Traditional CDC-to-Iceberg Stack (and Why It's Complex)
The "standard" architecture for syncing a database to a data lake looks like this:
- Debezium reads the database write-ahead log (WAL) and emits change events
- Apache Kafka buffers those events for durability and fan-out
- Apache Flink consumes Kafka topics, handles deduplication, and writes Parquet
- Iceberg writer manages table metadata, snapshots, and file commits
This works at large scale, but the operational surface is enormous. A few things that bite teams regularly:
Schema evolution is a cascade. Add a column to your Postgres table and you need to update the Debezium configuration, the Kafka schema registry entry, the Flink job's type mapping, and the Iceberg schema - in the right order. One mismatch causes silent data loss or job failure. Teams at Estuary have documented that Debezium's handling of schema changes is one of the most common production pain points.
The small-file problem is structural. Flink loves small, frequent commits to minimize latency. Iceberg hates thousands of tiny Parquet files because metadata bloat slows query planning. You end up running a separate compaction service to merge files - another component to operate.
Initial snapshot is slow and fragile. Traditional CDC tools do a sequential scan for the initial snapshot. On a 500M-row table, this can run for hours. If the replication slot fills during that time, Postgres will stop WAL cleanup and your disk fills up.
Backpressure crosses system boundaries. If your S3 bucket throttles writes and Flink backs up, that backpressure propagates back through Kafka into Debezium's replication slot. The slot stops advancing. Postgres starts accumulating WAL. Disk usage climbs. This failure mode is nasty because it starts silently.
Debugging latency across three hops is hard. Is the bottleneck Debezium reading the WAL, Kafka consumer lag, or Flink checkpoint intervals? You need three separate observability stacks to answer that question.
How RisingWave Simplifies This
RisingWave is a PostgreSQL-compatible streaming database with native CDC connectors built in. Instead of running four separate systems, you describe the pipeline in SQL and RisingWave handles the rest.
The architecture collapses to:
graph LR
A[(Postgres<br>or MySQL)] -->|WAL / binlog| B[RisingWave]
B -->|Parquet + metadata| C[(Apache Iceberg<br>on S3)]
D[BI Tools / Analytics] -->|SQL| C
What RisingWave handles internally:
- Parallel backfill using lock-free snapshot reads (no long sequential scans)
- Change stream ingestion from the database WAL or MySQL binlog
- In-memory transformations and enrichment via materialized views
- Transactional batch commits to Iceberg with exactly-once semantics
- Background Iceberg file compaction (no separate compaction service needed)
- Automatic schema change propagation when
auto.schema.change = 'true'
The result is one system to deploy, one set of metrics to watch, and one SQL dialect to learn.
Step-by-Step Tutorial
Prerequisites
- RisingWave running and accessible (see the quickstart guide)
- A Postgres 10+ database with logical replication enabled (
wal_level = logical) - An S3 bucket for Iceberg data
- A Postgres user with
REPLICATIONprivilege
To grant replication privilege:
-- Run in your Postgres instance (not RisingWave)
CREATE USER risingwave_cdc WITH REPLICATION LOGIN PASSWORD 'your_password';
GRANT SELECT ON TABLE your_table TO risingwave_cdc;
Step 1: Create the CDC Source Table
In RisingWave, you create a table that mirrors your upstream Postgres table. RisingWave manages the replication slot and publication automatically.
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT,
amount DECIMAL,
status VARCHAR,
updated_at TIMESTAMPTZ
) WITH (
connector = 'postgres-cdc',
hostname = 'your-postgres-host',
port = '5432',
username = 'risingwave_cdc',
password = 'your_password',
database.name = 'ecommerce',
schema.name = 'public',
slot.name = 'rw_slot_orders',
publication.name = 'rw_publication',
publication.create.enable = 'true'
);
Key parameters:
slot.name- RisingWave creates and maintains this replication slot in Postgrespublication.name- the logical replication publication; setpublication.create.enable = 'true'and RisingWave will create it if it doesn't existschema.name- defaults topublicif omitted
For MySQL, swap connector = 'mysql-cdc' and replace slot.name/publication.name with server.id (a unique numeric ID for the MySQL binlog reader).
Once this table is created, RisingWave immediately begins an initial snapshot backfill, then switches to streaming mode to consume ongoing changes.
Step 2: Transform with SQL (Optional)
Before writing to Iceberg, you can enrich, filter, or reshape the data using a materialized view. This view is maintained incrementally - RisingWave updates it as new CDC events arrive.
CREATE MATERIALIZED VIEW orders_enriched AS
SELECT
order_id,
customer_id,
amount,
status,
updated_at,
CASE
WHEN amount > 200 THEN 'high_value'
WHEN amount > 100 THEN 'medium_value'
ELSE 'standard'
END AS value_tier
FROM orders;
This query was verified against RisingWave 2.8.0:
order_id | customer_id | amount | status | updated_at | value_tier
----------+-------------+--------+-----------+-------------------------------+--------------
1 | 100 | 99.99 | completed | 2026-05-22 20:11:49.842+00:00 | standard
2 | 101 | 149.50 | pending | 2026-05-22 20:11:49.842+00:00 | medium_value
3 | 102 | 299.00 | completed | 2026-05-22 20:11:49.842+00:00 | high_value
(3 rows)
You can add joins to dimension tables, window aggregations, or any other SQL transformation at this stage. RisingWave propagates changes through the entire pipeline incrementally.
Step 3: Sink to Iceberg
Create a sink from your enriched view to an Iceberg table on S3. Use type = 'upsert' so that updates and deletes from Postgres are correctly reflected in Iceberg (not just appended as new rows).
CREATE SINK orders_to_iceberg
FROM orders_enriched
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
catalog.type = 'rest',
catalog.uri = 'http://your-iceberg-catalog:8181',
warehouse.path = 's3://your-bucket/warehouse',
s3.region = 'us-east-1',
s3.access.key = 'YOUR_ACCESS_KEY',
s3.secret.key = 'YOUR_SECRET_KEY',
database.name = 'replication',
table.name = 'orders'
);
If you don't have a REST catalog, you can use catalog.type = 'glue' for AWS Glue, or catalog.type = 'storage' for a file-based catalog directly on S3 (no catalog service required):
-- Storage catalog variant (no REST catalog needed)
CREATE SINK orders_to_iceberg_storage
FROM orders_enriched
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
catalog.type = 'storage',
warehouse.path = 's3://your-bucket/warehouse',
s3.region = 'us-east-1',
s3.access.key = 'YOUR_ACCESS_KEY',
s3.secret.key = 'YOUR_SECRET_KEY',
database.name = 'replication',
table.name = 'orders'
);
The optional create_table_if_not_exists = 'true' parameter lets RisingWave create the Iceberg table automatically if it doesn't exist yet.
By default, RisingWave commits to Iceberg every 60 seconds (commit_checkpoint_interval). You can tune this based on your latency requirements.
Step 4: Verify the Data
Once the sink is running, query status through RisingWave's system catalog:
-- Check CDC source lag
SELECT * FROM rw_table_fragments WHERE name = 'orders';
-- Check sink status
SELECT connector_name, sink_name, sink_type, status
FROM rw_sinks
WHERE sink_name = 'orders_to_iceberg';
On the Iceberg side, any query engine that supports Iceberg (Spark, Trino, DuckDB, Athena) can read the table directly from S3. RisingWave writes standard Iceberg v2 format.
Handling Schema Changes
When you run ALTER TABLE in Postgres (e.g., adding a column), RisingWave can propagate that change automatically to the downstream Iceberg table. Enable this when creating the CDC source:
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
-- ... columns ...
) WITH (
connector = 'postgres-cdc',
-- ... connection params ...
auto.schema.change = 'true'
);
With auto.schema.change = 'true', RisingWave detects the DDL change from the WAL stream and applies the equivalent schema evolution to the downstream Iceberg table. For more detail on what DDL operations are supported, see the schema change documentation.
Without this parameter, RisingWave will pause the pipeline and raise an error when it encounters an unrecognized DDL change, giving you a chance to handle it manually.
Key Takeaways
- RisingWave replaces Debezium, Kafka, and Flink with a single SQL-native system for CDC-to-Iceberg pipelines.
- The
postgres-cdcandmysql-cdcconnectors handle initial backfill, ongoing change capture, and exactly-once delivery - no external tools required. - Use
type = 'upsert'on the Iceberg sink to correctly reflect updates and deletes from your source database. auto.schema.change = 'true'propagates DDL changes automatically, eliminating manual schema migration steps.- RisingWave handles Iceberg file compaction internally - no separate compaction service or scheduled job needed.
For a deeper look at production considerations (WAL retention, backpressure, and recovery), see Postgres CDC to Iceberg: Lessons from Real-World Data Pipelines.
FAQ
Can I use this with MySQL instead of Postgres?
Yes. Replace connector = 'postgres-cdc' with connector = 'mysql-cdc' and replace slot.name/publication.name with server.id (a unique integer that identifies the RisingWave instance to MySQL's binlog). The Iceberg sink configuration is identical.
What happens if RisingWave restarts mid-pipeline?
RisingWave uses distributed checkpointing. On restart, it recovers from the last checkpoint - re-reading from the correct WAL position or binlog offset - and resumes without data loss or duplicate writes to Iceberg. The Postgres replication slot holds the WAL position open during recovery.
Does this work with existing Iceberg tables (not created by RisingWave)?
Yes, as long as the schema matches. Point the sink at an existing database.name and table.name, and RisingWave will append to or upsert into the existing Iceberg table. Use create_table_if_not_exists = 'false' (the default) to avoid accidentally overwriting your table definition.
RisingWave is open source (Apache 2.0) and runs on your laptop or in Kubernetes. Try the cloud-hosted version or join the Slack community to ask questions.

