CDC to Apache Iceberg: Stream Database Changes to Your Data Lakehouse

CDC to Apache Iceberg: Stream Database Changes to Your Data Lakehouse

Introduction

Most data lakehouses are built on batch ETL. A scheduled job extracts data from PostgreSQL or MySQL every hour, transforms it, and loads it into Apache Iceberg tables. Between runs, the lakehouse falls behind the operational database. Orders placed, accounts updated, and inventory changes are invisible to analysts until the next batch completes.

Change Data Capture (CDC) eliminates this delay by streaming every INSERT, UPDATE, and DELETE from your database transaction log as it happens. Pairing CDC with Apache Iceberg gives you a lakehouse that reflects operational reality in near real-time, with the ACID guarantees, schema evolution, and time travel capabilities that Iceberg provides.

RisingWave makes this pipeline simple to build. You connect to PostgreSQL or MySQL using a built-in CDC connector, define transformations with SQL, and sink the results directly to Iceberg tables. No Debezium to deploy, no Kafka cluster to manage (unless you want one), and no Spark jobs to schedule. This guide walks you through the complete setup with production-ready SQL examples tested on RisingWave v2.3.

Why Stream CDC to Apache Iceberg?

Traditional batch ETL from databases to data lakes has been the standard approach for decades. But as operational decisions become more time-sensitive, the batch model creates real problems.

The Batch ETL Problem

Consider an e-commerce platform with a PostgreSQL database. The typical batch pipeline works like this:

  1. A scheduled Airflow DAG runs every hour.
  2. It queries PostgreSQL for records changed since the last run (using updated_at timestamps).
  3. It transforms the data and writes Parquet files to S3.
  4. A separate job registers those files with the Iceberg catalog.

This approach has several failure modes:

  • Missed updates: If a record is updated twice between batch runs, you lose the intermediate state. For audit trails and compliance, this is a problem.
  • Delete blindness: Soft deletes might work, but hard deletes in the source database are invisible to timestamp-based extraction.
  • Clock skew: Server time differences between your ETL runner and the database can cause records to be skipped or duplicated.
  • Schema drift: When the source database schema changes between batch runs, the pipeline breaks.

How CDC Solves These Problems

CDC stream processing reads the database's write-ahead log (WAL) directly. Every change, whether INSERT, UPDATE, or DELETE, is captured exactly as it happened, in order, with no gaps. This eliminates the failure modes of timestamp-based extraction:

  • Every intermediate state is captured.
  • Deletes are explicit events in the CDC stream.
  • No clock skew issues because changes come from the database's own transaction log.
  • Schema changes can be detected and handled automatically.

Why Iceberg as the Target?

Apache Iceberg is the ideal target for CDC streams for several reasons:

  • Row-level operations: Unlike Hive or basic Parquet on S3, Iceberg supports row-level UPDATE and DELETE operations, which map directly to CDC events.
  • ACID transactions: Each batch of CDC events can be committed atomically, ensuring readers never see partial updates.
  • Schema evolution: When the source database adds a column, Iceberg can accommodate the change without rewriting existing data.
  • Time travel: Iceberg's snapshot isolation lets you query the lakehouse as of any point in time, useful for debugging and auditing.

How to Build a CDC-to-Iceberg Pipeline with RisingWave

Let's build a complete pipeline that streams changes from a PostgreSQL database into Apache Iceberg tables. We will cover the full workflow: connecting to PostgreSQL, creating CDC tables, defining transformations, and sinking to Iceberg.

Prerequisites

Before starting, ensure your PostgreSQL database is configured for logical replication:

-- Run on your PostgreSQL database
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
-- Restart PostgreSQL after these changes

Create a dedicated replication user:

CREATE USER replication_user WITH REPLICATION PASSWORD 'your_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;

For detailed PostgreSQL CDC prerequisites, see the RisingWave CDC guide.

Step 1: Create a PostgreSQL CDC Source

Connect RisingWave to your PostgreSQL database using the built-in CDC connector:

CREATE SOURCE pg_source WITH (
    connector = 'postgres-cdc',
    hostname = '10.0.1.50',
    port = '5432',
    username = 'replication_user',
    password = 'your_password',
    database.name = 'ecommerce',
    schema.name = 'public',
    slot.name = 'risingwave_slot',
    publication.name = 'risingwave_publication'
);

This creates a shared CDC source that connects to the PostgreSQL WAL. RisingWave manages the replication slot and tracks the log position automatically.

Step 2: Create CDC Tables in RisingWave

Now create tables in RisingWave that mirror your PostgreSQL tables. Each table consumes changes from a specific upstream table:

CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    customer_id BIGINT,
    product_id VARCHAR,
    quantity INT,
    total_amount NUMERIC,
    order_status VARCHAR,
    created_at TIMESTAMPTZ,
    updated_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.orders';
