Multi-Modal Transport Monitoring with Streaming SQL

Multi-Modal Transport Monitoring with Streaming SQL

Multi-modal freight crosses multiple transport networks — ocean containers, rail wagons, road trucks, air cargo — each emitting events in different formats and at different cadences. RisingWave, a PostgreSQL-compatible streaming database, provides a unified SQL layer that normalizes and correlates all those event streams in real time.

Why Multi-Modal Transport Monitoring Is Hard

A typical intercontinental shipment touches four or five distinct transport modes. The ocean leg is tracked through AIS vessel position data. Rail connections are monitored via waybill scan events. Road legs generate GPS telemetry from telematics units. Air cargo produces airline cargo management system (CMS) messages. Each system speaks a different protocol, uses different identifiers for the same physical cargo, and operates on a different event cadence — AIS every few minutes, road GPS every few seconds, ocean dwell events only at port milestones.

Traditional approaches to multi-modal visibility involve building a master tracking database that ingests from all these sources via overnight batch imports. The result is a visibility gap: the customer sees a static "last updated" timestamp and a status that may be hours or days old. When a rail connection is missed or a vessel is diverted, the operations team finds out from a phone call rather than from a system alert.

The complexity multiplies when you need to compute intermodal dwell time — how long cargo sits between transport modes — because that requires correlating arrival events from one system (say, port unload) with departure events from another (rail pickup). In a batch architecture, that join runs overnight. In a streaming architecture, it runs continuously and alerts you the moment dwell time exceeds threshold.

How Streaming SQL Solves Multi-Modal Monitoring

RisingWave ingests normalized shipment events from a unified Kafka topic (produced by a lightweight event normalization layer upstream) and maintains materialized views that track each shipment's current mode, location, and dwell state. Because RisingWave supports temporal joins, you can correlate events from different transport legs using the shipment's booking reference number as a join key, even when those events arrive in different streams with different timestamps.

SESSION window functions are particularly powerful here: they naturally detect mode transitions by identifying gaps in the event stream for a given shipment. When a truck delivers cargo to a port and no further road telemetry appears, the SESSION window closes, marking the end of the road leg and the beginning of the port dwell period.

Building It Step by Step

Step 1: Create the Data Source

