Building a Real-Time Data Lake with Debezium and Apache Iceberg

Building a Real-Time Data Lake with Debezium and Apache Iceberg

Combining Debezium CDC with Apache Iceberg gives you a data lake that reflects the current state of your operational database in near real time—with full ACID guarantees, time-travel queries, and partition pruning for analytical workloads. You get the freshness of a streaming pipeline with the queryability of a lakehouse format.

Why Iceberg for CDC Sink?

Traditional data lakes store append-only Parquet files. Capturing CDC means you need to process updates and deletes—operations that are fundamentally incompatible with immutable files. Apache Iceberg Table Format V2 introduces row-level deletes (position deletes and equality deletes), which allow updates and deletes to be represented without full file rewrites on every change.

This makes Iceberg the natural target for CDC pipelines:

  • Merge-on-read: Delete files are written alongside data files; merging happens at query time. Low write amplification, slightly higher read cost.
  • Copy-on-write: New data files replace old ones during compaction. Higher write cost, lower read cost. Better for high-query, low-change-rate tables.
  • Time travel: Every CDC operation creates a new Iceberg snapshot. You can query the table as it existed at any point in the past.
  • Partition evolution: Add or change partition specs without rewriting existing data.

Architecture Options

There are two main patterns for Debezium → Iceberg:

Option A: Debezium → Kafka → Flink → Iceberg Apache Flink has a mature Iceberg sink connector (flink-connector-iceberg). Flink reads from Kafka, interprets the Debezium envelope, and writes to Iceberg using upsert semantics.

Option B: Debezium → Kafka → RisingWave → Iceberg RisingWave is a PostgreSQL-compatible streaming database with a native Iceberg sink. You define sources and materialized views in SQL, then sink the results to Iceberg—no Flink cluster required.

Step-by-Step Tutorial

Step 1: Deploy Debezium and Start Streaming CDC

{
  "name": "postgres-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "inventory",
    "database.server.name": "invserver1",
    "table.include.list": "public.products,public.inventory",
    "plugin.name": "pgoutput",
    "snapshot.mode": "initial"
  }
}

This streams changes from products and inventory tables to Kafka topics invserver1.public.products and invserver1.public.inventory.

Step 2: Connect RisingWave to Kafka

-- For Debezium → Kafka → RisingWave pipeline:
CREATE SOURCE products_cdc (
    id          BIGINT,
    name        VARCHAR,
    category    VARCHAR,
    price       NUMERIC,
    stock       INTEGER,
    updated_at  TIMESTAMPTZ,
    _op         VARCHAR  -- debezium op field: c/u/d/r
) WITH (
    connector = 'kafka',
    topic = 'invserver1.public.products',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;

Step 3: Create a Materialized View for Iceberg Sink

-- Current product inventory state (upsert semantics handled by RisingWave)
CREATE MATERIALIZED VIEW current_inventory AS
SELECT
    p.id,
    p.name,
    p.category,
    p.price,
    p.stock,
    p.updated_at,
    DATE_TRUNC('day', p.updated_at) AS partition_date
FROM products_cdc p
WHERE p._op != 'd';  -- exclude deleted products

-- Category-level aggregation for analytics
CREATE MATERIALIZED VIEW inventory_by_category AS
SELECT
    category,
    COUNT(*)        AS product_count,
    SUM(stock)      AS total_units,
    AVG(price)      AS avg_price,
    MIN(price)      AS min_price,
    MAX(price)      AS max_price
FROM current_inventory
GROUP BY category;

Step 4: Sink to Apache Iceberg

RisingWave's native Iceberg sink writes materialized view results to Iceberg tables:

-- Create an Iceberg sink from the materialized view
CREATE SINK inventory_iceberg_sink
FROM current_inventory
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'id',
    catalog.type = 'rest',
    catalog.uri = 'http://iceberg-rest-catalog:8181',
    warehouse.path = 's3://my-data-lake/inventory/',
    s3.endpoint = 'https://s3.amazonaws.com',
    s3.region = 'us-east-1',
    database.name = 'inventory_db',
    table.name = 'products'
);

RisingWave writes Iceberg Table Format V2 files with row-level deletes, correctly representing updates and deletes from the CDC stream.

AspectFlink + IcebergRisingWave + Iceberg
LanguageJava/Scala DataStream or Table APIStandard SQL
Operational complexityFlink cluster + JobManager HASingle streaming DB cluster
Upsert supportYes (via equality deletes)Yes (native upsert sink)
Iceberg Table Format V2YesYes
Partition spec supportYesYes
Time travel queriesVia Iceberg readers (Spark, Trino)Via Iceberg readers
Intermediate analyticsFlink SQL (limited materialization)Full materialized views

FAQ

Q: What Iceberg table format version does this produce? RisingWave's Iceberg sink writes Iceberg Table Format V2, which supports row-level deletes (both position deletes and equality deletes). This is required for CDC upsert semantics. Format V1 is append-only and cannot represent deletes without full file rewrites.

Q: How do time-travel queries work with CDC-sourced Iceberg tables? Each time RisingWave writes a batch of changes to Iceberg, it creates a new table snapshot with a timestamp. You can query historical snapshots using Spark or Trino: SELECT * FROM catalog.db.products VERSION AS OF <snapshot_id> or SELECT * FROM catalog.db.products TIMESTAMP AS OF '2026-01-01 00:00:00'. This lets you reconstruct the exact state of the product catalog at any point in time.

Q: How often should I configure the Iceberg sink to commit? More frequent commits (every 30-60 seconds) give lower latency but create many small files. Less frequent commits (every 5-10 minutes) create larger files but increase query performance. Use Iceberg's compaction action (via Spark or a scheduled maintenance job) to merge small files into optimal-sized data files regularly.

Key Takeaways

  • Apache Iceberg Table Format V2's row-level delete support makes it the right format for CDC sink targets.
  • RisingWave's native Iceberg sink eliminates the need for a separate Flink cluster, reducing operational complexity.
  • The combination of Debezium (capture) + Kafka (transport) + RisingWave (streaming SQL) + Iceberg (lakehouse storage) gives you a complete real-time data lake pipeline.
  • Time-travel queries on CDC-sourced Iceberg tables let you audit historical states without maintaining separate audit tables.
  • Schedule Iceberg file compaction regularly to prevent small-file proliferation caused by high-frequency CDC writes.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.