Build a Streaming Logistics Lakehouse: RisingWave + Lakekeeper + Iceberg

Build a Streaming Logistics Lakehouse: RisingWave + Lakekeeper + Iceberg

·

8 min read

One-line deployment of RisingWave + Lakekeeper + MinIO to join 7 Kafka streams and write directly to Apache Iceberg, just like a Postgres database. Fully open and queryable from Spark, Trino, or DuckDB, with no lock-in.

A simpler path to real-time logistics on RisingWave

Teams building real-time logistics analytics need fresh, joined materialized views across many event streams (trucks, drivers, routes, fuel, maintenance, shipments, warehouses). They also need to persist results to Apache Iceberg for long-term storage and interoperability, so the same data can be queried by other engines, used to train AI models, and power real-time dashboards and broader analytics.

However, traditional Iceberg stacks typically require a separate writer, a catalog service, and an external compaction tool. This adds operational overhead, slows onboarding, and forces teams to juggle multiple services, making the stack harder to run end-to-end.

This blog shows a simpler path: build the multi-way streaming join in SQL, persist it natively to Iceberg, and query from any engine, all with RisingWave (stream processing + Iceberg Table Engine) and a self-hosted Lakekeeper (open REST catalog).

How It Works

  • RisingWave ingests seven Kafka topics, performs a multi-way streaming join in a materialized view, and writes the results directly to Iceberg (no external writer/sink needed).

  • Lakekeeper provides a REST catalog so Spark/Trino/DuckDB can query the same Iceberg tables.

  • MinIO acts as S3-compatible object storage.

Result: a fully open streaming lakehouse you can run locally or in production.

What You’ll Build (End-to-End)

You will:

  1. Spin up RisingWave + Lakekeeper + MinIO

  2. Connect RisingWave’s Iceberg Table Engine to the Lakekeeper REST catalog

  3. Join seven Kafka streams and materialize the result

  4. Persist that result to an Iceberg table

  5. Query the same table from Spark

Environment Setup

Option A — Local (Docker)

Use Docker for a quick local setup of the stack, as shown in the RisingWave Awesome Stream Processing repository.

Option B — Production (Kubernetes/Helm)

Use this for a durable, cluster setup.

Deploy RisingWave + Lakekeeper + MinIO with Helm and production parameters (storage classes, resources, persistence, auth). The SQL and workflow below stay the same.

Step 1: Start RisingWave and connect to Lakekeeper catalog

Open a SQL shell to RisingWave:

psql -h localhost -p 4566 -d dev -U root

Create a connection pointing to Lakekeeper:

CREATE CONNECTION lakekeeper_catalog_conn
WITH (
    type = 'iceberg',
    catalog.type = 'rest',
    catalog.uri = 'http://lakekeeper:8181/catalog/',
    warehouse.path = 'risingwave-warehouse',
    s3.access.key = 'hummockadmin',
    s3.secret.key = 'hummockadmin',
    s3.path.style.access = 'true',
    s3.endpoint = 'http://minio-0:9301',
    s3.region = 'us-east-1'
);

Set as the default Iceberg connection

SET iceberg_engine_connection = 'public.lakekeeper_catalog_conn';

What this does

  • Points RisingWave to a REST-based Lakekeeper catalog.

  • Specifies the Iceberg warehouse and S3-compatible settings.

  • Makes the connection active so subsequent Iceberg DDL and DML use it automatically.

Step 2: Build the Streaming Join Pipeline

Create seven Kafka sources to ingest data from different Kafka topics into RisingWave using the RisingWave Kafka source connector.

trucks — Fleet inventory and live location

Defines per-truck metadata (model, capacity, year) and current_location from Kafka topic trucks.

