Apache Iceberg Schema Evolution with RisingWave Streaming Pipelines

Apache Iceberg Schema Evolution with RisingWave Streaming Pipelines

Apache Iceberg supports schema evolution — adding, renaming, reordering, and dropping columns — without rewriting data files. RisingWave streaming pipelines can safely co-exist with these changes because Iceberg tracks field IDs independently of column names, and RisingWave's materialized views adapt automatically when the upstream schema changes.

What Is Schema Evolution and Why Does It Matter?

In any production data pipeline, schemas change. Business requirements shift, new fields get added from upstream services, and legacy columns are retired. Traditional data formats — CSV, plain Parquet, even Hive tables — handle this poorly. Adding a column to a Hive table can silently corrupt older partitions; renaming a column breaks every downstream query.

Apache Iceberg solves this with field IDs. Each column carries a unique integer ID that persists across renames and reorders. When Iceberg reads a file written before a new column was added, it simply returns NULL for that column. When a column is renamed, existing files are unaffected — the ID mapping does the translation. This makes schema evolution safe by design.

When RisingWave writes to an Iceberg sink, it emits column data matched by field ID, not by name. This means you can rename a column in your Iceberg table without rewriting any Parquet files and without restarting your RisingWave pipeline.

Supported Schema Evolution Operations

OperationIceberg SupportImpact on Existing Files
Add columnFullReturns NULL for old files
Rename columnFullNo file changes needed
Reorder columnsFullNo file changes needed
Widen type (INT → LONG)FullCompatible type promotion
Drop columnFullOld files retain the data
Narrow type (LONG → INT)Not supportedWould lose precision
Change nullabilityPartialNULL → NOT NULL requires backfill

Setting Up a Streaming Pipeline

Let's build a pipeline that demonstrates schema evolution in practice. We start with a user events topic in Kafka:

CREATE SOURCE user_events (
    event_id    BIGINT,
    user_id     BIGINT,
    event_type  VARCHAR,
    page_url    VARCHAR,
    occurred_at TIMESTAMPTZ
)
WITH (
    connector        = 'kafka',
    topic            = 'user_events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE JSON;

CREATE MATERIALIZED VIEW user_event_summary AS
SELECT
    window_start,
    window_end,
    event_type,
    COUNT(DISTINCT user_id) AS unique_users,
    COUNT(*)                AS event_count
FROM TUMBLE(user_events, occurred_at, INTERVAL '5 MINUTES')
GROUP BY window_start, window_end, event_type;

CREATE SINK user_events_sink AS
SELECT * FROM user_event_summary
WITH (
    connector      = 'iceberg',
    type           = 'upsert',
    catalog.type   = 'rest',
    catalog.uri    = 'http://iceberg-catalog:8181',
    warehouse.path = 's3://analytics-bucket/warehouse',
    s3.region      = 'us-east-1',
    database.name  = 'web_analytics',
    table.name     = 'user_event_summary'
);

The sink is live. Now imagine the business asks for a new metric: average session depth (number of pages viewed per user per window).

Evolving the Schema

Step 1: Add the new aggregation to the materialized view. In RisingWave, you drop and recreate the materialized view (and its dependent sink):

-- Drop the sink first (it depends on the MV)
DROP SINK user_events_sink;
DROP MATERIALIZED VIEW user_event_summary;

-- Recreate with new column
CREATE MATERIALIZED VIEW user_event_summary AS
SELECT
    window_start,
    window_end,
    event_type,
    COUNT(DISTINCT user_id)   AS unique_users,
    COUNT(*)                  AS event_count,
    COUNT(*) / NULLIF(COUNT(DISTINCT user_id), 0) AS avg_events_per_user
FROM TUMBLE(user_events, occurred_at, INTERVAL '5 MINUTES')
GROUP BY window_start, window_end, event_type;

Step 2: Recreate the sink. RisingWave's iceberg connector detects that the target table schema has fewer columns and performs an ALTER TABLE ADD COLUMN on the Iceberg table automatically before resuming writes:

CREATE SINK user_events_sink AS
SELECT * FROM user_event_summary
WITH (
    connector      = 'iceberg',
    type           = 'upsert',
    catalog.type   = 'rest',
    catalog.uri    = 'http://iceberg-catalog:8181',
    warehouse.path = 's3://analytics-bucket/warehouse',
    s3.region      = 'us-east-1',
    database.name  = 'web_analytics',
    table.name     = 'user_event_summary'
);

Existing Parquet files on S3 are not touched. Historical rows simply return NULL for avg_events_per_user when queried.

Schema Evolution with CDC Sources

PostgreSQL CDC pipelines require special consideration. When you add a column to the upstream PostgreSQL table, the CDC stream starts emitting the new field automatically. RisingWave's postgres-cdc connector picks up the new column after a brief lag:

-- Source reflects the current upstream schema
CREATE SOURCE orders_cdc (
    order_id    BIGINT PRIMARY KEY,
    customer_id BIGINT,
    amount      NUMERIC(12,2),
    currency    VARCHAR,
    -- New column added to upstream Postgres:
    discount_pct NUMERIC(5,2)
)
WITH (
    connector   = 'postgres-cdc',
    hostname    = 'pg.internal',
    port        = '5432',
    username    = 'replication_user',
    password    = 'secret',
    database.name = 'ecommerce',
    table.name  = 'orders'
)
FORMAT DEBEZIUM ENCODE JSON;

When the Postgres table gains discount_pct, you update the CREATE SOURCE statement, which propagates through to the downstream Iceberg sink automatically — again, without rewriting existing data.

Best Practices for Schema Evolution

Use explicit column lists in sinks: Wildcard SELECT * in your sink query is convenient but can cause unexpected column additions. List columns explicitly for production pipelines to maintain control over what lands in Iceberg.

Version your schemas: Keep your CREATE MATERIALIZED VIEW and CREATE SINK statements in version control. Treat schema changes as migrations with rollback plans.

Test in staging first: Because Iceberg schema evolution is metadata-only, it's fast. Still, test in a staging environment to catch type promotion issues (e.g., DECIMAL precision changes) before they hit production.

Monitor field ID drift: Use the Iceberg catalog API or REST endpoints to audit field IDs periodically. This prevents accidental column reuse after drops.

FAQ

Q: Can I rename a column in the Iceberg table without changing my RisingWave sink? A: Yes. Iceberg uses field IDs internally, so renaming a column in the table metadata does not break existing writers. RisingWave matches on field ID, not name. However, you must ensure your sink's SELECT list still refers to the correct column in the materialized view.

Q: What happens if I add a NOT NULL column to an existing Iceberg table? A: Iceberg allows adding a NOT NULL column with a default value. Existing data files will return the default value for that column. RisingWave will write the actual value for new records going forward.

Q: Does RisingWave support Iceberg's type widening (e.g., INT to BIGINT)? A: RisingWave respects the Iceberg table's existing schema. If you widen a column in the Iceberg table metadata, RisingWave will cast values appropriately when writing, as long as the cast is safe.

Q: Can I drop a column from the Iceberg table that RisingWave is still writing? A: You should update your sink first to stop writing the column before dropping it from Iceberg. Writing a column that no longer exists in the table schema will cause write errors.

Q: How do I handle breaking schema changes (type narrowing, column reuse)? A: These require a full migration: create a new Iceberg table, repoint the sink, backfill historical data with a one-time query, and then decommission the old table.

Get Started

Ready to build schema-resilient streaming pipelines with RisingWave and Iceberg?

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.