To sink Kafka streams to Apache Iceberg with RisingWave: create a Kafka source, build a materialized view to transform or aggregate the data, then create an Iceberg sink pointing to your catalog and table. RisingWave handles continuous writes automatically — no custom code, no separate orchestration.
Why Use RisingWave to Sink Kafka to Iceberg?
Kafka is the dominant event streaming platform, and Apache Iceberg is rapidly becoming the standard storage format for analytical data lakes. Connecting them reliably at scale is a common data engineering challenge.
Traditional approaches — Kafka Connect S3 sink + Spark jobs, or Flink pipelines — work but require significant operational overhead: managing connectors, writing Java/Scala code, tuning checkpointing, and operating separate clusters.
RisingWave simplifies this dramatically. You write SQL. RisingWave ingests from Kafka, transforms the data through materialized views, and commits continuously to Iceberg. The entire pipeline is declared in three SQL statements.
Prerequisites
Before you begin, you need:
- A running RisingWave cluster (or local instance)
- A Kafka cluster with topics containing your events
- An Apache Iceberg catalog (REST, Glue, or Hive Metastore)
- An S3 bucket (or compatible object storage) for Iceberg data files
- The target Iceberg table already created (or auto-create enabled on your catalog)
Step 1: Create a Kafka Source
Define the schema of your Kafka messages and connect RisingWave to your topic:
CREATE SOURCE clickstream_events (
event_id VARCHAR,
session_id VARCHAR,
user_id BIGINT,
page_url VARCHAR,
referrer VARCHAR,
device_type VARCHAR,
country_code VARCHAR,
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'clickstream',
properties.bootstrap.server = 'kafka-broker:9092',
properties.group.id = 'risingwave-iceberg-sink',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
RisingWave reads from this topic continuously. The scan.startup.mode = 'earliest' ensures all historical messages are processed; use latest if you only want new events.
For Avro or Protobuf encoded messages, replace FORMAT PLAIN ENCODE JSON with FORMAT PLAIN ENCODE AVRO or FORMAT PLAIN ENCODE PROTOBUF and provide your schema registry URL.
Step 2: Transform with a Materialized View
You can sink directly from the source, but in most cases you'll want to transform, enrich, or aggregate the data first. Materialized views do this incrementally:
CREATE MATERIALIZED VIEW clickstream_hourly_summary AS
SELECT
country_code,
device_type,
window_start,
window_end,
COUNT(DISTINCT session_id) AS unique_sessions,
COUNT(DISTINCT user_id) AS unique_users,
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE referrer IS NOT NULL) AS events_with_referrer
FROM TUMBLE(clickstream_events, event_time, INTERVAL '1 HOUR')
GROUP BY country_code, device_type, window_start, window_end;
This materializes an hourly summary grouped by country and device type — exactly what a downstream analytics system would need for dashboards or reporting.
For raw event forwarding without aggregation, you can skip the materialized view and sink directly:
-- Direct raw sink (no transformation)
CREATE SINK raw_clickstream_to_iceberg AS
SELECT * FROM clickstream_events
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://my-datalake/warehouse',
s3.region = 'us-east-1',
database.name = 'web_analytics',
table.name = 'clickstream_raw'
);
Step 3: Create the Iceberg Sink
Sink your materialized view to Iceberg:
CREATE SINK clickstream_summary_to_iceberg AS
SELECT * FROM clickstream_hourly_summary
WITH (
connector = 'iceberg',
type = 'upsert',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://my-datalake/warehouse',
s3.region = 'us-east-1',
database.name = 'web_analytics',
table.name = 'clickstream_hourly'
);
Sink Types: append-only vs. upsert
| Sink Type | Use When | Behavior |
append-only | Raw event logs, immutable data | Each micro-batch appends new files |
upsert | Aggregated results, CDC data | Updates existing rows based on primary key |
For windowed aggregations like the one above, use upsert so that late-arriving events can update the correct window bucket. For raw event logs, append-only is more efficient.
Step 4: Verify the Sink
Check that your sink is running and monitor for any errors:
-- List all sinks
SHOW SINKS;
-- Check sink status
SELECT * FROM rw_sinks;
In RisingWave v2.8+, you can also query the Iceberg table directly to verify data is flowing:
CREATE SOURCE iceberg_clickstream_check
WITH (
connector = 'iceberg',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://my-datalake/warehouse',
database.name = 'web_analytics',
table.name = 'clickstream_hourly'
);
SELECT country_code, SUM(total_events) as total
FROM iceberg_clickstream_check
GROUP BY country_code
ORDER BY total DESC
LIMIT 10;
Handling Schema Changes
Iceberg's schema evolution capabilities mean you can add new fields to your Kafka messages without breaking existing pipelines. When you add a new column to your source schema and materialized view, RisingWave will propagate the change to the Iceberg table on the next commit.
However, dropping or renaming columns requires care. Always update the Iceberg table schema first (using Spark or your catalog API), then update the RisingWave sink.
Performance Tuning
Commit interval — By default, RisingWave commits to Iceberg based on its internal checkpoint interval. For higher throughput, increase the batch size. For lower latency, decrease the checkpoint interval (at the cost of more small files).
Parallelism — RisingWave distributes Kafka consumption and sink writes across multiple worker nodes. Increase the number of source partitions in Kafka to improve ingestion parallelism.
Partitioning — Ensure your Iceberg table is partitioned by a column that matches your most common query filters (e.g., partition by event_time truncated to day). This dramatically reduces query scan costs.
Troubleshooting Common Issues
Sink falls behind — Check RisingWave's source lag metric. If Kafka lag is growing, consider scaling out RisingWave workers or reducing the complexity of your materialized view.
Small files in Iceberg — This is expected with streaming writes. Schedule periodic compaction jobs using Spark's RewriteDataFiles action or Iceberg's REST catalog compaction API.
Catalog connection errors — Verify that RisingWave workers can reach your catalog URI. For REST catalogs, ensure the port is accessible. For Glue, verify IAM permissions.
Comparison: Methods for Sinking Kafka to Iceberg
| Method | Complexity | Latency | Code Required | Transformations |
| RisingWave SQL | Low | Seconds | None (SQL only) | Full SQL |
| Kafka Connect + S3 Sink | Medium | Minutes | Config only | Limited |
| Apache Flink | High | Seconds | Java/Scala | Full |
| Spark Structured Streaming | High | Minutes | Scala/Python | Full |
| Custom Consumer | Very High | Seconds | Full code | Unlimited |
RisingWave uniquely combines low latency with low complexity — no JVM, no custom code, no separate orchestration layer.
FAQ
Q: Can I sink multiple Kafka topics to different Iceberg tables? Yes. Create a separate source, materialized view, and sink for each topic. All pipelines run concurrently and independently.
Q: Does RisingWave guarantee exactly-once delivery to Iceberg? RisingWave uses Iceberg's atomic commit semantics to ensure exactly-once writes. Each checkpoint produces a complete, consistent Iceberg snapshot. If a commit fails, it is retried without duplicating data.
Q: Can I use AWS MSK instead of self-hosted Kafka?
Yes. Set properties.bootstrap.server to your MSK broker endpoints and configure appropriate security settings (SASL/TLS). RisingWave supports the same Kafka protocol as MSK.
Q: What happens if my Iceberg catalog is temporarily unavailable? RisingWave will pause Iceberg commits and buffer data internally. Once the catalog becomes available, it resumes committing without data loss.
Q: Can I sink Avro messages from a Schema Registry?
Yes. Use FORMAT PLAIN ENCODE AVRO with schema.registry = 'http://schema-registry:8081' in your source definition. RisingWave will fetch the schema automatically.
Next Steps
You now have a complete, production-ready pipeline from Kafka to Apache Iceberg — declared entirely in SQL, maintained automatically by RisingWave.
Explore the full RisingWave documentation to learn about more connectors, advanced SQL patterns, and deployment options. Questions? Join the RisingWave Slack community where engineers share pipelines and help each other build.

