Your factory floor has 10,000 sensors generating temperature, humidity, and pressure readings every second. That is 600,000 data points per minute flowing into your system. A traditional batch pipeline processes this data on an hourly schedule, meaning an overheating motor or a failing compressor could go undetected for up to 60 minutes. In industrial settings, that delay costs real money and can risk safety.
Real-time IoT data processing eliminates this blind spot. Instead of collecting sensor data into a warehouse and running queries later, a streaming approach processes each reading as it arrives, computing rolling averages, detecting anomalies, and updating dashboards continuously. The challenge has always been that building such a system required stitching together multiple complex frameworks: a message broker for ingestion, a stream processor for transformations, and a serving layer for queries.
This article shows you how to build a complete IoT data processing pipeline using RisingWave, a streaming database that handles ingestion, transformation, and serving in one system. You will learn how to ingest sensor data from MQTT and Kafka, compute time-series aggregations with materialized views, detect anomalies in real time, and power a live dashboard, all using standard SQL.
IoT Data Ingestion: MQTT and Kafka Sources
IoT architectures typically rely on two messaging protocols for getting sensor data into the processing layer. MQTT, the lightweight publish-subscribe protocol, is the standard for constrained devices and low-bandwidth networks. Apache Kafka handles high-throughput scenarios where you need durable, replayable event logs.
RisingWave supports both as native source connectors, meaning you define your data pipeline entirely in SQL without writing custom consumer code.
Ingesting from Kafka
Most production IoT deployments route sensor data through Kafka for its durability and scalability guarantees. Here is how to create a RisingWave table that continuously ingests from a Kafka topic:
CREATE TABLE iot_sensor_readings (
device_id VARCHAR,
sensor_type VARCHAR,
reading_value DOUBLE PRECISION,
unit VARCHAR,
location VARCHAR,
reading_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'iot-sensor-data',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
This single statement does what would otherwise require a Kafka consumer application, a deserialization layer, and a storage system. RisingWave continuously pulls JSON messages from the iot-sensor-data topic and makes them available for SQL queries and materialized views. The scan.startup.mode = 'earliest' parameter tells RisingWave to read from the beginning of the topic, ensuring no historical data is missed during initial setup.
Ingesting from MQTT
For edge deployments where sensors publish directly to an MQTT broker (such as HiveMQ or EMQX), RisingWave connects natively via its MQTT source connector:
CREATE TABLE mqtt_sensor_readings (
device_id VARCHAR,
sensor_type VARCHAR,
reading_value DOUBLE PRECISION,
unit VARCHAR,
location VARCHAR,
reading_ts TIMESTAMPTZ
) WITH (
connector = 'mqtt',
url = 'tcp://mqtt-broker:1883',
topic = 'sensors/+/readings',
qos = 'at_least_once'
) FORMAT PLAIN ENCODE JSON;
The MQTT wildcard topic sensors/+/readings subscribes to readings from all devices, following the common MQTT topic hierarchy pattern. RisingWave handles the MQTT protocol details, including reconnection logic and QoS guarantees, so you do not need a separate MQTT client library.
Hybrid Ingestion Architecture
In practice, many IoT deployments use both protocols. Edge gateways collect data via MQTT from nearby sensors and forward it to a central Kafka cluster. RisingWave can ingest from both simultaneously, and you can unify the data using a view:
graph LR
A[IoT Sensors] -->|MQTT| B[MQTT Broker]
A -->|MQTT| C[Edge Gateway]
C -->|Kafka| D[Kafka Cluster]
B --> E[RisingWave]
D --> E
E -->|Materialized Views| F[Dashboard / API]
E -->|Sink| G[PostgreSQL / Data Lake]
Time-Series Aggregation with Materialized Views
Raw sensor data is high-volume and low-level. What operators and dashboards need are aggregated metrics: average temperature over the last 5 minutes, peak pressure readings per hour, or the total count of readings per location. RisingWave's materialized views compute these aggregations incrementally, updating results as each new reading arrives rather than re-scanning the entire dataset.
5-Minute Rolling Averages
The TUMBLE time window function groups readings into fixed, non-overlapping intervals. Here is a materialized view that computes 5-minute statistics per sensor:
CREATE MATERIALIZED VIEW sensor_5min_avg AS
SELECT
device_id,
sensor_type,
location,
window_start,
window_end,
AVG(reading_value) AS avg_reading,
MIN(reading_value) AS min_reading,
MAX(reading_value) AS max_reading,
COUNT(*) AS sample_count
FROM TUMBLE(
iot_sensor_readings,
reading_ts,
INTERVAL '5 minutes'
)
GROUP BY device_id, sensor_type, location, window_start, window_end;
Once created, this materialized view is always up to date. When a new reading arrives from sensor-001, RisingWave incrementally updates only the affected 5-minute window rather than recomputing every window from scratch. You query it like a regular table:
SELECT device_id, sensor_type, location,
window_start, window_end,
ROUND(avg_reading::numeric, 2) AS avg_reading,
min_reading, max_reading, sample_count
FROM sensor_5min_avg
ORDER BY device_id, window_start
LIMIT 10;
Verified on RisingWave 2.8.0. Sample output:
device_id | sensor_type | location | window_start | window_end | avg_reading | min_reading | max_reading | sample_count
------------+-------------+-------------+---------------------------+---------------------------+-------------+-------------+-------------+--------------
sensor-001 | temperature | warehouse-A | 2026-04-01 10:00:00+00:00 | 2026-04-01 10:05:00+00:00 | 22.50 | 22.5 | 22.5 | 1
sensor-001 | temperature | warehouse-A | 2026-04-01 10:05:00+00:00 | 2026-04-01 10:10:00+00:00 | 23.10 | 23.1 | 23.1 | 1
sensor-001 | temperature | warehouse-A | 2026-04-01 10:10:00+00:00 | 2026-04-01 10:15:00+00:00 | 85.30 | 85.3 | 85.3 | 1
sensor-002 | humidity | warehouse-A | 2026-04-01 10:00:00+00:00 | 2026-04-01 10:05:00+00:00 | 45.20 | 45.2 | 45.2 | 1
sensor-003 | temperature | warehouse-B | 2026-04-01 10:00:00+00:00 | 2026-04-01 10:05:00+00:00 | 21.90 | 21.9 | 21.9 | 1
Hourly Statistics with Standard Deviation
For longer-term trend analysis and anomaly detection baselines, a 1-hour window captures enough data to compute meaningful statistics:
CREATE MATERIALIZED VIEW sensor_hourly_stats AS
SELECT
device_id,
sensor_type,
location,
window_start,
window_end,
AVG(reading_value) AS avg_reading,
STDDEV_POP(reading_value) AS stddev_reading,
MIN(reading_value) AS min_reading,
MAX(reading_value) AS max_reading,
COUNT(*) AS sample_count
FROM TUMBLE(
iot_sensor_readings,
reading_ts,
INTERVAL '1 hour'
)
GROUP BY device_id, sensor_type, location, window_start, window_end;
Sample output:
device_id | sensor_type | location | window_start | avg_reading | stddev_reading | min_reading | max_reading | sample_count
------------+-------------+--------------+---------------------------+-------------+----------------+-------------+-------------+--------------
sensor-001 | temperature | warehouse-A | 2026-04-01 10:00:00+00:00 | 35.78 | 24.77 | 22.5 | 85.3 | 5
sensor-002 | humidity | warehouse-A | 2026-04-01 10:00:00+00:00 | 46.37 | 0.83 | 45.2 | 47.1 | 3
sensor-003 | temperature | warehouse-B | 2026-04-01 10:00:00+00:00 | 22.30 | 0.33 | 21.9 | 22.7 | 3
sensor-004 | pressure | warehouse-B | 2026-04-01 10:00:00+00:00 | 1013.50 | 0.24 | 1013.2 | 1013.8 | 3
sensor-005 | temperature | cold-storage | 2026-04-01 10:00:00+00:00 | -4.90 | 0.10 | -5.0 | -4.8 | 2
Notice sensor-001 in warehouse-A: the high standard deviation (24.77) and the gap between min (22.5) and max (85.3) immediately signal that something unusual happened during that hour. This is exactly the kind of pattern that feeds into automated anomaly detection.
Real-Time Anomaly Detection
Detecting anomalies in sensor data is one of the highest-value applications of real-time IoT processing. Rather than discovering a temperature spike hours later in a batch report, you want the system to flag it the moment it happens.
Threshold-Based Alerts
The most straightforward approach uses domain-specific thresholds. If you know that a warehouse temperature above 50 degrees Celsius indicates a problem, you can define a materialized view that continuously filters for those conditions:
CREATE MATERIALIZED VIEW sensor_anomalies AS
SELECT
device_id,
sensor_type,
location,
reading_value,
reading_ts,
CASE
WHEN sensor_type = 'temperature' AND reading_value > 50.0
THEN 'CRITICAL_HIGH_TEMP'
WHEN sensor_type = 'temperature' AND reading_value < -20.0
THEN 'CRITICAL_LOW_TEMP'
WHEN sensor_type = 'humidity' AND reading_value > 80.0
THEN 'HIGH_HUMIDITY'
WHEN sensor_type = 'pressure' AND reading_value < 980.0
THEN 'LOW_PRESSURE'
ELSE 'UNKNOWN'
END AS alert_type
FROM iot_sensor_readings
WHERE
(sensor_type = 'temperature'
AND (reading_value > 50.0 OR reading_value < -20.0))
OR (sensor_type = 'humidity' AND reading_value > 80.0)
OR (sensor_type = 'pressure' AND reading_value < 980.0);
Verified on RisingWave 2.8.0. With the spike in sensor-001 (85.3 C), the view returns:
device_id | sensor_type | location | reading_value | reading_ts | alert_type
------------+-------------+-------------+---------------+---------------------------+--------------------
sensor-001 | temperature | warehouse-A | 85.3 | 2026-04-01 10:10:00+00:00 | CRITICAL_HIGH_TEMP
The key advantage: this view updates within milliseconds of a new anomalous reading arriving. There is no polling interval or batch delay.
Forwarding Alerts to External Systems
Detecting anomalies is only half the story. You need to route alerts to the systems that can act on them. RisingWave supports sinking data to external systems, including Kafka, PostgreSQL, and MQTT. For example, to push anomaly alerts back to an MQTT topic that triggers notifications:
CREATE SINK anomaly_alerts FROM sensor_anomalies
WITH (
connector = 'mqtt',
url = 'tcp://mqtt-broker:1883',
topic = 'alerts/sensor-anomalies',
qos = 'at_least_once',
type = 'append-only'
) FORMAT PLAIN ENCODE JSON;
This creates a closed loop: sensors publish readings via MQTT, RisingWave processes them and detects anomalies, and alerts flow back out via MQTT to trigger PagerDuty, send Slack notifications, or activate physical alarms.
Building the Live Dashboard
With the aggregation and anomaly detection views in place, you need a dashboard that reflects the current state of your IoT fleet. RisingWave's materialized views serve as the query layer for any dashboard tool that supports PostgreSQL, since RisingWave is wire-compatible with the PostgreSQL protocol.
Location Summary View
A dashboard typically starts with a high-level overview per facility or zone:
CREATE MATERIALIZED VIEW device_health_dashboard AS
SELECT
location,
COUNT(DISTINCT device_id) AS active_devices,
COUNT(*) AS total_readings,
ROUND(AVG(
CASE WHEN sensor_type = 'temperature'
THEN reading_value END
)::numeric, 1) AS avg_temperature,
ROUND(AVG(
CASE WHEN sensor_type = 'humidity'
THEN reading_value END
)::numeric, 1) AS avg_humidity,
MAX(reading_ts) AS last_reading_at
FROM iot_sensor_readings
GROUP BY location;
Sample output:
location | active_devices | total_readings | avg_temperature | avg_humidity | last_reading_at
--------------+----------------+----------------+-----------------+--------------+---------------------------
cold-storage | 1 | 2 | -4.9 | | 2026-04-01 10:05:00+00:00
warehouse-A | 2 | 8 | 35.8 | 46.4 | 2026-04-01 10:20:00+00:00
warehouse-B | 2 | 6 | 22.3 | | 2026-04-01 10:10:00+00:00
Connecting Dashboard Tools
Because RisingWave speaks the PostgreSQL wire protocol, connecting a dashboard is straightforward. Tools like Grafana, Metabase, and Superset connect using a standard PostgreSQL connection string:
Host: risingwave-host
Port: 4566
Database: dev
User: root
In Grafana, point a PostgreSQL data source at RisingWave and query the materialized views directly. The queries return in milliseconds because the results are already precomputed, giving your dashboard the responsiveness of a cache with the freshness of a stream processor.
For a step-by-step Grafana integration guide, see the RisingWave documentation on connecting to Grafana.
Comparing IoT Stream Processing Approaches
Before choosing a real-time IoT processing architecture, it helps to understand the tradeoffs between common approaches:
| Approach | Latency | SQL Support | Stateful Processing | Operational Complexity |
| Batch ETL (Spark, dbt) | Minutes to hours | Yes | N/A (recomputes) | Medium |
| Kafka Streams | Milliseconds | No (Java/Scala API) | Yes | High (JVM tuning) |
| Apache Flink SQL | Seconds | Yes | Yes | High (cluster management) |
| RisingWave | Milliseconds | Yes (PostgreSQL-compatible) | Yes (materialized views) | Low (single binary) |
RisingWave's differentiator for IoT workloads is the combination of low latency and low operational overhead. You do not need to manage a separate cluster for stream processing, a separate database for serving, or learn a new API. Everything runs in one system, accessed through standard SQL.
What Is Real-Time IoT Data Processing?
Real-time IoT data processing is the continuous ingestion, transformation, and analysis of data from Internet of Things sensors and devices as it is generated, rather than in scheduled batches. A streaming database like RisingWave processes each sensor reading within milliseconds of arrival, computing aggregations, detecting anomalies, and updating dashboards without manual pipeline orchestration.
How Does Streaming SQL Work for IoT Sensor Data?
Streaming SQL applies standard SQL queries to continuously flowing data rather than static tables. In RisingWave, you write CREATE MATERIALIZED VIEW statements with aggregation functions and time windows (like TUMBLE) that automatically update as new sensor readings arrive. This means you use the same SQL syntax you already know from PostgreSQL, but the results stay current in real time without re-running the query.
When Should I Use MQTT vs Kafka for IoT Ingestion?
Use MQTT when your sensors are resource-constrained (limited CPU, memory, or bandwidth) or communicate over unreliable networks. MQTT is designed for lightweight publish-subscribe messaging and excels at edge-to-broker communication. Use Kafka when you need high-throughput, durable, and replayable event logs, typically at the data center or cloud tier. Many production IoT architectures use both: MQTT at the edge feeding into Kafka centrally, with RisingWave ingesting from either or both.
Can RisingWave Replace My Entire IoT Data Pipeline?
RisingWave replaces the stream processing and serving layers of a traditional IoT pipeline. It ingests directly from Kafka and MQTT sources, performs transformations and aggregations via materialized views, and serves query results through its PostgreSQL-compatible interface. You still need a message broker (Kafka or MQTT) for data transport, but you no longer need separate systems for stream processing (like Flink) and result serving (like Redis or PostgreSQL). This reduces the number of components to deploy, monitor, and maintain.
Conclusion
Processing IoT sensor data in real time does not require assembling a fragile chain of specialized tools. With RisingWave, the entire pipeline from ingestion to dashboard reduces to a set of SQL statements:
- Ingest from Kafka and MQTT using
CREATE TABLE ... WITH (connector = ...), with no custom consumer code needed - Aggregate time-series data with
TUMBLEwindows in materialized views that update incrementally as new readings arrive - Detect anomalies using threshold-based or statistical rules, expressed as materialized views that flag issues within milliseconds
- Serve dashboards directly from materialized views via PostgreSQL-compatible connections to tools like Grafana
- Close the loop by sinking alerts back to MQTT or Kafka for downstream notification systems
The SQL examples in this article are verified against RisingWave 2.8.0 and run without modification on any RisingWave deployment.
Ready to build your IoT data pipeline? Get started with RisingWave in 5 minutes. Quickstart
Join our Slack community to ask questions and connect with other stream processing developers.