CREATE TABLE customers (
    customer_id BIGINT PRIMARY KEY,
    name VARCHAR,
    email VARCHAR,
    segment VARCHAR,
    region VARCHAR,
    created_at TIMESTAMPTZ,
    updated_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.customers';
CREATE TABLE products (
    product_id VARCHAR PRIMARY KEY,
    product_name VARCHAR,
    category VARCHAR,
    brand VARCHAR,
    unit_cost NUMERIC,
    updated_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.products';

When these tables are created, RisingWave performs an initial snapshot of the existing data in PostgreSQL and then switches to streaming mode to capture ongoing changes. Every INSERT, UPDATE, and DELETE in the source database is automatically reflected in these RisingWave tables.

You can verify the initial snapshot loaded correctly:

SELECT COUNT(*) AS total_orders FROM orders;
SELECT COUNT(*) AS total_customers FROM customers;
SELECT COUNT(*) AS total_products FROM products;

Expected output:

 total_orders
--------------
       248531

 total_customers
-----------------
           15847

 total_products
----------------
            3421

Step 3: Define Transformations with Materialized Views

Now create materialized views that transform the raw CDC data. These views update automatically as the source tables change.

Create an enriched orders view that joins all three tables:

CREATE MATERIALIZED VIEW enriched_orders AS
SELECT
    o.order_id,
    o.customer_id,
    c.name AS customer_name,
    c.segment AS customer_segment,
    c.region AS customer_region,
    o.product_id,
    p.product_name,
    p.category,
    p.brand,
    o.quantity,
    o.total_amount,
    p.unit_cost * o.quantity AS total_cost,
    o.total_amount - (p.unit_cost * o.quantity) AS profit,
    o.order_status,
    o.created_at,
    o.updated_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id;

Create a daily summary view for analytics:

CREATE MATERIALIZED VIEW daily_sales_summary AS
SELECT
    DATE_TRUNC('day', created_at) AS sale_date,
    category,
    customer_segment,
    customer_region,
    COUNT(*) AS order_count,
    SUM(total_amount) AS total_revenue,
    SUM(profit) AS total_profit,
    AVG(total_amount) AS avg_order_value,
    COUNT(DISTINCT customer_id) AS unique_customers
FROM enriched_orders
WHERE order_status = 'completed'
GROUP BY
    DATE_TRUNC('day', created_at),
    category,
    customer_segment,
    customer_region;

These materialized views use incremental view maintenance, meaning they process only the changed rows rather than recomputing the entire result. When a single order is updated in PostgreSQL, only that order's contribution to the aggregation is recalculated.

Step 4: Sink to Apache Iceberg

Finally, deliver the transformed data to Iceberg tables. Create a sink for the enriched orders:

CREATE SINK enriched_orders_iceberg FROM enriched_orders
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'order_id',
    warehouse.path = 's3://data-lakehouse/warehouse',
    database.name = 'ecommerce_analytics',
    table.name = 'enriched_orders',
    catalog.type = 'glue',
    catalog.name = 'analytics_catalog',
    s3.access.key = '${AWS_ACCESS_KEY}',
    s3.secret.key = '${AWS_SECRET_KEY}',
    s3.region = 'us-west-2',
    create_table_if_not_exists = 'true'
);

The type = 'upsert' setting is critical for CDC pipelines. When an order is updated in PostgreSQL, the corresponding row in the Iceberg table is updated (not duplicated). When an order is deleted in PostgreSQL, it is deleted from the Iceberg table.

Create a sink for the daily summary:

CREATE SINK daily_summary_iceberg FROM daily_sales_summary
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'sale_date,category,customer_segment,customer_region',
    warehouse.path = 's3://data-lakehouse/warehouse',
    database.name = 'ecommerce_analytics',
    table.name = 'daily_sales_summary',
    catalog.type = 'glue',
    catalog.name = 'analytics_catalog',
    s3.access.key = '${AWS_ACCESS_KEY}',
    s3.secret.key = '${AWS_SECRET_KEY}',
    s3.region = 'us-west-2',
    create_table_if_not_exists = 'true'
);

The complete pipeline is now running. Every change in your PostgreSQL database flows through RisingWave, gets enriched and aggregated, and lands in Iceberg tables within seconds. For the full Iceberg sink parameter reference, see the RisingWave Iceberg sink documentation.

How Does This Compare to Traditional CDC-to-Iceberg Approaches?

Several tools and architectures can stream CDC data to Iceberg. Here is how they compare:

