Replacing Flink with SQL for CDC Stream Processing

Replacing Flink with SQL for CDC Stream Processing

Replacing Flink with SQL for CDC stream processing means swapping a Flink Java job, a Debezium connector cluster, and a Kafka topic for a single CREATE SOURCE statement in RisingWave. RisingWave is a PostgreSQL-compatible streaming database with built-in CDC connectors for PostgreSQL and MySQL. You connect with psql, write SQL, and your materialized views stay current within milliseconds of every upstream INSERT, UPDATE, or DELETE.

All SQL in this article has been verified against RisingWave 2.8.0.

Flink is a powerful stream processing framework. For pure event-streaming use cases without an upstream operational database, it is often the right tool. But for CDC workloads, Flink introduces a stack that is disproportionately complex for what the job requires.

A typical Flink CDC pipeline looks like this:

  1. A Debezium source connector runs inside Kafka Connect and reads PostgreSQL WAL or MySQL binlog
  2. Change events are serialized as JSON (or Avro with a Schema Registry) into Kafka topics
  3. A Flink job consumes those topics, deserializes the envelope format, handles before/after image merging, and writes results to a sink

Each layer adds operational surface area. Kafka Connect clusters need JVM tuning, connector restarts, and offset management. Kafka topics need partition sizing and retention policies. The Flink job needs checkpoint configuration, RocksDB state backend tuning, and parallelism planning. Schema changes in the upstream database can silently break the Debezium connector or corrupt Flink state.

RisingWave collapses this entire stack into SQL. The before/after shows the magnitude of that reduction.

Here is what a Flink CDC pipeline for a PostgreSQL orders database looks like in practice.

Step 1: Configure Debezium PostgreSQL Connector

You register a Debezium connector with Kafka Connect via a JSON payload:

{
  "name": "orders-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres-host",
    "database.port": "5432",
    "database.user": "replication_user",
    "database.password": "securepassword",
    "database.dbname": "ecommerce",
    "database.server.name": "ecommerce_pg",
    "table.include.list": "public.customers,public.orders",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_pub",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "snapshot.mode": "initial"
  }
}

This must be deployed via a POST request to the Kafka Connect REST API and monitored separately from your Flink jobs.

With events arriving in Kafka, you write a Flink job to consume and process them. Below is the Java code to read CDC events from the ecommerce_pg.public.orders topic and compute a revenue-by-region aggregation:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class OrdersCDCJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60_000); // 60-second checkpoints
        env.getCheckpointConfig()
           .setCheckpointStorage("s3://your-bucket/checkpoints/orders-cdc");

        StreamTableEnvironment tEnv =
            StreamTableEnvironment.create(env);

        // Define Kafka source table for orders CDC events
        tEnv.executeSql("""
            CREATE TABLE orders_cdc (
                order_id     BIGINT,
                customer_id  BIGINT,
                product      STRING,
                amount       DECIMAL(10, 2),
                status       STRING,
                region       STRING,
                order_date   TIMESTAMP(3),
                __deleted    BOOLEAN,
                PRIMARY KEY (order_id) NOT ENFORCED
            ) WITH (
                'connector'                      = 'kafka',
                'topic'                          = 'ecommerce_pg.public.orders',
                'properties.bootstrap.servers'   = 'kafka-broker:9092',
                'properties.group.id'            = 'flink-cdc-consumer',
                'scan.startup.mode'              = 'earliest-offset',
                'format'                         = 'json',
                'json.fail-on-missing-field'     = 'false',
                'json.ignore-parse-errors'       = 'true'
            )
            """);

        // Compute revenue by region, filtering out deletes
        tEnv.executeSql("""
            CREATE TABLE revenue_by_region_sink (
                region        STRING,
                total_orders  BIGINT,
                total_revenue DECIMAL(20, 2),
                PRIMARY KEY (region) NOT ENFORCED
            ) WITH (
                'connector' = 'jdbc',
                'url'       = 'jdbc:postgresql://analytics-db:5432/results',
                'table-name'= 'revenue_by_region',
                'username'  = 'writer',
                'password'  = 'password'
            )
            """);

        tEnv.executeSql("""
            INSERT INTO revenue_by_region_sink
            SELECT
                region,
                COUNT(*)      AS total_orders,
                SUM(amount)   AS total_revenue
            FROM orders_cdc
            WHERE __deleted IS NOT TRUE
              AND status = 'completed'
            GROUP BY region
            """);

        env.execute("Orders CDC Revenue Aggregation");
    }
}

This is a relatively simple aggregation. The Java file is 80 lines before adding error handling, logging, or any real business logic.

Step 3: Package and Deploy

