Processing Millions of IoT Sensor Events Per Second with SQL

Processing Millions of IoT Sensor Events Per Second with SQL

IoT sensor networks produce data volumes that overwhelm traditional databases within hours of deployment. RisingWave — a PostgreSQL-compatible streaming database — ingests high-throughput sensor event streams and maintains real-time aggregations using standard SQL, so your analytics scale with your sensor fleet.

Why High-Volume IoT Analytics Is Difficult

A modern industrial facility might deploy thousands of temperature, pressure, vibration, flow, and current sensors, each emitting a reading every one to ten seconds. At that density, a facility with 5,000 sensors generates tens of thousands of events per second — far beyond what a traditional relational database can absorb as an INSERT workload while simultaneously serving analytical queries.

The standard response to this problem is to build a specialized IoT data stack: a time-series database for raw readings, a stream processor for real-time alerting, and a data warehouse for historical analysis. Each layer requires its own deployment, configuration, and maintenance expertise. Data must be copied between layers, introducing latency and creating opportunities for inconsistency. The operational burden of running three systems instead of one can consume more engineering time than the IoT use case itself.

Beyond the infrastructure problem, there is a query semantics problem. IoT analytics almost always involve time-windowed aggregations — average temperature over the last five minutes, maximum vibration in the last hour, anomalies relative to a rolling baseline. Traditional stream processors require custom code to express these patterns. SQL-native streaming changes the equation: the same declarative language used for historical analysis also expresses real-time computations.

How Streaming SQL Handles IoT Scale

RisingWave processes sensor event streams through a shared-nothing architecture that partitions work across compute nodes. Incremental computation means that each new sensor reading updates only the affected aggregations — a single temperature reading from sensor S-042 does not trigger recomputation of the aggregations for sensor S-041 or any other sensor. This makes per-sensor metric maintenance efficient at scale.

TUMBLE windows compute non-overlapping aggregation periods (5-minute averages, hourly extremes), HOP windows produce overlapping periods useful for anomaly detection, and SESSION windows detect sensor silence — periods when a sensor stops reporting, which may indicate a hardware failure or connectivity issue.

Because RisingWave exposes a PostgreSQL interface, the same SQL queries used to analyze historical sensor data in a warehouse also work against live streaming views. Your data engineers write one query language, not three.

Building It Step by Step

Step 1: Create the Data Source