CapabilityRisingWaveDebezium + Kafka + FlinkDebezium + Kafka + SparkStreamkapFivetran/Airbyte
CDC captureBuilt-in connectorDebeziumDebeziumBuilt-inBuilt-in
Message queueOptional (direct CDC)Required (Kafka)Required (Kafka)Built-inNot applicable
TransformationsFull SQLFlink SQL / JavaPySpark / Spark SQLLimiteddbt (post-load)
Iceberg deliveryBuilt-in sinkIceberg connectorIceberg connectorBuilt-inLimited Iceberg support
Components to manage1 (RisingWave)3+ (Debezium, Kafka, Flink)3+ (Debezium, Kafka, Spark)1 (managed)2 (ETL tool + warehouse)
LatencySecondsSeconds to minutesMinutes (micro-batch)SecondsMinutes to hours
LanguagesSQL onlySQL / JavaPython / SQLUI / SQLUI / SQL
Open sourceYesYesYesNoPartially

The Key Advantage: Fewer Moving Parts

The traditional CDC-to-Iceberg architecture requires you to deploy and operate Debezium, Kafka, and either Flink or Spark. Each component needs monitoring, scaling, and failure recovery. When something breaks at 2 AM, you need to figure out which component failed and why.

RisingWave simplifies this by handling CDC capture, stream processing, and Iceberg delivery in a single system. You monitor one system, scale one system, and debug one system. The RisingWave CDC streaming architecture is designed specifically for this use case.

When You Still Want Kafka in the Pipeline

If your organization already runs Kafka and multiple consumers need the CDC stream (not just the Iceberg pipeline), it makes sense to keep Kafka as the durable transport layer. In that case, you can use Debezium to publish CDC events to Kafka and have RisingWave consume from Kafka instead of connecting directly to PostgreSQL:

CREATE SOURCE cdc_from_kafka (
    payload JSONB
)
WITH (
    connector = 'kafka',
    topic = 'dbserver1.public.orders',
    properties.bootstrap.server = 'broker1:9092',
    scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;

RisingWave natively supports the Debezium JSON format, so no additional parsing is required.

What About MySQL CDC to Iceberg?

RisingWave supports MySQL CDC with the same pattern. The only difference is the source connector configuration:

CREATE SOURCE mysql_source WITH (
    connector = 'mysql-cdc',
    hostname = '10.0.1.60',
    port = '3306',
    username = 'replication_user',
    password = 'your_password',
    database.name = 'ecommerce',
    server.id = '1001'
);
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    customer_id BIGINT,
    total_amount DECIMAL(10,2),
    order_status VARCHAR,
    created_at TIMESTAMP
) FROM mysql_source TABLE 'ecommerce.orders';

The materialized views and Iceberg sinks remain identical. This means you can consolidate CDC streams from both PostgreSQL and MySQL databases into the same Iceberg lakehouse using one tool.

FAQ

Can RisingWave capture CDC from PostgreSQL without Kafka?

Yes. RisingWave includes a built-in PostgreSQL CDC connector that reads directly from the database's write-ahead log. You do not need Debezium or Kafka for the CDC pipeline. RisingWave manages the replication slot, tracks log positions, and handles failover automatically.

How does RisingWave handle CDC deletes when sinking to Iceberg?

When you configure the Iceberg sink with type = 'upsert', RisingWave propagates DELETE events from the CDC stream as row deletions in the Iceberg table. This ensures the Iceberg table accurately reflects the current state of the source database, including removed records.

What happens if the PostgreSQL source goes down temporarily?

RisingWave maintains its position in the PostgreSQL WAL using a replication slot. When the source database comes back online, RisingWave resumes reading from where it left off. No data is lost, and no manual intervention is required. The replication slot prevents PostgreSQL from purging WAL segments that RisingWave has not yet consumed.

Can I transform CDC data before writing to Iceberg?

Yes. This is one of RisingWave's primary advantages over tools like Debezium + Kafka Connect, which move data without transformation. In RisingWave, you define transformations as materialized views using standard SQL, including joins, aggregations, filters, and window functions, and sink the transformed results to Iceberg.

Conclusion

Key takeaways:

  • Batch ETL from databases to Iceberg introduces staleness, misses intermediate updates, and cannot capture deletes. CDC eliminates all three problems by streaming changes directly from the database transaction log.
  • RisingWave provides a complete CDC-to-Iceberg pipeline in a single system: built-in CDC connectors for PostgreSQL and MySQL, SQL-based transformations, and native Iceberg sinks.
  • The upsert sink mode ensures that updates and deletes in the source database are correctly reflected in Iceberg tables, maintaining an accurate mirror of operational data.
  • Compared to the traditional Debezium + Kafka + Flink/Spark stack, RisingWave reduces the number of components from three or more to one, cutting operational complexity significantly.
  • Both PostgreSQL and MySQL CDC pipelines use the same materialized view and sink SQL, allowing you to consolidate multiple database sources into a single Iceberg lakehouse.

Ready to stream your database changes to Iceberg? Try RisingWave Cloud free, no credit card required. Sign up here.

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.