Migrating from Apache Flink to RisingWave: A Step-by-Step Guide

Migrating from Apache Flink to RisingWave: A Step-by-Step Guide

·

16 min read

You have a Flink pipeline in production. It processes events from Kafka, aggregates them in windowed computations, and sinks results to a downstream database. It works, but the operational overhead is real: JVM tuning, RocksDB state backend configuration, checkpoint failures at 2 AM, and a three-week lead time every time the business needs a new streaming metric.

RisingWave is a streaming database that replaces Flink's Java-based processing framework with PostgreSQL-compatible SQL. Instead of DataStream operators compiled into JARs, you write CREATE MATERIALIZED VIEW statements. Instead of managing RocksDB state backends, your state lives on S3 automatically. Instead of deploying a JobManager, TaskManagers, and ZooKeeper, you connect with psql and run SQL.

This guide walks you through migrating a real Flink pipeline to RisingWave. You will learn how Flink concepts map to RisingWave equivalents, what SQL syntax differences to expect, and how to execute the migration step by step. By the end, you will have a concrete playbook for moving your streaming workloads from Flink to RisingWave.

Before writing any code, you need a mental model of how Flink's components translate into RisingWave's world. The table below maps every major Flink concept to its RisingWave equivalent.

Flink ConceptRisingWave EquivalentNotes
DataStream API (Java/Scala)SQL Materialized ViewsAll processing logic expressed as SQL
Flink SQLPostgreSQL-compatible SQLSimilar but different dialect
ProcessFunctionSQL with UDFs (Python/Java)Custom logic via user-defined functions
KeyedState / RocksDB StateBackendHummock (S3-based storage)Automatic, no tuning required
Checkpointing to S3/HDFSBuilt-in barrier-based checkpointingConfigurable interval, default 1 second
CREATE TABLE (source)CREATE SOURCE / CREATE TABLEDedicated DDL for external connections
CREATE TABLE (sink)CREATE SINKExplicit sink declarations
Flink SQL Materialized TableCREATE MATERIALIZED VIEWCore primitive in RisingWave
JobManager + TaskManagersSingle binary or Kubernetes podsDecoupled compute/storage/meta
SavepointsSnapshots on S3Automatic, no manual trigger needed
TUMBLE / HOP TVFsTUMBLE() / HOP() table functionsSimilar syntax, minor differences
MATCH_RECOGNIZE (CEP)Not supportedStay on Flink for CEP workloads
WatermarksWatermark clauses on sourcesDefined in CREATE SOURCE/TABLE DDL
Catalogs (Hive, JDBC)PostgreSQL-native schemasBuilt-in catalog with pg_catalog

DataStream API to Materialized Views

The biggest shift is conceptual. In Flink, you build a pipeline by chaining operators in Java:

// Flink DataStream: 5-minute revenue aggregation
DataStream<Tuple2<String, Double>> revenue = orders
    .keyBy(order -> order.getCategory())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new RevenueAggregator());

In RisingWave, the same logic is a materialized view:

-- RisingWave: 5-minute revenue aggregation
CREATE MATERIALIZED VIEW revenue_per_category AS
SELECT
    category,
    window_start,
    window_end,
    SUM(amount) AS total_revenue,
    COUNT(*) AS order_count
FROM TUMBLE(orders, order_time, INTERVAL '5 MINUTES')
GROUP BY category, window_start, window_end;

The materialized view is incrementally maintained. As new events arrive in the orders source, RisingWave updates the view automatically. You query it with a standard SELECT statement, and you always get fresh results.

ProcessFunction to SQL + UDFs

Flink's ProcessFunction gives you access to timers, state, and side outputs. In RisingWave, most of this logic can be expressed as SQL. For the rare cases where SQL is not sufficient, RisingWave supports user-defined functions in Python, Java, JavaScript, and Rust.

For example, a Flink ProcessFunction that flags orders above a dynamic threshold:

// Flink ProcessFunction
public class FraudDetector extends KeyedProcessFunction<String, Order, Alert> {
    private ValueState<Double> threshold;

    @Override
    public void processElement(Order order, Context ctx, Collector<Alert> out) {
        if (order.getAmount() > threshold.value()) {
            out.collect(new Alert(order.getId(), "HIGH_VALUE"));
        }
    }
}

The RisingWave equivalent uses a materialized view with a join against a threshold table:

-- RisingWave: threshold-based alerting with SQL
CREATE MATERIALIZED VIEW high_value_alerts AS
SELECT
    o.order_id,
    o.category,
    o.amount,
    'HIGH_VALUE' AS alert_type,
    o.order_time
