To stream CDC data to Apache Iceberg with RisingWave: connect RisingWave's built-in Postgres CDC or MySQL CDC connector to your source database, create a materialized view to shape the data, and define an Iceberg sink with type = 'upsert'. RisingWave captures every INSERT, UPDATE, and DELETE and propagates them to Iceberg in real time — no Debezium, no Kafka, no custom code required.
What Is CDC and Why Sink It to Iceberg?
Change Data Capture (CDC) reads the database write-ahead log (WAL) to capture every row-level change — inserts, updates, and deletes — as they happen. This is the foundation for database replication, event-driven architectures, and keeping a data lake in sync with operational databases.
Traditionally, CDC data was landed in Kafka, then consumed by Flink or Spark to write into storage. This multi-hop approach adds latency and operational complexity.
RisingWave simplifies this by acting as both the CDC consumer and the stream processor. It connects directly to Postgres or MySQL replication slots, processes changes through SQL materialized views, and writes the results to Apache Iceberg — all in one system.
Why Iceberg for CDC Data?
Apache Iceberg is well-suited for CDC workloads for several reasons:
- Upsert support — Iceberg handles row-level updates and deletes natively through its delete file mechanism
- Schema evolution — When source database schemas change, Iceberg tables can evolve without full rewrites
- Time travel — Query the state of your Iceberg table at any point in time, invaluable for auditing and debugging
- Open access — Once in Iceberg, your CDC data is accessible to Athena, Trino, Spark, Snowflake, and more
Setting Up Postgres CDC to Iceberg
Step 1: Configure Postgres for Replication
Before connecting RisingWave, ensure your Postgres instance has logical replication enabled:
-- In postgresql.conf
-- wal_level = logical
-- max_replication_slots = 10
-- max_wal_senders = 10
-- Create a replication slot (optional; RisingWave can create it automatically)
SELECT pg_create_logical_replication_slot('risingwave_slot', 'pgoutput');
-- Grant replication privilege
CREATE USER risingwave_cdc WITH REPLICATION PASSWORD 'cdc_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO risingwave_cdc;
Step 2: Create the CDC Source in RisingWave
CREATE SOURCE orders_cdc
WITH (
connector = 'postgres-cdc',
hostname = 'postgres-primary.internal',
port = '5432',
username = 'risingwave_cdc',
password = 'cdc_password',
database.name = 'ecommerce',
schema.name = 'public',
table.name = 'orders',
slot.name = 'risingwave_orders_slot'
);
RisingWave automatically creates the replication slot if it doesn't exist and begins reading from the WAL. It first performs a consistent snapshot of the existing table data, then streams all subsequent changes.
Step 3: Transform the CDC Stream
Materialized views can enrich and filter CDC data before it reaches Iceberg:
CREATE MATERIALIZED VIEW orders_enriched AS
SELECT
o.order_id,
o.customer_id,
o.status,
o.total_amount,
o.created_at,
o.updated_at,
-- Derive some computed columns
EXTRACT(YEAR FROM o.created_at) AS order_year,
EXTRACT(MONTH FROM o.created_at) AS order_month,
CASE
WHEN o.total_amount >= 1000 THEN 'high_value'
WHEN o.total_amount >= 100 THEN 'medium_value'
ELSE 'low_value'
END AS value_tier,
o.shipping_country
FROM orders_cdc o;
Step 4: Sink to Iceberg with Upsert
CREATE SINK orders_cdc_to_iceberg AS
SELECT * FROM orders_enriched
WITH (
connector = 'iceberg',
type = 'upsert',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://my-lakehouse/warehouse',
s3.region = 'us-east-1',
database.name = 'replication',
table.name = 'orders'
);
The type = 'upsert' setting tells RisingWave to merge incoming changes into the Iceberg table using the primary key. Updates and deletes in Postgres are reflected in Iceberg within seconds.
Setting Up MySQL CDC to Iceberg
The process is similar for MySQL. First, enable binary logging:
-- In my.cnf
-- [mysqld]
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
-- server_id = 1
-- Grant permissions
CREATE USER 'risingwave_cdc'@'%' IDENTIFIED BY 'cdc_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'risingwave_cdc'@'%';
FLUSH PRIVILEGES;
Then create the source in RisingWave:
CREATE SOURCE inventory_mysql_cdc
WITH (
connector = 'mysql-cdc',
hostname = 'mysql-primary.internal',
port = '3306',
username = 'risingwave_cdc',
password = 'cdc_password',
database.name = 'inventory_db',
table.name = 'products',
server.id = '5401'
);
CREATE MATERIALIZED VIEW products_current AS
SELECT
product_id,
sku,
product_name,
category,
unit_price,
stock_quantity,
is_active,
last_updated
FROM inventory_mysql_cdc;
CREATE SINK products_to_iceberg AS
SELECT * FROM products_current
WITH (
connector = 'iceberg',
type = 'upsert',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://my-lakehouse/warehouse',
s3.region = 'us-east-1',
database.name = 'replication',
table.name = 'products'
);
Handling Deletes
When a row is deleted in the source database, RisingWave propagates the delete to Iceberg. Iceberg handles this through positional delete files (for MOR) or by rewriting affected data files (for COW). From the query perspective, the row simply disappears.
If you need to preserve deleted rows for auditing, add a soft-delete pattern in your materialized view:
-- Instead of propagating hard deletes, track delete status
CREATE MATERIALIZED VIEW orders_with_deletes AS
SELECT
order_id,
customer_id,
status,
total_amount,
created_at,
updated_at,
-- _op is available in CDC sources: 'I' = insert, 'U' = update, 'D' = delete
CASE WHEN _op = 'D' THEN TRUE ELSE FALSE END AS is_deleted,
NOW() AS replicated_at
FROM orders_cdc;
CDC Architecture Comparison
| Approach | Complexity | Latency | Requires Kafka | Code Required |
| RisingWave direct CDC → Iceberg | Low | Seconds | No | None |
| Debezium + Kafka + Flink → Iceberg | High | Seconds | Yes | Java/SQL |
| Debezium + Kafka + Spark → Iceberg | High | Minutes | Yes | Scala/Python |
| AWS DMS → S3 → Glue → Iceberg | Medium | Minutes | No | Config only |
| Fivetran/Airbyte → Warehouse | Low | Minutes | No | None |
RisingWave direct CDC is the simplest path to Iceberg with low latency. It requires no Kafka cluster, no separate CDC platform, and no custom code.
Multi-Table CDC Pipelines
Real applications have dozens of tables. Here's how to manage a multi-table CDC pipeline efficiently:
-- Define multiple CDC sources
CREATE SOURCE customers_cdc WITH (connector = 'postgres-cdc', hostname = 'postgres', port = '5432', username = 'replicator', password = 'secret', database.name = 'ecommerce', schema.name = 'public', table.name = 'customers');
CREATE SOURCE products_cdc WITH (connector = 'postgres-cdc', hostname = 'postgres', port = '5432', username = 'replicator', password = 'secret', database.name = 'ecommerce', schema.name = 'public', table.name = 'products');
-- Join CDC streams for an enriched view
CREATE MATERIALIZED VIEW orders_fully_enriched AS
SELECT
o.order_id,
o.total_amount,
o.created_at,
c.customer_name,
c.email,
c.loyalty_tier
FROM orders_cdc o
JOIN customers_cdc FOR SYSTEM_TIME AS OF PROCTIME() c
ON o.customer_id = c.customer_id;
Schema Changes in Source Databases
One of the trickier aspects of CDC is handling schema evolution. When a DBA adds a column to a Postgres table:
- Postgres starts including the new column in the WAL
- RisingWave detects the schema change and updates its internal representation
- Your materialized view needs to be updated to include the new column
- The Iceberg table needs to be evolved to accept the new column
This typically requires a manual step: update your RisingWave materialized view definition and run an ALTER TABLE on the Iceberg table before RisingWave tries to write the new schema.
FAQ
Q: Does RisingWave support CDC from Amazon RDS?
Yes. Amazon RDS for PostgreSQL and Aurora PostgreSQL support logical replication. Set wal_level = logical in your RDS parameter group. The connection configuration is the same as for self-hosted Postgres.
Q: Can I CDC from multiple databases into a single Iceberg table? Yes. Create multiple CDC sources, join or union them in a materialized view, and sink to a single Iceberg table. This is useful for consolidating sharded databases.
Q: What happens if the source database restarts? RisingWave maintains the WAL position (LSN for Postgres, binlog position for MySQL). After a database restart, RisingWave reconnects and resumes from where it left off with no data loss.
Q: How does RisingWave handle large transactions? RisingWave buffers large transactions in memory and processes them atomically when the transaction commits. Very large transactions (millions of rows) may require tuning memory settings.
Q: Is the initial snapshot of existing data included? Yes. When you first create a CDC source, RisingWave performs a consistent snapshot of the existing table data. All existing rows are replicated to Iceberg first, then ongoing changes are streamed in real time.
Start Your CDC Pipeline
Streaming CDC data to Apache Iceberg with RisingWave is one of the most powerful patterns for keeping your data lakehouse synchronized with operational databases.
Explore the full setup in the RisingWave documentation and share your CDC pipeline designs with the community on Slack.