CREATE SOURCE trucks (
  truck_id integer,
  truck_model varchar,
  capacity_tons integer,
  manufacture_year integer,
  current_location varchar
) WITH (
  connector = 'kafka',
  topic = 'trucks',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

driver — Driver roster and assignments

Captures driver identity, license, and assigned_truck_id from topic drivers.

CREATE SOURCE driver (
  driver_id integer,
  driver_name varchar,
  license_number varchar,
  assigned_truck_id integer
) WITH (
  connector = 'kafka',
  topic = 'drivers',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

shipments — Shipment workload and routing targets

Tracks each shipment’s origin, destination, weight, and truck binding from topic shipments.

CREATE SOURCE shipments (
  shipment_id varchar,
  origin varchar,
  destination varchar,
  shipment_weight integer,
  truck_id integer
) WITH (
  connector = 'kafka',
  topic = 'shipments',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

warehouses — Destination sites and capacity

Provides warehouse location and capacity for joins with shipments from topic warehouses.

CREATE SOURCE warehouses (
  warehouse_id varchar,
  location varchar,
  capacity_tons integer
) WITH (
  connector = 'kafka',
  topic = 'warehouses',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

route — Planned movements and ETAs

Contains per-route truck/driver linkage, ETD/ETA, and distance from topic route.

CREATE SOURCE route (
  route_id varchar,
  truck_id integer,
  driver_id integer,
  estimated_departure_time timestamptz,
  estimated_arrival_time timestamptz,
  distance_km integer
) WITH (
  connector = 'kafka',
  topic = 'route',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

fuel — Refueling logs

Records fuel events per truck (time, liters, station) from topic fuel.

CREATE SOURCE fuel (
  fuel_log_id varchar,
  truck_id integer,
  fuel_date timestamptz,
  liters_filled integer,
  fuel_station varchar
) WITH (
  connector = 'kafka',
  topic = 'fuel',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

maint — Maintenance history and costs

Captures maintenance events and costs per truck from topic maint.

CREATE SOURCE maint (
  maintenance_id varchar,
  truck_id integer,
  maintenance_date timestamptz,
  cost_usd integer
) WITH (
  connector = 'kafka',
  topic = 'maint',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

This logistics_joined_mv is a continuously updated, materialized view that joins seven streams into one unified logistics record per truck/driver/route. It left-joins drivers → trucks (so drivers without a current truck still appear) and inner-joins shipments, route, warehouses, fuel, maint to enrich each record with shipment details, ETD/ETA and distance, warehouse capacity, refueling events, and maintenance costs. Use it as a ready-to-query operational table for real-time dashboards (e.g., Grafana) and to persist the joined stream to Iceberg.

CREATE MATERIALIZED VIEW logistics_joined_mv AS
SELECT
    t.truck_id,
    d.driver_id,
    d.driver_name,
    d.license_number,
    t.truck_model,
    t.capacity_tons,
    t.current_location,
    s.shipment_id,
    s.origin,
    w.location AS warehouse_location,
    w.capacity_tons AS warehouse_capacity_tons,
    r.route_id,
    r.estimated_departure_time,
    r.distance_km,
    f.fuel_log_id,
    f.fuel_date,
    f.liters_filled,
    m.maintenance_id,
    m.maintenance_date,
    m.cost_usd
FROM driver d
LEFT JOIN trucks t ON d.assigned_truck_id = t.truck_id
JOIN shipments s ON t.truck_id = s.truck_id
JOIN route r ON r.truck_id = t.truck_id
JOIN warehouses w ON s.destination = w.location
JOIN fuel f ON f.truck_id = t.truck_id
JOIN maint m ON m.truck_id = t.truck_id;

Returns a quick preview of the first 5 rows from logistics_joined_mv to verify the joined schema and data flow.

SELECT * FROM logistics_joined_mv LIMIT 5;

This creates a materialized view that provides a continuously updated fleet snapshot per truck, combining performance, cost, route, and driver information. It aggregates shipment weight and compares it to truck capacity to compute capacity utilization (%), sums fuel cost (liters × 1.2) and maintenance cost to derive total operational cost, and attaches the current route (ID, ETD, ETA, distance) plus driver (name, license). Output columns include truck metadata alongside these metrics for real-time monitoring and reporting.

CREATE MATERIALIZED VIEW truck_fleet_overview AS
WITH TruckPerformance AS (
    SELECT
        t.truck_id,
        SUM(s.shipment_weight) AS total_shipment_weight,
        t.capacity_tons * 1000 AS max_capacity_weight 
    FROM
        trucks t
    LEFT JOIN
        shipments s ON t.truck_id = s.truck_id
    GROUP BY
        t.truck_id, t.capacity_tons
),
TruckCosts AS (
    SELECT
        t.truck_id,
        SUM(f.liters_filled * 1.2) AS total_fuel_cost,
        SUM(m.cost_usd) AS total_maintenance_cost
    FROM
        trucks t
    LEFT JOIN
        fuel f ON t.truck_id = f.truck_id
    LEFT JOIN
        maint m ON t.truck_id = m.truck_id
    GROUP BY
        t.truck_id
),
RouteDetails AS (
    SELECT
        r.truck_id,
        r.route_id,
        r.driver_id,
        r.estimated_departure_time,
        r.estimated_arrival_time,
        r.distance_km
    FROM
        route r
),
DriverDetails AS (
    SELECT
        d.driver_id,
        d.driver_name,
        d.license_number
    FROM
        driver d
)
SELECT
    t.truck_id,
    t.truck_model,
    tp.total_shipment_weight,
    tp.max_capacity_weight,
    ROUND((tp.total_shipment_weight * 100.0 / tp.max_capacity_weight), 2) AS capacity_utilization_percentage,
    tc.total_fuel_cost,
    tc.total_maintenance_cost,
    (tc.total_fuel_cost + tc.total_maintenance_cost) AS total_operational_cost,
    rd.route_id,
    rd.estimated_departure_time,
    rd.estimated_arrival_time,
    rd.distance_km,
    dd.driver_name,
    dd.license_number
FROM
    TruckPerformance tp
JOIN
    TruckCosts tc ON tp.truck_id = tc.truck_id
JOIN
    RouteDetails rd ON tp.truck_id = rd.truck_id
JOIN
    DriverDetails dd ON rd.driver_id = dd.driver_id
JOIN
    trucks t ON tp.truck_id = t.truck_id;

Query the truck_fleet_overview MV to return five rows.

SELECT * FROM truck_fleet_overview LIMIT 5;

Step 3: Create a Native Iceberg Table

Persist the streaming join to an Iceberg table using the REST-based Lakekeeper catalog and MinIO object storage, all managed from RisingWave.

CREATE TABLE logistics_joined_iceberg (
  truck_id                  INT,
  driver_id                 INT,
  driver_name               VARCHAR,
  license_number            VARCHAR,
  truck_model               VARCHAR,
  capacity_tons             INT,
  current_location          VARCHAR,

  shipment_id               VARCHAR,
  origin                    VARCHAR,

  warehouse_location        VARCHAR,
  warehouse_capacity_tons   INT,

  route_id                  VARCHAR,
  estimated_departure_time  TIMESTAMPTZ,
  distance_km               INT,

  fuel_log_id               VARCHAR,
  fuel_date                 TIMESTAMPTZ,
  liters_filled             INT,

  maintenance_id            VARCHAR,
  maintenance_date          TIMESTAMPTZ,
  cost_usd                  INT
)
WITH (commit_checkpoint_interval = 1)
ENGINE = iceberg;

Note: commit_checkpoint_interval = 1 targets low-latency commits; increase it for larger files and fewer commits.

Step 4: Insert & Query Data

Stream the MV into the Iceberg table:

INSERT INTO logistics_joined_iceberg
SELECT * FROM logistics_joined_mv;

Query the Iceberg table from RisingWave.

SELECT * FROM logistics_joined_iceberg LIMIT 5;

Step 5: Query from Spark

Install Java (8/11/17), then Spark 3.5.x. Map minio-0 to localhost so Spark can reach MinIO:

echo "127.0.0.1 minio-0" | sudo tee -a /etc/hosts

Start Spark SQL with Iceberg + REST + S3 settings:

spark-sql \\
  --packages "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2,org.apache.iceberg:iceberg-aws-bundle:1.9.2" \\
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \\
  --conf spark.sql.defaultCatalog=lakekeeper \\
  --conf spark.sql.catalog.lakekeeper=org.apache.iceberg.spark.SparkCatalog \\
  --conf spark.sql.catalog.lakekeeper.catalog-impl=org.apache.iceberg.rest.RESTCatalog \\
  --conf spark.sql.catalog.lakekeeper.uri=http://127.0.0.1:8181/catalog/ \\
  --conf spark.sql.catalog.lakekeeper.warehouse=risingwave-warehouse \\
  --conf spark.sql.catalog.lakekeeper.io-impl=org.apache.iceberg.aws.s3.S3FileIO \\
  --conf spark.sql.catalog.lakekeeper.s3.endpoint=http://minio-0:9301 \\
  --conf spark.sql.catalog.lakekeeper.s3.region=us-east-1 \\
  --conf spark.sql.catalog.lakekeeper.s3.path-style-access=true \\
  --conf spark.sql.catalog.lakekeeper.s3.access-key-id=hummockadmin \\
  --conf spark.sql.catalog.lakekeeper.s3.secret-access-key=hummockadmin

Then:

SELECT * FROM public.logistics_joined_iceberg;

Spark reads the same Iceberg table registered in Lakekeeper, written by RisingWave, and stored in MinIO. That’s interoperability with no lock-in and full flexibility.

Conclusion

In this blog, we showed how RisingWave simplifies a logistics use case: ingest data from Kafka topics, join them in a materialized view, and persist the results to a native Apache Iceberg table using the REST-based Lakekeeper catalog on MinIO. If you want Iceberg to feel like a database without sacrificing openness, this pattern delivers. Join multiple streams in RisingWave, persist natively to Iceberg, and query from any engine, DuckDB, Trino, Spark, Dremio, and more, via the Lakekeeper REST catalog. It is an open, high-performance streaming lakehouse with minimal setup, perfect for logistics analytics and beyond. With fewer moving parts, you get faster onboarding and a production-ready setup.

The Modern Backbone for Your
Data Streaming Workloads
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.