Real-Time Traffic Flow Analysis with Streaming SQL

Real-Time Traffic Flow Analysis with Streaming SQL

Cities can detect congestion, identify incidents, and adjust signal timing in real time by treating traffic sensor data as a continuous stream rather than a batch feed. RisingWave, a PostgreSQL-compatible streaming database, ingests events from loop detectors, cameras, and GPS probes and maintains continuously updated flow, density, and incident materialized views that traffic management centers can query and act on immediately.

Why Real-Time Traffic Flow Analysis Matters

Urban congestion costs the average city driver 50–100 hours per year in lost time and contributes disproportionately to vehicle emissions—stop-and-go traffic consumes 3–5× more fuel than steady flow. Traditional traffic management relies on fixed signal timing plans updated quarterly and congestion reports generated from SCADA historians with 5–15 minute lag.

The data to do better already exists. Modern arterials have inductive loop detectors every 500 meters, generating vehicle count, speed, and occupancy readings every 20–60 seconds. Intersections increasingly carry radar or camera sensors. Connected vehicles and navigation apps provide GPS probe data at 1–10 second intervals. What is missing is a layer that fuses all of these streams, computes flow metrics continuously, and surfaces actionable intelligence fast enough to change signal timing before a queue grows to blocking length.

Real-time traffic analysis also enables incident detection. A sudden drop in speed combined with an occupancy spike is the signature of a lane-blocking incident. Detecting this within 60 seconds—rather than waiting for a 911 call—reduces secondary crash risk and enables faster emergency response.

The Streaming SQL Approach

RisingWave ingests sensor events from Kafka, joins them by zone ID and sensor ID, and maintains materialized views for traffic flow metrics at the intersection, corridor, and city-zone levels. The views update continuously as sensor readings arrive; traffic management dashboards and adaptive signal controllers query them over the PostgreSQL interface with sub-second latency.

Key design decisions:

  • Tumbling windows for flow rate (vehicles per 5 minutes), density, and occupancy
  • Session or sliding windows for incident detection where the key signal is a sustained change
  • Zone-level aggregations that roll up intersection data for corridor and network views

Building It Step by Step

Step 1: Data Source

