Real-Time Truck Telematics Processing with SQL

Real-Time Truck Telematics Processing with SQL

Truck telematics systems generate thousands of data points per vehicle per minute — GPS coordinates, RPM, engine load, DTC codes, and odometer readings. Processing all of that in real time with standard SQL, rather than custom stream processing code, is exactly what RisingWave is built for.

Why Real-Time Truck Telematics Matters

A fleet operator running fifty long-haul trucks across interstate routes is flying blind if their data is thirty minutes stale. A driver who has been speeding for two hours, an engine that threw a DTC fault code forty minutes ago, or a truck that deviated from its assigned route ten minutes ago — none of these problems show up in a batch-processed report until long after the moment to intervene has passed.

Telematics data comes fast and in high volume. A standard J1939 CAN bus integration logs engine parameters every few seconds. GPS position may update every 30 seconds on highway, every few seconds in urban stop-and-go. A fleet of fifty trucks generates around 100,000 to 500,000 events per hour.

Traditional approaches funnel this data into a time-series database and query it with dashboards. But correlation — joining engine faults with GPS location, joining HOS (Hours of Service) status with current speed — requires either pre-computation or query-time joins that are expensive at stream scale. RisingWave maintains the joined, aggregated results incrementally, so every query hits a pre-computed view.

The Streaming SQL Solution

RisingWave ingests telematics events from a Kafka topic, maintains materialized views for vehicle positions, engine health, and HOS compliance, and pushes alerts to a downstream Kafka topic. The entire pipeline is SQL.

Tutorial: Building It Step by Step

Step 1: Set Up the Data Source

