Real-Time Data Ingestion into Apache Iceberg with SQL

Real-Time Data Ingestion into Apache Iceberg with SQL

Real-Time Data Ingestion into Apache Iceberg with SQL (2026)

You can ingest real-time data into Apache Iceberg tables using pure SQL with RisingWave — no Java code, no Flink cluster, no Spark jobs. Define a streaming source, transform data with SQL, and create an Iceberg sink. RisingWave handles continuous ingestion, Parquet file writing, catalog commits, and automatic compaction in a single system.

This guide shows step-by-step how to build a real-time Iceberg ingestion pipeline with SQL.

Why SQL-Based Iceberg Ingestion?

Traditional Iceberg ingestion requires:

  • Flink: Java code, Flink cluster deployment, checkpoint tuning, manual compaction
  • Spark: PySpark scripts, Spark cluster, micro-batch latency, scheduled jobs

SQL-based ingestion with RisingWave simplifies this to three SQL statements:

CREATE SOURCE ...;        -- Define where data comes from
CREATE MATERIALIZED VIEW ...; -- Transform the data (optional)
CREATE SINK ...;          -- Write to Iceberg

No JVM, no cluster management, no manual compaction. RisingWave handles everything in a PostgreSQL-compatible SQL interface.

Step-by-Step: Kafka to Iceberg

1. Create a Kafka Source

