Replacing Flink with SQL for CDC stream processing means swapping a Flink Java job, a Debezium connector cluster, and a Kafka topic for a single CREATE SOURCE statement in RisingWave. RisingWave is a PostgreSQL-compatible streaming database with built-in CDC connectors for PostgreSQL and MySQL. You connect with psql, write SQL, and your materialized views stay current within milliseconds of every upstream INSERT, UPDATE, or DELETE.
All SQL in this article has been verified against RisingWave 2.8.0.
Why Teams Replace Flink CDC Pipelines
Flink is a powerful stream processing framework. For pure event-streaming use cases without an upstream operational database, it is often the right tool. But for CDC workloads, Flink introduces a stack that is disproportionately complex for what the job requires.
A typical Flink CDC pipeline looks like this:
- A Debezium source connector runs inside Kafka Connect and reads PostgreSQL WAL or MySQL binlog
- Change events are serialized as JSON (or Avro with a Schema Registry) into Kafka topics
- A Flink job consumes those topics, deserializes the envelope format, handles before/after image merging, and writes results to a sink
Each layer adds operational surface area. Kafka Connect clusters need JVM tuning, connector restarts, and offset management. Kafka topics need partition sizing and retention policies. The Flink job needs checkpoint configuration, RocksDB state backend tuning, and parallelism planning. Schema changes in the upstream database can silently break the Debezium connector or corrupt Flink state.
RisingWave collapses this entire stack into SQL. The before/after shows the magnitude of that reduction.
Before: Flink + Debezium CDC Pipeline
Here is what a Flink CDC pipeline for a PostgreSQL orders database looks like in practice.
Step 1: Configure Debezium PostgreSQL Connector
You register a Debezium connector with Kafka Connect via a JSON payload:
{
"name": "orders-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "replication_user",
"database.password": "securepassword",
"database.dbname": "ecommerce",
"database.server.name": "ecommerce_pg",
"table.include.list": "public.customers,public.orders",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_pub",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"snapshot.mode": "initial"
}
}
This must be deployed via a POST request to the Kafka Connect REST API and monitored separately from your Flink jobs.
Step 2: Write the Flink Job in Java
With events arriving in Kafka, you write a Flink job to consume and process them. Below is the Java code to read CDC events from the ecommerce_pg.public.orders topic and compute a revenue-by-region aggregation:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OrdersCDCJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000); // 60-second checkpoints
env.getCheckpointConfig()
.setCheckpointStorage("s3://your-bucket/checkpoints/orders-cdc");
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env);
// Define Kafka source table for orders CDC events
tEnv.executeSql("""
CREATE TABLE orders_cdc (
order_id BIGINT,
customer_id BIGINT,
product STRING,
amount DECIMAL(10, 2),
status STRING,
region STRING,
order_date TIMESTAMP(3),
__deleted BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'ecommerce_pg.public.orders',
'properties.bootstrap.servers' = 'kafka-broker:9092',
'properties.group.id' = 'flink-cdc-consumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
""");
// Compute revenue by region, filtering out deletes
tEnv.executeSql("""
CREATE TABLE revenue_by_region_sink (
region STRING,
total_orders BIGINT,
total_revenue DECIMAL(20, 2),
PRIMARY KEY (region) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://analytics-db:5432/results',
'table-name'= 'revenue_by_region',
'username' = 'writer',
'password' = 'password'
)
""");
tEnv.executeSql("""
INSERT INTO revenue_by_region_sink
SELECT
region,
COUNT(*) AS total_orders,
SUM(amount) AS total_revenue
FROM orders_cdc
WHERE __deleted IS NOT TRUE
AND status = 'completed'
GROUP BY region
""");
env.execute("Orders CDC Revenue Aggregation");
}
}
This is a relatively simple aggregation. The Java file is 80 lines before adding error handling, logging, or any real business logic.
Step 3: Package and Deploy
After writing the job, you compile it with Maven, package it as a fat JAR, and submit it to a running Flink cluster:
mvn clean package -DskipTests
flink run -m jobmanager:8081 \
-c OrdersCDCJob \
target/orders-cdc-1.0-SNAPSHOT.jar
The full operational stack for this pipeline includes:
| Component | What it does | Who manages it |
| PostgreSQL | Source database | Your team |
| Debezium PostgreSQL Connector | WAL reader, publishes to Kafka | Your team |
| Kafka + ZooKeeper (or KRaft) | Event bus | Your team |
| Kafka Connect cluster | Connector runtime | Your team |
| Flink JobManager | Job coordination | Your team |
| Flink TaskManagers (N nodes) | Parallel execution | Your team |
| RocksDB state backend | Stateful aggregations | Your team |
| S3 checkpoints | Fault tolerance | Your team |
| JDBC sink | Result storage | Your team |
Nine components. All managed by your team. A schema change in PostgreSQL can break the Debezium connector, corrupt messages in Kafka, fail deserialization in Flink, and require a full restart from a savepoint. Debugging the failure means correlating logs across all nine layers.
After: RisingWave Native CDC Sources
Here is the same pipeline in RisingWave. You need a PostgreSQL connection and a psql client.
Step 1: Create the CDC Source
-- Run this in RisingWave (connect with psql -h localhost -p 4566 -U root -d dev)
CREATE SOURCE pg_ecommerce WITH (
connector = 'postgres-cdc',
hostname = 'postgres-host',
port = '5432',
username = 'replication_user',
password = 'securepassword',
database.name = 'ecommerce',
schema.name = 'public',
slot.name = 'risingwave_slot'
);
RisingWave automatically creates a PostgreSQL publication named rw_publication, opens a replication slot, and begins reading WAL changes. No Kafka. No Debezium. No connector JSON.
For MySQL, the syntax is nearly identical:
CREATE SOURCE mysql_ecommerce WITH (
connector = 'mysql-cdc',
hostname = 'mysql-host',
port = '3306',
username = 'replication_user',
password = 'securepassword',
database.name = 'ecommerce'
);
RisingWave reads MySQL's binary log (binlog) directly. The same FULL row-image requirement applies: set binlog_row_image = FULL in your MySQL configuration.
Step 2: Map Tables from the Source
For each upstream table you want to capture, you create a corresponding table in RisingWave using the FROM SOURCE clause:
CREATE TABLE fcdc_customers (
customer_id INT PRIMARY KEY,
name VARCHAR,
email VARCHAR,
region VARCHAR,
plan VARCHAR,
created_at TIMESTAMPTZ
)
FROM pg_ecommerce TABLE 'public.customers';
CREATE TABLE fcdc_orders (
order_id INT PRIMARY KEY,
customer_id INT,
product VARCHAR,
amount DECIMAL,
status VARCHAR,
order_date TIMESTAMPTZ
)
FROM pg_ecommerce TABLE 'public.orders';
RisingWave performs an initial snapshot (backfill) of the existing rows, then switches to streaming incremental changes. Each INSERT, UPDATE, and DELETE in PostgreSQL is reflected in these tables within milliseconds.
Step 3: Build Materialized Views on CDC Data
Now the fun part. You write plain SQL to define your analytics. RisingWave maintains the results incrementally and forever:
-- Revenue by region across completed orders
CREATE MATERIALIZED VIEW fcdc_order_revenue_by_region AS
SELECT
c.region,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(o.amount) AS total_revenue,
ROUND(AVG(o.amount), 2) AS avg_order_value,
COUNT(DISTINCT c.customer_id) AS unique_customers
FROM fcdc_orders o
JOIN fcdc_customers c ON o.customer_id = c.customer_id
WHERE o.status = 'completed'
GROUP BY c.region;
Query the view like any database table:
SELECT region, total_orders, total_revenue, avg_order_value, unique_customers
FROM fcdc_order_revenue_by_region
ORDER BY total_revenue DESC;
region | total_orders | total_revenue | avg_order_value | unique_customers
---------+--------------+---------------+-----------------+------------------
us-east | 3 | 51500.00 | 17166.67 | 2
ap-east | 1 | 12000.00 | 12000.00 | 1
eu-west | 2 | 6500.00 | 3250.00 | 1
us-west | 1 | 500.00 | 500.00 | 1
(4 rows)
You can also join across tables for a customer lifetime value summary:
CREATE MATERIALIZED VIEW fcdc_customer_order_summary AS
SELECT
c.customer_id,
c.name AS customer_name,
c.region,
c.plan,
COUNT(o.order_id) AS total_orders,
SUM(o.amount) AS lifetime_value,
MAX(o.order_date) AS last_order_date
FROM fcdc_customers c
LEFT JOIN fcdc_orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.region, c.plan;
Query result:
SELECT customer_name, region, plan, total_orders, lifetime_value
FROM fcdc_customer_order_summary
ORDER BY lifetime_value DESC;
customer_name | region | plan | total_orders | lifetime_value
-----------------+---------+------------+--------------+----------------
Acme Corp | us-east | enterprise | 3 | 82000.00
Umbrella Inc | ap-east | enterprise | 1 | 12000.00
Globex Systems | eu-west | pro | 2 | 6500.00
Massive Dynamic | us-east | pro | 1 | 4500.00
Initech | us-west | starter | 1 | 500.00
(5 rows)
And you can create alert-style views that filter for high-value or anomalous events:
-- Alert on orders above $10,000
CREATE MATERIALIZED VIEW fcdc_high_value_alerts AS
SELECT
o.order_id,
c.name AS customer_name,
c.region,
o.product,
o.amount,
o.order_date
FROM fcdc_orders o
JOIN fcdc_customers c ON o.customer_id = c.customer_id
WHERE o.amount > 10000;
SELECT order_id, customer_name, region, product, amount
FROM fcdc_high_value_alerts
ORDER BY amount DESC;
order_id | customer_name | region | product | amount
----------+---------------+---------+-------------------+----------
103 | Acme Corp | us-east | Enterprise Bundle | 35000.00
108 | Acme Corp | us-east | Enterprise Bundle | 35000.00
105 | Umbrella Inc | ap-east | Data Platform Pro | 12000.00
101 | Acme Corp | us-east | Data Platform Pro | 12000.00
(4 rows)
Every view you create subscribes to the CDC stream automatically. When a row is updated in PostgreSQL, all downstream materialized views that depend on that row are updated incrementally.
Before vs. After: Side-by-Side Comparison
The table below captures the most significant differences between a Flink + Debezium CDC pipeline and a RisingWave CDC pipeline.
| Dimension | Flink + Debezium | RisingWave |
| Language for processing logic | Java (or Scala) | SQL |
| CDC reading mechanism | Debezium connector in Kafka Connect | Built-in postgres-cdc / mysql-cdc connector |
| Message broker required | Yes (Kafka) | No |
| Schema registry required | Optional, but common for Avro | No |
| Before/after image handling | Manual (ExtractNewRecordState SMT) | Automatic |
| State backend | RocksDB (manual tuning) | Hummock on S3 (automatic) |
| Checkpointing | Manual configuration, periodic | Automatic, continuous |
| JVM required | Yes (Flink + Kafka Connect JVMs) | No (written in Rust) |
| Query interface | Flink REST API + Flink SQL | Standard PostgreSQL SQL (psql) |
| Incremental aggregations | Defined in DataStream or Flink SQL | CREATE MATERIALIZED VIEW |
| Deployment complexity | 5-9 components | 1 system (or RisingWave Cloud) |
| Typical lines of code for simple aggregation | 70-100 (Java) | 10-15 (SQL) |
PostgreSQL CDC: Setup Requirements
Before RisingWave can read from PostgreSQL, the database must be configured for logical replication. This is a one-time setup.
PostgreSQL Configuration
In postgresql.conf, set:
wal_level = logical # Required for logical decoding
max_replication_slots = 4 # One per CDC consumer
max_wal_senders = 4 # One per CDC consumer
Restart PostgreSQL, then create a replication user:
-- Run in PostgreSQL (not RisingWave)
CREATE USER replication_user WITH REPLICATION PASSWORD 'securepassword';
GRANT USAGE ON SCHEMA public TO replication_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO replication_user;
For managed services like RDS or Aurora, set the rds.logical_replication parameter in your parameter group to 1 instead of editing postgresql.conf. For GCP Cloud SQL, enable the cloudsql.logical_decoding flag. See the RisingWave PostgreSQL CDC documentation for cloud-specific instructions.
Supported PostgreSQL Versions
RisingWave supports PostgreSQL 10 and above. The pgoutput plugin (available since PostgreSQL 10) is used by default. If your PostgreSQL version is older than 10, wal2json is not supported either; upgrading PostgreSQL is the correct path.
MySQL CDC: Setup Requirements
MySQL CDC reads from the binary log (binlog). Configure your MySQL server with:
# my.cnf
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
Create a replication user:
-- Run in MySQL (not RisingWave)
CREATE USER 'replication_user'@'%' IDENTIFIED BY 'securepassword';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'replication_user'@'%';
FLUSH PRIVILEGES;
Then create the RisingWave source:
CREATE SOURCE mysql_app WITH (
connector = 'mysql-cdc',
hostname = 'mysql-host',
port = '3306',
username = 'replication_user',
password = 'securepassword',
database.name = 'myapp'
);
CREATE TABLE fcdc_mysql_products (
product_id INT PRIMARY KEY,
name VARCHAR,
category VARCHAR,
price DECIMAL,
stock_qty INT,
updated_at TIMESTAMP
)
FROM mysql_app TABLE 'myapp.products';
For details on Amazon RDS for MySQL and Aurora MySQL, see the RisingWave MySQL CDC documentation.
Handling Schema Changes
Schema changes are one of the most painful parts of managing a Flink + Debezium pipeline. When you add a column to an upstream PostgreSQL table, you typically need to:
- Update the Debezium connector configuration and restart it
- Update the Kafka topic schema (and schema registry if using Avro)
- Update the Flink job DDL and redeploy it from a savepoint
RisingWave handles the most common schema changes automatically without requiring a restart:
- Adding a nullable column: RisingWave detects the new column and backfills
NULLfor existing rows. New rows from CDC include the new column automatically. - Adding a column with a default: If the upstream database sets a default value, RisingWave reflects it from the CDC event.
- Renaming a column: Requires recreating the CDC table in RisingWave with the updated schema, then rebuilding dependent materialized views.
- Changing a column type: Requires schema migration. RisingWave will pause the CDC stream and surface an error if the wire format changes incompatibly.
For non-breaking additions, RisingWave's handling is operationally transparent. For breaking changes, you follow a planned migration rather than scrambling to fix a broken connector at 2 AM.
The RisingWave schema change documentation covers the full set of supported ALTER TABLE operations on CDC sources.
Operational Comparison: What You Stop Managing
The biggest return when replacing Flink CDC pipelines is not the SQL syntax. It is the operational surface area you eliminate.
What you stop managing
Kafka topic sizing and retention. CDC events for a busy database generate millions of messages per day. Kafka topics need partition counts sized for throughput, and retention policies sized to handle connector lag without running out of disk. With RisingWave, there is no Kafka.
Debezium connector offsets. Debezium tracks its position in the WAL using Kafka consumer offsets stored in an internal topic. When a connector crashes, you need to verify the stored offset is valid before restarting, or risk skipping or replaying events. RisingWave manages replication slot positions internally.
Flink checkpoint tuning. Checkpoint intervals, timeout settings, minimum pause between checkpoints, aligned versus unaligned checkpoints, and incremental checkpoint configuration are all Flink-specific concerns. RisingWave handles checkpointing automatically with a default 1-second barrier interval and no user configuration required.
JVM memory and GC. Kafka Connect runs on JVM. Flink runs on JVM. RocksDB is a JVM-managed native library. GC pauses in any of these components can cause checkpoint timeouts, connector lag, or consumer group rebalances. RisingWave is written in Rust and has no garbage collector.
Schema Registry. Many Flink CDC pipelines use Confluent Schema Registry to manage Avro schemas. This adds a service to run, secure, and back up. RisingWave schemas are defined in SQL DDL, stored in the meta service alongside table and view definitions.
Building a Multi-Source CDC Pipeline
One advantage of RisingWave is that you can join across multiple CDC sources in a single materialized view. In Flink, joining two CDC streams requires careful watermark alignment and often produces cartesian-product-scale state.
Suppose your orders are in PostgreSQL and your product catalog is in MySQL. In RisingWave:
-- PostgreSQL CDC source (already created above)
-- CREATE SOURCE pg_ecommerce ...
-- MySQL CDC source for product catalog
CREATE SOURCE mysql_catalog WITH (
connector = 'mysql-cdc',
hostname = 'mysql-host',
port = '3306',
username = 'replication_user',
password = 'securepassword',
database.name = 'catalog'
);
CREATE TABLE fcdc_mysql_products (
product_id INT PRIMARY KEY,
name VARCHAR,
category VARCHAR,
price DECIMAL
)
FROM mysql_catalog TABLE 'catalog.products';
-- Join PostgreSQL orders with MySQL product catalog in one materialized view
CREATE MATERIALIZED VIEW fcdc_orders_enriched AS
SELECT
o.order_id,
o.customer_id,
p.name AS product_name,
p.category,
o.amount,
o.status,
c.region,
o.order_date
FROM fcdc_orders o
JOIN fcdc_customers c ON o.customer_id = c.customer_id
JOIN fcdc_mysql_products p ON o.product_id = p.product_id;
RisingWave maintains this join incrementally. When a product price changes in MySQL, the materialized view updates for all orders that reference that product. When a new order arrives from PostgreSQL, it is joined with the current product record immediately.
Doing the equivalent in Flink requires two separate Kafka consumer groups (one for each CDC source), two Flink tables, a SQL join with JOIN state size concerns, and careful checkpoint coordination across both sources.
When to Stay on Flink
RisingWave is not the right answer for every stream processing workload. Keep Flink for these scenarios:
Complex Event Processing (CEP). Flink's MATCH_RECOGNIZE operator supports pattern detection across event sequences that has no equivalent in RisingWave. If you need to detect sequences like "login followed by three failed payments within 10 minutes," Flink CEP is the right tool.
Non-SQL DataStream logic. If your pipeline applies machine learning model inference per record, performs custom binary protocol parsing, or requires fine-grained per-record state machines, the Flink DataStream API gives you control that SQL does not.
Kafka-native architectures without a source database. If your entire data model is event-driven with no upstream relational database, RisingWave's CDC advantage disappears. Both systems consume from Kafka equally well, so the choice reduces to SQL dialect and operational preferences.
Organizations heavily invested in Flink Platform. If you have a dedicated Flink platform team, existing Flink jobs running in production, and established runbooks for Flink operations, the marginal cost of running more Flink jobs is low. Migration makes the most sense for teams starting new pipelines or who are absorbing disproportionate operational cost.
Summary
Replacing Flink with SQL for CDC stream processing reduces a nine-component stack to a single SQL statement. The before-and-after:
Before: PostgreSQL WAL -> Debezium connector -> Kafka Connect -> Kafka topic -> Flink job (Java) -> JDBC sink -> downstream database. Nine moving parts, JVM tuning required at every layer, schema changes coordinated across the entire chain.
After: PostgreSQL WAL -> RisingWave CDC source -> materialized view (SQL) -> query with psql. One system, no JVM, schema changes surfaced as SQL errors.
RisingWave's native CDC connectors for PostgreSQL and MySQL eliminate the Debezium and Kafka layer entirely. Materialized views replace Flink jobs. The PostgreSQL wire protocol replaces Flink's REST API and custom monitoring tooling.
For teams spending more time operating their CDC infrastructure than building analytics on top of it, the migration from Flink to RisingWave is one of the highest-leverage engineering investments available. Start with a single CDC source, validate the pipeline in parallel with your existing Flink job, and cut over when you are confident in the results.
To get started, see the RisingWave quickstart guide or try RisingWave Cloud for a managed deployment with no infrastructure setup required.
FAQ
Can RisingWave replace Flink for all CDC use cases?
RisingWave covers the vast majority of CDC stream processing workloads: ingesting changes from PostgreSQL and MySQL, joining CDC streams with lookup tables, computing aggregations and alerts, and serving results via SQL. The cases where Flink remains a better fit are complex event processing with MATCH_RECOGNIZE, custom DataStream logic that is not expressible in SQL, and architectures that are entirely event-driven with no upstream relational database.
Does RisingWave require Kafka for CDC?
No. RisingWave reads directly from PostgreSQL's WAL via a replication slot and from MySQL's binary log via the binlog protocol. No Kafka, no Debezium, and no Kafka Connect cluster is required. If you already have CDC events in Kafka topics from an existing Debezium deployment, RisingWave can also consume from Kafka using its Kafka source connector while you transition.
How does RisingWave handle backpressure and slow consumers?
RisingWave is designed as a streaming database, not a stream processing framework. State is stored in Hummock, a cloud-native storage engine backed by object storage (S3, GCS, or MinIO). Backpressure is managed automatically within the system: compute nodes signal upstream ingestion nodes to slow down when processing falls behind. Unlike Flink, there is no external RocksDB state backend to tune and no checkpoint timeout to configure manually.
What happens to my Flink job during migration?
The recommended migration pattern is to run RisingWave in parallel with your existing Flink pipeline for a validation period. Create the RisingWave CDC source and materialized views, let them run against the same upstream database, and compare the output of both systems for correctness. Once you are satisfied that results match, stop the Flink job, drop the Debezium connector, and update any downstream consumers to point at RisingWave's PostgreSQL endpoint. The migration can be done table by table, so you do not need a big-bang cutover.