-- Loop detector readings (every 20-60 seconds per detector)
CREATE SOURCE loop_detector_readings (
    detector_id     VARCHAR,
    sensor_id       VARCHAR,
    zone_id         VARCHAR,
    intersection_id VARCHAR,
    lane_id         VARCHAR,
    direction       VARCHAR,  -- 'NB','SB','EB','WB'
    vehicle_count   INT,      -- vehicles in this interval
    avg_speed_kmh   NUMERIC,
    occupancy_pct   NUMERIC,  -- % of time loop is occupied
    reading_ts      TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'traffic.detectors.loop',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- GPS probe data from connected vehicles
CREATE SOURCE gps_probes (
    probe_id        VARCHAR,
    zone_id         VARCHAR,
    segment_id      VARCHAR,
    speed_kmh       NUMERIC,
    heading_deg     NUMERIC,
    lat             NUMERIC,
    lon             NUMERIC,
    probe_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'traffic.probes.gps',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- 5-minute flow aggregation per detector
CREATE MATERIALIZED VIEW traffic_flow_5min AS
SELECT
    detector_id,
    zone_id,
    intersection_id,
    lane_id,
    direction,
    window_start,
    window_end,
    SUM(vehicle_count)        AS flow_vehicles,
    AVG(avg_speed_kmh)        AS avg_speed_kmh,
    AVG(occupancy_pct)        AS avg_occupancy_pct,
    MAX(occupancy_pct)        AS peak_occupancy_pct,
    -- Level of service classification (simplified HCM)
    CASE
        WHEN AVG(avg_speed_kmh) >= 90 THEN 'A'
        WHEN AVG(avg_speed_kmh) >= 70 THEN 'B'
        WHEN AVG(avg_speed_kmh) >= 50 THEN 'C'
        WHEN AVG(avg_speed_kmh) >= 40 THEN 'D'
        WHEN AVG(avg_speed_kmh) >= 25 THEN 'E'
        ELSE 'F'
    END                       AS level_of_service
FROM TUMBLE(loop_detector_readings, reading_ts, INTERVAL '5 minutes')
GROUP BY detector_id, zone_id, intersection_id, lane_id, direction,
         window_start, window_end;

-- Zone-level congestion score (aggregated from all detectors in zone)
CREATE MATERIALIZED VIEW zone_congestion_5min AS
SELECT
    zone_id,
    window_start,
    window_end,
    AVG(avg_speed_kmh)    AS zone_avg_speed_kmh,
    AVG(avg_occupancy_pct) AS zone_avg_occupancy_pct,
    SUM(flow_vehicles)    AS zone_total_flow,
    COUNT(DISTINCT detector_id) AS active_detectors,
    -- Composite congestion score 0-100
    ROUND(
        LEAST(100,
            (100 - AVG(avg_speed_kmh)) * 0.5 +
            AVG(avg_occupancy_pct) * 0.5
        ), 1
    )                     AS congestion_score
FROM traffic_flow_5min
GROUP BY zone_id, window_start, window_end;

-- Segment travel time from GPS probes
CREATE MATERIALIZED VIEW segment_travel_time_5min AS
SELECT
    segment_id,
    zone_id,
    window_start,
    window_end,
    AVG(speed_kmh)          AS avg_probe_speed_kmh,
    COUNT(DISTINCT probe_id) AS probe_count
FROM TUMBLE(gps_probes, probe_ts, INTERVAL '5 minutes')
GROUP BY segment_id, zone_id, window_start, window_end;

Step 3: Alerts and Aggregations

-- Incident detection: speed < 20 km/h AND occupancy > 80% simultaneously
CREATE MATERIALIZED VIEW incident_candidates AS
SELECT
    detector_id,
    zone_id,
    intersection_id,
    direction,
    avg_speed_kmh,
    avg_occupancy_pct,
    flow_vehicles,
    window_start,
    window_end,
    -- Incident signature score
    CASE
        WHEN avg_speed_kmh < 10 AND avg_occupancy_pct > 85 THEN 'HIGH'
        WHEN avg_speed_kmh < 20 AND avg_occupancy_pct > 70 THEN 'MEDIUM'
        ELSE 'LOW'
    END                AS incident_confidence
FROM traffic_flow_5min
WHERE avg_speed_kmh < 20 AND avg_occupancy_pct > 70;

CREATE SINK incident_alerts_sink
AS SELECT * FROM incident_candidates
WHERE incident_confidence IN ('HIGH','MEDIUM')
WITH (
    connector = 'kafka',
    topic = 'traffic.alerts.incidents',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Congestion alert: zone congestion score > 75 for 2+ consecutive windows
CREATE MATERIALIZED VIEW congestion_alerts AS
SELECT
    zone_id,
    congestion_score,
    zone_avg_speed_kmh,
    zone_total_flow,
    window_start,
    window_end
FROM zone_congestion_5min
WHERE congestion_score > 75;

CREATE SINK congestion_alerts_sink
AS SELECT * FROM congestion_alerts
WITH (
    connector = 'kafka',
    topic = 'traffic.alerts.congestion',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

ApproachLatencyIncident DetectionZone AggregationSQL Interface
SCADA historian + batch5–15 minNoManualNo
Apache Flink custom app1–5 secPossibleComplexNo
Commercial TMC platform1–2 minBasicYesProprietary
RisingWave streaming SQLSub-secondYes (SQL)Yes (SQL)PostgreSQL

FAQ

How does RisingWave handle detectors that go offline temporarily?

RisingWave continues computing aggregates from the detectors that are online. For offline detectors, the last-known value is not automatically carried forward—you can implement this pattern with a LAST_VALUE or coalesce approach by joining the live stream against a "last known state" table updated via a separate materialized view.

Can I feed the congestion score into an adaptive signal control system?

Yes. The congestion score in zone_congestion_5min is queryable over PostgreSQL and can also be published to a Kafka topic via a sink. Adaptive signal controllers that consume Kafka topics can subscribe to this feed and adjust timing plans in response to real-time zone congestion.

How many detectors can a single RisingWave cluster handle?

RisingWave is designed for horizontal scaling. A modest cluster (3–5 nodes) can comfortably ingest tens of thousands of detector readings per second. City-scale deployments with hundreds of thousands of sensors per day are well within its operational range.

Key Takeaways

  • Stream loop detector readings and GPS probe data into RisingWave to compute flow, occupancy, and level-of-service metrics in 5-minute tumbling windows.
  • Zone-level congestion scores aggregate detector data continuously, enabling city-wide visibility from a single SQL query.
  • Incident candidates are identified in pure SQL by combining speed and occupancy thresholds—no custom application code needed.
  • Alerts publish to Kafka sinks in sub-second latency, feeding traffic management centers, variable message signs, and navigation app providers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.