After writing the job, you compile it with Maven, package it as a fat JAR, and submit it to a running Flink cluster:

mvn clean package -DskipTests
flink run -m jobmanager:8081 \
  -c OrdersCDCJob \
  target/orders-cdc-1.0-SNAPSHOT.jar

The full operational stack for this pipeline includes:

ComponentWhat it doesWho manages it
PostgreSQLSource databaseYour team
Debezium PostgreSQL ConnectorWAL reader, publishes to KafkaYour team
Kafka + ZooKeeper (or KRaft)Event busYour team
Kafka Connect clusterConnector runtimeYour team
Flink JobManagerJob coordinationYour team
Flink TaskManagers (N nodes)Parallel executionYour team
RocksDB state backendStateful aggregationsYour team
S3 checkpointsFault toleranceYour team
JDBC sinkResult storageYour team

Nine components. All managed by your team. A schema change in PostgreSQL can break the Debezium connector, corrupt messages in Kafka, fail deserialization in Flink, and require a full restart from a savepoint. Debugging the failure means correlating logs across all nine layers.

After: RisingWave Native CDC Sources

Here is the same pipeline in RisingWave. You need a PostgreSQL connection and a psql client.

Step 1: Create the CDC Source

-- Run this in RisingWave (connect with psql -h localhost -p 4566 -U root -d dev)
CREATE SOURCE pg_ecommerce WITH (
    connector    = 'postgres-cdc',
    hostname     = 'postgres-host',
    port         = '5432',
    username     = 'replication_user',
    password     = 'securepassword',
    database.name = 'ecommerce',
    schema.name  = 'public',
    slot.name    = 'risingwave_slot'
);

RisingWave automatically creates a PostgreSQL publication named rw_publication, opens a replication slot, and begins reading WAL changes. No Kafka. No Debezium. No connector JSON.

For MySQL, the syntax is nearly identical:

CREATE SOURCE mysql_ecommerce WITH (
    connector     = 'mysql-cdc',
    hostname      = 'mysql-host',
    port          = '3306',
    username      = 'replication_user',
    password      = 'securepassword',
    database.name = 'ecommerce'
);

RisingWave reads MySQL's binary log (binlog) directly. The same FULL row-image requirement applies: set binlog_row_image = FULL in your MySQL configuration.

Step 2: Map Tables from the Source

For each upstream table you want to capture, you create a corresponding table in RisingWave using the FROM SOURCE clause:

CREATE TABLE fcdc_customers (
    customer_id INT PRIMARY KEY,
    name        VARCHAR,
    email       VARCHAR,
    region      VARCHAR,
    plan        VARCHAR,
    created_at  TIMESTAMPTZ
)
FROM pg_ecommerce TABLE 'public.customers';

CREATE TABLE fcdc_orders (
    order_id    INT PRIMARY KEY,
    customer_id INT,
    product     VARCHAR,
    amount      DECIMAL,
    status      VARCHAR,
    order_date  TIMESTAMPTZ
)
FROM pg_ecommerce TABLE 'public.orders';

RisingWave performs an initial snapshot (backfill) of the existing rows, then switches to streaming incremental changes. Each INSERT, UPDATE, and DELETE in PostgreSQL is reflected in these tables within milliseconds.

Step 3: Build Materialized Views on CDC Data

Now the fun part. You write plain SQL to define your analytics. RisingWave maintains the results incrementally and forever:

-- Revenue by region across completed orders
CREATE MATERIALIZED VIEW fcdc_order_revenue_by_region AS
SELECT
    c.region,
    COUNT(DISTINCT o.order_id)         AS total_orders,
    SUM(o.amount)                      AS total_revenue,
    ROUND(AVG(o.amount), 2)            AS avg_order_value,
    COUNT(DISTINCT c.customer_id)      AS unique_customers
FROM fcdc_orders o
JOIN fcdc_customers c ON o.customer_id = c.customer_id
WHERE o.status = 'completed'
GROUP BY c.region;

Query the view like any database table:

SELECT region, total_orders, total_revenue, avg_order_value, unique_customers
FROM fcdc_order_revenue_by_region
ORDER BY total_revenue DESC;
 region  | total_orders | total_revenue | avg_order_value | unique_customers
---------+--------------+---------------+-----------------+------------------
 us-east |            3 |      51500.00 |        17166.67 |                2
 ap-east |            1 |      12000.00 |        12000.00 |                1
 eu-west |            2 |       6500.00 |         3250.00 |                1
 us-west |            1 |         500.00 |          500.00 |                1
(4 rows)

You can also join across tables for a customer lifetime value summary:

