Real-Time Data Sync with Debezium and Streaming SQL

Real-Time Data Sync with Debezium and Streaming SQL

Real-time data sync with Debezium and streaming SQL eliminates batch ETL latency by capturing database changes as they happen and continuously applying them to target systems. Instead of scheduled jobs running every hour, data arrives at destinations within milliseconds of the source commit.

Why Batch Sync Falls Short

Traditional batch synchronization jobs have a fundamental flaw: they run on a schedule, not on a signal. This means:

  • Stale data: Dashboards show information that's minutes or hours behind reality
  • Thundering herd: Large batch jobs create periodic spikes on both source and target systems
  • No delete propagation: Soft-delete workarounds are fragile and error-prone
  • No before-state: You lose the ability to reason about what changed, not just the new value

Log-based CDC via Debezium solves all four problems. By reading the database's own transaction log, Debezium captures inserts, updates, and deletes with their before and after state, at the moment they commit.

Architecture: Debezium + RisingWave for Sync

The sync architecture uses three layers:

  1. Debezium reads the source database's change log and publishes events to Kafka
  2. RisingWave consumes those events, applies streaming SQL transformations, and maintains materialized state
  3. RisingWave sinks write the transformed results to target systems continuously

This allows you to sync a subset of columns, apply business logic transformations, and join data from multiple sources — all in streaming SQL, without writing application code.

Step-by-Step Tutorial

Step 1: Configure the Source Database

For this example, we'll sync a PostgreSQL e-commerce database. Enable logical replication:

-- Grant replication privileges
CREATE ROLE sync_user REPLICATION LOGIN PASSWORD 'sync_pass';
GRANT SELECT ON TABLE public.users, public.orders, public.products TO sync_user;

-- Create a publication covering all sync tables
CREATE PUBLICATION sync_pub FOR TABLE 
    public.users, 
    public.orders, 
    public.products;

Register the Debezium connector:

{
  "name": "sync-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "source-postgres",
    "database.port": "5432",
    "database.user": "sync_user",
    "database.password": "sync_pass",
    "database.dbname": "ecommerce",
    "database.server.name": "src",
    "slot.name": "sync_slot",
    "plugin.name": "pgoutput",
    "publication.name": "sync_pub",
    "table.include.list": "public.users,public.orders,public.products",
    "topic.prefix": "src"
  }
}

Step 2: Create Sources in RisingWave

RisingWave is a PostgreSQL-compatible streaming database. Connect it to each Debezium Kafka topic:

CREATE SOURCE users_cdc
WITH (
    connector = 'kafka',
    topic = 'src.public.users',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;

CREATE SOURCE orders_cdc
WITH (
    connector = 'kafka',
    topic = 'src.public.orders',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;

CREATE SOURCE products_cdc
WITH (
    connector = 'kafka',
    topic = 'src.public.products',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;

Step 3: Build Transformation Materialized Views

Apply business logic as you sync. For example, enrich orders with user and product details:

CREATE MATERIALIZED VIEW enriched_orders AS
SELECT
    o.id AS order_id,
    o.status,
    o.created_at,
    u.email AS customer_email,
    u.country AS customer_country,
    p.name AS product_name,
    p.category AS product_category,
    o.quantity * o.unit_price AS order_value
FROM orders_cdc o
JOIN users_cdc u ON o.customer_id = u.id
JOIN products_cdc p ON o.product_id = p.id;

This materialized view updates automatically whenever any of the three source tables change.

Step 4: Sync to Target Systems

Write the transformed data to target destinations. RisingWave supports sinks to PostgreSQL, Kafka, and more:

-- Sync to a target PostgreSQL database (e.g., analytics DB)
CREATE SINK enriched_orders_pg_sink
FROM enriched_orders
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://analytics-db:5432/warehouse',
    table.name = 'enriched_orders',
    type = 'upsert',
    primary_key = 'order_id'
);

-- Also publish to Kafka for other consumers
CREATE SINK enriched_orders_kafka_sink
FROM enriched_orders
WITH (
    connector = 'kafka',
    topic = 'enriched-orders',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT UPSERT ENCODE JSON
KEY ENCODE JSON (order_id);

Handling Deletes in Sync Pipelines

A key advantage of log-based CDC is that deletes are captured. In the Debezium event, op=d with a populated before field signals a deletion. RisingWave's FORMAT DEBEZIUM ENCODE JSON source handles deletes automatically — when a row is deleted in the source, it is removed from the materialized state.

For UPSERT sinks (like the JDBC sink), RisingWave emits a null value for the deleted key, which the target interprets as a delete operation. This means soft-delete workarounds (is_deleted flags) are no longer needed.

Comparison Table

Sync MethodLatencyCaptures DeletesSource ImpactComplexity
Batch ETLMinutes–hoursNo (soft deletes only)High (periodic bulk reads)Low
Trigger-based CDCLowYesMedium (trigger overhead)Medium
Log-based CDC (Debezium)MillisecondsYesMinimalMedium
Query-based CDC (timestamp)MinutesNoMediumLow

FAQ

How do I sync only specific columns? Use column.exclude.list in the Debezium connector config to drop columns before they reach Kafka. In RisingWave, you can further filter with your materialized view's SELECT list — only project the columns you need in the sink.

What if the target schema differs from the source? RisingWave's materialized view SQL is the transformation layer. You can rename columns, cast types, compute derived fields, and join data from multiple sources before writing to the target. The source schema and target schema are fully decoupled.

How do I sync the initial snapshot plus ongoing changes? Debezium's snapshot.mode = 'initial' (the default) performs a consistent snapshot of the current table state as op=r events, then seamlessly transitions to streaming mode. RisingWave ingests these events in order, so by the time streaming begins, the materialized view already reflects the full table state.

Key Takeaways

  • Log-based CDC with Debezium enables millisecond-latency sync, capturing inserts, updates, and deletes
  • RisingWave materializes Debezium events using FORMAT DEBEZIUM ENCODE JSON and keeps views continuously updated
  • Streaming SQL in materialized views replaces custom ETL code with declarative transformations
  • Deletes propagate automatically through RisingWave sinks — no soft-delete flags needed
  • The pipeline handles initial snapshots and ongoing changes in a single, unified flow

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.