How to Migrate from Debezium to RisingWave CDC: Step-by-Step Guide
Migrating from Debezium → Kafka → analytics consumer to RisingWave native CDC eliminates Kafka, Kafka Connect, and a separate stream processor. This guide walks through every step: auditing your pipeline, validating the migration, cutting over, and decommissioning old components. Do not start this migration if you have multiple Kafka consumers — Debezium's fan-out is irreplaceable in that case.
Before You Start: The Migration Decision
This migration makes sense when CDC data has exactly one consumer: your analytics layer. If other systems read from the same Kafka topics, stop here. RisingWave does not publish to external Kafka topics. The fan-out use case requires keeping Debezium.
Run this audit before proceeding:
# List all consumer groups for your CDC topics
kafka-consumer-groups.sh \
--bootstrap-server kafka.internal:9092 \
--describe \
--group "*" \
| grep "ecommerce\."
If more than one active consumer group appears, that means multiple systems are reading the same CDC topic. Evaluate each one before decommissioning.
Step 1: Audit What You're Doing with CDC Events
Document every transformation your stream processor applies to CDC events. Be exhaustive.
For each transformation, note:
- Source table(s)
- Transformation type: filter, aggregation, join, enrichment, window function
- Output: where does the result go?
Example audit output:
| Transformation | Source Tables | Type | Output |
| Revenue by day | orders | Aggregation | Dashboard |
| Active orders | orders | Filter | Dashboard |
| Customer lifetime value | orders + customers | Join + aggregation | Dashboard |
| Order line totals | orders + order_items | Join | API endpoint |
| Fraud score | orders | UDF-based | Alert system |
Map each transformation to a RisingWave materialized view before writing a line of SQL. The fraud score UDF might require additional work if it calls an external service — flag those early.
Step 2: Check for Fan-Out Requirements
With your audit complete, determine whether any transformation output goes to a system other than your analytics layer.
Safe to migrate: aggregations, joins, and filters that feed dashboards, BI tools, or internal SQL APIs.
Requires additional planning: transformations that produce events consumed by notification systems, Elasticsearch indexers, or other Kafka topics. These may need to be handled via RisingWave sinks instead:
-- Example: write RisingWave results to a Kafka topic for downstream consumers
CREATE SINK fraud_alerts_sink
FROM high_risk_orders
WITH (
connector = 'kafka',
topic = 'fraud-alerts',
properties.bootstrap.server = 'kafka.internal:9092'
)
FORMAT PLAIN ENCODE JSON;
Step 3: Set Up RisingWave CDC Source
Install RisingWave (Docker, Kubernetes, or RisingWave Cloud) alongside the existing stack. Do not decommission anything yet.
Prepare the PostgreSQL source:
-- On PostgreSQL: verify logical replication is enabled
SHOW wal_level; -- must return 'logical'
-- Create a dedicated replication user
CREATE USER risingwave_replicator WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO risingwave_replicator;
-- Verify the user can create replication slots
SELECT pg_create_logical_replication_slot('test_slot', 'pgoutput');
SELECT pg_drop_replication_slot('test_slot');
Create the CDC source in RisingWave:
CREATE SOURCE pg_ecommerce WITH (
connector = 'postgres-cdc',
hostname = 'postgres.internal',
port = '5432',
username = 'risingwave_replicator',
password = 'secure_password',
database.name = 'ecommerce',
slot.name = 'risingwave_main_slot',
publication.name = 'risingwave_publication'
);
Declare tables from the CDC source:
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
customer_id BIGINT,
status VARCHAR,
total NUMERIC,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
) FROM pg_ecommerce TABLE 'public.orders';
CREATE TABLE customers (
id BIGINT PRIMARY KEY,
name VARCHAR,
email VARCHAR,
tier VARCHAR,
created_at TIMESTAMPTZ
) FROM pg_ecommerce TABLE 'public.customers';
CREATE TABLE order_items (
id BIGINT PRIMARY KEY,
order_id BIGINT,
product_id BIGINT,
quantity INT,
unit_price NUMERIC
) FROM pg_ecommerce TABLE 'public.order_items';
RisingWave will begin the initial snapshot immediately. Monitor progress:
SELECT table_name, snapshot_state, snapshot_row_count
FROM rw_source_backfill_info;
Step 4: Recreate Transformations as Materialized Views
Translate each item in your audit table to a RisingWave materialized view.
Revenue by day:
CREATE MATERIALIZED VIEW revenue_by_day AS
SELECT
DATE_TRUNC('day', created_at) AS day,
COUNT(*) AS order_count,
SUM(total) AS revenue
FROM orders
WHERE status = 'completed'
GROUP BY DATE_TRUNC('day', created_at);
Active orders (filter only):
CREATE MATERIALIZED VIEW active_orders AS
SELECT id, customer_id, status, total, created_at
FROM orders
WHERE status IN ('pending', 'processing', 'shipped');
Customer lifetime value (multi-table join):
CREATE MATERIALIZED VIEW customer_lifetime_value AS
SELECT
c.id AS customer_id,
c.name,
c.email,
c.tier,
COUNT(o.id) AS order_count,
COALESCE(SUM(o.total), 0) AS lifetime_value,
MAX(o.created_at) AS last_order_at
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id AND o.status = 'completed'
GROUP BY c.id, c.name, c.email, c.tier;
Order line totals (join with calculation):
CREATE MATERIALIZED VIEW order_totals AS
SELECT
o.id AS order_id,
o.customer_id,
o.status,
SUM(oi.quantity * oi.unit_price) AS calculated_total,
COUNT(oi.id) AS line_item_count
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
GROUP BY o.id, o.customer_id, o.status;
Step 5: Validate in Parallel
Run both systems simultaneously for at least 24–48 hours. Compare outputs regularly.
Validation query pattern:
-- In RisingWave: get today's revenue
SELECT revenue FROM revenue_by_day
WHERE day = DATE_TRUNC('day', NOW());
-- In your existing system: same query
-- Compare the results. Acceptable delta depends on latency difference.
Automate this with a validation script that queries both systems every 5 minutes and alerts on divergence greater than a defined threshold.
Common causes of validation divergence:
- Clock skew between source database and stream processor
- Late-arriving events that RisingWave processes faster
- Different handling of NULL values in aggregations
- Timezone handling differences between Flink/Spark and RisingWave
Step 6: Cutover and Decommission
Once validation passes for 48 hours with no unexplained divergence:
Update BI tool and dashboard connection strings to RisingWave (host:
risingwave.internal, port:4566, database:dev, user: your RisingWave user).Update application connection strings for any services querying the analytics layer.
Run both systems in parallel for one more business day after the cutover to catch any edge cases.
Decommission components in order:
- Stop the Flink or Spark streaming job
- Delete the Debezium connector config
- Stop Kafka Connect workers
- Decommission Kafka brokers (if no other consumers)
- Remove the Debezium replication slot from PostgreSQL:
SELECT pg_drop_replication_slot('debezium_slot');
Gotchas and Common Issues
Replication slot accumulation. PostgreSQL accumulates WAL until all replication slots have consumed it. During migration, you have two slots active (Debezium's and RisingWave's). Monitor disk usage:
SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;
Initial snapshot duration. For large tables (hundreds of millions of rows), the initial snapshot can take hours. RisingWave buffers change events during the snapshot and applies them after — no data is lost — but queries against the table will return partial results until the snapshot completes.
Schema changes during migration. Avoid DDL changes (ALTER TABLE) on source tables during the validation window. Both systems need to agree on schema before you can reliably compare outputs.
Monitoring after cutover:
-- Check CDC connector health
SELECT connector_name, status, last_heartbeat_at
FROM rw_sources;
-- Check materialized view refresh lag
SELECT name, definition_fragment
FROM rw_materialized_views;
FAQ
Q: How long does the migration take? The technical steps — setting up RisingWave, creating tables, and writing materialized views — typically take one to two days for a well-documented pipeline. The validation window (running both systems in parallel) adds another two to five days. Budget two weeks end-to-end including buffer for unexpected issues.
Q: Do I need to pause writes to the source database during cutover? No. RisingWave maintains a replication slot that tracks exactly where it is in the WAL. You can cut over BI tools and application queries to RisingWave while writes continue to the source database without any pause.
Q: What if my Flink jobs use stateful operations like session windows? RisingWave supports tumbling and hopping windows natively. Session windows require a different approach — typically a time-bounded self-join or a state table pattern. Review your Flink job's window types before assuming a direct translation.
Q: Can I roll back if something goes wrong after cutover? Yes. Keep the Debezium connector and Kafka topics running until you are confident the migration is complete. If RisingWave has issues post-cutover, point connection strings back to the old analytics layer. The Debezium replication slot will have accumulated WAL while RisingWave was in use, which may slow things down temporarily, but no data is lost.
Q: Does RisingWave support all PostgreSQL data types from Debezium? RisingWave supports all standard PostgreSQL types including arrays, JSONB, UUID, and geometric types. Custom domains and composite types require mapping to supported types. Check the RisingWave CDC documentation for the current type compatibility matrix before migrating tables with unusual column types.