FROM orders o
JOIN thresholds t ON o.category = t.category
WHERE o.amount > t.max_amount;

No Java. No state management API. The join against thresholds is automatically maintained as either table changes.

RocksDB StateBackend to S3 Storage

In Flink, you configure a state backend, tune RocksDB block cache sizes, manage compaction, and hope checkpoints complete before the timeout. In RisingWave, there is nothing to configure. The Hummock storage engine persists all state to S3-compatible object storage automatically. State scales with your data, not with your local disk capacity.

Flink State ManagementRisingWave State Management
Choose state backend (RocksDB, HashMap, ForSt)Automatic (Hummock on S3)
Configure block cache, write buffers, bloom filtersNo tuning needed
Set checkpoint interval (default 30s-minutes)Built-in checkpointing (default ~1s)
Manage local disk capacity per TaskManagerObject storage scales automatically
Manual savepoints for upgradesAutomatic snapshots
State TTL configuration per operatorTemporal filters in SQL

If you are already using Flink SQL (rather than the DataStream API), migration is largely a syntax translation exercise. Both systems use SQL, but the dialects differ in important ways.

Creating a Kafka Source

Flink SQL:

CREATE TABLE orders (
    order_id BIGINT,
    customer_id BIGINT,
    category VARCHAR,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-orders',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
);

RisingWave:

CREATE SOURCE orders (
    order_id BIGINT,
    customer_id BIGINT,
    category VARCHAR,
    amount DECIMAL,
    order_time TIMESTAMPTZ,
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    connector = 'kafka',
    topic = 'orders',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Key differences to note:

  • DDL keyword: Flink uses CREATE TABLE for both sources and sinks. RisingWave uses CREATE SOURCE for read-only external streams and CREATE TABLE for mutable tables with CDC or direct inserts.
  • Connector syntax: Flink uses quoted keys ('connector' = 'kafka'). RisingWave uses unquoted keys (connector = 'kafka').
  • Format declaration: Flink uses 'format' = 'json' inside WITH. RisingWave uses FORMAT PLAIN ENCODE JSON as a separate clause.
  • Timestamp types: Flink uses TIMESTAMP(3) or TIMESTAMP_LTZ. RisingWave uses TIMESTAMPTZ (PostgreSQL-standard).
  • Bootstrap server: Flink uses properties.bootstrap.servers (plural). RisingWave uses properties.bootstrap.server (singular).

Windowed Aggregations

Flink SQL:

SELECT
    category,
    TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
    TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,
    SUM(amount) AS total_revenue,
    COUNT(*) AS order_count
FROM orders
GROUP BY category, TUMBLE(order_time, INTERVAL '5' MINUTE);

RisingWave:

CREATE MATERIALIZED VIEW revenue_5min AS
SELECT
    category,
    window_start,
    window_end,
    SUM(amount) AS total_revenue,
    COUNT(*) AS order_count
FROM TUMBLE(orders, order_time, INTERVAL '5 MINUTES')
GROUP BY category, window_start, window_end;

Key differences:

  • Tumble syntax: Flink puts TUMBLE() in GROUP BY. RisingWave uses TUMBLE() as a table function in the FROM clause.
  • Window boundaries: Flink uses TUMBLE_START() and TUMBLE_END() functions. RisingWave exposes window_start and window_end as implicit columns from the TUMBLE table function.
  • Materialized result: In Flink, this query runs as a continuous job. In RisingWave, you create it as a materialized view and query the results with SELECT * FROM revenue_5min.
  • Interval format: Flink uses INTERVAL '5' MINUTE (unquoted unit). RisingWave uses INTERVAL '5 MINUTES' (unit inside the quotes).

Creating a Sink

Flink SQL:

CREATE TABLE revenue_sink (
    category VARCHAR,
    window_start TIMESTAMP(3),
    total_revenue DECIMAL(10, 2),
    PRIMARY KEY (category, window_start) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres:5432/analytics',
    'table-name' = 'revenue_5min',
    'username' = 'analytics_user',
    'password' = 'secret'
);

INSERT INTO revenue_sink SELECT * FROM revenue_5min_view;

RisingWave:

CREATE SINK revenue_sink FROM revenue_5min
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://postgres:5432/analytics',
    table.name = 'revenue_5min',
    type = 'upsert',
    primary_key = 'category,window_start'
);

Key differences:

  • No INSERT INTO: RisingWave sinks are declarative. You specify the source materialized view (or table) with FROM, and RisingWave continuously streams updates.
  • Sink is a first-class object: In Flink, a sink is just a table with a connector. In RisingWave, CREATE SINK is a dedicated DDL statement.
  • Primary key handling: Flink uses PRIMARY KEY ... NOT ENFORCED in the table definition. RisingWave uses primary_key in the WITH clause.

Step-by-Step Migration of a Real Pipeline

Let's migrate a concrete Flink pipeline: an e-commerce order analytics system that reads from Kafka, computes revenue metrics, detects high-value orders, and writes results to PostgreSQL.

Before migrating, document every component of your existing pipeline:

  • Sources: What Kafka topics, CDC streams, or file sources does the pipeline read?
  • Processing logic: What transformations, aggregations, joins, and window operations run?
  • State: How much state does the pipeline maintain? What TTL policies exist?
  • Sinks: Where do results go (databases, Kafka topics, data lakes)?
  • Custom operators: Are there any DataStream API operators or custom ProcessFunctions?

For our example pipeline, the inventory looks like this:

ComponentFlink Implementation
SourceKafka topic orders (JSON)
SourceKafka topic products (JSON)
Aggregation5-minute tumbling window revenue by category
JoinOrders enriched with product details
AlertProcessFunction for high-value order detection
SinkPostgreSQL analytics.revenue_5min table
SinkKafka topic high-value-alerts

Step 2: Set Up RisingWave

Install RisingWave and connect:

# Option 1: Local single-node (for testing)
risingwave single-node --store-directory /tmp/risingwave-data &

# Wait for startup
sleep 15

# Connect with psql
psql -h localhost -p 4566 -U root -d dev

Or use RisingWave Cloud for a managed deployment.

Step 3: Create Sources

Translate each Flink CREATE TABLE (source) to a RisingWave CREATE SOURCE:

-- Orders source from Kafka
CREATE SOURCE orders (
    order_id BIGINT,
    customer_id BIGINT,
    product_id BIGINT,
    category VARCHAR,
    amount DECIMAL,
    order_time TIMESTAMPTZ,
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    connector = 'kafka',
    topic = 'orders',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

-- Products source from Kafka (compacted topic for lookups)
CREATE TABLE products (
    product_id BIGINT PRIMARY KEY,
    product_name VARCHAR,
    category VARCHAR,
    base_price DECIMAL
) WITH (
    connector = 'kafka',
    topic = 'products',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT UPSERT ENCODE JSON;

Notice that products uses CREATE TABLE (not CREATE SOURCE) because it is a compacted Kafka topic where updates overwrite previous values. In Flink, both would be CREATE TABLE - RisingWave makes the distinction explicit so the system can optimize storage and query serving.

Step 4: Create Materialized Views (Processing Logic)

This is where the bulk of the migration happens. Each Flink operator chain becomes one or more materialized views.

Revenue aggregation (replaces the windowed DataStream operator):

CREATE MATERIALIZED VIEW revenue_by_category AS
SELECT
    category,
    window_start,
    window_end,
    SUM(amount) AS total_revenue,
    COUNT(*) AS order_count,
    AVG(amount) AS avg_order_value
FROM TUMBLE(orders, order_time, INTERVAL '5 MINUTES')
GROUP BY category, window_start, window_end;

Order enrichment (replaces the join operator):

CREATE MATERIALIZED VIEW enriched_orders AS
SELECT
    o.order_id,
    o.customer_id,
    p.product_name,
    o.category,
    o.amount,
    p.base_price,
    o.amount - p.base_price AS price_delta,
    o.order_time
FROM orders o
JOIN products p ON o.product_id = p.product_id;

High-value order detection (replaces the ProcessFunction):

CREATE MATERIALIZED VIEW high_value_alerts AS
SELECT
    order_id,
    customer_id,
    category,
    amount,
    order_time,
    'HIGH_VALUE' AS alert_type
FROM orders
WHERE amount > 500.00;

If your threshold logic is more complex (for example, dynamic per-category thresholds stored in a table), you can use a join:

-- Dynamic thresholds table
CREATE TABLE alert_thresholds (
    category VARCHAR PRIMARY KEY,
    max_amount DECIMAL
);

INSERT INTO alert_thresholds VALUES
    ('electronics', 1000.00),
    ('clothing', 200.00),
    ('groceries', 150.00);

-- Dynamic threshold alerting
CREATE MATERIALIZED VIEW dynamic_alerts AS
SELECT
    o.order_id,
    o.customer_id,
    o.category,
    o.amount,
    t.max_amount AS threshold,
    o.order_time
FROM orders o
JOIN alert_thresholds t ON o.category = t.category
WHERE o.amount > t.max_amount;

Cascading materialized views: RisingWave supports building materialized views on top of other materialized views. This lets you decompose complex Flink operator chains into modular, testable layers:

-- Layer 1: Clean and enrich
CREATE MATERIALIZED VIEW cleaned_orders AS
SELECT * FROM enriched_orders WHERE amount > 0;

-- Layer 2: Aggregate on top of cleaned data
CREATE MATERIALIZED VIEW daily_category_summary AS
SELECT
    category,
    DATE_TRUNC('day', order_time) AS order_date,
    SUM(amount) AS daily_revenue,
    COUNT(*) AS daily_orders
FROM cleaned_orders
GROUP BY category, DATE_TRUNC('day', order_time);

This pattern replaces Flink's operator chaining with a queryable, layered architecture.

Step 5: Create Sinks

Connect the output to your downstream systems:

-- Sink revenue data to PostgreSQL
CREATE SINK revenue_to_postgres FROM revenue_by_category
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://postgres:5432/analytics',
    table.name = 'revenue_5min',
    type = 'upsert',
    primary_key = 'category,window_start'
);

-- Sink alerts to Kafka topic
CREATE SINK alerts_to_kafka FROM high_value_alerts
WITH (
    connector = 'kafka',
    topic = 'high-value-alerts',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 6: Validate Results

Query your materialized views directly to confirm correctness:

-- Check revenue aggregation
SELECT * FROM revenue_by_category
ORDER BY window_start DESC
LIMIT 10;

-- Check alerts
SELECT * FROM high_value_alerts
ORDER BY order_time DESC
LIMIT 10;

-- Compare row counts with Flink output
SELECT COUNT(*) FROM enriched_orders;

RisingWave serves these queries from its built-in storage. In Flink, you would need to query the downstream PostgreSQL or Kafka topic to see results. With RisingWave, you can query intermediate results at any layer of the pipeline, which makes debugging and validation dramatically easier.

Once you have validated that RisingWave produces the same results:

  1. Run both systems in parallel for at least one week, comparing outputs
  2. Switch consumers from the Flink output topic/table to the RisingWave output
  3. Stop the Flink job and archive the savepoint
  4. Decommission the Flink cluster (JobManager, TaskManagers, ZooKeeper)

What to Expect After Migration

Migration is not just about translating code. Here is what changes in practice.

Operational Simplification

The most immediate impact is fewer moving parts. A typical Flink deployment involves a JobManager, multiple TaskManagers, ZooKeeper (or Kubernetes for HA), a RocksDB state backend, checkpoint storage on S3, a metrics exporter, and monitoring dashboards for each component. RisingWave collapses this into a single system with built-in monitoring accessible through system catalogs and SQL queries.

Recovery Time

Flink recovery after a failure depends on state size. With RocksDB, restoring a multi-gigabyte state can take minutes. Flink 2.0's ForSt backend improves this to under 10 seconds. RisingWave recovers in seconds regardless of state size because state is already on S3 - there is no local state to rebuild.

Query Serving

With Flink, reading pipeline results requires querying the downstream sink (PostgreSQL, Elasticsearch, or another database). RisingWave serves queries directly from materialized views. This eliminates the "Flink + serving database" pattern, reducing infrastructure complexity and data staleness.

What You Lose

Be honest about the tradeoffs:

  • MATCH_RECOGNIZE: If you use Flink's Complex Event Processing (CEP) capabilities, RisingWave does not have an equivalent. You will need to express pattern matching as a combination of window functions and joins, or keep Flink for that specific workload.
  • DataStream API flexibility: Custom Java operators with timers, side outputs, and async I/O have no direct equivalent. Most can be replaced with SQL + UDFs, but some niche logic may require rethinking.
  • Ecosystem breadth: Flink has connectors for nearly everything. RisingWave covers the most common sources and sinks (Kafka, Pulsar, Kinesis, MySQL CDC, PostgreSQL CDC, S3, Iceberg, JDBC, and more), but check the connector list for your specific needs.

Flink persists state locally on each TaskManager using RocksDB (or in-memory for small states). You must configure block cache sizes, write buffer counts, and compaction settings per job. Periodic checkpoints serialize this local state to remote storage (S3 or HDFS), and the checkpoint interval (commonly 30 seconds to several minutes) directly affects recovery time and exactly-once guarantees.

RisingWave takes a fundamentally different approach. Its storage engine, Hummock, is a cloud-native LSM-tree built on top of S3-compatible object storage. All state is persisted remotely by default. There is no local state to manage, no RocksDB to tune, and no separate checkpoint configuration. The barrier-based checkpointing mechanism runs at approximately 1-second intervals, providing both low-latency recovery and strong consistency guarantees.

For pipelines with large state (multi-way joins, long windows, or high-cardinality aggregations), this architectural difference is significant. Flink's local state can exhaust TaskManager disk space, requiring manual intervention. RisingWave's S3-backed state scales with your data automatically.

Yes, for most SQL-based workloads. If your Flink pipeline uses Flink SQL for sources, transformations, and sinks, migration is primarily a syntax translation. The core SQL operations (SELECT, JOIN, GROUP BY, window functions, aggregations) are supported in both systems.

The main areas requiring changes are:

  1. DDL syntax: CREATE TABLE with connectors becomes CREATE SOURCE (for read streams) or CREATE TABLE (for CDC/upsert). Connector parameters use unquoted keys.
  2. Window functions: Flink's TUMBLE() in GROUP BY becomes RisingWave's TUMBLE() as a FROM-clause table function. TUMBLE_START/TUMBLE_END become window_start/window_end columns.
  3. Interval syntax: Flink uses INTERVAL '5' MINUTE. RisingWave uses INTERVAL '5 MINUTES' (or INTERVAL '5' MINUTE - both work).
  4. Type mapping: TIMESTAMP_LTZ becomes TIMESTAMPTZ. TINYINT is not supported (use SMALLINT). CHAR(n) fixed-length types are not supported (use VARCHAR).
  5. Sink declarations: Flink's INSERT INTO sink_table pattern becomes CREATE SINK ... FROM materialized_view.

If your pipeline uses the DataStream API or custom ProcessFunctions, those components need to be rewritten in SQL or replaced with UDFs.

Stay on Flink if your workloads depend on capabilities that RisingWave does not support:

  • Complex Event Processing (CEP): Flink's MATCH_RECOGNIZE clause enables sophisticated pattern detection over event streams. RisingWave does not support this syntax.
  • Custom Java/Scala operators: If your pipeline relies heavily on the DataStream API with custom operators, timers, async I/O, or side outputs, the migration cost may exceed the operational savings.
  • Batch-streaming unification: Flink's unified batch and stream processing on the same API is mature. RisingWave focuses on streaming-first with batch query serving, but it is not designed as a batch processing engine.
  • Existing investment: If you have a dedicated Flink platform team, well-tuned clusters, and established CI/CD pipelines for Flink jobs, the migration cost must be weighed against the operational benefits.

For teams where 80% or more of the pipeline is SQL-expressible (aggregations, joins, windows, CDC), migration to RisingWave typically pays for itself within weeks through reduced operational complexity and faster development cycles.

How Do I Handle the Transition Period During Migration

Run both systems in parallel. Keep your Flink pipeline running while building the RisingWave equivalent alongside it. Both systems read from the same Kafka topics, so there is no data duplication at the source layer.

During the parallel phase:

  1. Compare outputs by writing both Flink and RisingWave results to separate tables in your downstream database, then run diff queries
  2. Monitor latency - RisingWave's sub-second checkpointing typically delivers fresher results than Flink's default checkpoint interval
  3. Validate edge cases - pay special attention to late data handling, null values, and window boundary behavior
  4. Migrate consumers gradually - switch one downstream consumer at a time from the Flink output to the RisingWave output
  5. Keep the Flink savepoint - archive the final savepoint before decommissioning, in case you need to roll back

A typical parallel run lasts one to two weeks. This gives enough time to observe behavior across different traffic patterns and catch edge cases.

Conclusion

Migrating from Apache Flink to RisingWave is a shift from a Java-centric processing framework to a SQL-native streaming database. The key takeaways:

  • Concept mapping is straightforward: DataStream operators become materialized views, ProcessFunctions become SQL with UDFs, and RocksDB state backends become automatic S3 storage.
  • SQL translation is mechanical: For Flink SQL users, migration is primarily syntax changes - different DDL keywords, window function placement, and connector configuration.
  • Operational complexity drops significantly: No JVM tuning, no RocksDB configuration, no TaskManager capacity planning. One system replaces the Flink cluster plus your serving database.
  • Run both systems in parallel: The safest migration path is a one-to-two-week parallel run with output comparison before switching consumers.
  • Know the limits: MATCH_RECOGNIZE, custom DataStream operators, and batch processing are areas where Flink remains stronger. Verify that your workloads are SQL-expressible before committing to migration.

Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.