Introduction
Most teams extract data from PostgreSQL the same way: a nightly pg_dump or a scheduled ETL job that queries the production database, transforms the results, and loads them into a data warehouse or data lake. This approach works, but it has real costs. Your analytics data is always hours behind production. The batch export hammers your database with heavy read queries during off-peak hours. And if the export fails, you spend the next morning debugging instead of analyzing data.
Change Data Capture (CDC) offers a better path. Instead of periodically dumping the entire database, CDC reads PostgreSQL's Write-Ahead Log (WAL), the same log PostgreSQL uses for crash recovery and replication, to capture every insert, update, and delete as it happens. These changes stream into your data lake continuously, keeping it within seconds of production reality.
This tutorial walks you through building a complete PostgreSQL-to-Iceberg pipeline using RisingWave. You will ingest CDC data from PostgreSQL, transform it with SQL, and sink the results to Apache Iceberg tables, replacing nightly exports with a pipeline that runs continuously and keeps your data lake always fresh.
Why Should You Replace pg_dump with CDC for Your Data Lake?
Before building the pipeline, it helps to understand why CDC is worth the switch.
The problems with batch exports
The traditional approach to populating a data lake from PostgreSQL involves:
- Scheduling a cron job or Airflow DAG to run
pg_dumporCOPYqueries - Writing the output to CSV or Parquet files in object storage
- Running a Spark or dbt job to load and transform those files into your data lake tables
This pattern has several drawbacks:
- Stale data: Analytics always lag behind production by hours. A customer who upgraded their plan at 2 PM won't show up as upgraded until the next morning's export.
- Production load: Full table scans during export create I/O pressure on the production database, even if you read from a replica.
- All-or-nothing: If the export fails halfway through, you have to re-run the entire batch. There is no incremental progress.
- No deletes:
pg_dumpcaptures current state, not changes. If a row was inserted and then deleted between two exports, you never see either event.
How CDC solves these problems
PostgreSQL CDC works by reading the WAL (Write-Ahead Log), which records every data change at the row level. This gives you:
- Real-time freshness: Changes appear in your data lake within seconds, not hours
- Minimal production impact: WAL reading is lightweight compared to full table scans
- Incremental progress: If the pipeline pauses, it resumes from the last processed WAL position
- Full change history: You see every insert, update, and delete, not just the current state
RisingWave connects directly to PostgreSQL's logical replication protocol, no Kafka or Debezium required. It reads the WAL, applies your SQL transformations in real time, and writes the results to Iceberg.
What Do You Need Before Starting?
PostgreSQL prerequisites
Your PostgreSQL instance needs logical replication enabled. Here is the configuration:
-- Check current WAL level (must be 'logical')
SHOW wal_level;
If it is not set to logical, update postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
Restart PostgreSQL after changing these settings.
Create a database user with replication permissions:
-- Create a dedicated CDC user
CREATE USER cdc_reader WITH REPLICATION PASSWORD 'your_secure_password';
-- Grant access to the tables you want to replicate
GRANT USAGE ON SCHEMA public TO cdc_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_reader;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO cdc_reader;
For AWS RDS or Aurora PostgreSQL, set the rds.logical_replication parameter to 1 in your parameter group and reboot the instance.
RisingWave setup
Install RisingWave locally for testing:
# Using Docker
docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest single_node
Or sign up for RisingWave Cloud for a managed instance.
Object storage for Iceberg
You need an S3 bucket (or compatible object storage) for the Iceberg warehouse. Create one with versioning enabled:
aws s3 mb s3://my-iceberg-lake
How Do You Build the PostgreSQL to Iceberg Pipeline?
Here is the complete pipeline, step by step.
Step 1: Create the PostgreSQL CDC source in RisingWave
First, establish the CDC connection to PostgreSQL. RisingWave uses a shared source that can serve multiple tables:
CREATE SOURCE pg_ecommerce WITH (
connector = 'postgres-cdc',
hostname = '192.168.1.100',
port = '5432',
username = 'cdc_reader',
password = 'your_secure_password',
database.name = 'ecommerce',
schema.name = 'public',
slot.name = 'rw_ecommerce_slot'
);
Key parameters:
connector = 'postgres-cdc': Uses RisingWave's native PostgreSQL CDC connector (no Debezium needed)slot.name: A unique replication slot name. RisingWave creates this automatically if it does not exist.
Step 2: Create tables that mirror upstream PostgreSQL tables
For each upstream table you want to replicate, create a corresponding table in RisingWave:
-- Orders table
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT,
product_id INT,
quantity INT,
unit_price DECIMAL(10, 2),
total_amount DECIMAL(12, 2),
order_status VARCHAR,
payment_method VARCHAR,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
) FROM pg_ecommerce TABLE 'public.orders';
-- Customers table
CREATE TABLE customers (
customer_id BIGINT PRIMARY KEY,
first_name VARCHAR,
last_name VARCHAR,
email VARCHAR,
country VARCHAR,
signup_date DATE,
is_active BOOLEAN
) FROM pg_ecommerce TABLE 'public.customers';
-- Products table
CREATE TABLE products (
product_id INT PRIMARY KEY,
product_name VARCHAR,
category VARCHAR,
brand VARCHAR,
list_price DECIMAL(10, 2),
is_available BOOLEAN
) FROM pg_ecommerce TABLE 'public.products';
When you create these tables, RisingWave automatically:
- Takes an initial snapshot of the existing data in each PostgreSQL table
- Starts reading the WAL for new changes (inserts, updates, deletes)
- Applies changes in real time to keep the RisingWave tables synchronized
Step 3: Transform data with materialized views
Now you can write SQL transformations that run continuously. These materialized views update automatically as new CDC data arrives:
-- Daily order summary by customer and category
CREATE MATERIALIZED VIEW customer_order_analytics AS
SELECT
c.customer_id,
c.first_name || ' ' || c.last_name AS customer_name,
c.country,
p.category AS product_category,
COUNT(o.order_id) AS order_count,
SUM(o.total_amount) AS total_spent,
AVG(o.total_amount) AS avg_order_value,
MAX(o.created_at) AS last_order_at,
MIN(o.created_at) AS first_order_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE o.order_status != 'cancelled'
GROUP BY c.customer_id, c.first_name, c.last_name, c.country, p.category;
-- Product performance metrics
CREATE MATERIALIZED VIEW product_performance AS
SELECT
p.product_id,
p.product_name,
p.category,
p.brand,
p.list_price,
COUNT(o.order_id) AS total_orders,
SUM(o.quantity) AS total_units_sold,
SUM(o.total_amount) AS total_revenue,
AVG(o.total_amount) AS avg_order_value,
COUNT(DISTINCT o.customer_id) AS unique_customers
FROM products p
LEFT JOIN orders o ON p.product_id = o.product_id
AND o.order_status = 'completed'
GROUP BY p.product_id, p.product_name, p.category, p.brand, p.list_price;
-- Order fact table with denormalized dimensions
CREATE MATERIALIZED VIEW order_facts AS
SELECT
o.order_id,
o.customer_id,
c.first_name || ' ' || c.last_name AS customer_name,
c.country AS customer_country,
o.product_id,
p.product_name,
p.category AS product_category,
p.brand AS product_brand,
o.quantity,
o.unit_price,
o.total_amount,
o.order_status,
o.payment_method,
o.created_at,
o.updated_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id;
Step 4: Sink transformed data to Iceberg
Create Iceberg sinks for each materialized view. RisingWave writes to Iceberg continuously, creating new snapshots at each commit interval:
-- Sink customer analytics to Iceberg
CREATE SINK customer_analytics_iceberg FROM customer_order_analytics
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'customer_id,product_category',
warehouse.path = 's3://my-iceberg-lake/warehouse',
database.name = 'analytics',
table.name = 'customer_order_analytics',
catalog.type = 'rest',
catalog.name = 'analytics_catalog',
create_table_if_not_exists = 'true',
commit_checkpoint_interval = '60',
s3.access.key = '${AWS_ACCESS_KEY}',
s3.secret.key = '${AWS_SECRET_KEY}',
s3.region = 'us-east-1'
);
-- Sink product performance to Iceberg
CREATE SINK product_perf_iceberg FROM product_performance
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'product_id',
warehouse.path = 's3://my-iceberg-lake/warehouse',
database.name = 'analytics',
table.name = 'product_performance',
catalog.type = 'rest',
catalog.name = 'analytics_catalog',
create_table_if_not_exists = 'true',
commit_checkpoint_interval = '60',
s3.access.key = '${AWS_ACCESS_KEY}',
s3.secret.key = '${AWS_SECRET_KEY}',
s3.region = 'us-east-1'
);
-- Sink order facts as append-only for full history
CREATE SINK order_facts_iceberg FROM order_facts
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
warehouse.path = 's3://my-iceberg-lake/warehouse',
database.name = 'analytics',
table.name = 'order_facts',
catalog.type = 'rest',
catalog.name = 'analytics_catalog',
create_table_if_not_exists = 'true',
commit_checkpoint_interval = '60',
s3.access.key = '${AWS_ACCESS_KEY}',
s3.secret.key = '${AWS_SECRET_KEY}',
s3.region = 'us-east-1'
);
Step 5: Verify the pipeline
Check that everything is running:
-- View all active sinks
SHOW SINKS;
-- Check source status
SHOW SOURCES;
-- Query the materialized view to verify data
SELECT * FROM customer_order_analytics LIMIT 5;
Expected output:
customer_id | customer_name | country | product_category | order_count | total_spent | avg_order_value | last_order_at | first_order_at
-------------+-----------------+---------+------------------+-------------+-------------+-----------------+------------------------+------------------------
1 | Alice Johnson | US | Electronics | 12 | 3456.00 | 288.00 | 2026-03-29 10:23:00+00 | 2025-11-15 08:12:00+00
2 | Bob Chen | CA | Books | 5 | 127.50 | 25.50 | 2026-03-28 14:45:00+00 | 2026-01-20 09:30:00+00
3 | Carol Martinez | MX | Home & Garden | 8 | 892.00 | 111.50 | 2026-03-29 11:02:00+00 | 2025-12-03 16:22:00+00
How Do You Query the Iceberg Tables from Other Engines?
Once the data lands in Iceberg, any query engine that supports the Iceberg format can read it.
Querying with Trino
-- Connect to the Iceberg catalog and query customer analytics
SELECT
customer_name,
country,
product_category,
total_spent,
order_count
FROM analytics.customer_order_analytics
WHERE country = 'US'
ORDER BY total_spent DESC
LIMIT 10;
Querying with Spark SQL
-- Product performance report
SELECT
product_name,
category,
total_orders,
total_revenue,
unique_customers,
ROUND(total_revenue / NULLIF(total_orders, 0), 2) AS revenue_per_order
FROM analytics.product_performance
WHERE total_orders > 0
ORDER BY total_revenue DESC;
Querying with DuckDB
-- Lightweight analytics with DuckDB
INSTALL iceberg;
LOAD iceberg;
SELECT product_category, SUM(total_spent) AS category_revenue
FROM iceberg_scan('s3://my-iceberg-lake/warehouse/analytics/customer_order_analytics')
GROUP BY product_category
ORDER BY category_revenue DESC;
What Are the Key Differences from a Batch pg_dump Approach?
Here is a direct comparison:
| Aspect | Nightly pg_dump | PostgreSQL CDC with RisingWave |
| Data freshness | 12-24 hours | Seconds to minutes |
| Production database load | Heavy (full table scans) | Light (WAL reading) |
| Handles deletes | No (only captures current state) | Yes (full insert/update/delete) |
| Recovery from failure | Re-run entire export | Resume from last WAL position |
| Infrastructure | Cron + Airflow + Spark | RisingWave (single system) |
| Schema changes | Manual ETL updates | Automatic with auto.schema.change |
| Transformation language | Python/Spark | SQL |
The most significant operational improvement is eliminating the batch orchestration layer. With CDC, you do not need Airflow DAGs to schedule exports, Spark jobs to transform data, or monitoring to alert you when a nightly batch fails. The pipeline is a set of SQL statements that run continuously.
How Do You Monitor and Maintain the Pipeline?
Monitoring CDC lag
Check that RisingWave is keeping up with PostgreSQL changes:
-- Check the status of your sources
SHOW SOURCES;
If CDC lag grows, consider increasing RisingWave's compute resources or parallelism.
Handling replication slot growth
PostgreSQL retains WAL segments as long as a replication slot consumer has not read them. If RisingWave is down for an extended period, WAL segments accumulate and can fill your disk.
Monitor replication slot lag:
-- Run this on PostgreSQL
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots
WHERE slot_name = 'rw_ecommerce_slot';
Set a maximum WAL retention size in postgresql.conf:
max_slot_wal_keep_size = 10GB
Iceberg table maintenance
Schedule regular compaction and snapshot expiration for your Iceberg tables:
-- Expire old snapshots (run in Spark or Trino)
CALL analytics_catalog.system.expire_snapshots(
table => 'analytics.order_facts',
older_than => TIMESTAMP '2026-03-22 00:00:00',
retain_last => 100
);
-- Compact small files
CALL analytics_catalog.system.rewrite_data_files(
table => 'analytics.order_facts'
);
RisingWave Open Lake automates these maintenance tasks so you do not need to schedule them separately.
FAQ
Can I use this approach with AWS RDS PostgreSQL?
Yes. RisingWave's PostgreSQL CDC connector works with AWS RDS and Aurora PostgreSQL. You need to set the rds.logical_replication parameter to 1 in your RDS parameter group and reboot the instance. Then add postgres.is.aws.rds = 'true' to your CREATE SOURCE parameters.
Do I need Kafka or Debezium for PostgreSQL CDC with RisingWave?
No. RisingWave connects directly to PostgreSQL's logical replication protocol. It reads the WAL natively without requiring Kafka, Debezium, or any external message broker. This reduces infrastructure complexity and operational overhead compared to a Debezium-based setup.
How does RisingWave handle the initial data load from PostgreSQL?
When you create a CDC table in RisingWave, it automatically takes an initial snapshot of the existing data in the upstream PostgreSQL table. After the snapshot completes, it switches to streaming mode and processes only new WAL changes going forward. You can control snapshot behavior with the snapshot, snapshot.batch_size, and backfill.parallelism parameters.
What happens if RisingWave goes down during CDC ingestion?
RisingWave uses checkpointing to track its position in the PostgreSQL WAL. If RisingWave restarts, it resumes from the last checkpointed WAL position, ensuring no data is lost. PostgreSQL retains WAL segments for the replication slot until RisingWave catches up. The Iceberg sink provides exactly-once semantics, so you do not get duplicate records after recovery.
Conclusion
Replacing nightly pg_dump exports with a streaming CDC pipeline transforms your data lake from a stale snapshot into a living, continuously updated asset.
Key takeaways:
- PostgreSQL CDC reads the WAL to capture every change in real time, with minimal production database impact
- RisingWave connects directly to PostgreSQL without Kafka or Debezium, simplifying infrastructure
- SQL materialized views let you transform and enrich data before it reaches the Iceberg data lake
- The pipeline creates Iceberg snapshots every minute, supporting fine-grained time travel queries
- Any query engine (Trino, Spark, DuckDB) can read the resulting Iceberg tables
Ready to build a real-time data lake from PostgreSQL? Try RisingWave Cloud free, no credit card required. Sign up here.
Join our Slack community to ask questions and connect with other stream processing developers.