-- Truck telematics events from onboard ECU / telematics gateway
CREATE SOURCE truck_telemetry (
    vin             VARCHAR,         -- Vehicle Identification Number
    driver_id       VARCHAR,
    event_ts        TIMESTAMPTZ,
    latitude        DOUBLE PRECISION,
    longitude       DOUBLE PRECISION,
    speed_mph       DOUBLE PRECISION,
    heading_deg     INT,
    altitude_m      DOUBLE PRECISION,
    engine_rpm      INT,
    engine_load_pct INT,
    coolant_temp_c  DOUBLE PRECISION,
    fuel_level_pct  DOUBLE PRECISION,
    odometer_km     DOUBLE PRECISION,
    dtc_codes       VARCHAR,         -- comma-separated SAE J1939 DTC codes, null if none
    ignition_on     BOOLEAN,
    harsh_brake     BOOLEAN,
    harsh_accel     BOOLEAN
)
WITH (
    connector = 'kafka',
    topic = 'fleet.telemetry',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Driver HOS (Hours of Service) status stream
CREATE SOURCE driver_hos (
    driver_id       VARCHAR,
    vin             VARCHAR,
    hos_status      VARCHAR,   -- ON_DUTY | OFF_DUTY | DRIVING | SLEEPER_BERTH
    driving_hours   DOUBLE PRECISION,   -- hours driven in current 14-hour window
    duty_hours      DOUBLE PRECISION,
    hos_ts          TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'fleet.hos',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build Real-Time Aggregations

-- Current vehicle position and status (latest per VIN)
CREATE MATERIALIZED VIEW vehicle_current_position AS
SELECT DISTINCT ON (vin)
    vin,
    driver_id,
    latitude,
    longitude,
    speed_mph,
    heading_deg,
    engine_rpm,
    engine_load_pct,
    coolant_temp_c,
    fuel_level_pct,
    odometer_km,
    dtc_codes,
    ignition_on,
    event_ts AS last_seen
FROM truck_telemetry
ORDER BY vin, event_ts DESC;

-- 15-minute speed and engine performance summary per truck
CREATE MATERIALIZED VIEW vehicle_performance_15m AS
SELECT
    vin,
    driver_id,
    window_start,
    window_end,
    AVG(speed_mph)                      AS avg_speed_mph,
    MAX(speed_mph)                      AS max_speed_mph,
    AVG(engine_rpm)                     AS avg_rpm,
    MAX(engine_rpm)                     AS max_rpm,
    AVG(engine_load_pct)                AS avg_engine_load_pct,
    AVG(fuel_level_pct)                 AS avg_fuel_pct,
    MIN(fuel_level_pct)                 AS min_fuel_pct,
    COUNT(*) FILTER (WHERE harsh_brake) AS harsh_brakes,
    COUNT(*) FILTER (WHERE harsh_accel) AS harsh_accels
FROM TUMBLE(truck_telemetry, event_ts, INTERVAL '15 MINUTES')
WHERE ignition_on = true
GROUP BY vin, driver_id, window_start, window_end;

-- DTC fault event extraction (non-null dtc_codes)
CREATE MATERIALIZED VIEW dtc_fault_events AS
SELECT
    vin,
    driver_id,
    dtc_codes,
    latitude,
    longitude,
    speed_mph,
    event_ts,
    -- Severity heuristic: P0 = powertrain, P1 = manufacturer-specific
    CASE
        WHEN dtc_codes LIKE '%P0%' THEN 'POWERTRAIN'
        WHEN dtc_codes LIKE '%P1%' THEN 'MANUFACTURER'
        WHEN dtc_codes LIKE '%U%'  THEN 'NETWORK'
        ELSE 'OTHER'
    END AS fault_category
FROM truck_telemetry
WHERE dtc_codes IS NOT NULL
  AND dtc_codes != '';

Step 3: Detect Anomalies or Generate Alerts

-- Speed violation: over 70 mph for 5+ consecutive minutes
CREATE MATERIALIZED VIEW speed_violations AS
SELECT
    vin,
    driver_id,
    window_start,
    window_end,
    AVG(speed_mph)    AS avg_speed_mph,
    MAX(speed_mph)    AS max_speed_mph,
    COUNT(*)          AS sample_count
FROM TUMBLE(truck_telemetry, event_ts, INTERVAL '5 MINUTES')
WHERE ignition_on = true
GROUP BY vin, driver_id, window_start, window_end
HAVING AVG(speed_mph) > 70.0;

-- Engine overheating: coolant temp above safe threshold
CREATE MATERIALIZED VIEW engine_overheat_alerts AS
SELECT
    vin,
    driver_id,
    coolant_temp_c,
    latitude,
    longitude,
    speed_mph,
    event_ts,
    'ENGINE_OVERHEAT' AS alert_type
FROM truck_telemetry
WHERE coolant_temp_c > 105.0
  AND ignition_on = true;

-- Sink: push all critical alerts to fleet ops Kafka topic
CREATE SINK fleet_critical_alerts
FROM (
    SELECT vin, driver_id, dtc_codes AS details, event_ts, fault_category AS alert_type
    FROM dtc_fault_events
    UNION ALL
    SELECT vin, driver_id, CAST(coolant_temp_c AS VARCHAR) AS details, event_ts, alert_type
    FROM engine_overheat_alerts
)
WITH (
    connector = 'kafka',
    topic = 'alerts.fleet.critical',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

Legacy Telematics PlatformRisingWave Streaming SQL
GPS refresh on dashboard30-second polling intervalsPer-event (sub-second)
DTC fault detectionNext batch run (minutes)Immediate on event arrival
Speed violation alertEnd-of-day report5-minute window detection
HOS + position joinManual cross-system querySingle materialized view
Query languageVendor-specific APIStandard PostgreSQL SQL
Scaling to 1000+ trucksRequires licensed shardsScale RisingWave workers

FAQ

What telematics hardware integrates with this pipeline? Any telematics device that can publish to Kafka — either directly or via a Kafka Connect adapter. Common J1939 gateways from vendors like Geotab, Samsara, or Omnitracs can emit events to Kafka. The pipeline does not require any specific hardware.

How do I parse DTC codes stored as comma-separated strings? Use RisingWave's string_to_array and unnest functions to split and expand DTC codes into individual rows, then join with a DTC reference table if you need human-readable fault descriptions.

Can RisingWave handle a fleet of 10,000 trucks? Yes. RisingWave scales horizontally by adding compute nodes. The materialized views shard across nodes, and Kafka partitioning distributes the ingestion load. However, the optimal configuration depends on event rate, window sizes, and query complexity — benchmark your specific workload.

Does RisingWave support geospatial queries for route deviation? RisingWave supports PostGIS-compatible geometry types and functions, allowing you to store route geometries and compute distance from the planned route as part of a materialized view.

Key Takeaways

  • RisingWave ingests J1939 telematics events from Kafka and maintains continuously updated position, performance, and fault materialized views.
  • DISTINCT ON with ORDER BY event_ts DESC gives each VIN a single current-state row, replacing periodic polling queries.
  • TUMBLE window aggregations over 5- and 15-minute intervals detect speed violations and performance trends without application code.
  • A unified Kafka sink consolidates DTC faults and engine overheat alerts into a single ops notification topic.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.