IoT sensor networks produce data volumes that overwhelm traditional databases within hours of deployment. RisingWave — a PostgreSQL-compatible streaming database — ingests high-throughput sensor event streams and maintains real-time aggregations using standard SQL, so your analytics scale with your sensor fleet.
Why High-Volume IoT Analytics Is Difficult
A modern industrial facility might deploy thousands of temperature, pressure, vibration, flow, and current sensors, each emitting a reading every one to ten seconds. At that density, a facility with 5,000 sensors generates tens of thousands of events per second — far beyond what a traditional relational database can absorb as an INSERT workload while simultaneously serving analytical queries.
The standard response to this problem is to build a specialized IoT data stack: a time-series database for raw readings, a stream processor for real-time alerting, and a data warehouse for historical analysis. Each layer requires its own deployment, configuration, and maintenance expertise. Data must be copied between layers, introducing latency and creating opportunities for inconsistency. The operational burden of running three systems instead of one can consume more engineering time than the IoT use case itself.
Beyond the infrastructure problem, there is a query semantics problem. IoT analytics almost always involve time-windowed aggregations — average temperature over the last five minutes, maximum vibration in the last hour, anomalies relative to a rolling baseline. Traditional stream processors require custom code to express these patterns. SQL-native streaming changes the equation: the same declarative language used for historical analysis also expresses real-time computations.
How Streaming SQL Handles IoT Scale
RisingWave processes sensor event streams through a shared-nothing architecture that partitions work across compute nodes. Incremental computation means that each new sensor reading updates only the affected aggregations — a single temperature reading from sensor S-042 does not trigger recomputation of the aggregations for sensor S-041 or any other sensor. This makes per-sensor metric maintenance efficient at scale.
TUMBLE windows compute non-overlapping aggregation periods (5-minute averages, hourly extremes), HOP windows produce overlapping periods useful for anomaly detection, and SESSION windows detect sensor silence — periods when a sensor stops reporting, which may indicate a hardware failure or connectivity issue.
Because RisingWave exposes a PostgreSQL interface, the same SQL queries used to analyze historical sensor data in a warehouse also work against live streaming views. Your data engineers write one query language, not three.
Building It Step by Step
Step 1: Create the Data Source
CREATE SOURCE sensor_readings (
sensor_id VARCHAR,
device_group VARCHAR,
location_id VARCHAR,
facility_id VARCHAR,
sensor_type VARCHAR, -- TEMPERATURE, PRESSURE, VIBRATION, FLOW, CURRENT, HUMIDITY
value DOUBLE PRECISION,
unit VARCHAR,
quality_flag SMALLINT, -- 0=Good, 1=Uncertain, 2=Bad
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'iot.sensor_readings',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
-- Latest reading per sensor
CREATE MATERIALIZED VIEW sensor_latest AS
SELECT DISTINCT ON (sensor_id)
sensor_id,
device_group,
location_id,
facility_id,
sensor_type,
value AS latest_value,
unit,
quality_flag,
event_ts AS last_reported_ts,
NOW() - event_ts AS data_age
FROM sensor_readings
WHERE quality_flag = 0 -- good quality readings only
ORDER BY sensor_id, event_ts DESC;
-- 5-minute tumbling window averages per sensor
CREATE MATERIALIZED VIEW sensor_5min_avg AS
SELECT
window_start,
window_end,
sensor_id,
sensor_type,
facility_id,
location_id,
AVG(value) AS avg_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
COUNT(*) AS reading_count,
STDDEV(value) AS stddev_value
FROM TUMBLE(sensor_readings, event_ts, INTERVAL '5 MINUTES')
WHERE quality_flag = 0
GROUP BY window_start, window_end, sensor_id, sensor_type, facility_id, location_id;
Step 3: Add Alerts and Aggregations
-- Threshold breach alerts (configurable per sensor type)
CREATE MATERIALIZED VIEW sensor_threshold_alerts AS
SELECT
s.sensor_id,
s.device_group,
s.location_id,
s.facility_id,
s.sensor_type,
s.latest_value,
s.unit,
s.last_reported_ts,
CASE
WHEN s.sensor_type = 'TEMPERATURE' AND s.latest_value > 85 THEN 'HIGH_TEMP'
WHEN s.sensor_type = 'PRESSURE' AND s.latest_value > 150 THEN 'HIGH_PRESSURE'
WHEN s.sensor_type = 'VIBRATION' AND s.latest_value > 12 THEN 'HIGH_VIBRATION'
WHEN s.sensor_type = 'CURRENT' AND s.latest_value > 95 THEN 'OVERCURRENT'
ELSE NULL
END AS alert_type
FROM sensor_latest s
WHERE (s.sensor_type = 'TEMPERATURE' AND s.latest_value > 85)
OR (s.sensor_type = 'PRESSURE' AND s.latest_value > 150)
OR (s.sensor_type = 'VIBRATION' AND s.latest_value > 12)
OR (s.sensor_type = 'CURRENT' AND s.latest_value > 95);
-- Anomaly detection: current value more than 3 standard deviations from 5-min average
CREATE MATERIALIZED VIEW sensor_anomalies AS
SELECT
r.sensor_id,
r.sensor_type,
r.facility_id,
r.location_id,
r.value AS current_value,
a.avg_value,
a.stddev_value,
ABS(r.value - a.avg_value) / NULLIF(a.stddev_value, 0) AS z_score,
r.event_ts
FROM sensor_readings r
JOIN sensor_5min_avg a
ON a.sensor_id = r.sensor_id
AND a.window_end = (
SELECT MAX(window_end)
FROM sensor_5min_avg
WHERE sensor_id = r.sensor_id
AND window_end <= r.event_ts
)
WHERE a.stddev_value > 0
AND ABS(r.value - a.avg_value) / a.stddev_value > 3;
-- Sensor silence detection: sensors not reporting in last 2 minutes
CREATE MATERIALIZED VIEW silent_sensors AS
SELECT
sensor_id,
device_group,
facility_id,
location_id,
sensor_type,
last_reported_ts,
EXTRACT(EPOCH FROM (NOW() - last_reported_ts)) / 60.0 AS silent_minutes
FROM sensor_latest
WHERE EXTRACT(EPOCH FROM (NOW() - last_reported_ts)) / 60.0 > 2;
Step 4: Sink Results Downstream
-- Push threshold alerts to IoT platform alert engine
CREATE SINK sensor_alert_sink
FROM sensor_threshold_alerts
WITH (
connector = 'kafka',
topic = 'iot.alerts.threshold',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Write 5-minute aggregations to time-series store for dashboards
CREATE SINK sensor_aggregates_sink
FROM sensor_5min_avg
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://tsdb:5432/iot_metrics',
table.name = 'sensor_5min_avg'
) FORMAT PLAIN ENCODE JSON;
-- Long-term retention in Iceberg
CREATE SINK sensor_raw_archive
FROM sensor_readings
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
database.name = 'iot',
table.name = 'sensor_readings_archive'
) FORMAT PLAIN ENCODE JSON;
How This Compares to Traditional Approaches
| Aspect | TSDB + Stream Processor | Streaming SQL (RisingWave) |
| Systems to operate | 3+ (TSDB, processor, warehouse) | 1 |
| Query language | Multiple (InfluxQL, Java/Python, SQL) | Standard PostgreSQL SQL |
| Alerting latency | Milliseconds (custom code) | Sub-second (declarative SQL) |
| Anomaly detection | Custom application logic | SQL window + join |
| Scalability | Per-system scaling | Single horizontal scale-out |
| Silence detection | Heartbeat monitoring service | SESSION window or staleness view |
FAQ
What is IoT sensor event processing?
IoT sensor event processing is the ingestion, transformation, and analysis of high-frequency readings from connected physical devices. Use cases include threshold alerting, anomaly detection, predictive maintenance, energy optimization, and equipment health monitoring.
How does RisingWave scale to millions of events per second?
RisingWave uses shared-nothing horizontal scaling — each compute node processes a partition of the sensor stream independently. As event volume grows, additional nodes are added. Incremental computation ensures each new event updates only the affected materialized view rows, keeping per-event processing cost constant regardless of accumulated historical data.
Can I integrate RisingWave with my existing stack?
Yes. RisingWave connects to Kafka (source and sink), PostgreSQL via CDC, MySQL via CDC, and writes to JDBC databases and Iceberg tables. The PostgreSQL-compatible interface means Grafana, Prometheus remote-write (via adapter), and standard BI tools work without modification.
How do I handle bad-quality sensor readings?
Filter on the quality_flag column as shown above. You can also build a data quality materialized view that tracks bad-reading rates per sensor — a rising bad-reading rate often precedes sensor failure.
Key Takeaways
- Multi-layer IoT stacks (TSDB + stream processor + warehouse) create operational complexity that streaming SQL eliminates by unifying ingestion, real-time alerting, and analytics in one system.
- RisingWave's TUMBLE and HOP windows compute time-series aggregations in SQL; SESSION windows detect sensor silence without a separate heartbeat monitoring service.
- Anomaly detection using rolling standard deviation is expressible as a SQL JOIN between raw events and windowed averages — no custom code required.
- Results flow to IoT alerting platforms, dashboards, and long-term Iceberg storage through declarative sink definitions.
Ready to try this? Get started with RisingWave. Join our Slack community.

