Real-Time Public Transport Optimization with SQL

Real-Time Public Transport Optimization with SQL

Transit agencies can reduce bunching, improve on-time performance, and balance passenger loads across their fleet by treating GTFS-Realtime and AVL feeds as continuous streams in RisingWave. Streaming SQL materialized views compute headway variance, vehicle adherence, and route-level KPIs in real time—giving control centers the data they need to make holding, expressing, and dispatch decisions before problems become visible to riders.

Why Real-Time Transit Optimization Matters

Bus bunching—two vehicles on the same route arriving at a stop in rapid succession after a long gap—is the most visible and frustrating failure mode in surface transit. It happens because small delays compound: a slightly late vehicle picks up more passengers, making it later, causing the following vehicle to catch up. Without real-time intervention from a dispatch system, bunching is self-reinforcing.

The data to prevent bunching exists in every modern transit agency: GPS positions from automatic vehicle location (AVL) systems, updated every 10–30 seconds. But most agencies have not built the analytical layer to turn that data stream into actionable, second-by-second headway analytics that dispatchers can act on.

The same data also enables passenger information services (next vehicle arrival predictions), performance reporting (schedule adherence, on-time percentage), and capacity management (crowding prediction when vehicles carry passenger counters). All of these use cases benefit from the same streaming SQL architecture.

The Streaming SQL Approach

GTFS-RT vehicle position and trip update feeds, supplemented by AVL and APC (automatic passenger counter) data, flow into Kafka topics. RisingWave ingests these feeds and maintains materialized views for:

  • Vehicle adherence — how many minutes ahead or behind schedule each vehicle is
  • Headway — actual gap between consecutive vehicles on the same route/direction
  • Route KPIs — on-time percentage, average headway, bunching incidents
  • Crowding — load factor per vehicle from APC data

Building It Step by Step

Step 1: Data Source