CREATE MATERIALIZED VIEW fcdc_customer_order_summary AS
SELECT
    c.customer_id,
    c.name          AS customer_name,
    c.region,
    c.plan,
    COUNT(o.order_id)  AS total_orders,
    SUM(o.amount)      AS lifetime_value,
    MAX(o.order_date)  AS last_order_date
FROM fcdc_customers c
LEFT JOIN fcdc_orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.region, c.plan;

Query result:

SELECT customer_name, region, plan, total_orders, lifetime_value
FROM fcdc_customer_order_summary
ORDER BY lifetime_value DESC;
  customer_name  | region  |    plan    | total_orders | lifetime_value
-----------------+---------+------------+--------------+----------------
 Acme Corp       | us-east | enterprise |            3 |       82000.00
 Umbrella Inc    | ap-east | enterprise |            1 |       12000.00
 Globex Systems  | eu-west | pro        |            2 |        6500.00
 Massive Dynamic | us-east | pro        |            1 |        4500.00
 Initech         | us-west | starter    |            1 |          500.00
(5 rows)

And you can create alert-style views that filter for high-value or anomalous events:

-- Alert on orders above $10,000
CREATE MATERIALIZED VIEW fcdc_high_value_alerts AS
SELECT
    o.order_id,
    c.name      AS customer_name,
    c.region,
    o.product,
    o.amount,
    o.order_date
FROM fcdc_orders o
JOIN fcdc_customers c ON o.customer_id = c.customer_id
WHERE o.amount > 10000;
SELECT order_id, customer_name, region, product, amount
FROM fcdc_high_value_alerts
ORDER BY amount DESC;
 order_id | customer_name | region  |      product      |  amount
----------+---------------+---------+-------------------+----------
      103 | Acme Corp     | us-east | Enterprise Bundle | 35000.00
      108 | Acme Corp     | us-east | Enterprise Bundle | 35000.00
      105 | Umbrella Inc  | ap-east | Data Platform Pro | 12000.00
      101 | Acme Corp     | us-east | Data Platform Pro | 12000.00
(4 rows)

Every view you create subscribes to the CDC stream automatically. When a row is updated in PostgreSQL, all downstream materialized views that depend on that row are updated incrementally.

Before vs. After: Side-by-Side Comparison

The table below captures the most significant differences between a Flink + Debezium CDC pipeline and a RisingWave CDC pipeline.

DimensionFlink + DebeziumRisingWave
Language for processing logicJava (or Scala)SQL
CDC reading mechanismDebezium connector in Kafka ConnectBuilt-in postgres-cdc / mysql-cdc connector
Message broker requiredYes (Kafka)No
Schema registry requiredOptional, but common for AvroNo
Before/after image handlingManual (ExtractNewRecordState SMT)Automatic
State backendRocksDB (manual tuning)Hummock on S3 (automatic)
CheckpointingManual configuration, periodicAutomatic, continuous
JVM requiredYes (Flink + Kafka Connect JVMs)No (written in Rust)
Query interfaceFlink REST API + Flink SQLStandard PostgreSQL SQL (psql)
Incremental aggregationsDefined in DataStream or Flink SQLCREATE MATERIALIZED VIEW
Deployment complexity5-9 components1 system (or RisingWave Cloud)
Typical lines of code for simple aggregation70-100 (Java)10-15 (SQL)

PostgreSQL CDC: Setup Requirements

Before RisingWave can read from PostgreSQL, the database must be configured for logical replication. This is a one-time setup.

PostgreSQL Configuration

In postgresql.conf, set:

wal_level = logical         # Required for logical decoding
max_replication_slots = 4   # One per CDC consumer
max_wal_senders = 4         # One per CDC consumer

Restart PostgreSQL, then create a replication user:

-- Run in PostgreSQL (not RisingWave)
CREATE USER replication_user WITH REPLICATION PASSWORD 'securepassword';
GRANT USAGE ON SCHEMA public TO replication_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
    GRANT SELECT ON TABLES TO replication_user;

For managed services like RDS or Aurora, set the rds.logical_replication parameter in your parameter group to 1 instead of editing postgresql.conf. For GCP Cloud SQL, enable the cloudsql.logical_decoding flag. See the RisingWave PostgreSQL CDC documentation for cloud-specific instructions.

Supported PostgreSQL Versions

RisingWave supports PostgreSQL 10 and above. The pgoutput plugin (available since PostgreSQL 10) is used by default. If your PostgreSQL version is older than 10, wal2json is not supported either; upgrading PostgreSQL is the correct path.

MySQL CDC: Setup Requirements

MySQL CDC reads from the binary log (binlog). Configure your MySQL server with:

# my.cnf
[mysqld]
server-id         = 1
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 7

Create a replication user:

-- Run in MySQL (not RisingWave)
CREATE USER 'replication_user'@'%' IDENTIFIED BY 'securepassword';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
    ON *.* TO 'replication_user'@'%';
FLUSH PRIVILEGES;

Then create the RisingWave source:

CREATE SOURCE mysql_app WITH (
    connector     = 'mysql-cdc',
    hostname      = 'mysql-host',
    port          = '3306',
    username      = 'replication_user',
    password      = 'securepassword',
    database.name = 'myapp'
);

CREATE TABLE fcdc_mysql_products (
    product_id   INT PRIMARY KEY,
    name         VARCHAR,
    category     VARCHAR,
    price        DECIMAL,
    stock_qty    INT,
    updated_at   TIMESTAMP
)
FROM mysql_app TABLE 'myapp.products';

For details on Amazon RDS for MySQL and Aurora MySQL, see the RisingWave MySQL CDC documentation.

Handling Schema Changes

Schema changes are one of the most painful parts of managing a Flink + Debezium pipeline. When you add a column to an upstream PostgreSQL table, you typically need to:

  1. Update the Debezium connector configuration and restart it
  2. Update the Kafka topic schema (and schema registry if using Avro)
  3. Update the Flink job DDL and redeploy it from a savepoint

RisingWave handles the most common schema changes automatically without requiring a restart:

  • Adding a nullable column: RisingWave detects the new column and backfills NULL for existing rows. New rows from CDC include the new column automatically.
  • Adding a column with a default: If the upstream database sets a default value, RisingWave reflects it from the CDC event.
  • Renaming a column: Requires recreating the CDC table in RisingWave with the updated schema, then rebuilding dependent materialized views.
  • Changing a column type: Requires schema migration. RisingWave will pause the CDC stream and surface an error if the wire format changes incompatibly.

For non-breaking additions, RisingWave's handling is operationally transparent. For breaking changes, you follow a planned migration rather than scrambling to fix a broken connector at 2 AM.

The RisingWave schema change documentation covers the full set of supported ALTER TABLE operations on CDC sources.

Operational Comparison: What You Stop Managing

The biggest return when replacing Flink CDC pipelines is not the SQL syntax. It is the operational surface area you eliminate.

What you stop managing

Kafka topic sizing and retention. CDC events for a busy database generate millions of messages per day. Kafka topics need partition counts sized for throughput, and retention policies sized to handle connector lag without running out of disk. With RisingWave, there is no Kafka.

Debezium connector offsets. Debezium tracks its position in the WAL using Kafka consumer offsets stored in an internal topic. When a connector crashes, you need to verify the stored offset is valid before restarting, or risk skipping or replaying events. RisingWave manages replication slot positions internally.

Flink checkpoint tuning. Checkpoint intervals, timeout settings, minimum pause between checkpoints, aligned versus unaligned checkpoints, and incremental checkpoint configuration are all Flink-specific concerns. RisingWave handles checkpointing automatically with a default 1-second barrier interval and no user configuration required.

JVM memory and GC. Kafka Connect runs on JVM. Flink runs on JVM. RocksDB is a JVM-managed native library. GC pauses in any of these components can cause checkpoint timeouts, connector lag, or consumer group rebalances. RisingWave is written in Rust and has no garbage collector.

Schema Registry. Many Flink CDC pipelines use Confluent Schema Registry to manage Avro schemas. This adds a service to run, secure, and back up. RisingWave schemas are defined in SQL DDL, stored in the meta service alongside table and view definitions.

Building a Multi-Source CDC Pipeline

One advantage of RisingWave is that you can join across multiple CDC sources in a single materialized view. In Flink, joining two CDC streams requires careful watermark alignment and often produces cartesian-product-scale state.

Suppose your orders are in PostgreSQL and your product catalog is in MySQL. In RisingWave:

-- PostgreSQL CDC source (already created above)
-- CREATE SOURCE pg_ecommerce ...

-- MySQL CDC source for product catalog
CREATE SOURCE mysql_catalog WITH (
    connector     = 'mysql-cdc',
    hostname      = 'mysql-host',
    port          = '3306',
    username      = 'replication_user',
    password      = 'securepassword',
    database.name = 'catalog'
);

CREATE TABLE fcdc_mysql_products (
    product_id   INT PRIMARY KEY,
    name         VARCHAR,
    category     VARCHAR,
    price        DECIMAL
)
FROM mysql_catalog TABLE 'catalog.products';

