IoT anomaly detection with streaming SQL window functions means identifying abnormal sensor readings—outliers, sudden spikes, drift—as they happen, using statistical windows evaluated continuously over live data streams. RisingWave, a PostgreSQL-compatible streaming database, provides the window functions you need with no separate ML platform.
Why Real-Time Anomaly Detection Matters
A vibration sensor reading that exceeds normal bounds by 3 standard deviations often precedes equipment failure by hours or days. A temperature reading 20°C above baseline may indicate a cooling system fault. Detecting these anomalies from batch data means discovering the problem after it escalates—often after a shutdown or a safety event.
Real-time anomaly detection turns reactive maintenance into predictive maintenance. Operators receive alerts while there is still time to intervene. IoT fleets across manufacturing, energy, transportation, and utilities all share the same requirement: find the anomaly before it becomes an incident.
The traditional approach—streaming the data to a data lake, running a scheduled ML job, and returning predictions—introduces latency measured in minutes or hours. Streaming SQL with window functions collapses detection latency to sub-second.
How Streaming SQL Solves This
Window functions in RisingWave operate over continuous streams. TUMBLE divides time into fixed non-overlapping buckets. HOP creates overlapping windows for smoother signal tracking. Standard SQL aggregate window functions (AVG, STDDEV, LAG, LEAD) compute baselines and deltas over rolling partitions.
The key insight: you don't need a machine learning model to detect most IoT anomalies. Statistical methods—z-score, interquartile range, rate-of-change thresholds—catch the majority of real-world equipment anomalies and are trivially expressible in SQL.
Step-by-Step Tutorial
Step 1: Connect Your Data Source
CREATE SOURCE machine_sensors (
machine_id VARCHAR,
sensor_id VARCHAR,
metric VARCHAR,
value DOUBLE,
quality INT,
event_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'machine-telemetry',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Real-Time View
Compute a rolling baseline—mean and standard deviation—over a 10-minute tumble window. This becomes the reference for z-score anomaly detection.
CREATE MATERIALIZED VIEW sensor_baseline_10min AS
SELECT
machine_id,
sensor_id,
metric,
window_start,
window_end,
AVG(value) AS mean_value,
STDDEV(value) AS stddev_value,
COUNT(*) AS sample_count
FROM TUMBLE(machine_sensors, event_time, INTERVAL '10 MINUTES')
WHERE quality = 1
GROUP BY machine_id, sensor_id, metric, window_start, window_end;
Step 3: Window-Based Aggregations
Use a HOP window with a 1-minute slide to compute a short-term average for comparison against the baseline:
CREATE MATERIALIZED VIEW sensor_short_avg AS
SELECT
machine_id,
sensor_id,
metric,
window_start,
window_end,
AVG(value) AS short_avg,
MAX(value) AS max_val,
MIN(value) AS min_val
FROM HOP(machine_sensors, event_time, INTERVAL '1 MINUTE', INTERVAL '5 MINUTES')
WHERE quality = 1
GROUP BY machine_id, sensor_id, metric, window_start, window_end;
For rate-of-change anomalies, use the LAG window function to detect sudden jumps:
CREATE MATERIALIZED VIEW sensor_rate_of_change AS
SELECT
machine_id,
sensor_id,
metric,
value,
event_time,
value - LAG(value, 1) OVER (
PARTITION BY machine_id, sensor_id, metric
ORDER BY event_time
) AS delta
FROM machine_sensors
WHERE quality = 1;
Step 4: Alerts and Sinks
Join the short-term average against the baseline to compute a z-score and fire alerts when it exceeds a threshold:
CREATE MATERIALIZED VIEW anomaly_alerts AS
SELECT
s.machine_id,
s.sensor_id,
s.metric,
s.short_avg AS observed_value,
b.mean_value AS baseline_mean,
b.stddev_value AS baseline_stddev,
ABS(s.short_avg - b.mean_value) / NULLIF(b.stddev_value, 0) AS z_score,
s.window_end AS detected_at
FROM sensor_short_avg s
JOIN sensor_baseline_10min b
ON s.machine_id = b.machine_id
AND s.sensor_id = b.sensor_id
AND s.metric = b.metric
AND s.window_start >= b.window_start
AND s.window_start < b.window_end
WHERE ABS(s.short_avg - b.mean_value) / NULLIF(b.stddev_value, 0) > 3.0;
CREATE SINK anomaly_sink
FROM anomaly_alerts
WITH (
connector = 'kafka',
topic = 'anomaly-events',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Approach | Detection Latency | Infrastructure | Tuning |
| Scheduled ML batch job | Minutes to hours | Spark + model server + scheduler | Model retraining pipeline |
| Streaming ML (Flink + Python) | Seconds | Flink cluster + model service | Complex deployment |
| Streaming SQL (RisingWave) | Sub-second | Single streaming database | SQL threshold adjustment |
FAQ
How do I tune the z-score threshold to reduce false positives?
Start with z > 3.0 (roughly 0.3% false positive rate under normal distribution). Adjust the threshold in the WHERE clause of anomaly_alerts. You can also add a minimum sample count filter (b.sample_count > 20) to avoid triggering on sparse data.
Can I run multiple anomaly detection strategies simultaneously?
Yes. Create separate materialized views for each strategy—z-score, IQR, rate-of-change, absolute threshold—and union the results in a final alerts view. Each view updates independently.
What happens if a sensor goes silent?
Use a session window or a CASE expression that checks MAX(event_time) against the current timestamp to detect devices that stop reporting. Combine this with an external heartbeat table for device liveness tracking.
Key Takeaways
- Statistical anomaly detection using z-scores and rate-of-change deltas is expressible in pure SQL window functions, eliminating the need for a dedicated ML platform for most IoT use cases.
TUMBLEandHOPwindows compute baselines and short-term averages that can be joined together to identify deviations in real time.- Alerts materialize automatically and propagate to Kafka sinks with sub-second latency.
- RisingWave's incremental view maintenance means baseline statistics are always fresh without full recomputation.

