RisingWave can stream PostgreSQL changes directly into Apache Iceberg tables in real time using its built-in postgres-cdc connector. By reading PostgreSQL's logical replication log (WAL), RisingWave captures every INSERT, UPDATE, and DELETE and writes them to Iceberg as upserts — with exactly-once semantics and no custom Debezium infrastructure required.
Why PostgreSQL CDC to Iceberg?
PostgreSQL is the transactional workhorse of thousands of applications. But OLTP databases are not designed for analytics — complex queries slow down production systems, storage costs climb, and joining across multiple services requires data movement anyway.
The standard solution is to replicate PostgreSQL data to an analytical store. Traditional ETL (nightly batch exports, Airbyte) introduces hours of latency. Apache Kafka + Debezium + Flink reduces latency but adds significant operational complexity: you need Kafka, Schema Registry, Debezium connectors, and a Flink cluster — all to do what RisingWave handles in a single CREATE SOURCE statement.
RisingWave's postgres-cdc connector connects directly to PostgreSQL's logical replication slot, which means:
- No Kafka required for the CDC path
- Sub-second replication lag
- Exactly-once semantics backed by Iceberg's atomic commits
Prerequisites
Before starting, ensure your PostgreSQL instance is configured for logical replication:
-- In postgresql.conf:
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4
-- Create a replication user:
CREATE USER cdc_user REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
Also ensure the pgoutput or wal2json plugin is available (pgoutput ships with PostgreSQL 10+).
Step 1: Define the CDC Source
Connect RisingWave to PostgreSQL. The connector creates a logical replication slot automatically:
CREATE SOURCE product_catalog_cdc (
product_id BIGINT PRIMARY KEY,
sku VARCHAR,
name VARCHAR,
category VARCHAR,
price NUMERIC(12, 2),
stock_quantity INT,
is_active BOOLEAN,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
)
WITH (
connector = 'postgres-cdc',
hostname = 'postgres.prod.internal',
port = '5432',
username = 'cdc_user',
password = 'secure_password',
database.name = 'ecommerce',
schema.name = 'public',
table.name = 'products',
slot.name = 'risingwave_products_slot',
publication.name = 'risingwave_publication'
)
FORMAT DEBEZIUM ENCODE JSON;
RisingWave creates the risingwave_products_slot replication slot and the risingwave_publication publication on PostgreSQL automatically. From this moment, every change to the products table flows into RisingWave.
Step 2: Create an Enrichment Materialized View
Raw CDC streams often need enrichment before landing in the lakehouse. Here we join the product stream with a category metadata table:
-- Static reference data
CREATE TABLE category_metadata (
category VARCHAR PRIMARY KEY,
department VARCHAR,
margin_pct NUMERIC(5, 2),
is_seasonal BOOLEAN
);
-- Enriched product view
CREATE MATERIALIZED VIEW products_enriched AS
SELECT
p.product_id,
p.sku,
p.name,
p.category,
c.department,
c.margin_pct,
p.price,
p.price * (1 - c.margin_pct / 100) AS cost_basis,
p.stock_quantity,
p.is_active,
p.created_at,
p.updated_at
FROM product_catalog_cdc p
LEFT JOIN category_metadata c ON p.category = c.category;
This materialized view is continuously updated. When a product's price changes in PostgreSQL, the view updates immediately, and the downstream Iceberg sink writes the new row within the next checkpoint interval.
Step 3: Sink to Iceberg with Upsert
Use upsert mode so that updates and deletes in PostgreSQL are correctly reflected in Iceberg:
CREATE SINK products_iceberg_sink AS
SELECT * FROM products_enriched
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'product_id',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://ecommerce-lakehouse/warehouse',
s3.region = 'us-east-1',
database.name = 'ecommerce',
table.name = 'products'
);
RisingWave uses Iceberg's equality delete files to handle updates: when product_id = 42 changes price, RisingWave writes a delete record for the old row and a new data record for the updated row in the same Iceberg snapshot.
Handling Multiple Tables
Real CDC pipelines replicate dozens of tables. RisingWave handles each as an independent source-view-sink chain:
-- Orders table
CREATE SOURCE orders_cdc (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT,
product_id BIGINT,
quantity INT,
unit_price NUMERIC(12,2),
status VARCHAR,
ordered_at TIMESTAMPTZ,
shipped_at TIMESTAMPTZ
)
WITH (
connector = 'postgres-cdc',
hostname = 'postgres.prod.internal',
port = '5432',
username = 'cdc_user',
password = 'secure_password',
database.name = 'ecommerce',
table.name = 'orders',
slot.name = 'risingwave_orders_slot'
)
FORMAT DEBEZIUM ENCODE JSON;
CREATE MATERIALIZED VIEW orders_with_products AS
SELECT
o.order_id,
o.customer_id,
o.quantity,
o.unit_price,
o.status,
o.ordered_at,
p.sku,
p.category,
p.department
FROM orders_cdc o
JOIN products_enriched p ON o.product_id = p.product_id;
CREATE SINK orders_iceberg_sink AS
SELECT * FROM orders_with_products
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://ecommerce-lakehouse/warehouse',
s3.region = 'us-east-1',
database.name = 'ecommerce',
table.name = 'orders'
);
Monitoring Replication Lag
Check replication health with standard SQL:
-- Monitor CDC source lag
SELECT source_name, connector, status, upstream_version
FROM rw_sources
WHERE connector = 'postgres-cdc';
On the PostgreSQL side, monitor the replication slot lag:
-- On PostgreSQL: check slot lag
SELECT slot_name, confirmed_flush_lsn, pg_current_wal_lsn(),
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name LIKE 'risingwave_%';
Typical steady-state lag is under 1 second for moderate workloads.
CDC vs. Batch Comparison
| Dimension | Nightly Batch ETL | RisingWave CDC |
| Latency | Hours | Seconds |
| Infrastructure | ETL server + scheduler | RisingWave only |
| Deletes handled | Soft-delete workarounds | Native (upsert mode) |
| Exactly-once | Depends on implementation | Built-in |
| Schema changes | Manual migration scripts | Semi-automatic |
| PostgreSQL load | High (full table scans) | Low (WAL tail only) |
FAQ
Q: Do I need Kafka between PostgreSQL and RisingWave for CDC?
A: No. RisingWave's postgres-cdc connector reads directly from PostgreSQL's logical replication slot. Kafka is optional and only needed if other consumers also need the CDC stream.
Q: What PostgreSQL versions are supported?
A: RisingWave's postgres-cdc connector supports PostgreSQL 10 and later, which introduced native logical replication with the pgoutput plugin.
Q: How does RisingWave handle a PostgreSQL restart or failover? A: The replication slot persists on the primary PostgreSQL node. After a failover to a replica that has been promoted to primary, you may need to recreate the slot. RisingWave will resume from the latest confirmed LSN.
Q: Can I replicate tables without a primary key? A: For upsert mode, a primary key is required. For append-only mode (no updates/deletes), tables without primary keys are supported.
Q: What happens to the replication slot if RisingWave is down for a long time?
A: PostgreSQL retains WAL segments until the slot is consumed. If RisingWave is down for an extended period, WAL accumulates on disk. Monitor pg_replication_slots.pg_wal_lsn_diff and set max_slot_wal_keep_size in PostgreSQL to prevent disk exhaustion.
Get Started
Start streaming PostgreSQL data into your Iceberg lakehouse today:

