Migrating from Flink Iceberg Sink to RisingWave

Migrating from Flink Iceberg Sink to RisingWave

If your team runs Apache Flink to sink streaming data into Apache Iceberg, this guide shows you exactly how to replace that pipeline with RisingWave. You will inventory your existing jobs, translate the SQL, run both systems in parallel to validate correctness, then cut over with a clear rollback plan if anything goes wrong.


Flink works, but it carries operational weight that compounds over time.

Two-phase commit complexity. Flink's Iceberg sink uses a 2PC protocol tied to checkpoint cycles. Coordinator failures mid-checkpoint can leave uncommitted files in the Iceberg table, requiring manual recovery. Every engineer on your team needs to understand checkpoint intervals, failure modes, and how to inspect the job manager logs to diagnose stuck commits.

Separate compaction and GC jobs. Flink does not compact Iceberg small files. After any high-throughput sink, you end up running a separate Spark or Flink compaction job on a schedule. Same for expired snapshot cleanup. That is two more jobs to monitor, two more failure modes, and two more things to page you at 2am.

SQL limitations on CDC sources. Doing incremental aggregation over a CDC source in Flink requires careful watermark management and often custom UDFs. Getting exactly-once semantics across a Kafka source, a stateful transformation, and an Iceberg sink is possible but non-trivial.

RisingWave addresses each of these directly. It is a PostgreSQL-compatible streaming database built in Rust and released under Apache 2.0. It runs streaming SQL natively, ingests from Kafka and CDC sources without a separate connector layer, handles checkpointing internally, and compacts Iceberg tables automatically. The operational surface area is smaller, and the SQL dialect is standard PostgreSQL, so your team does not need to learn a new language.


Step 1: Pre-Migration Assessment

Before touching any infrastructure, document every Flink Iceberg sink job you are running.

For each job, record:

  • The source (Kafka topic, MySQL CDC, PostgreSQL CDC, or other)
  • The transformation logic (projections, aggregations, joins, UDFs)
  • The target Iceberg table, catalog, and warehouse location
  • Parallelism setting and checkpoint interval
  • Whether the job uses event-time or processing-time semantics

This inventory becomes your migration checklist. A job with no UDFs, a single Kafka source, and a simple projection is a one-day migration. A job with five custom Java UDFs, event-time windowing, and a complex join is a week.

Identify custom UDFs. RisingWave does not execute Flink Java or Scala UDFs. If your Flink job calls a UDF, you need a SQL equivalent in RisingWave. Most string manipulation and type casting UDFs have direct PostgreSQL-compatible equivalents. Window aggregations may need to be restructured as tumble or hop window queries using RisingWave's built-in window functions.

Map your Flink catalogs. RisingWave supports the same Iceberg catalog backends as Flink: REST, Hive Metastore, AWS Glue, and JDBC. Make a note of which catalog backend each Flink job uses and what credentials it needs. RisingWave will connect to the same catalog, so no data migration is required — only a new writer.


Step 2: Environment Setup

Deploy RisingWave alongside your existing Flink cluster. Do not decommission Flink yet. The goal of this step is connectivity validation, not cutover.

Install RisingWave using Docker or Kubernetes:

# Docker single-node for testing
docker run -d --name risingwave \
  -p 4566:4566 -p 5691:5691 \
  risingwavelabs/risingwave:latest \
  playground

# Connect via psql
psql -h localhost -p 4566 -U root

For production, use the RisingWave Kubernetes Operator.

Once connected, configure your Iceberg catalog connection. The following example uses a REST catalog:

CREATE SOURCE iceberg_catalog_check
WITH (
  connector = 'iceberg',
  catalog.type = 'rest',
  catalog.uri = 'http://your-iceberg-rest-catalog:8181',
  warehouse.path = 's3://your-bucket/warehouse',
  s3.region = 'us-east-1',
  s3.access.key = 'YOUR_ACCESS_KEY',
  s3.secret.key = 'YOUR_SECRET_KEY'
);

Validate connectivity by listing tables in the catalog:

SHOW TABLES;

If your tables appear, RisingWave can read from and write to the same catalog your Flink jobs use. If not, check the catalog URI, credentials, and network access before proceeding.


Step 3: SQL Translation

This is where most of the migration work happens. Flink SQL and RisingWave SQL look similar but have meaningful differences. RisingWave uses PostgreSQL-compatible SQL, uses materialized views as an intermediate compute layer, and separates sources from sinks.

Kafka Source

Flink:

CREATE TABLE kafka_orders (
  order_id BIGINT,
  user_id BIGINT,
  amount DECIMAL(10, 2),
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

RisingWave:

CREATE SOURCE kafka_orders (
  order_id BIGINT,
  user_id BIGINT,
  amount NUMERIC(10, 2),
  event_time TIMESTAMPTZ
)
WITH (
  connector = 'kafka',
  topic = 'orders',
  properties.bootstrap.server = 'kafka:9092',
  scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE JSON;

Note: RisingWave does not use watermarks in the Flink sense. Barrier-based checkpoints handle exactly-once semantics internally.

CDC Source

Flink:

CREATE TABLE mysql_customers (
  customer_id BIGINT PRIMARY KEY NOT ENFORCED,
  name STRING,
  email STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'port' = '3306',
  'username' = 'root',
  'password' = 'secret',
  'database-name' = 'shop',
  'table-name' = 'customers'
);

RisingWave:

CREATE TABLE mysql_customers (
  customer_id BIGINT PRIMARY KEY,
  name VARCHAR,
  email VARCHAR
)
WITH (
  connector = 'mysql-cdc',
  hostname = 'mysql',
  port = '3306',
  username = 'root',
  password = 'secret',
  database.name = 'shop',
  table.name = 'customers'
);

RisingWave CDC tables are first-class objects. They automatically snapshot the upstream table and then tail the binlog.

Transformation and Iceberg Sink

Flink:

INSERT INTO iceberg_orders
SELECT order_id, user_id, amount, event_time
FROM kafka_orders
WHERE amount > 0;

RisingWave:

-- First, create a materialized view for the transformation
CREATE MATERIALIZED VIEW mv_orders AS
SELECT order_id, user_id, amount, event_time
FROM kafka_orders
WHERE amount > 0;

-- Then, sink the materialized view to Iceberg
CREATE SINK iceberg_orders_sink
FROM mv_orders
WITH (
  connector = 'iceberg',
  type = 'append-only',
  catalog.type = 'rest',
  catalog.uri = 'http://your-iceberg-rest-catalog:8181',
  warehouse.path = 's3://your-bucket/warehouse',
  database.name = 'analytics',
  table.name = 'orders',
  s3.region = 'us-east-1',
  s3.access.key = 'YOUR_ACCESS_KEY',
  s3.secret.key = 'YOUR_SECRET_KEY'
);

The materialized view is RisingWave's incremental compute layer. It holds the current state of the transformation and feeds the sink continuously. This is the most important mental model shift from Flink: in RisingWave, computation lives in the materialized view, not in the sink definition.

Aggregations

Flink (tumbling window):

INSERT INTO iceberg_hourly_totals
SELECT
  TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
  user_id,
  SUM(amount) AS total_amount
FROM kafka_orders
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), user_id;

RisingWave:

CREATE MATERIALIZED VIEW mv_hourly_totals AS
SELECT
  window_start,
  user_id,
  SUM(amount) AS total_amount
FROM TUMBLE(kafka_orders, event_time, INTERVAL '1 HOUR')
GROUP BY window_start, user_id;

CREATE SINK iceberg_hourly_totals_sink
FROM mv_hourly_totals
WITH (
  connector = 'iceberg',
  type = 'upsert',
  primary_key = 'window_start,user_id',
  catalog.type = 'rest',
  catalog.uri = 'http://your-iceberg-rest-catalog:8181',
  warehouse.path = 's3://your-bucket/warehouse',
  database.name = 'analytics',
  table.name = 'hourly_totals',
  s3.region = 'us-east-1'
);

Step 4: Parallel Validation

Do not cut over until you have run both systems in parallel and confirmed that they produce identical output.

Create a shadow Iceberg table with the same schema as your production table:

-- In your Iceberg catalog (e.g., using Spark or iceberg-cli)
CREATE TABLE analytics.orders_rw_test
LIKE analytics.orders;

Point your RisingWave sink at the shadow table:

CREATE SINK iceberg_orders_rw_test_sink
FROM mv_orders
WITH (
  connector = 'iceberg',
  type = 'append-only',
  catalog.type = 'rest',
  catalog.uri = 'http://your-iceberg-rest-catalog:8181',
  warehouse.path = 's3://your-bucket/warehouse',
  database.name = 'analytics',
  table.name = 'orders_rw_test',
  s3.region = 'us-east-1'
);

Run both the Flink job and the RisingWave sink for 24 hours from the same Kafka offset. Then compare:

-- Row count comparison (run in your query engine, e.g., Trino or Spark)
SELECT
  (SELECT COUNT(*) FROM analytics.orders WHERE dt = '2024-01-15') AS flink_count,
  (SELECT COUNT(*) FROM analytics.orders_rw_test WHERE dt = '2024-01-15') AS rw_count;

-- Checksum comparison
SELECT
  SUM(amount) AS flink_total
FROM analytics.orders
WHERE dt = '2024-01-15';

SELECT
  SUM(amount) AS rw_total
FROM analytics.orders_rw_test
WHERE dt = '2024-01-15';

Validate exactly-once semantics. While RisingWave is running, kill the process and restart it:

# Kill RisingWave (simulate a crash)
docker stop risingwave

# Restart it
docker start risingwave

After recovery, verify that row counts in the shadow table match expectations with no duplicates. RisingWave recovers from its internal checkpoint and resumes from the last committed position.


Step 5: Cutover

Once parallel validation passes, execute the cutover during a low-traffic window.

1. Pause the Flink job by issuing a savepoint or stopping it cleanly:

flink stop --savepointPath s3://your-bucket/savepoints/ <job-id>

2. Record the current Kafka consumer group offset for the Flink job's consumer group:

kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group flink-orders-consumer

Note the offset for each partition of the orders topic.

3. Drop the shadow sink and create the production sink in RisingWave, starting from the recorded offset:

-- Drop the shadow sink
DROP SINK iceberg_orders_rw_test_sink;

-- Create the production sink pointing to the real table
CREATE SINK iceberg_orders_sink
FROM mv_orders
WITH (
  connector = 'iceberg',
  type = 'append-only',
  catalog.type = 'rest',
  catalog.uri = 'http://your-iceberg-rest-catalog:8181',
  warehouse.path = 's3://your-bucket/warehouse',
  database.name = 'analytics',
  table.name = 'orders',
  s3.region = 'us-east-1',
  s3.access.key = 'YOUR_ACCESS_KEY',
  s3.secret.key = 'YOUR_SECRET_KEY'
);

If the RisingWave source was already consuming from the same Kafka consumer group as Flink, offsets are shared. If not, update the source to scan from the specific offsets you recorded:

-- Drop and recreate the source at the exact offset
DROP SOURCE kafka_orders;

CREATE SOURCE kafka_orders (
  order_id BIGINT,
  user_id BIGINT,
  amount NUMERIC(10, 2),
  event_time TIMESTAMPTZ
)
WITH (
  connector = 'kafka',
  topic = 'orders',
  properties.bootstrap.server = 'kafka:9092',
  scan.startup.mode = 'specific',
  scan.startup.timestamp.millis = '1705276800000'  -- replace with actual cutover timestamp
)
FORMAT PLAIN ENCODE JSON;

4. Monitor for 15 minutes. Watch RisingWave's internal metrics:

-- Check sink lag
SELECT * FROM rw_sink_metrics;

-- Check source lag
SELECT * FROM rw_source_metrics;

Verify that rows are flowing into the production Iceberg table before declaring success.


Rollback Plan

Keep the Flink savepoint and job definition intact for 48 hours post-cutover.

If RisingWave encounters an issue during the first 48 hours:

  1. Drop the RisingWave production sink to stop writes.
  2. Restart the Flink job from its last savepoint:
flink run -s s3://your-bucket/savepoints/<savepoint-dir> \
  -d your-flink-job.jar
  1. Flink will resume from the savepoint position. Because Kafka retains messages, any gap in coverage during the RisingWave run will be replayed.
  2. Investigate the RisingWave issue before attempting cutover again.

Post-Migration Cleanup

Once you have operated RisingWave in production for 48 hours without issues:

  • Remove the Flink compaction job. RisingWave compacts Iceberg small files automatically. Running a separate compaction job will conflict and should be removed.
  • Remove the Flink snapshot expiry job. RisingWave handles expired snapshot cleanup.
  • Cancel and archive the Flink job. Keep the JAR and configuration in version control for reference, but cancel the job in the Flink cluster.
  • Update your runbooks to reflect the new architecture: source in RisingWave, materialized view for transformation, Iceberg sink.
  • Remove unused Kafka consumer groups that belonged to the Flink job if they are no longer needed.

Common Migration Pitfalls

Forgetting to handle watermark semantics. Flink watermarks are application-level constructs that define event-time progress. RisingWave uses barrier-based checkpoints for exactly-once delivery and does not have the same watermark concept. If your Flink SQL relied on watermarks to handle late-arriving data, you need to evaluate whether RisingWave's event-time window behavior meets your requirements. For most Iceberg sink use cases, this is not an issue because you are appending records, not emitting windowed aggregations with allowed lateness.

Not aligning the initial snapshot during cutover. If your Flink job was reading from a CDC source (not Kafka), the cutover process is different. You cannot simply "pick an offset" from a CDC stream the same way you can from Kafka. RisingWave's CDC connector will snapshot the upstream table and then tail the binlog from the snapshot LSN. Coordinate the Flink pause and the RisingWave CDC snapshot start to avoid a gap or overlap in the data written to Iceberg.

Assuming Flink SQL and RisingWave SQL are identical. They share ANSI SQL overlap but differ in many places. RisingWave does not support Flink-specific functions like PROCTIME(), ROWTIME(), or Flink's own MATCH_RECOGNIZE. It also does not support Flink's DataStream API. All logic must be expressible in PostgreSQL-compatible SQL.

Running compaction jobs after migration. If you keep the old Flink or Spark compaction job running after RisingWave takes over the sink, both systems will try to rewrite Iceberg files simultaneously. This causes conflicts. Disable external compaction jobs before or immediately after cutover.

Not testing CDC initial snapshot load. RisingWave's CDC connector performs a full table snapshot before tailing the binlog. For large tables, this snapshot phase can take hours and will generate significant load on the upstream database. Test this in a staging environment with production-sized tables before the real cutover.


Migration Checklist

StepTaskDone
1Inventory all Flink Iceberg sink jobs
1Identify custom UDFs requiring SQL translation
1Map Flink catalogs to RisingWave catalog connectors
2Deploy RisingWave alongside Flink cluster
2Validate catalog connectivity with SHOW TABLES
3Translate Kafka source DDL
3Translate CDC source DDL
3Create materialized views for transformation logic
3Create RisingWave Iceberg sink DDL
4Create shadow Iceberg table
4Run parallel validation for 24 hours
4Verify row counts and checksums match
4Test crash recovery and exactly-once semantics
5Pause Flink job with savepoint
5Record Kafka consumer group offsets
5Create production sink in RisingWave
5Monitor for 15 minutes post-cutover
-Keep Flink job paused for 48 hours (rollback window)
-Remove Flink compaction and GC jobs after 48 hours
-Update runbooks and architecture diagrams

FAQ

Can RisingWave write to an Iceberg table that Flink has been writing to?

Yes. RisingWave and Flink both use the standard Iceberg table format and commit through the catalog. As long as you stop Flink before starting RisingWave writes to the same table, there is no conflict. Running both writers simultaneously on the same Iceberg table is not recommended because it can cause commit conflicts.

Does RisingWave support Iceberg v2 tables with merge-on-read?

RisingWave supports writing to Iceberg v2 tables. For upsert workloads, use type = 'upsert' in the sink definition and specify primary_key. RisingWave writes position delete files for Iceberg v2 delete semantics. Check the RisingWave Iceberg sink documentation for the current list of supported Iceberg features.

What happens to in-flight Flink checkpoints during cutover?

When you issue flink stop --savepointPath, Flink completes the current checkpoint before stopping. This ensures the Flink job leaves the Iceberg table in a consistent state. There will be no partial files or uncommitted transactions to clean up before RisingWave takes over.

How do I handle Flink jobs that use both Kafka and CDC sources in a join?

RisingWave supports joining a CDC table and a Kafka source in a materialized view. Create both as sources, then write the join logic in the materialized view definition. The materialized view will maintain the join state incrementally, the same way Flink's DataStream stateful join does.

Can I migrate incrementally, job by job, or do I need to cut over all at once?

You can migrate job by job. RisingWave and Flink can coexist indefinitely on different Iceberg tables. Start with your simplest, lowest-risk job to build confidence in the process. Move high-value or complex jobs last. Each job follows the same five-step playbook described above.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.