-- Unified transport event stream (normalized upstream from mode-specific feeds)
CREATE SOURCE transport_events (
    shipment_id     VARCHAR,
    booking_ref     VARCHAR,
    transport_mode  VARCHAR,   -- OCEAN, RAIL, ROAD, AIR, PORT_DWELL, WAREHOUSE_DWELL
    leg_sequence    INTEGER,
    event_type      VARCHAR,   -- DEPARTED, IN_TRANSIT, ARRIVED, CUSTOMS_HOLD, EXCEPTION
    carrier_code    VARCHAR,
    vessel_voyage   VARCHAR,   -- populated for ocean legs
    container_id    VARCHAR,
    location_name   VARCHAR,
    latitude        DOUBLE PRECISION,
    longitude       DOUBLE PRECISION,
    planned_arrival TIMESTAMPTZ,
    event_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'logistics.transport_events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

-- Current leg status per shipment
CREATE MATERIALIZED VIEW shipment_current_leg AS
SELECT DISTINCT ON (shipment_id)
    shipment_id,
    booking_ref,
    transport_mode          AS current_mode,
    leg_sequence            AS current_leg,
    event_type              AS current_event,
    carrier_code,
    vessel_voyage,
    container_id,
    location_name,
    latitude,
    longitude,
    planned_arrival,
    event_ts                AS last_event_ts,
    NOW() - event_ts        AS event_age
FROM transport_events
ORDER BY shipment_id, event_ts DESC;

-- Full leg history per shipment (for transit time analysis)
CREATE MATERIALIZED VIEW shipment_leg_transitions AS
SELECT
    a.shipment_id,
    a.booking_ref,
    a.transport_mode                                      AS from_mode,
    b.transport_mode                                      AS to_mode,
    a.location_name                                       AS handoff_location,
    a.event_ts                                            AS leg_end_ts,
    b.event_ts                                            AS leg_start_ts,
    EXTRACT(EPOCH FROM (b.event_ts - a.event_ts)) / 3600.0 AS intermodal_dwell_hours
FROM transport_events a
JOIN transport_events b
    ON  b.shipment_id  = a.shipment_id
    AND b.leg_sequence = a.leg_sequence + 1
    AND b.event_type   = 'DEPARTED'
WHERE a.event_type = 'ARRIVED';

Step 3: Add Alerts and Aggregations

-- Intermodal dwell time alerts: cargo sitting at transfer point > 12 hours
CREATE MATERIALIZED VIEW intermodal_dwell_alerts AS
SELECT
    shipment_id,
    booking_ref,
    location_name                                          AS stuck_at,
    from_mode,
    to_mode,
    leg_end_ts                                             AS arrived_at_transfer,
    intermodal_dwell_hours
FROM shipment_leg_transitions
WHERE intermodal_dwell_hours > 12
   OR leg_start_ts IS NULL;  -- no departure scanned yet

-- Hourly shipments-in-transit count by mode
CREATE MATERIALIZED VIEW hourly_in_transit_by_mode AS
SELECT
    window_start,
    window_end,
    transport_mode,
    COUNT(DISTINCT shipment_id) AS shipment_count
FROM TUMBLE(transport_events, event_ts, INTERVAL '1 HOUR')
WHERE event_type = 'IN_TRANSIT'
GROUP BY window_start, window_end, transport_mode;

-- Schedule adherence: actual vs planned arrival delta per leg
CREATE MATERIALIZED VIEW schedule_adherence AS
SELECT
    shipment_id,
    transport_mode,
    carrier_code,
    planned_arrival,
    event_ts                                               AS actual_arrival,
    EXTRACT(EPOCH FROM (event_ts - planned_arrival)) / 3600.0 AS delay_hours,
    CASE
        WHEN event_ts <= planned_arrival          THEN 'ON_TIME'
        WHEN event_ts <= planned_arrival + INTERVAL '4 HOURS' THEN 'MINOR_DELAY'
        ELSE 'SIGNIFICANT_DELAY'
    END AS adherence_status
FROM transport_events
WHERE event_type = 'ARRIVED'
  AND planned_arrival IS NOT NULL;

Step 4: Sink Results Downstream

-- Stream dwell alerts to operations center notification system
CREATE SINK dwell_alert_sink
FROM intermodal_dwell_alerts
WITH (
    connector = 'kafka',
    topic = 'ops.alerts.dwell_exceeded',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Write schedule adherence metrics to Iceberg for long-term carrier analysis
CREATE SINK schedule_adherence_sink
FROM schedule_adherence
WITH (
    connector = 'iceberg',
    type = 'append-only',
    catalog.type = 'rest',
    catalog.uri = 'http://iceberg-catalog:8181',
    database.name = 'logistics',
    table.name = 'schedule_adherence'
) FORMAT PLAIN ENCODE JSON;

How This Compares to Traditional Approaches

AspectBatch Visibility PlatformStreaming SQL (RisingWave)
Event freshnessHours to daysSub-second
Dwell detectionOvernight jobAlerts within seconds of threshold
Mode-crossing joinsComplex ETL pipelinesDeclarative temporal join SQL
Multi-system ingestionCustom adaptersKafka source + upstream normalizer
ScalabilityFixed ETL cluster capacityHorizontal scale-out
Analyst accessSpecialized BI toolsStandard PostgreSQL SQL

FAQ

What is multi-modal transport monitoring?

Multi-modal transport monitoring tracks cargo as it transitions between different transport modes — ocean vessel, rail, road, and air — providing end-to-end visibility across the full shipping journey. It includes tracking current location, mode transitions, dwell times at transfer points, and schedule adherence for each leg.

How does RisingWave handle the different event cadences of each transport mode?

RisingWave is event-driven: it processes events as they arrive regardless of cadence. Ocean AIS events arriving every few minutes and road GPS events arriving every few seconds are both handled by the same infrastructure. Materialized views update incrementally on each new event, so slower modes simply produce fewer updates rather than blocking faster modes.

Can I integrate RisingWave with my existing stack?

Yes. RisingWave connects to Kafka (source and sink), PostgreSQL via CDC, MySQL via CDC, and writes to JDBC databases and Iceberg tables. The PostgreSQL-compatible query interface means visibility portals, BI platforms, and operations dashboards can query live shipment state directly.

How do I handle duplicate events from multiple tracking providers?

Deduplication can be handled upstream in Kafka (using a deduplication transformation) or within RisingWave by keying materialized views on the combination of shipment ID and event timestamp. The DISTINCT ON pattern shown above naturally surfaces the latest event per shipment.

Key Takeaways

  • Multi-modal visibility platforms built on batch ETL leave operations teams discovering dwell overruns and missed connections hours after they occur.
  • RisingWave unifies transport event streams from all modes under a single SQL schema, with temporal joins handling mode-transition correlations in real time.
  • SESSION and TUMBLE window functions enable dwell time detection, schedule adherence scoring, and in-transit counts without custom application code.
  • Results flow to operations alerting systems and long-term analytics stores through Kafka and Iceberg sinks.

Ready to try this? Get started with RisingWave. Join our Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.