A compressor on your factory floor runs hot. Its temperature sensor crosses 96 degrees Celsius at 08:04:00. Your batch pipeline processes that reading at the top of the next hour, flags it as an anomaly, and sends an alert at 09:00:02. The compressor has been running at unsafe temperatures for nearly an hour. The repair cost, and the safety risk, is avoidable.
Real-time IoT anomaly detection eliminates that gap. Instead of writing sensor readings to a warehouse and querying them later, a streaming approach evaluates every reading the moment it arrives. When a value crosses a threshold or deviates statistically from its expected window average, an alert row appears in milliseconds inside a continuously maintained materialized view.
This article shows you how to build that system using RisingWave, an open-source streaming database that processes IoT data with standard SQL. You will create device metadata tables, ingest sensor readings, detect anomalies with both threshold rules and z-score analysis inside tumbling windows, and maintain a live device health dashboard. Every SQL statement in this guide was run against RisingWave 2.8.0.
The Architecture: Tables, Windows, and Materialized Views
IoT anomaly detection in RisingWave rests on three SQL primitives working together.
Tables are the ingestion layer. For development and testing you use CREATE TABLE with direct inserts; in production you swap in a CREATE SOURCE backed by Kafka, MQTT, or another connector. The downstream materialized views do not change.
Tumbling windows slice the continuous sensor stream into fixed, non-overlapping intervals. A 5-minute tumbling window over a temperature stream produces one aggregated row per device per 5-minute period. RisingWave's TUMBLE() function handles this natively; you do not need a separate streaming framework. For a deeper explanation of all window types available in RisingWave, see the windowing documentation.
Materialized views are the detection layer. A materialized view in RisingWave is a continuously maintained query result. Unlike a regular SQL view that reruns its query on every read, an incremental materialized view updates only the rows affected by each new input event. This keeps detection latency low even at high sensor throughput.
The pipeline looks like this:
Sensors --> iot_readings table (or Kafka source)
|
v
TUMBLE(iot_readings, event_time, INTERVAL '5 minutes')
|
+--------+----------+
| |
v v
iot_tumble_avg_5min iot_zscore_anomalies
| |
v v
iot_threshold_alerts iot_device_status
Setting Up the Data Model
Start by creating a device metadata table that holds the expected operating ranges for each sensor. You will join this table against the live readings stream to determine what counts as normal.
CREATE TABLE iot_devices (
device_id VARCHAR,
device_name VARCHAR,
location VARCHAR,
sensor_type VARCHAR,
unit VARCHAR,
normal_min DOUBLE PRECISION,
normal_max DOUBLE PRECISION,
PRIMARY KEY (device_id)
);
Insert three representative devices covering temperature, pressure, and vibration sensors:
INSERT INTO iot_devices VALUES
('dev-001', 'Compressor-A1', 'Plant Floor 1', 'temperature', 'celsius', 60.0, 90.0),
('dev-002', 'Pump-B2', 'Plant Floor 2', 'pressure', 'psi', 80.0, 120.0),
('dev-003', 'Motor-C3', 'Warehouse', 'vibration', 'mm/s', 0.0, 5.0);
device_id | device_name | location | sensor_type | unit | normal_min | normal_max
-----------+---------------+---------------+-------------+---------+------------+------------
dev-001 | Compressor-A1 | Plant Floor 1 | temperature | celsius | 60 | 90
dev-002 | Pump-B2 | Plant Floor 2 | pressure | psi | 80 | 120
dev-003 | Motor-C3 | Warehouse | vibration | mm/s | 0 | 5
Next create the sensor readings table. The event_time column is the timestamp that RisingWave uses for windowing. In production this maps to the sensor-generated timestamp embedded in the Kafka message, not the ingestion time.
CREATE TABLE iot_readings (
reading_id VARCHAR,
device_id VARCHAR,
sensor_value DOUBLE PRECISION,
event_time TIMESTAMPTZ,
PRIMARY KEY (reading_id)
);
Insert 15 readings across the three devices. Three of them are intentionally anomalous: reading r005 takes the Compressor-A1 temperature above 90 degrees, r008 spikes the Pump-B2 pressure above 120 psi, and r013 pushes Motor-C3 vibration above 5 mm/s.
INSERT INTO iot_readings VALUES
('r001', 'dev-001', 75.2, '2026-04-01 08:00:00+00'),
('r002', 'dev-001', 76.5, '2026-04-01 08:01:00+00'),
('r003', 'dev-001', 78.1, '2026-04-01 08:02:00+00'),
('r004', 'dev-001', 77.8, '2026-04-01 08:03:00+00'),
('r005', 'dev-001', 96.4, '2026-04-01 08:04:00+00'), -- anomaly: above 90 C
('r006', 'dev-002', 95.0, '2026-04-01 08:00:00+00'),
('r007', 'dev-002', 97.3, '2026-04-01 08:01:00+00'),
('r008', 'dev-002', 130.1, '2026-04-01 08:02:00+00'), -- anomaly: above 120 psi
('r009', 'dev-002', 101.5, '2026-04-01 08:03:00+00'),
('r010', 'dev-002', 99.8, '2026-04-01 08:04:00+00'),
('r011', 'dev-003', 2.1, '2026-04-01 08:00:00+00'),
('r012', 'dev-003', 2.4, '2026-04-01 08:01:00+00'),
('r013', 'dev-003', 8.7, '2026-04-01 08:02:00+00'), -- anomaly: above 5 mm/s
('r014', 'dev-003', 3.0, '2026-04-01 08:03:00+00'),
('r015', 'dev-003', 2.9, '2026-04-01 08:04:00+00');
Aggregating Sensor Data with Tumbling Windows
Before detecting anomalies, establish a baseline. A 5-minute tumbling window gives you the count, average, minimum, and maximum reading for each device within each non-overlapping interval.
CREATE MATERIALIZED VIEW iot_tumble_avg_5min AS
SELECT
device_id,
window_start,
window_end,
COUNT(*) AS reading_count,
ROUND(AVG(sensor_value)::numeric, 2) AS avg_value,
ROUND(MIN(sensor_value)::numeric, 2) AS min_value,
ROUND(MAX(sensor_value)::numeric, 2) AS max_value
FROM TUMBLE(iot_readings, event_time, INTERVAL '5 minutes')
GROUP BY device_id, window_start, window_end;
RisingWave's TUMBLE() function takes three arguments: the source table or view, the time column to partition by, and the window size. Each output row is tagged with window_start and window_end so you know exactly which time bucket the aggregate covers.
Query the view immediately after creation:
SELECT * FROM iot_tumble_avg_5min ORDER BY device_id, window_start;
device_id | window_start | window_end | reading_count | avg_value | min_value | max_value
-----------+---------------------------+---------------------------+---------------+-----------+-----------+-----------
dev-001 | 2026-04-01 08:00:00+00:00 | 2026-04-01 08:05:00+00:00 | 5 | 80.8 | 75.2 | 96.4
dev-002 | 2026-04-01 08:00:00+00:00 | 2026-04-01 08:05:00+00:00 | 5 | 104.74 | 95 | 130.1
dev-003 | 2026-04-01 08:00:00+00:00 | 2026-04-01 08:05:00+00:00 | 5 | 3.82 | 2.1 | 8.7
The max values already hint at the anomalies. Compressor-A1 hit 96.4 degrees inside a window where the average was 80.8. Pump-B2 peaked at 130.1 psi. Motor-C3 reached 8.7 mm/s. The materialized view surfaces these extremes immediately and keeps updating as new readings arrive, without any polling or scheduled job.
When you insert readings from a new 5-minute window, the view grows automatically:
INSERT INTO iot_readings VALUES
('r016', 'dev-001', 105.3, '2026-04-01 08:10:00+00'),
('r017', 'dev-001', 81.2, '2026-04-01 08:11:00+00'),
('r018', 'dev-002', 89.5, '2026-04-01 08:10:00+00');
SELECT * FROM iot_tumble_avg_5min ORDER BY device_id, window_start;
device_id | window_start | window_end | reading_count | avg_value | min_value | max_value
-----------+---------------------------+---------------------------+---------------+-----------+-----------+-----------
dev-001 | 2026-04-01 08:00:00+00:00 | 2026-04-01 08:05:00+00:00 | 5 | 80.8 | 75.2 | 96.4
dev-001 | 2026-04-01 08:10:00+00:00 | 2026-04-01 08:15:00+00:00 | 2 | 93.25 | 81.2 | 105.3
dev-002 | 2026-04-01 08:00:00+00:00 | 2026-04-01 08:05:00+00:00 | 5 | 104.74 | 95 | 130.1
dev-002 | 2026-04-01 08:10:00+00:00 | 2026-04-01 08:15:00+00:00 | 1 | 89.5 | 89.5 | 89.5
dev-003 | 2026-04-01 08:00:00+00:00 | 2026-04-01 08:05:00+00:00 | 5 | 3.82 | 2.1 | 8.7
The new window rows for dev-001 and dev-002 appear without any query rewrite or pipeline restart. RisingWave recomputes only the affected window, not the entire history.
Threshold-Based Anomaly Alerts
The simplest anomaly detection strategy is a direct range check against the operating boundaries stored in iot_devices. If a sensor reading falls outside [normal_min, normal_max], it is an alert.
CREATE MATERIALIZED VIEW iot_threshold_alerts AS
SELECT
r.reading_id,
r.device_id,
d.device_name,
d.location,
d.sensor_type,
d.unit,
r.sensor_value,
d.normal_min,
d.normal_max,
r.event_time,
CASE
WHEN r.sensor_value > d.normal_max THEN 'HIGH'
WHEN r.sensor_value < d.normal_min THEN 'LOW'
END AS alert_level
FROM iot_readings r
JOIN iot_devices d ON r.device_id = d.device_id
WHERE r.sensor_value > d.normal_max
OR r.sensor_value < d.normal_min;
The JOIN with iot_devices lets the detection logic stay in the metadata table. When you change the operating range for a device, you update one row in iot_devices and the materialized view reflects the new threshold automatically for all future readings. Historical alerts are not retroactively removed, which gives you an accurate audit trail.
SELECT
reading_id,
device_id,
device_name,
sensor_type,
sensor_value,
normal_min,
normal_max,
alert_level,
event_time
FROM iot_threshold_alerts
ORDER BY event_time;
reading_id | device_id | device_name | sensor_type | sensor_value | normal_min | normal_max | alert_level | event_time
------------+-----------+---------------+-------------+--------------+------------+------------+-------------+---------------------------
r008 | dev-002 | Pump-B2 | pressure | 130.1 | 80 | 120 | HIGH | 2026-04-01 08:02:00+00:00
r013 | dev-003 | Motor-C3 | vibration | 8.7 | 0 | 5 | HIGH | 2026-04-01 08:02:00+00:00
r005 | dev-001 | Compressor-A1 | temperature | 96.4 | 60 | 90 | HIGH | 2026-04-01 08:04:00+00:00
r016 | dev-001 | Compressor-A1 | temperature | 105.3 | 60 | 90 | HIGH | 2026-04-01 08:10:00+00:00
All four anomalous readings are captured. The fourth row (r016) was inserted after the view was created, demonstrating that incremental maintenance picks up new events automatically.
Threshold alerts work well for known fault conditions where you can specify explicit upper and lower bounds. They break down for sensors where the normal operating range shifts with load or environmental conditions. For those cases, a statistical approach is more robust.
Z-Score Anomaly Detection Inside Tumbling Windows
A z-score measures how far a reading deviates from its window average in units of standard deviation. A reading with a z-score above 1.5 or 2.0 is statistically unusual within its time window, regardless of the absolute value of the sensor reading.
The query pattern uses a CTE to compute per-window statistics, then joins individual readings back against those statistics to calculate the z-score for each point:
CREATE MATERIALIZED VIEW iot_zscore_anomalies AS
WITH windowed_stats AS (
SELECT
device_id,
window_start,
window_end,
AVG(sensor_value) AS window_avg,
STDDEV_POP(sensor_value) AS window_stddev
FROM TUMBLE(iot_readings, event_time, INTERVAL '5 minutes')
GROUP BY device_id, window_start, window_end
)
SELECT
r.reading_id,
r.device_id,
r.sensor_value,
r.event_time,
ws.window_start,
ROUND(ws.window_avg::numeric, 2) AS window_avg,
ROUND(ws.window_stddev::numeric, 2) AS window_stddev,
ROUND(
(ABS(r.sensor_value - ws.window_avg) / NULLIF(ws.window_stddev, 0))::numeric
, 2) AS z_score
FROM iot_readings r
JOIN windowed_stats ws
ON r.device_id = ws.device_id
AND r.event_time >= ws.window_start
AND r.event_time < ws.window_end
WHERE ws.window_stddev > 0
AND (ABS(r.sensor_value - ws.window_avg) / NULLIF(ws.window_stddev, 0)) > 1.5;
A few implementation notes:
- RisingWave provides
STDDEV_POP(population standard deviation) andSTDDEV_SAMP(sample standard deviation). For IoT windows where all readings in the interval are available,STDDEV_POPis the correct choice. NULLIF(ws.window_stddev, 0)prevents division-by-zero errors when all readings in a window are identical, which can happen briefly after a device restart.- The
WHERE ws.window_stddev > 0guard excludes windows that have only a single reading, since standard deviation is undefined there. - A threshold of 1.5 is appropriate for IoT anomaly detection where sensors produce correlated readings. Use 2.0 or higher for noisier environments where false positives are costly.
SELECT
reading_id,
device_id,
sensor_value,
window_avg,
window_stddev,
z_score,
event_time
FROM iot_zscore_anomalies
ORDER BY z_score DESC;
reading_id | device_id | sensor_value | window_avg | window_stddev | z_score | event_time
------------+-----------+--------------+------------+---------------+---------+---------------------------
r005 | dev-001 | 96.4 | 80.8 | 7.87 | 1.98 | 2026-04-01 08:04:00+00:00
r013 | dev-003 | 8.7 | 3.82 | 2.46 | 1.98 | 2026-04-01 08:02:00+00:00
r008 | dev-002 | 130.1 | 104.74 | 12.87 | 1.97 | 2026-04-01 08:02:00+00:00
The three anomalous readings all score approximately 1.98, which is near 2.0 standard deviations above their respective window averages. The z-score approach catches these readings even without knowing in advance that 96.4 degrees or 130.1 psi are problematic for these specific devices. The detection adapts to whatever the sensor has been doing in the current window.
Threshold vs. Z-Score: When to Use Each
Both approaches complement each other in production:
| Approach | Catches | Misses | Best For |
| Threshold alert | Values outside fixed bounds | Gradual drift within bounds | Safety limits, regulatory compliance |
| Z-score (windowed) | Statistical outliers within a window | Slow trends across multiple windows | Sudden spikes, sensor faults |
Running both materialized views simultaneously gives you defense in depth: threshold alerts for absolute limits you must never breach, and z-score alerts for unexpected deviations even within the accepted range.
Building a Device Health Dashboard
A device health materialized view consolidates reading counts and alert counts per device into a single queryable surface for dashboards and APIs.
CREATE MATERIALIZED VIEW iot_device_status AS
SELECT
r.device_id,
d.device_name,
d.location,
d.sensor_type,
d.unit,
COUNT(r.reading_id) AS total_readings,
ROUND(AVG(r.sensor_value)::numeric, 2) AS overall_avg,
ROUND(MAX(r.sensor_value)::numeric, 2) AS overall_max,
COUNT(CASE WHEN r.sensor_value > d.normal_max
OR r.sensor_value < d.normal_min THEN 1 END) AS alert_count
FROM iot_readings r
JOIN iot_devices d ON r.device_id = d.device_id
GROUP BY r.device_id, d.device_name, d.location, d.sensor_type, d.unit;
SELECT * FROM iot_device_status ORDER BY device_id;
device_id | device_name | location | sensor_type | unit | total_readings | overall_avg | overall_max | alert_count
-----------+---------------+---------------+-------------+---------+----------------+-------------+-------------+-------------
dev-001 | Compressor-A1 | Plant Floor 1 | temperature | celsius | 7 | 85.79| 105.3 | 2
dev-002 | Pump-B2 | Plant Floor 2 | pressure | psi | 6 | 102.37 | 130.1 | 1
dev-003 | Motor-C3 | Warehouse | vibration | mm/s | 5 | 3.82 | 8.7 | 1
Because this is an incremental materialized view, a monitoring dashboard can query iot_device_status directly without hitting the raw readings table. The view is always pre-computed. Query latency is the same whether you have 100 sensors or 100,000.
Connecting to Kafka in Production
The tables in this guide accept direct INSERT statements, which is ideal for testing. Switching to a production Kafka source requires only changing the data source definition. The downstream materialized views are identical.
CREATE SOURCE iot_readings_source (
reading_id VARCHAR,
device_id VARCHAR,
sensor_value DOUBLE PRECISION,
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'iot-sensor-readings',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
For a complete list of supported connectors including MQTT, Kinesis, and Pulsar, see the RisingWave sources documentation.
When using a Kafka source, set event_time to the sensor-embedded timestamp field in your JSON payload, not to RisingWave's ingestion time. This ensures that tumbling windows align with when the sensor actually recorded the reading, not when it arrived in the broker. For guidance on timestamp handling and watermarks in streaming SQL, see Watermarks and Event Time in RisingWave.
FAQ
Does RisingWave support MQTT sources for IoT data?
Yes. RisingWave includes a native MQTT source connector that subscribes to MQTT topics directly. You can use wildcard topics like sensors/+/readings to collect from multiple devices. This eliminates the need for a separate MQTT-to-Kafka bridge in edge deployments where sensors publish using the MQTT protocol.
How does RisingWave handle late-arriving sensor readings?
RisingWave uses watermarks to track event-time progress. Readings that arrive after the watermark for their window has passed are considered late. The default behavior is to drop them from windowed aggregations. You can configure watermark lag tolerance in your source definition to accommodate sensor networks with high jitter. For details, see the watermark documentation.
Can I push anomaly alerts to external systems like PagerDuty or Slack?
Yes. RisingWave supports sinks that continuously push materialized view output to Kafka, PostgreSQL, and other targets. A common pattern is to sink iot_threshold_alerts to a Kafka topic and have a lightweight consumer forward alert rows to PagerDuty, Slack, or any webhook. See CREATE SINK in the RisingWave documentation.
What happens to materialized views when new devices are added to iot_devices?
The views pick up new devices automatically because the JOIN between iot_readings and iot_devices is evaluated incrementally as new data arrives. Insert a new row into iot_devices for the new device, and readings from that device will appear in iot_threshold_alerts and iot_device_status as soon as they land in iot_readings. No view refresh or pipeline restart is needed.
How does z-score detection compare to machine learning approaches for IoT anomalies?
Z-score detection is statistical, not learned, so it requires no training data and no model deployment pipeline. It works immediately on live data and adapts to the current window's distribution. The tradeoff is that it treats the current window average as ground truth, which means a sustained anomaly that lasts several windows will gradually shift the average and reduce the z-score. For gradual drift detection, combine z-score alerts with a longer-window baseline view, or layer a machine learning model on top of the aggregated features produced by iot_tumble_avg_5min.
What You Built
In this article you built a complete IoT anomaly detection pipeline using only SQL:
iot_devices: a metadata table holding per-device operating rangesiot_readings: the sensor data ingestion table (swappable with a Kafka source in production)iot_tumble_avg_5min: a 5-minute tumbling window view computing count, average, min, and max per deviceiot_threshold_alerts: a materialized view that joins readings against device thresholds and flags out-of-range values in real timeiot_zscore_anomalies: a materialized view that computes per-window z-scores and surfaces statistical outliers with a z-score above 1.5iot_device_status: a summary dashboard view showing total readings and alert count per device
Each view updates incrementally as new sensor data arrives. Query latency on the materialized views is independent of the volume of raw data in iot_readings because the computation happens at write time, not at read time.
To try this yourself, install RisingWave locally and run the SQL in this guide. For production deployments, RisingWave Cloud provides a fully managed option with no infrastructure to operate.

