Handling Debezium Schema Evolution in Real-Time Pipelines

Handling Debezium Schema Evolution in Real-Time Pipelines

Schema evolution — adding, removing, or modifying database columns — is one of the most disruptive events in a CDC pipeline. Debezium provides built-in mechanisms to handle schema changes gracefully, and RisingWave's streaming SQL gives you tools to adapt materialized views without pipeline downtime.

Why Schema Changes Break Pipelines

In a CDC pipeline, the database schema is the contract between producer (Debezium) and consumer (RisingWave or other downstream systems). When a column is added, a consumer that expects a fixed schema may fail to parse events. When a column is dropped, downstream SQL referencing that column may error. When a column is renamed, consumers see it as a drop + add, which can cause data loss.

The challenge is operational: schema changes in production databases are routine. Migrations happen. Fields get renamed. Tables get new columns. A robust CDC pipeline must handle these changes without manual intervention or data gaps.

How Debezium Tracks Schema Changes

Debezium maintains an internal schema history — a log of every DDL statement it has observed, stored in a dedicated Kafka topic. For each change event, Debezium records the schema version that was active when that event was captured. Consumers can reconstruct the exact schema for any event by replaying the schema history.

For PostgreSQL, Debezium detects schema changes by monitoring the pg_catalog system tables and tracking DDL via publication updates. For MySQL, DDL statements in the binlog are captured directly.

Schema History Configuration (MySQL)

{
  "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
  "schema.history.internal.kafka.topic": "schema-changes.dbserver1",
  "include.schema.changes": "true"
}

Schema Registry Integration

For Avro-encoded events, Debezium integrates with Confluent Schema Registry. The registry enforces compatibility rules (BACKWARD, FORWARD, FULL) when schemas evolve:

{
  "key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
  "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

With BACKWARD compatibility, new consumers can read events written by old schemas (e.g., newly added optional columns default to null for old events).

Step-by-Step Tutorial

Step 1: Set Up Debezium with Schema History

Configure the connector to track schema changes:

{
  "name": "schema-aware-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres.internal",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "production",
    "database.server.name": "pg1",
    "slot.name": "schema_evolution_slot",
    "plugin.name": "pgoutput",
    "table.include.list": "public.orders",
    "topic.prefix": "pg1",
    "heartbeat.interval.ms": "10000",
    "schema.name.adjustment.mode": "avro"
  }
}

Step 2: Connect RisingWave as the Downstream Consumer

RisingWave is a PostgreSQL-compatible streaming database. Create the initial source:

CREATE SOURCE orders_cdc
WITH (
    connector = 'kafka',
    topic = 'pg1.public.orders',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;

Create a materialized view with the current schema:

CREATE MATERIALIZED VIEW order_summary AS
SELECT
    id,
    customer_id,
    amount,
    status,
    created_at
FROM orders_cdc;

Step 3: Perform a Schema Migration on the Source

Add a new column to the source database:

-- On the source PostgreSQL database
ALTER TABLE orders ADD COLUMN shipping_address TEXT;
ALTER TABLE orders ADD COLUMN priority_level INTEGER DEFAULT 0;

Debezium detects this DDL change, logs it to the schema history topic, and immediately starts including shipping_address and priority_level in subsequent CDC events. Older events already in Kafka do not have these fields — consumers must handle null values for events predating the migration.

Step 4: Update RisingWave Materialized Views

In RisingWave, update your materialized view to include the new columns:

-- Drop and recreate the materialized view with the new schema
-- (RisingWave will rebuild from the Kafka source)
DROP MATERIALIZED VIEW order_summary;

CREATE MATERIALIZED VIEW order_summary AS
SELECT
    id,
    customer_id,
    amount,
    status,
    created_at,
    shipping_address,
    priority_level
FROM orders_cdc;

For additive changes (new nullable columns), you can also use a view that handles both old (null) and new events:

CREATE MATERIALIZED VIEW order_summary_v2 AS
SELECT
    id,
    customer_id,
    amount,
    status,
    created_at,
    COALESCE(shipping_address, '') AS shipping_address,
    COALESCE(priority_level, 0) AS priority_level
FROM orders_cdc;

Step 5: Sink the Updated Results

CREATE SINK order_summary_sink
FROM order_summary_v2
WITH (
    connector = 'kafka',
    topic = 'order-summary-output',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT UPSERT ENCODE JSON
KEY ENCODE JSON (id);

Schema Change Impact by Operation Type

Change TypeDebezium BehaviorDownstream ImpactRecommended Handling
ADD COLUMN (nullable)New field appears in afterLow — old events lack the fieldUse COALESCE in materialized view
ADD COLUMN (NOT NULL with default)New field in after, null in pre-change eventsLowUse COALESCE with the default value
DROP COLUMNField disappears from afterMedium — downstream queries may breakUpdate materialized view before migration
RENAME COLUMNTreated as drop + addHigh — consumers see two changesCoordinate rename with consumer update
CHANGE DATA TYPESchema history logs the changeHigh — type mismatch in consumersUse explicit CAST in materialized view

FAQ

What is the schema.history.internal.kafka.topic used for? This topic stores a log of all DDL changes Debezium has observed. It is not a normal Kafka topic for consumer applications — it is an internal Debezium topic used to reconstruct the schema at any point in time. Do not delete this topic or change its retention settings.

How does Schema Registry help with schema evolution? When using Avro encoding, Schema Registry stores the schema for every version of a Kafka topic. Consumers automatically look up the schema ID embedded in each message and deserialize accordingly. Compatibility rules (BACKWARD, FORWARD, FULL) prevent incompatible schema changes from being registered.

Can I rename a column without downtime? The safest approach is an expand-contract migration: (1) add the new column, (2) dual-write to both old and new columns, (3) update all consumers to use the new column, (4) drop the old column. This requires application-level coordination but ensures no events are lost.

Key Takeaways

  • Debezium stores schema history in a dedicated Kafka topic, allowing consumers to reconstruct the schema for any event
  • Additive changes (new nullable columns) are low risk — old events simply lack the new field
  • Destructive changes (drops, renames, type changes) require coordinating the migration with downstream consumer updates
  • RisingWave's materialized views can be dropped and recreated to pick up new schema fields
  • Using COALESCE in RisingWave views provides resilience for events predating an additive schema change

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.