Multi-Table Apache Iceberg Sinks with RisingWave

Multi-Table Apache Iceberg Sinks with RisingWave

RisingWave supports multiple independent Iceberg sinks running concurrently from a single deployment. Each sink reads from its own materialized view and writes to a separate Iceberg table with its own schema and partition spec. This fan-out pattern enables a single streaming pipeline to populate a full Iceberg data lakehouse—raw events, aggregated metrics, and dimensional tables—simultaneously from one source.

Why Multi-Table Sinks Matter

A single Kafka topic or CDC stream often needs to populate multiple downstream tables. A payments stream might need to feed:

  • A raw events table for audit and replay
  • An hourly aggregation for dashboards
  • A merchant summary for billing
  • A fraud signals table for risk systems

Without multi-table sink support, you would need separate pipeline deployments—one per table—each re-reading the same Kafka topic and applying redundant transformations. RisingWave solves this with independent sinks that share upstream computation.

Building a Fan-Out Pipeline

Step 1: Define the Source

CREATE SOURCE payment_events (
    payment_id    VARCHAR,
    merchant_id   VARCHAR,
    customer_id   VARCHAR,
    amount        DECIMAL(12,2),
    currency      VARCHAR,
    status        VARCHAR,
    payment_method VARCHAR,
    region        VARCHAR,
    created_at    TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'payments.events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Step 2: Create Multiple Materialized Views

Each downstream table gets its own materialized view with the appropriate transformation logic.

-- View 1: Hourly merchant aggregations for dashboards
CREATE MATERIALIZED VIEW merchant_hourly_stats AS
SELECT
    merchant_id,
    region,
    DATE_TRUNC('hour', created_at)          AS hour_bucket,
    COUNT(*)                                 AS tx_count,
    SUM(amount)                              AS total_volume,
    AVG(amount)                              AS avg_tx,
    SUM(CASE WHEN status = 'failed'
             THEN 1 ELSE 0 END)             AS failed_count,
    COUNT(DISTINCT customer_id)              AS unique_customers
FROM payment_events
GROUP BY merchant_id, region, DATE_TRUNC('hour', created_at);

-- View 2: Tumble window for fraud signal detection
CREATE MATERIALIZED VIEW fraud_signals AS
SELECT
    customer_id,
    w.window_start,
    w.window_end,
    COUNT(*)                                 AS tx_count_5min,
    SUM(w.amount)                            AS volume_5min,
    COUNT(DISTINCT w.merchant_id)            AS distinct_merchants,
    COUNT(DISTINCT w.region)                 AS distinct_regions,
    tx_count_5min > 10
        OR volume_5min > 5000               AS is_suspicious
FROM TUMBLE(payment_events, created_at, INTERVAL '5 MINUTES') AS w
GROUP BY customer_id, w.window_start, w.window_end;

Step 3: Create Independent Iceberg Sinks

Each sink is completely independent—different table names, schemas, and can have different sink types (upsert vs. append-only).

-- Sink 1: Raw events archive (append-only)
CREATE SINK payments_raw_sink AS
SELECT * FROM payment_events
WITH (
    connector       = 'iceberg',
    type            = 'append-only',
    catalog.type    = 'rest',
    catalog.uri     = 'http://iceberg-catalog:8181',
    warehouse.path  = 's3://payments-lakehouse/warehouse',
    s3.region       = 'us-east-1',
    database.name   = 'payments',
    table.name      = 'raw_events'
);

-- Sink 2: Merchant hourly stats (upsert)
CREATE SINK merchant_stats_sink AS
SELECT * FROM merchant_hourly_stats
WITH (
    connector       = 'iceberg',
    type            = 'upsert',
    catalog.type    = 'rest',
    catalog.uri     = 'http://iceberg-catalog:8181',
    warehouse.path  = 's3://payments-lakehouse/warehouse',
    s3.region       = 'us-east-1',
    database.name   = 'payments',
    table.name      = 'merchant_hourly_stats'
);

-- Sink 3: Fraud signals (append-only)
CREATE SINK fraud_signals_sink AS
SELECT * FROM fraud_signals
WITH (
    connector       = 'iceberg',
    type            = 'append-only',
    catalog.type    = 'rest',
    catalog.uri     = 'http://iceberg-catalog:8181',
    warehouse.path  = 's3://payments-lakehouse/warehouse',
    s3.region       = 'us-east-1',
    database.name   = 'payments',
    table.name      = 'fraud_signals'
);

Managing Multiple Sinks in Production

Monitoring Sink Health

Check the status of all running sinks:

SHOW SINKS;

For detailed backpressure and throughput metrics, use RisingWave's built-in metrics endpoint (Prometheus-compatible) or the SHOW SINK BACKPRESSURE statement.

Independent Sink Lifecycle

Each sink can be paused, dropped, and recreated independently without affecting other sinks or the underlying materialized views.

-- Pause a specific sink during maintenance
ALTER SINK fraud_signals_sink PAUSE;

-- Resume after maintenance
ALTER SINK fraud_signals_sink RESUME;

-- Drop and recreate with updated configuration
DROP SINK fraud_signals_sink;
CREATE SINK fraud_signals_sink AS ...;

Multi-Sink Architecture Comparison

PatternSinksShared ComputationOperational ComplexityRecommended For
One pipeline per tableNNone (redundant)HighIndependent data domains
Fan-out from single sourceNFull source readLowSame-domain derived tables
Cascading MVs + sinksNPartialMediumMulti-stage transformations
RisingWave multi-sinkNFull DAGLowMost streaming lakehouse use cases

Cross-Sink Consistency

RisingWave's checkpoint mechanism ensures that all sinks derived from the same source advance together. When a checkpoint commits, all sinks checkpoint at the same logical offset. This means:

  • All three Iceberg tables (raw, aggregated, fraud signals) are always consistent to the same input watermark
  • No partial writes: either all sinks commit a batch or none do (on failure, the entire checkpoint is retried)
  • Downstream consumers can JOIN across Iceberg tables without worrying about one table being behind another

This cross-sink consistency is a significant operational advantage over managing multiple independent pipeline deployments.

Sending Alerts via Kafka Alongside Iceberg Sinks

You can mix sink types. Use an Iceberg sink for durable storage and a Kafka sink for real-time alerting from the same materialized view:

-- Alert on suspicious transactions in real time via Kafka
CREATE SINK fraud_alerts_kafka AS
SELECT
    customer_id,
    window_start,
    tx_count_5min,
    volume_5min,
    distinct_regions
FROM fraud_signals
WHERE is_suspicious = TRUE
WITH (
    connector = 'kafka',
    topic = 'fraud.alerts',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

Now fraud signals go to both Iceberg (for historical analysis) and Kafka (for real-time response), driven by the same materialized view.

FAQ

Q: Is there a limit to the number of Iceberg sinks in RisingWave? A: There is no hard-coded limit. In practice, each sink consumes memory proportional to its in-flight buffer size. Hundreds of concurrent sinks are feasible on adequately provisioned clusters.

Q: Do multiple sinks slow down each other? A: Sinks are independent execution units in RisingWave's dataflow engine. A slow or backpressured sink does not directly slow other sinks. However, if all sinks share an upstream materialized view, backpressure can propagate up to the shared computation.

Q: Can I write to different Iceberg catalogs from the same RisingWave deployment? A: Yes. Each CREATE SINK specifies its own catalog.uri, allowing you to write to catalogs in different cloud accounts or regions from a single RisingWave cluster.

Q: How do I handle schema differences between the source and each Iceberg table? A: Define separate materialized views with the appropriate SELECT column list for each sink. Iceberg schema evolution handles adding new columns to existing tables without rewriting historical data.

Q: What happens if one Iceberg sink fails while others are running? A: RisingWave retries failed sinks independently. Other sinks continue processing unaffected. The failed sink resumes from its last committed checkpoint when the underlying issue (e.g., catalog unavailability) is resolved.

Get Started

Try building a multi-table Iceberg pipeline with the RisingWave getting started guide. Share your fan-out pipeline designs in the RisingWave Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.