A Debezium + Kafka + RisingWave pipeline combines three purpose-built components: Debezium captures database changes from transaction logs, Kafka buffers and distributes them durably, and RisingWave — a PostgreSQL-compatible streaming database — processes them with incremental SQL to produce live analytics and data products.
Architecture Overview
The pipeline follows a straightforward flow:
Source DB → Debezium (Kafka Connect) → Kafka Topic → RisingWave Source → Materialized View → Sink
Each component plays a specific role:
- Debezium: Reads the database's native change log (WAL for PostgreSQL, binlog for MySQL) and emits structured JSON events to Kafka
- Kafka: Stores events durably with configurable retention, decoupling producers from consumers and enabling fan-out
- RisingWave: Consumes Kafka topics, applies Debezium CDC semantics, and maintains incrementally-updated materialized views
- Sink: Delivers processed results to downstream systems — dashboards, APIs, data warehouses, or other Kafka topics
Why This Stack?
Each component is best-of-breed for its role:
| Component | Role | Why |
| Debezium | CDC extraction | Proven, battle-tested, supports all major databases |
| Kafka | Transport & buffer | Durable, replayable, enables fan-out |
| RisingWave | Stream processing | PostgreSQL-compatible SQL, incremental materialized views, no Java or Flink required |
Step-by-Step Tutorial
Step 1: Set Up the Source Database
This example uses PostgreSQL. Enable logical replication:
-- postgresql.conf
-- wal_level = logical
-- max_replication_slots = 4
-- Create the application schema
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
customer_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
quantity INTEGER NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Replication user
CREATE ROLE debezium REPLICATION LOGIN PASSWORD 'dbz_secret';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
CREATE PUBLICATION dbz_pub FOR TABLE public.orders;
Step 2: Deploy Debezium Connector
Configure and register the connector with Kafka Connect:
{
"name": "orders-pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz_secret",
"database.dbname": "production",
"database.server.name": "pg1",
"slot.name": "rw_pipeline_slot",
"plugin.name": "pgoutput",
"publication.name": "dbz_pub",
"table.include.list": "public.orders",
"topic.prefix": "pg1",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "6",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial"
}
}
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @orders-pg-connector.json
# Verify connector status
curl http://kafka-connect:8083/connectors/orders-pg-connector/status
Step 3: Connect RisingWave to the Kafka Topic
RisingWave is a PostgreSQL-compatible streaming database. Create a source that reads the Debezium-formatted events:
CREATE SOURCE orders_cdc
WITH (
connector = 'kafka',
topic = 'pg1.public.orders',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;
Verify the source is receiving data:
SELECT * FROM orders_cdc LIMIT 5;
Step 4: Build Materialized Views
Create incrementally-maintained aggregations. RisingWave updates these views in real time as new CDC events arrive:
-- Revenue by product, updated in real time
CREATE MATERIALIZED VIEW product_revenue_live AS
SELECT
product_id,
COUNT(*) FILTER (WHERE status = 'completed') AS completed_orders,
SUM(quantity * unit_price) FILTER (WHERE status = 'completed') AS total_revenue,
COUNT(*) FILTER (WHERE status = 'pending') AS pending_orders,
MAX(created_at) AS last_order_at
FROM orders_cdc
GROUP BY product_id;
-- Hourly order volume with 1-hour tumbling window
CREATE MATERIALIZED VIEW hourly_order_volume AS
SELECT
WINDOW_START AS hour,
COUNT(*) AS order_count,
SUM(quantity * unit_price) AS gross_revenue
FROM TUMBLE(orders_cdc, created_at, INTERVAL '1 HOUR')
GROUP BY WINDOW_START;
Step 5: Sink Results to Downstream Systems
Deliver the processed results to a dashboard-facing Kafka topic:
-- Sink product revenue to Kafka for dashboard consumption
CREATE SINK product_revenue_sink
FROM product_revenue_live
WITH (
connector = 'kafka',
topic = 'product-revenue-realtime',
properties.bootstrap.server = 'kafka:9092'
) FORMAT UPSERT ENCODE JSON
KEY ENCODE JSON (product_id);
-- Sink to PostgreSQL for BI tools
CREATE SINK hourly_volume_pg_sink
FROM hourly_order_volume
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://analytics-db:5432/reports',
table.name = 'hourly_order_volume',
type = 'upsert',
primary_key = 'hour'
);
Monitoring the Pipeline
Check Debezium connector lag from the Kafka Connect API:
# Connector status and task offsets
curl http://kafka-connect:8083/connectors/orders-pg-connector/offsets
# Consumer group lag for RisingWave
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group risingwave-consumer-group \
--describe
Monitor replication slot lag in PostgreSQL to ensure Debezium is keeping up:
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_name = 'rw_pipeline_slot';
FAQ
How do I handle a schema change (e.g., adding a column)? Debezium stores schema history in a dedicated Kafka topic. When a DDL change is detected, it updates its internal schema registry. RisingWave will reflect the new columns once the Kafka topic's schema is updated. For complex schema migrations, plan a maintenance window and coordinate the DDL with your Debezium and RisingWave configurations.
What topic naming convention does Debezium use?
By default: {topic.prefix}.{database}.{schema}.{table} for PostgreSQL and {topic.prefix}.{database}.{table} for MySQL. The prefix is set via topic.prefix (or database.server.name in older Debezium versions).
How many Kafka partitions should I use for the CDC topic? Start with partitions equal to the number of RisingWave source parallelism you want. More partitions allow higher throughput but increase Kafka overhead. For most workloads, 6–12 partitions is a reasonable starting point.
Key Takeaways
- The Debezium + Kafka + RisingWave pipeline provides end-to-end CDC from any supported database to streaming SQL
- Debezium handles log-based extraction; Kafka provides durable buffering and fan-out; RisingWave provides the SQL processing layer
- RisingWave connects to Kafka CDC topics using
FORMAT DEBEZIUM ENCODE JSON - Materialized views update incrementally — no batch jobs, no polling
- Monitor replication slot lag and Kafka consumer group lag to ensure the pipeline is keeping up

