Cities can detect congestion, identify incidents, and adjust signal timing in real time by treating traffic sensor data as a continuous stream rather than a batch feed. RisingWave, a PostgreSQL-compatible streaming database, ingests events from loop detectors, cameras, and GPS probes and maintains continuously updated flow, density, and incident materialized views that traffic management centers can query and act on immediately.
Why Real-Time Traffic Flow Analysis Matters
Urban congestion costs the average city driver 50–100 hours per year in lost time and contributes disproportionately to vehicle emissions—stop-and-go traffic consumes 3–5× more fuel than steady flow. Traditional traffic management relies on fixed signal timing plans updated quarterly and congestion reports generated from SCADA historians with 5–15 minute lag.
The data to do better already exists. Modern arterials have inductive loop detectors every 500 meters, generating vehicle count, speed, and occupancy readings every 20–60 seconds. Intersections increasingly carry radar or camera sensors. Connected vehicles and navigation apps provide GPS probe data at 1–10 second intervals. What is missing is a layer that fuses all of these streams, computes flow metrics continuously, and surfaces actionable intelligence fast enough to change signal timing before a queue grows to blocking length.
Real-time traffic analysis also enables incident detection. A sudden drop in speed combined with an occupancy spike is the signature of a lane-blocking incident. Detecting this within 60 seconds—rather than waiting for a 911 call—reduces secondary crash risk and enables faster emergency response.
The Streaming SQL Approach
RisingWave ingests sensor events from Kafka, joins them by zone ID and sensor ID, and maintains materialized views for traffic flow metrics at the intersection, corridor, and city-zone levels. The views update continuously as sensor readings arrive; traffic management dashboards and adaptive signal controllers query them over the PostgreSQL interface with sub-second latency.
Key design decisions:
- Tumbling windows for flow rate (vehicles per 5 minutes), density, and occupancy
- Session or sliding windows for incident detection where the key signal is a sustained change
- Zone-level aggregations that roll up intersection data for corridor and network views
Building It Step by Step
Step 1: Data Source
-- Loop detector readings (every 20-60 seconds per detector)
CREATE SOURCE loop_detector_readings (
detector_id VARCHAR,
sensor_id VARCHAR,
zone_id VARCHAR,
intersection_id VARCHAR,
lane_id VARCHAR,
direction VARCHAR, -- 'NB','SB','EB','WB'
vehicle_count INT, -- vehicles in this interval
avg_speed_kmh NUMERIC,
occupancy_pct NUMERIC, -- % of time loop is occupied
reading_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'traffic.detectors.loop',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- GPS probe data from connected vehicles
CREATE SOURCE gps_probes (
probe_id VARCHAR,
zone_id VARCHAR,
segment_id VARCHAR,
speed_kmh NUMERIC,
heading_deg NUMERIC,
lat NUMERIC,
lon NUMERIC,
probe_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'traffic.probes.gps',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- 5-minute flow aggregation per detector
CREATE MATERIALIZED VIEW traffic_flow_5min AS
SELECT
detector_id,
zone_id,
intersection_id,
lane_id,
direction,
window_start,
window_end,
SUM(vehicle_count) AS flow_vehicles,
AVG(avg_speed_kmh) AS avg_speed_kmh,
AVG(occupancy_pct) AS avg_occupancy_pct,
MAX(occupancy_pct) AS peak_occupancy_pct,
-- Level of service classification (simplified HCM)
CASE
WHEN AVG(avg_speed_kmh) >= 90 THEN 'A'
WHEN AVG(avg_speed_kmh) >= 70 THEN 'B'
WHEN AVG(avg_speed_kmh) >= 50 THEN 'C'
WHEN AVG(avg_speed_kmh) >= 40 THEN 'D'
WHEN AVG(avg_speed_kmh) >= 25 THEN 'E'
ELSE 'F'
END AS level_of_service
FROM TUMBLE(loop_detector_readings, reading_ts, INTERVAL '5 minutes')
GROUP BY detector_id, zone_id, intersection_id, lane_id, direction,
window_start, window_end;
-- Zone-level congestion score (aggregated from all detectors in zone)
CREATE MATERIALIZED VIEW zone_congestion_5min AS
SELECT
zone_id,
window_start,
window_end,
AVG(avg_speed_kmh) AS zone_avg_speed_kmh,
AVG(avg_occupancy_pct) AS zone_avg_occupancy_pct,
SUM(flow_vehicles) AS zone_total_flow,
COUNT(DISTINCT detector_id) AS active_detectors,
-- Composite congestion score 0-100
ROUND(
LEAST(100,
(100 - AVG(avg_speed_kmh)) * 0.5 +
AVG(avg_occupancy_pct) * 0.5
), 1
) AS congestion_score
FROM traffic_flow_5min
GROUP BY zone_id, window_start, window_end;
-- Segment travel time from GPS probes
CREATE MATERIALIZED VIEW segment_travel_time_5min AS
SELECT
segment_id,
zone_id,
window_start,
window_end,
AVG(speed_kmh) AS avg_probe_speed_kmh,
COUNT(DISTINCT probe_id) AS probe_count
FROM TUMBLE(gps_probes, probe_ts, INTERVAL '5 minutes')
GROUP BY segment_id, zone_id, window_start, window_end;
Step 3: Alerts and Aggregations
-- Incident detection: speed < 20 km/h AND occupancy > 80% simultaneously
CREATE MATERIALIZED VIEW incident_candidates AS
SELECT
detector_id,
zone_id,
intersection_id,
direction,
avg_speed_kmh,
avg_occupancy_pct,
flow_vehicles,
window_start,
window_end,
-- Incident signature score
CASE
WHEN avg_speed_kmh < 10 AND avg_occupancy_pct > 85 THEN 'HIGH'
WHEN avg_speed_kmh < 20 AND avg_occupancy_pct > 70 THEN 'MEDIUM'
ELSE 'LOW'
END AS incident_confidence
FROM traffic_flow_5min
WHERE avg_speed_kmh < 20 AND avg_occupancy_pct > 70;
CREATE SINK incident_alerts_sink
AS SELECT * FROM incident_candidates
WHERE incident_confidence IN ('HIGH','MEDIUM')
WITH (
connector = 'kafka',
topic = 'traffic.alerts.incidents',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Congestion alert: zone congestion score > 75 for 2+ consecutive windows
CREATE MATERIALIZED VIEW congestion_alerts AS
SELECT
zone_id,
congestion_score,
zone_avg_speed_kmh,
zone_total_flow,
window_start,
window_end
FROM zone_congestion_5min
WHERE congestion_score > 75;
CREATE SINK congestion_alerts_sink
AS SELECT * FROM congestion_alerts
WITH (
connector = 'kafka',
topic = 'traffic.alerts.congestion',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Approach | Latency | Incident Detection | Zone Aggregation | SQL Interface |
| SCADA historian + batch | 5–15 min | No | Manual | No |
| Apache Flink custom app | 1–5 sec | Possible | Complex | No |
| Commercial TMC platform | 1–2 min | Basic | Yes | Proprietary |
| RisingWave streaming SQL | Sub-second | Yes (SQL) | Yes (SQL) | PostgreSQL |
FAQ
How does RisingWave handle detectors that go offline temporarily?
RisingWave continues computing aggregates from the detectors that are online. For offline detectors, the last-known value is not automatically carried forward—you can implement this pattern with a LAST_VALUE or coalesce approach by joining the live stream against a "last known state" table updated via a separate materialized view.
Can I feed the congestion score into an adaptive signal control system?
Yes. The congestion score in zone_congestion_5min is queryable over PostgreSQL and can also be published to a Kafka topic via a sink. Adaptive signal controllers that consume Kafka topics can subscribe to this feed and adjust timing plans in response to real-time zone congestion.
How many detectors can a single RisingWave cluster handle?
RisingWave is designed for horizontal scaling. A modest cluster (3–5 nodes) can comfortably ingest tens of thousands of detector readings per second. City-scale deployments with hundreds of thousands of sensors per day are well within its operational range.
Key Takeaways
- Stream loop detector readings and GPS probe data into RisingWave to compute flow, occupancy, and level-of-service metrics in 5-minute tumbling windows.
- Zone-level congestion scores aggregate detector data continuously, enabling city-wide visibility from a single SQL query.
- Incident candidates are identified in pure SQL by combining speed and occupancy thresholds—no custom application code needed.
- Alerts publish to Kafka sinks in sub-second latency, feeding traffic management centers, variable message signs, and navigation app providers.

