Apache Iceberg Time Travel Queries with Streaming Data

Apache Iceberg Time Travel Queries with Streaming Data

Apache Iceberg's time travel feature lets you query any historical snapshot of a table using a timestamp or snapshot ID. When combined with RisingWave's continuous streaming writes, you get a system where every micro-batch commit becomes a queryable checkpoint—enabling instant rollback, reproducible reports, and audit trails for streaming pipelines.

What Is Iceberg Time Travel?

Every write to an Iceberg table creates an immutable snapshot. The table's metadata file tracks the full lineage of snapshots with timestamps and snapshot IDs. You can query any past state by referencing its snapshot ID or an AS OF timestamp—without copying data or maintaining separate historical tables.

This is fundamentally different from traditional streaming sinks, where overwritten records are gone forever. Iceberg preserves every version until you explicitly expire old snapshots.

Why Time Travel Matters for Streaming Data

Streaming pipelines produce continuous mutations. Late-arriving events can backfill earlier windows. Schema migrations happen online. Bugs in transformation logic corrupt downstream tables before anyone notices.

Iceberg time travel addresses all three scenarios. You can:

  • Reproduce the exact state of a table at a specific moment for audits
  • Compare results before and after a schema or logic change
  • Roll back a table to the last known-good snapshot when a bug is detected

Building a Streaming Pipeline with Audit Capability

Step 1: Define the CDC Source

CREATE SOURCE orders_cdc
WITH (
    connector = 'postgres-cdc',
    hostname  = 'pg-primary.internal',
    port      = '5432',
    username  = 'replicator',
    password  = 'secret',
    database.name = 'ecommerce',
    schema.name   = 'public',
    table.name    = 'orders',
    slot.name     = 'rw_orders_slot'
);

Step 2: Build a Stateful Aggregation View

CREATE MATERIALIZED VIEW hourly_order_summary AS
SELECT
    DATE_TRUNC('hour', order_time)  AS hour_bucket,
    region,
    product_category,
    COUNT(*)                        AS order_count,
    SUM(amount)                     AS total_revenue,
    AVG(amount)                     AS avg_order_value
FROM orders_cdc
WHERE status != 'cancelled'
GROUP BY
    DATE_TRUNC('hour', order_time),
    region,
    product_category;

Step 3: Sink to Iceberg with Upsert

CREATE SINK hourly_orders_iceberg AS
SELECT * FROM hourly_order_summary
WITH (
    connector       = 'iceberg',
    type            = 'upsert',
    catalog.type    = 'rest',
    catalog.uri     = 'http://iceberg-catalog:8181',
    warehouse.path  = 's3://analytics-warehouse/iceberg',
    s3.region       = 'us-east-1',
    database.name   = 'ecommerce',
    table.name      = 'hourly_order_summary'
);

Each RisingWave checkpoint interval produces a new Iceberg snapshot. With the default 60-second checkpoint interval, you get approximately 1,440 queryable snapshots per day.

Performing Time Travel Queries

Once data is in Iceberg, you can use any Iceberg-compatible engine for time travel queries.

Query a specific timestamp (Spark SQL):

SELECT * FROM ecommerce.hourly_order_summary
TIMESTAMP AS OF '2026-03-15 14:00:00';

Query by snapshot ID (Trino):

SELECT * FROM iceberg.ecommerce.hourly_order_summary
FOR VERSION AS OF 8742635912345;

In RisingWave v2.8, use temporal join syntax to compare snapshots:

-- Compare current state vs. state from 24 hours ago
SELECT
    current.hour_bucket,
    current.region,
    current.total_revenue                            AS current_revenue,
    historical.total_revenue                         AS revenue_24h_ago,
    current.total_revenue - historical.total_revenue AS delta
FROM hourly_order_summary AS current
LEFT JOIN hourly_order_summary FOR SYSTEM_TIME AS OF
    (NOW() - INTERVAL '24 hours') AS historical
  ON current.hour_bucket = historical.hour_bucket
 AND current.region      = historical.region;

Snapshot Retention Strategy

Retention PolicySnapshots KeptStorage OverheadUse Case
1 day (default)~1,440MinimalDevelopment/testing
7 days~10,080LowProduction streaming pipelines
30 days~43,200MediumCompliance and audit requirements
90 days~129,600HighFinancial data, regulated industries
CustomConfigurableVariableMixed SLA requirements

Configure retention with Iceberg's expireSnapshots procedure to avoid unbounded metadata growth while preserving the history you actually need.

Debugging a Streaming Pipeline with Time Travel

Imagine a transformation bug was introduced at 14:32 UTC that corrupted revenue figures. Here is how you recover:

  1. Identify the last clean snapshot — scan Iceberg metadata to find the snapshot just before 14:32
  2. Roll back your serving layer — point your BI tool to that snapshot while you fix the pipeline
  3. Fix and redeploy — correct the materialized view logic in RisingWave, drop and recreate the sink
  4. Backfill — RisingWave re-processes from the CDC source; Iceberg appends corrected data as new snapshots

The historical snapshots remain intact throughout this process. Stakeholders can still run queries against the pre-bug data at any time.

Comparing Time-Travel Implementations

SystemTime Travel MechanismGranularityStreaming Support
Apache IcebergSnapshot-basedPer commitYes (via streaming sinks)
Delta LakeTransaction logPer transactionYes
Apache HudiTimelinePer instantYes
PostgreSQLMVCCPer transactionNo (WAL replication only)
BigQueryTime travelHourlyNo (batch load)

Iceberg's advantage is engine-agnostic time travel—any tool that supports the Iceberg spec can query historical snapshots.

FAQ

Q: How often does RisingWave create new Iceberg snapshots? A: By default, each RisingWave checkpoint (every 60 seconds) creates a new Iceberg snapshot. You can tune commit_checkpoint_interval to reduce snapshot frequency for lower-latency/higher-throughput tradeoffs.

Q: Can I use time travel to reprocess streaming data? A: Iceberg time travel queries are read-only. For reprocessing, re-ingest from your source system (Kafka or CDC) and write to new Iceberg snapshots. The old snapshots remain for comparison.

Q: Does Iceberg time travel work with upsert sinks? A: Yes. Each upsert creates a new snapshot that reflects the table state after the merge operation. You can query any snapshot to see the table as it appeared at that point, including which records had been upserted.

Q: How do I find the snapshot ID for a specific time? A: Query the Iceberg metadata table: SELECT snapshot_id, committed_at FROM iceberg.ecommerce.hourly_order_summary$snapshots WHERE committed_at <= TIMESTAMP '2026-03-15 14:00:00' ORDER BY committed_at DESC LIMIT 1.

Q: What happens to time travel when I evolve the schema? A: Iceberg schema evolution is additive. New columns appear as null in historical snapshots. Renamed or dropped columns are tracked in the schema history, so queries against old snapshots still work with the original column names.

Get Started

Explore time travel and streaming lakehouses with the RisingWave quickstart guide. Have questions about your specific use case? Join the conversation on Slack.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.