One-line deployment of RisingWave + Lakekeeper + MinIO to join 7 Kafka streams and write directly to Apache Iceberg, just like a Postgres database. Fully open and queryable from Spark, Trino, or DuckDB, with no lock-in.
A simpler path to real-time logistics on RisingWave
Teams building real-time logistics analytics need fresh, joined materialized views across many event streams (trucks, drivers, routes, fuel, maintenance, shipments, warehouses). They also need to persist results to Apache Iceberg for long-term storage and interoperability, so the same data can be queried by other engines, used to train AI models, and power real-time dashboards and broader analytics.
However, traditional Iceberg stacks typically require a separate writer, a catalog service, and an external compaction tool. This adds operational overhead, slows onboarding, and forces teams to juggle multiple services, making the stack harder to run end-to-end.
This blog shows a simpler path: build the multi-way streaming join in SQL, persist it natively to Iceberg, and query from any engine, all with RisingWave (stream processing + Iceberg Table Engine) and a self-hosted Lakekeeper (open REST catalog).
How It Works
RisingWave ingests seven Kafka topics, performs a multi-way streaming join in a materialized view, and writes the results directly to Iceberg (no external writer/sink needed).
Lakekeeper provides a REST catalog so Spark/Trino/DuckDB can query the same Iceberg tables.
MinIO acts as S3-compatible object storage.
Result: a fully open streaming lakehouse you can run locally or in production.

What You’ll Build (End-to-End)
You will:
Spin up RisingWave + Lakekeeper + MinIO
Connect RisingWave’s Iceberg Table Engine to the Lakekeeper REST catalog
Join seven Kafka streams and materialize the result
Persist that result to an Iceberg table
Query the same table from Spark

