Driver fatigue is the single largest contributing factor in heavy truck accidents. Detecting it in real time — from patterns in telematics data rather than waiting for a near-miss report — is possible with streaming SQL in RisingWave, using event sequence matching, HOS compliance checks, and driving behavior anomaly detection.
Why Real-Time Driver Fatigue Detection Matters
Hours of Service (HOS) regulations exist precisely because fatigued driving kills. FMCSA rules limit commercial drivers to 11 hours of driving within a 14-hour on-duty window, followed by a mandatory 10-hour off-duty period. But HOS compliance is a floor, not a ceiling. A driver who is on hour 10 of driving, has been on the road since 4 AM, and has been executing hard braking events for the last 30 minutes is showing fatigue signatures — even if they are technically within legal limits.
Telematics systems capture the behavioral signals that correlate with driver fatigue: erratic lane changes (harsh cornering events), late braking, micro-corrections in steering (evidenced by rapid heading changes), and deviation from normal speed patterns. When these signals cluster in a short window and coincide with extended driving time, the risk profile changes dramatically.
Detecting this combination in a batch system is possible but too slow to be preventive. By the time a report runs and a safety manager reviews it, the driver has been on the road for another hour. Streaming SQL in RisingWave evaluates the pattern continuously and can trigger an intervention — a dispatched check-in call, a route modification, or a mandatory stop instruction — while the driver is still safely within an exit.
The Streaming SQL Solution
RisingWave ingests telematics events and HOS updates from Kafka. Materialized views compute rolling safety event counts, heading change rates, and a composite fatigue risk score per driver. A Kafka sink delivers alerts the moment a driver crosses the risk threshold.
Tutorial: Building It Step by Step
Step 1: Set Up the Data Source
-- Truck telematics: motion and safety events from ECU
CREATE SOURCE truck_telemetry (
vin VARCHAR,
driver_id VARCHAR,
event_ts TIMESTAMPTZ,
speed_mph DOUBLE PRECISION,
heading_deg INT,
engine_rpm INT,
harsh_brake BOOLEAN,
harsh_accel BOOLEAN,
harsh_corner BOOLEAN,
lane_departure BOOLEAN, -- from ADAS system if available
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION
)
WITH (
connector = 'kafka',
topic = 'fleet.telemetry',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Driver HOS (Hours of Service) ELD updates
CREATE SOURCE driver_hos (
driver_id VARCHAR,
vin VARCHAR,
hos_status VARCHAR, -- DRIVING | ON_DUTY | OFF_DUTY | SLEEPER_BERTH
driving_hours_today DOUBLE PRECISION,
duty_hours_today DOUBLE PRECISION,
hours_since_break DOUBLE PRECISION,
last_break_duration_h DOUBLE PRECISION,
hos_ts TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'fleet.hos',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build Real-Time Aggregations
-- 30-minute rolling safety event window per driver
CREATE MATERIALIZED VIEW driver_safety_30m AS
SELECT
driver_id,
vin,
window_start,
window_end,
COUNT(*) FILTER (WHERE harsh_brake) AS harsh_brakes,
COUNT(*) FILTER (WHERE harsh_accel) AS harsh_accels,
COUNT(*) FILTER (WHERE harsh_corner) AS harsh_corners,
COUNT(*) FILTER (WHERE lane_departure) AS lane_departures,
COUNT(*) AS total_events,
-- Heading variance: proxy for erratic steering
STDDEV(heading_deg) AS heading_stddev,
AVG(speed_mph) AS avg_speed_mph,
MAX(speed_mph) AS max_speed_mph,
-- Micro-correction rate: rapid small heading changes
COUNT(*) FILTER (WHERE harsh_corner AND speed_mph < 45) AS low_speed_harsh_corners
FROM HOP(truck_telemetry, event_ts, INTERVAL '5 MINUTES', INTERVAL '30 MINUTES')
GROUP BY driver_id, vin, window_start, window_end;
-- Join safety events with HOS status for fatigue risk scoring
CREATE MATERIALIZED VIEW driver_fatigue_risk AS
SELECT
s.driver_id,
s.vin,
s.window_start,
s.window_end,
s.harsh_brakes,
s.harsh_corners,
s.lane_departures,
s.heading_stddev,
h.driving_hours_today,
h.hours_since_break,
h.last_break_duration_h,
h.hos_status,
-- Fatigue risk score: higher is more concerning
(
s.harsh_brakes * 3 +
s.harsh_corners * 2 +
s.lane_departures * 5 +
COALESCE(s.heading_stddev, 0) * 0.1 +
CASE WHEN h.driving_hours_today > 8 THEN 10 ELSE 0 END +
CASE WHEN h.driving_hours_today > 10 THEN 10 ELSE 0 END +
CASE WHEN h.hours_since_break > 4 THEN 5 ELSE 0 END +
CASE WHEN h.hours_since_break > 6 THEN 10 ELSE 0 END +
CASE WHEN h.last_break_duration_h < 0.5 THEN 5 ELSE 0 END
) AS fatigue_risk_score,
-- Risk tier
CASE
WHEN (
s.harsh_brakes * 3 + s.harsh_corners * 2 + s.lane_departures * 5 +
COALESCE(s.heading_stddev, 0) * 0.1 +
CASE WHEN h.driving_hours_today > 8 THEN 10 ELSE 0 END +
CASE WHEN h.driving_hours_today > 10 THEN 10 ELSE 0 END +
CASE WHEN h.hours_since_break > 4 THEN 5 ELSE 0 END +
CASE WHEN h.hours_since_break > 6 THEN 10 ELSE 0 END +
CASE WHEN h.last_break_duration_h < 0.5 THEN 5 ELSE 0 END
) >= 30 THEN 'CRITICAL'
WHEN (
s.harsh_brakes * 3 + s.harsh_corners * 2 + s.lane_departures * 5 +
COALESCE(s.heading_stddev, 0) * 0.1 +
CASE WHEN h.driving_hours_today > 8 THEN 10 ELSE 0 END +
CASE WHEN h.hours_since_break > 4 THEN 5 ELSE 0 END
) >= 15 THEN 'WARNING'
ELSE 'NORMAL'
END AS risk_tier
FROM driver_safety_30m s
JOIN (
SELECT DISTINCT ON (driver_id) *
FROM driver_hos
ORDER BY driver_id, hos_ts DESC
) h ON s.driver_id = h.driver_id;
Step 3: Detect Anomalies or Generate Alerts
-- Real-time fatigue alert: CRITICAL or WARNING tier drivers
CREATE MATERIALIZED VIEW active_fatigue_alerts AS
SELECT
driver_id,
vin,
window_end,
fatigue_risk_score,
risk_tier,
driving_hours_today,
hours_since_break,
harsh_brakes,
harsh_corners,
lane_departures,
heading_stddev,
hos_status
FROM driver_fatigue_risk
WHERE risk_tier IN ('CRITICAL', 'WARNING')
AND window_end = (
SELECT MAX(window_end)
FROM driver_fatigue_risk dfr2
WHERE dfr2.driver_id = driver_fatigue_risk.driver_id
);
-- HOS violation: driving over 11 hours
CREATE MATERIALIZED VIEW hos_violation_alerts AS
SELECT DISTINCT ON (driver_id)
driver_id,
vin,
driving_hours_today,
duty_hours_today,
hours_since_break,
hos_status,
hos_ts,
'HOS_VIOLATION' AS alert_type
FROM driver_hos
WHERE driving_hours_today > 11.0
ORDER BY driver_id, hos_ts DESC;
-- Sink: push fatigue alerts to safety management system
CREATE SINK fatigue_alert_sink
FROM (
SELECT
driver_id,
vin,
CAST(fatigue_risk_score AS VARCHAR) AS metric,
window_end AS alert_ts,
risk_tier AS alert_type
FROM active_fatigue_alerts
UNION ALL
SELECT
driver_id,
vin,
CAST(driving_hours_today AS VARCHAR) AS metric,
hos_ts AS alert_ts,
alert_type
FROM hos_violation_alerts
)
WITH (
connector = 'kafka',
topic = 'alerts.driver.safety',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| ELD Compliance Only | RisingWave Streaming Safety | |
| Detection method | HOS hour count | Behavioral pattern + HOS |
| Alerting latency | Daily review cycle | Real-time (< 30 seconds) |
| Behavioral signals | Not captured | Harsh events + heading variance |
| Intervention window | After incident | Before incident |
| False positive control | Binary threshold | Scored + tiered alerts |
| Driver context | Hours only | Hours + break quality + behavior |
FAQ
How accurate is the fatigue risk score? The score is a heuristic based on behavioral proxies. It is not a clinical fatigue assessment. The weights can be tuned based on your fleet's historical incident data. For higher-accuracy detection, integrate data from in-cab fatigue cameras (drowsiness detection systems) as additional Kafka events.
What is a realistic intervention workflow for a CRITICAL alert? The Kafka sink delivers the alert to a fleet safety application. A dispatcher receives a notification, calls the driver, and if the driver confirms fatigue, issues a route modification directing them to the nearest truck stop. The whole loop — detection to instruction — should complete in under two minutes.
Can I backtest the risk score against historical incident data?
Yes. Replay historical telematics data from a Kafka topic with scan.startup.mode = 'earliest'. The materialized views will process the historical data and you can compare the computed fatigue_risk_score against incident records stored in your data warehouse.
Does RisingWave support event sequence pattern detection (CEP)?
RisingWave supports OVER window functions and aggregations, which can express many event sequence patterns. For complex CEP patterns like "three harsh braking events within 5 minutes followed by a lane departure", you can use sub-queries with TUMBLE windows and join them.
Key Takeaways
- Fatigue detection based purely on HOS compliance is insufficient; behavioral telematics signals (harsh events, heading variance, lane departures) add critical predictive value.
- A HOP window of 30 minutes with 5-minute slide captures evolving fatigue patterns while filtering out isolated aberrant events.
- Joining behavioral signals with ELD HOS data in a single materialized view produces a composite risk score that accounts for both time-in-seat and driving behavior.
- Real-time Kafka sink delivery enables intervention within minutes of a fatigue signature emerging — before an incident, not after.