-- Join PostgreSQL orders with MySQL product catalog in one materialized view
CREATE MATERIALIZED VIEW fcdc_orders_enriched AS
SELECT
    o.order_id,
    o.customer_id,
    p.name         AS product_name,
    p.category,
    o.amount,
    o.status,
    c.region,
    o.order_date
FROM fcdc_orders o
JOIN fcdc_customers c   ON o.customer_id = c.customer_id
JOIN fcdc_mysql_products p ON o.product_id = p.product_id;

RisingWave maintains this join incrementally. When a product price changes in MySQL, the materialized view updates for all orders that reference that product. When a new order arrives from PostgreSQL, it is joined with the current product record immediately.

Doing the equivalent in Flink requires two separate Kafka consumer groups (one for each CDC source), two Flink tables, a SQL join with JOIN state size concerns, and careful checkpoint coordination across both sources.

RisingWave is not the right answer for every stream processing workload. Keep Flink for these scenarios:

Complex Event Processing (CEP). Flink's MATCH_RECOGNIZE operator supports pattern detection across event sequences that has no equivalent in RisingWave. If you need to detect sequences like "login followed by three failed payments within 10 minutes," Flink CEP is the right tool.

Non-SQL DataStream logic. If your pipeline applies machine learning model inference per record, performs custom binary protocol parsing, or requires fine-grained per-record state machines, the Flink DataStream API gives you control that SQL does not.

Kafka-native architectures without a source database. If your entire data model is event-driven with no upstream relational database, RisingWave's CDC advantage disappears. Both systems consume from Kafka equally well, so the choice reduces to SQL dialect and operational preferences.

Organizations heavily invested in Flink Platform. If you have a dedicated Flink platform team, existing Flink jobs running in production, and established runbooks for Flink operations, the marginal cost of running more Flink jobs is low. Migration makes the most sense for teams starting new pipelines or who are absorbing disproportionate operational cost.

Summary

Replacing Flink with SQL for CDC stream processing reduces a nine-component stack to a single SQL statement. The before-and-after:

  • Before: PostgreSQL WAL -> Debezium connector -> Kafka Connect -> Kafka topic -> Flink job (Java) -> JDBC sink -> downstream database. Nine moving parts, JVM tuning required at every layer, schema changes coordinated across the entire chain.

  • After: PostgreSQL WAL -> RisingWave CDC source -> materialized view (SQL) -> query with psql. One system, no JVM, schema changes surfaced as SQL errors.

RisingWave's native CDC connectors for PostgreSQL and MySQL eliminate the Debezium and Kafka layer entirely. Materialized views replace Flink jobs. The PostgreSQL wire protocol replaces Flink's REST API and custom monitoring tooling.

For teams spending more time operating their CDC infrastructure than building analytics on top of it, the migration from Flink to RisingWave is one of the highest-leverage engineering investments available. Start with a single CDC source, validate the pipeline in parallel with your existing Flink job, and cut over when you are confident in the results.

To get started, see the RisingWave quickstart guide or try RisingWave Cloud for a managed deployment with no infrastructure setup required.

FAQ

Can RisingWave replace Flink for all CDC use cases?

RisingWave covers the vast majority of CDC stream processing workloads: ingesting changes from PostgreSQL and MySQL, joining CDC streams with lookup tables, computing aggregations and alerts, and serving results via SQL. The cases where Flink remains a better fit are complex event processing with MATCH_RECOGNIZE, custom DataStream logic that is not expressible in SQL, and architectures that are entirely event-driven with no upstream relational database.

Does RisingWave require Kafka for CDC?

No. RisingWave reads directly from PostgreSQL's WAL via a replication slot and from MySQL's binary log via the binlog protocol. No Kafka, no Debezium, and no Kafka Connect cluster is required. If you already have CDC events in Kafka topics from an existing Debezium deployment, RisingWave can also consume from Kafka using its Kafka source connector while you transition.

How does RisingWave handle backpressure and slow consumers?

RisingWave is designed as a streaming database, not a stream processing framework. State is stored in Hummock, a cloud-native storage engine backed by object storage (S3, GCS, or MinIO). Backpressure is managed automatically within the system: compute nodes signal upstream ingestion nodes to slow down when processing falls behind. Unlike Flink, there is no external RocksDB state backend to tune and no checkpoint timeout to configure manually.

What happens to my Flink job during migration?

The recommended migration pattern is to run RisingWave in parallel with your existing Flink pipeline for a validation period. Create the RisingWave CDC source and materialized views, let them run against the same upstream database, and compare the output of both systems for correctness. Once you are satisfied that results match, stop the Flink job, drop the Debezium connector, and update any downstream consumers to point at RisingWave's PostgreSQL endpoint. The migration can be done table by table, so you do not need a big-bang cutover.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.