Real-Time Apache Iceberg Ingestion from PostgreSQL CDC with RisingWave

Real-Time Apache Iceberg Ingestion from PostgreSQL CDC with RisingWave

RisingWave can stream PostgreSQL changes directly into Apache Iceberg tables in real time using its built-in postgres-cdc connector. By reading PostgreSQL's logical replication log (WAL), RisingWave captures every INSERT, UPDATE, and DELETE and writes them to Iceberg as upserts — with exactly-once semantics and no custom Debezium infrastructure required.

Why PostgreSQL CDC to Iceberg?

PostgreSQL is the transactional workhorse of thousands of applications. But OLTP databases are not designed for analytics — complex queries slow down production systems, storage costs climb, and joining across multiple services requires data movement anyway.

The standard solution is to replicate PostgreSQL data to an analytical store. Traditional ETL (nightly batch exports, Airbyte) introduces hours of latency. Apache Kafka + Debezium + Flink reduces latency but adds significant operational complexity: you need Kafka, Schema Registry, Debezium connectors, and a Flink cluster — all to do what RisingWave handles in a single CREATE SOURCE statement.

RisingWave's postgres-cdc connector connects directly to PostgreSQL's logical replication slot, which means:

  • No Kafka required for the CDC path
  • Sub-second replication lag
  • Exactly-once semantics backed by Iceberg's atomic commits

Prerequisites

Before starting, ensure your PostgreSQL instance is configured for logical replication:

-- In postgresql.conf:
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4

-- Create a replication user:
CREATE USER cdc_user REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;

Also ensure the pgoutput or wal2json plugin is available (pgoutput ships with PostgreSQL 10+).

Step 1: Define the CDC Source

Connect RisingWave to PostgreSQL. The connector creates a logical replication slot automatically:

CREATE SOURCE product_catalog_cdc (
    product_id       BIGINT PRIMARY KEY,
    sku              VARCHAR,
    name             VARCHAR,
    category         VARCHAR,
    price            NUMERIC(12, 2),
    stock_quantity   INT,
    is_active        BOOLEAN,
    created_at       TIMESTAMPTZ,
    updated_at       TIMESTAMPTZ
)
WITH (
    connector            = 'postgres-cdc',
    hostname             = 'postgres.prod.internal',
    port                 = '5432',
    username             = 'cdc_user',
    password             = 'secure_password',
    database.name        = 'ecommerce',
    schema.name          = 'public',
    table.name           = 'products',
    slot.name            = 'risingwave_products_slot',
    publication.name     = 'risingwave_publication'
)
FORMAT DEBEZIUM ENCODE JSON;

RisingWave creates the risingwave_products_slot replication slot and the risingwave_publication publication on PostgreSQL automatically. From this moment, every change to the products table flows into RisingWave.

Step 2: Create an Enrichment Materialized View

Raw CDC streams often need enrichment before landing in the lakehouse. Here we join the product stream with a category metadata table:

-- Static reference data
CREATE TABLE category_metadata (
    category     VARCHAR PRIMARY KEY,
    department   VARCHAR,
    margin_pct   NUMERIC(5, 2),
    is_seasonal  BOOLEAN
);

-- Enriched product view
CREATE MATERIALIZED VIEW products_enriched AS
SELECT
    p.product_id,
    p.sku,
    p.name,
    p.category,
    c.department,
    c.margin_pct,
    p.price,
    p.price * (1 - c.margin_pct / 100) AS cost_basis,
    p.stock_quantity,
    p.is_active,
    p.created_at,
    p.updated_at
FROM product_catalog_cdc p
LEFT JOIN category_metadata c ON p.category = c.category;

This materialized view is continuously updated. When a product's price changes in PostgreSQL, the view updates immediately, and the downstream Iceberg sink writes the new row within the next checkpoint interval.

Step 3: Sink to Iceberg with Upsert

Use upsert mode so that updates and deletes in PostgreSQL are correctly reflected in Iceberg:

CREATE SINK products_iceberg_sink AS
SELECT * FROM products_enriched
WITH (
    connector      = 'iceberg',
    type           = 'upsert',
    primary_key    = 'product_id',
    catalog.type   = 'rest',
    catalog.uri    = 'http://iceberg-catalog:8181',
    warehouse.path = 's3://ecommerce-lakehouse/warehouse',
    s3.region      = 'us-east-1',
    database.name  = 'ecommerce',
    table.name     = 'products'
);