Environment Setup
Option A — Local (Docker)
Use Docker for a quick local setup of the stack, as shown in the RisingWave Awesome Stream Processing repository.
Option B — Production (Kubernetes/Helm)
Use this for a durable, cluster setup.
Deploy RisingWave + Lakekeeper + MinIO with Helm and production parameters (storage classes, resources, persistence, auth). The SQL and workflow below stay the same.
Step 1: Start RisingWave and connect to Lakekeeper catalog
Open a SQL shell to RisingWave:
psql -h localhost -p 4566 -d dev -U root
Create a connection pointing to Lakekeeper:
CREATE CONNECTION lakekeeper_catalog_conn
WITH (
type = 'iceberg',
catalog.type = 'rest',
catalog.uri = 'http://lakekeeper:8181/catalog/',
warehouse.path = 'risingwave-warehouse',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.path.style.access = 'true',
s3.endpoint = 'http://minio-0:9301',
s3.region = 'us-east-1'
);
Set as the default Iceberg connection
SET iceberg_engine_connection = 'public.lakekeeper_catalog_conn';
What this does
Points RisingWave to a REST-based Lakekeeper catalog.
Specifies the Iceberg warehouse and S3-compatible settings.
Makes the connection active so subsequent Iceberg DDL and DML use it automatically.
Step 2: Build the Streaming Join Pipeline
Create seven Kafka sources to ingest data from different Kafka topics into RisingWave using the RisingWave Kafka source connector.
trucks — Fleet inventory and live location
Defines per-truck metadata (model, capacity, year) and current_location from Kafka topic trucks.
CREATE SOURCE trucks (
truck_id integer,
truck_model varchar,
capacity_tons integer,
manufacture_year integer,
current_location varchar
) WITH (
connector = 'kafka',
topic = 'trucks',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
driver — Driver roster and assignments
Captures driver identity, license, and assigned_truck_id from topic drivers.
CREATE SOURCE driver (
driver_id integer,
driver_name varchar,
license_number varchar,
assigned_truck_id integer
) WITH (
connector = 'kafka',
topic = 'drivers',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
shipments — Shipment workload and routing targets
Tracks each shipment’s origin, destination, weight, and truck binding from topic shipments.
CREATE SOURCE shipments (
shipment_id varchar,
origin varchar,
destination varchar,
shipment_weight integer,
truck_id integer
) WITH (
connector = 'kafka',
topic = 'shipments',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
warehouses — Destination sites and capacity
Provides warehouse location and capacity for joins with shipments from topic warehouses.
CREATE SOURCE warehouses (
warehouse_id varchar,
location varchar,
capacity_tons integer
) WITH (
connector = 'kafka',
topic = 'warehouses',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
route — Planned movements and ETAs
Contains per-route truck/driver linkage, ETD/ETA, and distance from topic route.
CREATE SOURCE route (
route_id varchar,
truck_id integer,
driver_id integer,
estimated_departure_time timestamptz,
estimated_arrival_time timestamptz,
distance_km integer
) WITH (
connector = 'kafka',
topic = 'route',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
fuel — Refueling logs
Records fuel events per truck (time, liters, station) from topic fuel.
CREATE SOURCE fuel (
fuel_log_id varchar,
truck_id integer,
fuel_date timestamptz,
liters_filled integer,
fuel_station varchar
) WITH (
connector = 'kafka',
topic = 'fuel',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
maint — Maintenance history and costs
Captures maintenance events and costs per truck from topic maint.
CREATE SOURCE maint (
maintenance_id varchar,
truck_id integer,
maintenance_date timestamptz,
cost_usd integer
) WITH (
connector = 'kafka',
topic = 'maint',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
This logistics_joined_mv is a continuously updated, materialized view that joins seven streams into one unified logistics record per truck/driver/route. It left-joins drivers → trucks (so drivers without a current truck still appear) and inner-joins shipments, route, warehouses, fuel, maint to enrich each record with shipment details, ETD/ETA and distance, warehouse capacity, refueling events, and maintenance costs. Use it as a ready-to-query operational table for real-time dashboards (e.g., Grafana) and to persist the joined stream to Iceberg.
CREATE MATERIALIZED VIEW logistics_joined_mv AS
SELECT
t.truck_id,
d.driver_id,
d.driver_name,
d.license_number,
t.truck_model,
t.capacity_tons,
t.current_location,
s.shipment_id,
s.origin,
w.location AS warehouse_location,
w.capacity_tons AS warehouse_capacity_tons,
r.route_id,
r.estimated_departure_time,
r.distance_km,
f.fuel_log_id,
f.fuel_date,
f.liters_filled,
m.maintenance_id,
m.maintenance_date,
m.cost_usd
FROM driver d
LEFT JOIN trucks t ON d.assigned_truck_id = t.truck_id
JOIN shipments s ON t.truck_id = s.truck_id
JOIN route r ON r.truck_id = t.truck_id
JOIN warehouses w ON s.destination = w.location
JOIN fuel f ON f.truck_id = t.truck_id
JOIN maint m ON m.truck_id = t.truck_id;
Returns a quick preview of the first 5 rows from logistics_joined_mv to verify the joined schema and data flow.
SELECT * FROM logistics_joined_mv LIMIT 5;
This creates a materialized view that provides a continuously updated fleet snapshot per truck, combining performance, cost, route, and driver information. It aggregates shipment weight and compares it to truck capacity to compute capacity utilization (%), sums fuel cost (liters × 1.2) and maintenance cost to derive total operational cost, and attaches the current route (ID, ETD, ETA, distance) plus driver (name, license). Output columns include truck metadata alongside these metrics for real-time monitoring and reporting.
CREATE MATERIALIZED VIEW truck_fleet_overview AS
WITH TruckPerformance AS (
SELECT
t.truck_id,
SUM(s.shipment_weight) AS total_shipment_weight,
t.capacity_tons * 1000 AS max_capacity_weight
FROM
trucks t
LEFT JOIN
shipments s ON t.truck_id = s.truck_id
GROUP BY
t.truck_id, t.capacity_tons
),
TruckCosts AS (
SELECT
t.truck_id,
SUM(f.liters_filled * 1.2) AS total_fuel_cost,
SUM(m.cost_usd) AS total_maintenance_cost
FROM
trucks t
LEFT JOIN
fuel f ON t.truck_id = f.truck_id
LEFT JOIN
maint m ON t.truck_id = m.truck_id
GROUP BY
t.truck_id
),
RouteDetails AS (
SELECT
r.truck_id,
r.route_id,
r.driver_id,
r.estimated_departure_time,
r.estimated_arrival_time,
r.distance_km
FROM
route r
),
DriverDetails AS (
SELECT
d.driver_id,
d.driver_name,
d.license_number
FROM
driver d
)
SELECT
t.truck_id,
t.truck_model,
tp.total_shipment_weight,
tp.max_capacity_weight,
ROUND((tp.total_shipment_weight * 100.0 / tp.max_capacity_weight), 2) AS capacity_utilization_percentage,
tc.total_fuel_cost,
tc.total_maintenance_cost,
(tc.total_fuel_cost + tc.total_maintenance_cost) AS total_operational_cost,
rd.route_id,
rd.estimated_departure_time,
rd.estimated_arrival_time,
rd.distance_km,
dd.driver_name,
dd.license_number
FROM
TruckPerformance tp
JOIN
TruckCosts tc ON tp.truck_id = tc.truck_id
JOIN
RouteDetails rd ON tp.truck_id = rd.truck_id
JOIN
DriverDetails dd ON rd.driver_id = dd.driver_id
JOIN
trucks t ON tp.truck_id = t.truck_id;
Query the truck_fleet_overview MV to return five rows.
SELECT * FROM truck_fleet_overview LIMIT 5;
Step 3: Create a Native Iceberg Table
Persist the streaming join to an Iceberg table using the REST-based Lakekeeper catalog and MinIO object storage, all managed from RisingWave.
CREATE TABLE logistics_joined_iceberg (
truck_id INT,
driver_id INT,
driver_name VARCHAR,
license_number VARCHAR,
truck_model VARCHAR,
capacity_tons INT,
current_location VARCHAR,
shipment_id VARCHAR,
origin VARCHAR,
warehouse_location VARCHAR,
warehouse_capacity_tons INT,
route_id VARCHAR,
estimated_departure_time TIMESTAMPTZ,
distance_km INT,
fuel_log_id VARCHAR,
fuel_date TIMESTAMPTZ,
liters_filled INT,
maintenance_id VARCHAR,
maintenance_date TIMESTAMPTZ,
cost_usd INT
)
WITH (commit_checkpoint_interval = 1)
ENGINE = iceberg;
Note: commit_checkpoint_interval = 1 targets low-latency commits; increase it for larger files and fewer commits.
Step 4: Insert & Query Data
Stream the MV into the Iceberg table:
INSERT INTO logistics_joined_iceberg
SELECT * FROM logistics_joined_mv;
Query the Iceberg table from RisingWave.
SELECT * FROM logistics_joined_iceberg LIMIT 5;
Step 5: Query from Spark
Install Java (8/11/17), then Spark 3.5.x. Map minio-0 to localhost so Spark can reach MinIO:
echo "127.0.0.1 minio-0" | sudo tee -a /etc/hosts
Start Spark SQL with Iceberg + REST + S3 settings:
spark-sql \\
--packages "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2,org.apache.iceberg:iceberg-aws-bundle:1.9.2" \\
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \\
--conf spark.sql.defaultCatalog=lakekeeper \\
--conf spark.sql.catalog.lakekeeper=org.apache.iceberg.spark.SparkCatalog \\
--conf spark.sql.catalog.lakekeeper.catalog-impl=org.apache.iceberg.rest.RESTCatalog \\
--conf spark.sql.catalog.lakekeeper.uri=http://127.0.0.1:8181/catalog/ \\
--conf spark.sql.catalog.lakekeeper.warehouse=risingwave-warehouse \\
--conf spark.sql.catalog.lakekeeper.io-impl=org.apache.iceberg.aws.s3.S3FileIO \\
--conf spark.sql.catalog.lakekeeper.s3.endpoint=http://minio-0:9301 \\
--conf spark.sql.catalog.lakekeeper.s3.region=us-east-1 \\
--conf spark.sql.catalog.lakekeeper.s3.path-style-access=true \\
--conf spark.sql.catalog.lakekeeper.s3.access-key-id=hummockadmin \\
--conf spark.sql.catalog.lakekeeper.s3.secret-access-key=hummockadmin
Then:
SELECT * FROM public.logistics_joined_iceberg;
Spark reads the same Iceberg table registered in Lakekeeper, written by RisingWave, and stored in MinIO. That’s interoperability with no lock-in and full flexibility.
Conclusion
In this blog, we showed how RisingWave simplifies a logistics use case: ingest data from Kafka topics, join them in a materialized view, and persist the results to a native Apache Iceberg table using the REST-based Lakekeeper catalog on MinIO. If you want Iceberg to feel like a database without sacrificing openness, this pattern delivers. Join multiple streams in RisingWave, persist natively to Iceberg, and query from any engine, DuckDB, Trino, Spark, Dremio, and more, via the Lakekeeper REST catalog. It is an open, high-performance streaming lakehouse with minimal setup, perfect for logistics analytics and beyond. With fewer moving parts, you get faster onboarding and a production-ready setup.