CREATE SOURCE sensor_readings (
    sensor_id       VARCHAR,
    device_group    VARCHAR,
    location_id     VARCHAR,
    facility_id     VARCHAR,
    sensor_type     VARCHAR,   -- TEMPERATURE, PRESSURE, VIBRATION, FLOW, CURRENT, HUMIDITY
    value           DOUBLE PRECISION,
    unit            VARCHAR,
    quality_flag    SMALLINT,  -- 0=Good, 1=Uncertain, 2=Bad
    event_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'iot.sensor_readings',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

-- Latest reading per sensor
CREATE MATERIALIZED VIEW sensor_latest AS
SELECT DISTINCT ON (sensor_id)
    sensor_id,
    device_group,
    location_id,
    facility_id,
    sensor_type,
    value           AS latest_value,
    unit,
    quality_flag,
    event_ts        AS last_reported_ts,
    NOW() - event_ts AS data_age
FROM sensor_readings
WHERE quality_flag = 0  -- good quality readings only
ORDER BY sensor_id, event_ts DESC;

-- 5-minute tumbling window averages per sensor
CREATE MATERIALIZED VIEW sensor_5min_avg AS
SELECT
    window_start,
    window_end,
    sensor_id,
    sensor_type,
    facility_id,
    location_id,
    AVG(value)      AS avg_value,
    MIN(value)      AS min_value,
    MAX(value)      AS max_value,
    COUNT(*)        AS reading_count,
    STDDEV(value)   AS stddev_value
FROM TUMBLE(sensor_readings, event_ts, INTERVAL '5 MINUTES')
WHERE quality_flag = 0
GROUP BY window_start, window_end, sensor_id, sensor_type, facility_id, location_id;

Step 3: Add Alerts and Aggregations

-- Threshold breach alerts (configurable per sensor type)
CREATE MATERIALIZED VIEW sensor_threshold_alerts AS
SELECT
    s.sensor_id,
    s.device_group,
    s.location_id,
    s.facility_id,
    s.sensor_type,
    s.latest_value,
    s.unit,
    s.last_reported_ts,
    CASE
        WHEN s.sensor_type = 'TEMPERATURE' AND s.latest_value > 85  THEN 'HIGH_TEMP'
        WHEN s.sensor_type = 'PRESSURE'    AND s.latest_value > 150 THEN 'HIGH_PRESSURE'
        WHEN s.sensor_type = 'VIBRATION'   AND s.latest_value > 12  THEN 'HIGH_VIBRATION'
        WHEN s.sensor_type = 'CURRENT'     AND s.latest_value > 95  THEN 'OVERCURRENT'
        ELSE NULL
    END AS alert_type
FROM sensor_latest s
WHERE (s.sensor_type = 'TEMPERATURE' AND s.latest_value > 85)
   OR (s.sensor_type = 'PRESSURE'    AND s.latest_value > 150)
   OR (s.sensor_type = 'VIBRATION'   AND s.latest_value > 12)
   OR (s.sensor_type = 'CURRENT'     AND s.latest_value > 95);

-- Anomaly detection: current value more than 3 standard deviations from 5-min average
CREATE MATERIALIZED VIEW sensor_anomalies AS
SELECT
    r.sensor_id,
    r.sensor_type,
    r.facility_id,
    r.location_id,
    r.value                               AS current_value,
    a.avg_value,
    a.stddev_value,
    ABS(r.value - a.avg_value) / NULLIF(a.stddev_value, 0) AS z_score,
    r.event_ts
FROM sensor_readings r
JOIN sensor_5min_avg a
    ON  a.sensor_id   = r.sensor_id
    AND a.window_end  = (
        SELECT MAX(window_end)
        FROM sensor_5min_avg
        WHERE sensor_id = r.sensor_id
          AND window_end <= r.event_ts
    )
WHERE a.stddev_value > 0
  AND ABS(r.value - a.avg_value) / a.stddev_value > 3;

-- Sensor silence detection: sensors not reporting in last 2 minutes
CREATE MATERIALIZED VIEW silent_sensors AS
SELECT
    sensor_id,
    device_group,
    facility_id,
    location_id,
    sensor_type,
    last_reported_ts,
    EXTRACT(EPOCH FROM (NOW() - last_reported_ts)) / 60.0 AS silent_minutes
FROM sensor_latest
WHERE EXTRACT(EPOCH FROM (NOW() - last_reported_ts)) / 60.0 > 2;

Step 4: Sink Results Downstream

-- Push threshold alerts to IoT platform alert engine
CREATE SINK sensor_alert_sink
FROM sensor_threshold_alerts
WITH (
    connector = 'kafka',
    topic = 'iot.alerts.threshold',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Write 5-minute aggregations to time-series store for dashboards
CREATE SINK sensor_aggregates_sink
FROM sensor_5min_avg
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://tsdb:5432/iot_metrics',
    table.name = 'sensor_5min_avg'
) FORMAT PLAIN ENCODE JSON;

-- Long-term retention in Iceberg
CREATE SINK sensor_raw_archive
FROM sensor_readings
WITH (
    connector = 'iceberg',
    type = 'append-only',
    catalog.type = 'rest',
    catalog.uri = 'http://iceberg-catalog:8181',
    database.name = 'iot',
    table.name = 'sensor_readings_archive'
) FORMAT PLAIN ENCODE JSON;

How This Compares to Traditional Approaches

AspectTSDB + Stream ProcessorStreaming SQL (RisingWave)
Systems to operate3+ (TSDB, processor, warehouse)1
Query languageMultiple (InfluxQL, Java/Python, SQL)Standard PostgreSQL SQL
Alerting latencyMilliseconds (custom code)Sub-second (declarative SQL)
Anomaly detectionCustom application logicSQL window + join
ScalabilityPer-system scalingSingle horizontal scale-out
Silence detectionHeartbeat monitoring serviceSESSION window or staleness view

FAQ

What is IoT sensor event processing?

IoT sensor event processing is the ingestion, transformation, and analysis of high-frequency readings from connected physical devices. Use cases include threshold alerting, anomaly detection, predictive maintenance, energy optimization, and equipment health monitoring.

How does RisingWave scale to millions of events per second?

RisingWave uses shared-nothing horizontal scaling — each compute node processes a partition of the sensor stream independently. As event volume grows, additional nodes are added. Incremental computation ensures each new event updates only the affected materialized view rows, keeping per-event processing cost constant regardless of accumulated historical data.

Can I integrate RisingWave with my existing stack?

Yes. RisingWave connects to Kafka (source and sink), PostgreSQL via CDC, MySQL via CDC, and writes to JDBC databases and Iceberg tables. The PostgreSQL-compatible interface means Grafana, Prometheus remote-write (via adapter), and standard BI tools work without modification.

How do I handle bad-quality sensor readings?

Filter on the quality_flag column as shown above. You can also build a data quality materialized view that tracks bad-reading rates per sensor — a rising bad-reading rate often precedes sensor failure.

Key Takeaways

  • Multi-layer IoT stacks (TSDB + stream processor + warehouse) create operational complexity that streaming SQL eliminates by unifying ingestion, real-time alerting, and analytics in one system.
  • RisingWave's TUMBLE and HOP windows compute time-series aggregations in SQL; SESSION windows detect sensor silence without a separate heartbeat monitoring service.
  • Anomaly detection using rolling standard deviation is expressible as a SQL JOIN between raw events and windowed averages — no custom code required.
  • Results flow to IoT alerting platforms, dashboards, and long-term Iceberg storage through declarative sink definitions.

Ready to try this? Get started with RisingWave. Join our Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.