Transit agencies can reduce bunching, improve on-time performance, and balance passenger loads across their fleet by treating GTFS-Realtime and AVL feeds as continuous streams in RisingWave. Streaming SQL materialized views compute headway variance, vehicle adherence, and route-level KPIs in real time—giving control centers the data they need to make holding, expressing, and dispatch decisions before problems become visible to riders.
Why Real-Time Transit Optimization Matters
Bus bunching—two vehicles on the same route arriving at a stop in rapid succession after a long gap—is the most visible and frustrating failure mode in surface transit. It happens because small delays compound: a slightly late vehicle picks up more passengers, making it later, causing the following vehicle to catch up. Without real-time intervention from a dispatch system, bunching is self-reinforcing.
The data to prevent bunching exists in every modern transit agency: GPS positions from automatic vehicle location (AVL) systems, updated every 10–30 seconds. But most agencies have not built the analytical layer to turn that data stream into actionable, second-by-second headway analytics that dispatchers can act on.
The same data also enables passenger information services (next vehicle arrival predictions), performance reporting (schedule adherence, on-time percentage), and capacity management (crowding prediction when vehicles carry passenger counters). All of these use cases benefit from the same streaming SQL architecture.
The Streaming SQL Approach
GTFS-RT vehicle position and trip update feeds, supplemented by AVL and APC (automatic passenger counter) data, flow into Kafka topics. RisingWave ingests these feeds and maintains materialized views for:
- Vehicle adherence — how many minutes ahead or behind schedule each vehicle is
- Headway — actual gap between consecutive vehicles on the same route/direction
- Route KPIs — on-time percentage, average headway, bunching incidents
- Crowding — load factor per vehicle from APC data
Building It Step by Step
Step 1: Data Source
-- GTFS-RT vehicle positions (every 10-30 seconds per vehicle)
CREATE SOURCE vehicle_positions (
vehicle_id VARCHAR,
route_id VARCHAR,
trip_id VARCHAR,
direction_id INT, -- 0=outbound, 1=inbound
stop_sequence INT,
current_stop_id VARCHAR,
current_status VARCHAR, -- 'IN_TRANSIT_TO','STOPPED_AT','INCOMING_AT'
lat NUMERIC,
lon NUMERIC,
bearing_deg NUMERIC,
speed_kmh NUMERIC,
timestamp_unix BIGINT,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'transit.gtfs-rt.positions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Stop arrival/departure events from AVL (triggered on stop visit)
CREATE SOURCE stop_events (
event_id VARCHAR,
vehicle_id VARCHAR,
route_id VARCHAR,
trip_id VARCHAR,
direction_id INT,
stop_id VARCHAR,
stop_sequence INT,
event_type VARCHAR, -- 'arrive','depart'
scheduled_ts TIMESTAMPTZ,
actual_ts TIMESTAMPTZ,
passenger_count INT, -- from APC if available
capacity INT
) WITH (
connector = 'kafka',
topic = 'transit.avl.stop-events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Schedule adherence per vehicle (running calculation)
CREATE MATERIALIZED VIEW vehicle_adherence AS
SELECT
vehicle_id,
route_id,
trip_id,
direction_id,
stop_id,
stop_sequence,
scheduled_ts,
actual_ts,
-- Positive = early, negative = late (minutes)
ROUND(
EXTRACT(EPOCH FROM (scheduled_ts - actual_ts)) / 60.0,
1
) AS adherence_minutes,
CASE
WHEN EXTRACT(EPOCH FROM (scheduled_ts - actual_ts)) / 60.0 > 1 THEN 'early'
WHEN EXTRACT(EPOCH FROM (scheduled_ts - actual_ts)) / 60.0 < -5 THEN 'late'
ELSE 'on_time'
END AS adherence_status,
passenger_count,
capacity,
ROUND(passenger_count * 100.0 / NULLIF(capacity, 0), 1) AS load_factor_pct,
actual_ts AS event_ts
FROM stop_events
WHERE event_type = 'depart';
-- Headway calculation: gap between consecutive vehicles at each stop
CREATE MATERIALIZED VIEW stop_headways AS
SELECT
current_v.route_id,
current_v.stop_id,
current_v.direction_id,
current_v.vehicle_id AS current_vehicle,
current_v.actual_ts AS current_arrival,
LAG(current_v.vehicle_id) OVER (
PARTITION BY current_v.route_id, current_v.stop_id, current_v.direction_id
ORDER BY current_v.actual_ts
) AS preceding_vehicle,
LAG(current_v.actual_ts) OVER (
PARTITION BY current_v.route_id, current_v.stop_id, current_v.direction_id
ORDER BY current_v.actual_ts
) AS preceding_arrival,
ROUND(
EXTRACT(EPOCH FROM (
current_v.actual_ts -
LAG(current_v.actual_ts) OVER (
PARTITION BY current_v.route_id, current_v.stop_id, current_v.direction_id
ORDER BY current_v.actual_ts
)
)) / 60.0, 1
) AS headway_minutes
FROM stop_events current_v
WHERE event_type = 'arrive';
-- Route-level performance KPIs (hourly window)
CREATE MATERIALIZED VIEW route_kpis_hourly AS
SELECT
route_id,
direction_id,
window_start,
window_end,
COUNT(*) AS stop_visits,
COUNT(*) FILTER (WHERE adherence_status = 'on_time') AS on_time_count,
ROUND(
COUNT(*) FILTER (WHERE adherence_status = 'on_time') * 100.0
/ NULLIF(COUNT(*), 0), 1
) AS on_time_pct,
AVG(ABS(adherence_minutes)) AS avg_abs_adherence_min,
AVG(load_factor_pct) AS avg_load_factor_pct,
MAX(load_factor_pct) AS max_load_factor_pct
FROM TUMBLE(vehicle_adherence, event_ts, INTERVAL '1 hour')
GROUP BY route_id, direction_id, window_start, window_end;
Step 3: Alerts and Aggregations
-- Bunching alert: headway < 2 minutes (vehicles too close together)
CREATE MATERIALIZED VIEW bunching_alerts AS
SELECT
route_id,
stop_id,
direction_id,
current_vehicle,
preceding_vehicle,
current_arrival,
headway_minutes
FROM stop_headways
WHERE headway_minutes IS NOT NULL
AND headway_minutes < 2.0;
CREATE SINK bunching_alerts_sink
AS SELECT * FROM bunching_alerts
WITH (
connector = 'kafka',
topic = 'transit.alerts.bunching',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Overcrowding alert: vehicle load factor > 85%
CREATE MATERIALIZED VIEW overcrowding_alerts AS
SELECT
vehicle_id,
route_id,
trip_id,
stop_id,
passenger_count,
capacity,
load_factor_pct,
event_ts
FROM vehicle_adherence
WHERE load_factor_pct > 85.0;
CREATE SINK overcrowding_sink
AS SELECT * FROM overcrowding_alerts
WITH (
connector = 'kafka',
topic = 'transit.alerts.crowding',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Headway regularity: routes with high headway variance (bunching risk)
CREATE MATERIALIZED VIEW headway_variance_alerts AS
SELECT
route_id,
direction_id,
window_start,
window_end,
AVG(headway_minutes) AS avg_headway_min,
STDDEV(headway_minutes) AS stddev_headway_min,
MIN(headway_minutes) AS min_headway_min,
MAX(headway_minutes) AS max_headway_min,
COUNT(*) AS observations
FROM TUMBLE(stop_headways, current_arrival, INTERVAL '30 minutes')
WHERE headway_minutes IS NOT NULL
GROUP BY route_id, direction_id, window_start, window_end
HAVING STDDEV(headway_minutes) > 3.0;
Comparison Table
| System | Headway Analytics | Bunching Detection | APC Integration | API for Dispatchers |
| CAD/AVL (basic) | Manual lookup | No | Separate system | Proprietary |
| GTFS-RT consumer + batch | 5–15 min lag | No | Optional | Limited |
| Commercial RTPI platform | Near real-time | Basic | Yes | REST |
| RisingWave streaming SQL | Sub-second | Yes (SQL) | Native join | PostgreSQL |
FAQ
How does RisingWave handle the window function for headway when vehicles appear out of order in Kafka?
RisingWave supports watermark-based event-time processing. Configure a watermark on the stop_events source, and RisingWave will hold the window open until it is confident all events within the allowed lateness have arrived, then compute the LAG window function in correct event-time order.
Can the bunching alert trigger an automated holding instruction to the affected vehicle?
Yes. The bunching_alerts_sink writes to a Kafka topic. A downstream service subscribed to that topic can look up the vehicle's current position from vehicle_positions and send a holding instruction via the CAD system's command API. RisingWave itself does not send commands—it produces the analytical signal; the action is taken by the downstream integration.
How do I calculate on-time performance for a full month from the streaming views?
Create a monthly tumbling window view (INTERVAL '30 days') over vehicle_adherence, or configure a Kafka sink from the hourly view to write to a data warehouse for longer-term aggregation. For operational reporting, the hourly view is typically sufficient; monthly performance reports are better served by a warehouse query over archived data.
Key Takeaways
- Stream GTFS-RT and AVL stop events into RisingWave to compute schedule adherence and headways using SQL window functions (
LAG) over event-time-ordered partitions. - Bunching alerts fire within seconds of a vehicle closing to within 2 minutes of its predecessor—actionable within the same signal cycle window.
- The
route_kpis_hourlyview provides on-time percentage, average adherence, and load factor in a single query, replacing multiple manual report pulls. - APC passenger count data joins naturally with stop-event data in RisingWave, enabling real-time load factor monitoring and overcrowding alerts without a separate integration.