RisingWave uses Iceberg's equality delete files to handle updates: when product_id = 42 changes price, RisingWave writes a delete record for the old row and a new data record for the updated row in the same Iceberg snapshot.

Handling Multiple Tables

Real CDC pipelines replicate dozens of tables. RisingWave handles each as an independent source-view-sink chain:

-- Orders table
CREATE SOURCE orders_cdc (
    order_id    BIGINT PRIMARY KEY,
    customer_id BIGINT,
    product_id  BIGINT,
    quantity    INT,
    unit_price  NUMERIC(12,2),
    status      VARCHAR,
    ordered_at  TIMESTAMPTZ,
    shipped_at  TIMESTAMPTZ
)
WITH (
    connector      = 'postgres-cdc',
    hostname       = 'postgres.prod.internal',
    port           = '5432',
    username       = 'cdc_user',
    password       = 'secure_password',
    database.name  = 'ecommerce',
    table.name     = 'orders',
    slot.name      = 'risingwave_orders_slot'
)
FORMAT DEBEZIUM ENCODE JSON;

CREATE MATERIALIZED VIEW orders_with_products AS
SELECT
    o.order_id,
    o.customer_id,
    o.quantity,
    o.unit_price,
    o.status,
    o.ordered_at,
    p.sku,
    p.category,
    p.department
FROM orders_cdc o
JOIN products_enriched p ON o.product_id = p.product_id;

CREATE SINK orders_iceberg_sink AS
SELECT * FROM orders_with_products
WITH (
    connector      = 'iceberg',
    type           = 'upsert',
    primary_key    = 'order_id',
    catalog.type   = 'rest',
    catalog.uri    = 'http://iceberg-catalog:8181',
    warehouse.path = 's3://ecommerce-lakehouse/warehouse',
    s3.region      = 'us-east-1',
    database.name  = 'ecommerce',
    table.name     = 'orders'
);

Monitoring Replication Lag

Check replication health with standard SQL:

-- Monitor CDC source lag
SELECT source_name, connector, status, upstream_version
FROM rw_sources
WHERE connector = 'postgres-cdc';

On the PostgreSQL side, monitor the replication slot lag:

-- On PostgreSQL: check slot lag
SELECT slot_name, confirmed_flush_lsn, pg_current_wal_lsn(),
       pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name LIKE 'risingwave_%';

Typical steady-state lag is under 1 second for moderate workloads.

CDC vs. Batch Comparison

DimensionNightly Batch ETLRisingWave CDC
LatencyHoursSeconds
InfrastructureETL server + schedulerRisingWave only
Deletes handledSoft-delete workaroundsNative (upsert mode)
Exactly-onceDepends on implementationBuilt-in
Schema changesManual migration scriptsSemi-automatic
PostgreSQL loadHigh (full table scans)Low (WAL tail only)

FAQ

Q: Do I need Kafka between PostgreSQL and RisingWave for CDC? A: No. RisingWave's postgres-cdc connector reads directly from PostgreSQL's logical replication slot. Kafka is optional and only needed if other consumers also need the CDC stream.

Q: What PostgreSQL versions are supported? A: RisingWave's postgres-cdc connector supports PostgreSQL 10 and later, which introduced native logical replication with the pgoutput plugin.

Q: How does RisingWave handle a PostgreSQL restart or failover? A: The replication slot persists on the primary PostgreSQL node. After a failover to a replica that has been promoted to primary, you may need to recreate the slot. RisingWave will resume from the latest confirmed LSN.

Q: Can I replicate tables without a primary key? A: For upsert mode, a primary key is required. For append-only mode (no updates/deletes), tables without primary keys are supported.

Q: What happens to the replication slot if RisingWave is down for a long time? A: PostgreSQL retains WAL segments until the slot is consumed. If RisingWave is down for an extended period, WAL accumulates on disk. Monitor pg_replication_slots.pg_wal_lsn_diff and set max_slot_wal_keep_size in PostgreSQL to prevent disk exhaustion.

Get Started

Start streaming PostgreSQL data into your Iceberg lakehouse today:

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.