CREATE SOURCE ecommerce_events (
  event_id VARCHAR,
  user_id INT,
  event_type VARCHAR,
  product_id INT,
  quantity INT,
  price DECIMAL,
  event_time TIMESTAMP WITH TIME ZONE
) WITH (
  connector = 'kafka',
  topic = 'ecommerce-events',
  properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

2. Transform with SQL (Optional)

CREATE MATERIALIZED VIEW ecommerce_enriched AS
SELECT
  event_id,
  user_id,
  event_type,
  product_id,
  quantity,
  price,
  quantity * price as total_amount,
  event_time,
  DATE(event_time) as event_date,
  EXTRACT(HOUR FROM event_time) as event_hour
FROM ecommerce_events;

3. Sink to Iceberg

CREATE SINK ecommerce_to_iceberg AS
SELECT * FROM ecommerce_enriched
WITH (
  connector = 'iceberg',
  type = 'append-only',
  catalog.type = 'rest',
  catalog.uri = 'http://iceberg-rest:8181',
  warehouse.path = 's3://my-lakehouse/warehouse',
  s3.endpoint = 'https://s3.amazonaws.com',
  s3.region = 'us-east-1',
  s3.access.key = '${AWS_ACCESS_KEY}',
  s3.secret.key = '${AWS_SECRET_KEY}',
  database.name = 'ecommerce',
  table.name = 'events'
);

That's it. Data now flows continuously from Kafka through RisingWave into Iceberg.

Step-by-Step: CDC to Iceberg

1. Create a PostgreSQL CDC Source

CREATE SOURCE pg_orders WITH (
  connector = 'postgres-cdc',
  hostname = 'postgres-host',
  port = '5432',
  username = 'replication_user',
  password = 'password',
  database.name = 'production',
  slot.name = 'rw_orders_slot'
);

CREATE TABLE orders (
  order_id INT PRIMARY KEY,
  customer_id INT,
  status VARCHAR,
  amount DECIMAL,
  region VARCHAR,
  created_at TIMESTAMP,
  updated_at TIMESTAMP
) FROM pg_orders TABLE 'public.orders';

2. Sink CDC Data to Iceberg (Upsert Mode)

CREATE SINK orders_to_iceberg AS
SELECT * FROM orders
WITH (
  connector = 'iceberg',
  type = 'upsert',
  primary_key = 'order_id',
  catalog.type = 'rest',
  catalog.uri = 'http://iceberg-rest:8181',
  warehouse.path = 's3://my-lakehouse/warehouse',
  database.name = 'production_mirror',
  table.name = 'orders'
);

Every insert, update, and delete in the source PostgreSQL table is automatically reflected in the Iceberg table — no batch ETL, no Debezium, no Kafka.

Iceberg Write Modes

RisingWave supports two write modes for Iceberg sinks:

Append-Only

Best for event streams (logs, clickstream, IoT) where data is never updated:

CREATE SINK ... WITH (
  connector = 'iceberg',
  type = 'append-only',
  ...
);

Upsert

Best for CDC data and mutable tables where rows are updated or deleted:

CREATE SINK ... WITH (
  connector = 'iceberg',
  type = 'upsert',
  primary_key = 'id',
  ...
);

Upsert mode supports both Merge-on-Read (default, optimized for write throughput) and Copy-on-Write (optimized for read performance).

Catalog Configuration Examples

CREATE SINK ... WITH (
  connector = 'iceberg',
  catalog.type = 'rest',
  catalog.uri = 'http://polaris:8181',
  ...
);

Works with Polaris, Lakekeeper, Tabular, and any REST-compatible Iceberg catalog.

Hive Metastore

CREATE SINK ... WITH (
  connector = 'iceberg',
  catalog.type = 'hive',
  catalog.uri = 'thrift://hive-metastore:9083',
  ...
);

AWS S3 Tables

CREATE SINK ... WITH (
  connector = 'iceberg',
  catalog.type = 's3_tables',
  catalog.name = 'my-s3-table-bucket',
  s3.region = 'us-east-1',
  ...
);

JDBC Catalog

CREATE SINK ... WITH (
  connector = 'iceberg',
  catalog.type = 'jdbc',
  catalog.uri = 'jdbc:postgresql://catalog-db:5432/iceberg',
  catalog.jdbc.user = 'iceberg',
  catalog.jdbc.password = 'password',
  ...
);

Automatic Compaction

RisingWave automatically compacts small Parquet files in Iceberg tables. This solves the common small files problem where streaming writes create thousands of tiny files that degrade query performance.

Without automatic compaction (Flink, Spark), you need to:

  1. Schedule a separate compaction job
  2. Run Iceberg's rewriteDataFiles action
  3. Monitor file sizes and compaction metrics
  4. Handle compaction failures and retries

With RisingWave, compaction is built in — no extra jobs, no monitoring, no maintenance.

Architecture Patterns

Pattern 1: Streaming ETL to Lakehouse

Kafka → RisingWave (transform) → Iceberg → Trino/DuckDB (analytics)

Pattern 2: CDC Replication to Lakehouse

PostgreSQL → RisingWave (CDC) → Iceberg → Snowflake/BigQuery (analytics)

Pattern 3: Dual-Serving (Real-Time + Historical)

                    ┌→ Materialized Views (real-time serving)
Kafka → RisingWave ─┤
                    └→ Iceberg Sink (historical analytics)

Frequently Asked Questions

Can I ingest data into Iceberg without Java or Spark?

Yes. RisingWave lets you define complete Iceberg ingestion pipelines in pure SQL — from source definition through transformation to Iceberg sink. No JVM, no Spark cluster, no Flink. Just SQL statements in a PostgreSQL-compatible interface.

How does RisingWave handle the small files problem?

RisingWave automatically compacts small Parquet files in Iceberg tables, merging them into optimally sized files without manual intervention. This eliminates the need for separate compaction jobs that Flink and Spark require.

Can I do CDC to Iceberg with RisingWave?

Yes. RisingWave supports native CDC from PostgreSQL and MySQL directly into Iceberg using upsert mode. No Debezium or Kafka required. Every insert, update, and delete in the source database is captured and reflected in the Iceberg table in real time.

Which Iceberg catalog should I use with RisingWave?

The REST catalog is recommended as it's the most flexible and the Iceberg community standard. RisingWave also supports Hive, JDBC, Storage (S3), and AWS S3 Tables catalogs. Use REST if you're starting fresh; use Hive or JDBC if you have existing catalog infrastructure.

Can RisingWave both serve real-time queries and write to Iceberg?

Yes. RisingWave can simultaneously maintain materialized views for real-time serving (sub-second freshness via PostgreSQL protocol) and sink data to Iceberg for long-term storage and historical analytics. This dual-serving pattern gives you real-time dashboards and historical analysis from a single streaming pipeline.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.