-- GTFS-RT vehicle positions (every 10-30 seconds per vehicle)
CREATE SOURCE vehicle_positions (
    vehicle_id      VARCHAR,
    route_id        VARCHAR,
    trip_id         VARCHAR,
    direction_id    INT,      -- 0=outbound, 1=inbound
    stop_sequence   INT,
    current_stop_id VARCHAR,
    current_status  VARCHAR,  -- 'IN_TRANSIT_TO','STOPPED_AT','INCOMING_AT'
    lat             NUMERIC,
    lon             NUMERIC,
    bearing_deg     NUMERIC,
    speed_kmh       NUMERIC,
    timestamp_unix  BIGINT,
    event_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'transit.gtfs-rt.positions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Stop arrival/departure events from AVL (triggered on stop visit)
CREATE SOURCE stop_events (
    event_id        VARCHAR,
    vehicle_id      VARCHAR,
    route_id        VARCHAR,
    trip_id         VARCHAR,
    direction_id    INT,
    stop_id         VARCHAR,
    stop_sequence   INT,
    event_type      VARCHAR,    -- 'arrive','depart'
    scheduled_ts    TIMESTAMPTZ,
    actual_ts       TIMESTAMPTZ,
    passenger_count INT,        -- from APC if available
    capacity        INT
) WITH (
    connector = 'kafka',
    topic = 'transit.avl.stop-events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Schedule adherence per vehicle (running calculation)
CREATE MATERIALIZED VIEW vehicle_adherence AS
SELECT
    vehicle_id,
    route_id,
    trip_id,
    direction_id,
    stop_id,
    stop_sequence,
    scheduled_ts,
    actual_ts,
    -- Positive = early, negative = late (minutes)
    ROUND(
        EXTRACT(EPOCH FROM (scheduled_ts - actual_ts)) / 60.0,
        1
    )                  AS adherence_minutes,
    CASE
        WHEN EXTRACT(EPOCH FROM (scheduled_ts - actual_ts)) / 60.0 > 1  THEN 'early'
        WHEN EXTRACT(EPOCH FROM (scheduled_ts - actual_ts)) / 60.0 < -5 THEN 'late'
        ELSE 'on_time'
    END                AS adherence_status,
    passenger_count,
    capacity,
    ROUND(passenger_count * 100.0 / NULLIF(capacity, 0), 1) AS load_factor_pct,
    actual_ts          AS event_ts
FROM stop_events
WHERE event_type = 'depart';

-- Headway calculation: gap between consecutive vehicles at each stop
CREATE MATERIALIZED VIEW stop_headways AS
SELECT
    current_v.route_id,
    current_v.stop_id,
    current_v.direction_id,
    current_v.vehicle_id                         AS current_vehicle,
    current_v.actual_ts                          AS current_arrival,
    LAG(current_v.vehicle_id)  OVER (
        PARTITION BY current_v.route_id, current_v.stop_id, current_v.direction_id
        ORDER BY current_v.actual_ts
    )                                            AS preceding_vehicle,
    LAG(current_v.actual_ts) OVER (
        PARTITION BY current_v.route_id, current_v.stop_id, current_v.direction_id
        ORDER BY current_v.actual_ts
    )                                            AS preceding_arrival,
    ROUND(
        EXTRACT(EPOCH FROM (
            current_v.actual_ts -
            LAG(current_v.actual_ts) OVER (
                PARTITION BY current_v.route_id, current_v.stop_id, current_v.direction_id
                ORDER BY current_v.actual_ts
            )
        )) / 60.0, 1
    )                                            AS headway_minutes
FROM stop_events current_v
WHERE event_type = 'arrive';

-- Route-level performance KPIs (hourly window)
CREATE MATERIALIZED VIEW route_kpis_hourly AS
SELECT
    route_id,
    direction_id,
    window_start,
    window_end,
    COUNT(*)                                            AS stop_visits,
    COUNT(*) FILTER (WHERE adherence_status = 'on_time') AS on_time_count,
    ROUND(
        COUNT(*) FILTER (WHERE adherence_status = 'on_time') * 100.0
        / NULLIF(COUNT(*), 0), 1
    )                                                   AS on_time_pct,
    AVG(ABS(adherence_minutes))                         AS avg_abs_adherence_min,
    AVG(load_factor_pct)                                AS avg_load_factor_pct,
    MAX(load_factor_pct)                                AS max_load_factor_pct
FROM TUMBLE(vehicle_adherence, event_ts, INTERVAL '1 hour')
GROUP BY route_id, direction_id, window_start, window_end;

Step 3: Alerts and Aggregations

-- Bunching alert: headway < 2 minutes (vehicles too close together)
CREATE MATERIALIZED VIEW bunching_alerts AS
SELECT
    route_id,
    stop_id,
    direction_id,
    current_vehicle,
    preceding_vehicle,
    current_arrival,
    headway_minutes
FROM stop_headways
WHERE headway_minutes IS NOT NULL
  AND headway_minutes < 2.0;

CREATE SINK bunching_alerts_sink
AS SELECT * FROM bunching_alerts
WITH (
    connector = 'kafka',
    topic = 'transit.alerts.bunching',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Overcrowding alert: vehicle load factor > 85%
CREATE MATERIALIZED VIEW overcrowding_alerts AS
SELECT
    vehicle_id,
    route_id,
    trip_id,
    stop_id,
    passenger_count,
    capacity,
    load_factor_pct,
    event_ts
FROM vehicle_adherence
WHERE load_factor_pct > 85.0;

CREATE SINK overcrowding_sink
AS SELECT * FROM overcrowding_alerts
WITH (
    connector = 'kafka',
    topic = 'transit.alerts.crowding',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Headway regularity: routes with high headway variance (bunching risk)
CREATE MATERIALIZED VIEW headway_variance_alerts AS
SELECT
    route_id,
    direction_id,
    window_start,
    window_end,
    AVG(headway_minutes)    AS avg_headway_min,
    STDDEV(headway_minutes) AS stddev_headway_min,
    MIN(headway_minutes)    AS min_headway_min,
    MAX(headway_minutes)    AS max_headway_min,
    COUNT(*)                AS observations
FROM TUMBLE(stop_headways, current_arrival, INTERVAL '30 minutes')
WHERE headway_minutes IS NOT NULL
GROUP BY route_id, direction_id, window_start, window_end
HAVING STDDEV(headway_minutes) > 3.0;

Comparison Table

SystemHeadway AnalyticsBunching DetectionAPC IntegrationAPI for Dispatchers
CAD/AVL (basic)Manual lookupNoSeparate systemProprietary
GTFS-RT consumer + batch5–15 min lagNoOptionalLimited
Commercial RTPI platformNear real-timeBasicYesREST
RisingWave streaming SQLSub-secondYes (SQL)Native joinPostgreSQL

FAQ

How does RisingWave handle the window function for headway when vehicles appear out of order in Kafka?

RisingWave supports watermark-based event-time processing. Configure a watermark on the stop_events source, and RisingWave will hold the window open until it is confident all events within the allowed lateness have arrived, then compute the LAG window function in correct event-time order.

Can the bunching alert trigger an automated holding instruction to the affected vehicle?

Yes. The bunching_alerts_sink writes to a Kafka topic. A downstream service subscribed to that topic can look up the vehicle's current position from vehicle_positions and send a holding instruction via the CAD system's command API. RisingWave itself does not send commands—it produces the analytical signal; the action is taken by the downstream integration.

How do I calculate on-time performance for a full month from the streaming views?

Create a monthly tumbling window view (INTERVAL '30 days') over vehicle_adherence, or configure a Kafka sink from the hourly view to write to a data warehouse for longer-term aggregation. For operational reporting, the hourly view is typically sufficient; monthly performance reports are better served by a warehouse query over archived data.

Key Takeaways

  • Stream GTFS-RT and AVL stop events into RisingWave to compute schedule adherence and headways using SQL window functions (LAG) over event-time-ordered partitions.
  • Bunching alerts fire within seconds of a vehicle closing to within 2 minutes of its predecessor—actionable within the same signal cycle window.
  • The route_kpis_hourly view provides on-time percentage, average adherence, and load factor in a single query, replacing multiple manual report pulls.
  • APC passenger count data joins naturally with stop-event data in RisingWave, enabling real-time load factor monitoring and overcrowding alerts without a separate integration.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.