Industrial IoT data pipelines often become unmaintainable tangles of custom connectors, transformation scripts, and message queues. RisingWave — a PostgreSQL-compatible streaming database — simplifies IIoT pipelines by providing a single SQL layer that ingests, transforms, aggregates, and delivers sensor data to any downstream system.
Why IIoT Data Pipelines Are Complex
Industrial IoT deployments span a wide range of hardware generations, protocols, and data formats. OPC-UA servers, MQTT brokers, Modbus gateways, and proprietary PLC export formats all coexist in a typical plant network. Getting data from these sources into a form that enterprise analytics tools can consume requires a chain of transformations: protocol normalization, unit conversion, quality filtering, contextual enrichment (mapping sensor IDs to asset names and locations), and temporal aggregation.
Most organizations build this pipeline incrementally — a Python script here, a Kafka Streams job there, a cron-scheduled dbt model for the warehouse export. Over time, the pipeline becomes a distributed collection of fragile scripts with no unified monitoring, inconsistent error handling, and tribal knowledge spread across three or four engineers. Adding a new sensor type or new downstream consumer requires understanding the entire chain.
The operational cost of maintaining such a pipeline often exceeds the value it delivers. Engineers spend more time debugging broken ingestion jobs than analyzing equipment performance data. When a new factory line is added, replicating the pipeline for the new equipment takes weeks of engineering work.
How RisingWave Simplifies IIoT Pipelines
RisingWave provides a single SQL interface for the entire transformation chain. A CREATE SOURCE statement connects to the Kafka topic where normalized IIoT events land (after a lightweight upstream protocol adapter converts OPC-UA/MQTT/Modbus to JSON). Materialized views perform contextual enrichment using temporal joins against asset master data in PostgreSQL. Additional views compute aggregations. CREATE SINK statements deliver results to downstream consumers.
The entire pipeline is expressed in SQL DDL statements that are easy to read, version-control, and modify. Adding a new sensor type means adding a few rows to the asset master and potentially a new column in a view — not modifying a Python codebase.
Building It Step by Step
Step 1: Create the Data Source
-- Normalized IIoT events from protocol gateway (OPC-UA/MQTT → Kafka)
CREATE SOURCE iiot_raw_events (
device_id VARCHAR,
tag_id VARCHAR,
raw_value DOUBLE PRECISION,
raw_unit VARCHAR,
quality VARCHAR, -- GOOD, UNCERTAIN, BAD, NOT_CONNECTED
source_protocol VARCHAR, -- OPCUA, MQTT, MODBUS
gateway_id VARCHAR,
ingestion_ts TIMESTAMPTZ,
device_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'iiot.raw_events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Asset context from plant information system (PostgreSQL CDC)
CREATE SOURCE asset_context (
device_id VARCHAR,
tag_id VARCHAR,
canonical_name VARCHAR,
engineering_unit VARCHAR,
scale_factor DOUBLE PRECISION,
scale_offset DOUBLE PRECISION,
equipment_id VARCHAR,
area_id VARCHAR,
plant_id VARCHAR,
high_alarm DOUBLE PRECISION,
high_warning DOUBLE PRECISION,
low_warning DOUBLE PRECISION,
low_alarm DOUBLE PRECISION
) WITH (
connector = 'postgres-cdc',
hostname = 'pis-db',
port = '5432',
username = 'replicator',
password = 'secret',
database.name = 'plant_info',
schema.name = 'public',
table.name = 'asset_context'
) FORMAT DEBEZIUM ENCODE JSON;
Step 2: Build the Core Materialized View
-- Enriched and scaled engineering values
CREATE MATERIALIZED VIEW iiot_engineering_values AS
SELECT
r.device_id,
r.tag_id,
a.canonical_name,
-- Apply scale and offset: engineering_value = raw_value * scale_factor + scale_offset
r.raw_value * COALESCE(a.scale_factor, 1.0) + COALESCE(a.scale_offset, 0.0) AS eng_value,
a.engineering_unit,
r.quality,
r.source_protocol,
a.equipment_id,
a.area_id,
a.plant_id,
a.high_alarm,
a.high_warning,
a.low_warning,
a.low_alarm,
r.device_ts AS measurement_ts,
r.ingestion_ts,
-- Latency between device measurement and platform ingestion
EXTRACT(EPOCH FROM (r.ingestion_ts - r.device_ts)) AS ingestion_lag_sec
FROM iiot_raw_events r
LEFT JOIN asset_context a
ON a.device_id = r.device_id
AND a.tag_id = r.tag_id
WHERE r.quality = 'GOOD';
-- Latest value per tag (for current-value dashboard)
CREATE MATERIALIZED VIEW iiot_current_values AS
SELECT DISTINCT ON (tag_id)
tag_id,
canonical_name,
eng_value,
engineering_unit,
equipment_id,
area_id,
plant_id,
high_alarm,
high_warning,
low_warning,
low_alarm,
measurement_ts,
CASE
WHEN eng_value >= high_alarm THEN 'HIGH_ALARM'
WHEN eng_value >= high_warning THEN 'HIGH_WARNING'
WHEN eng_value <= low_alarm THEN 'LOW_ALARM'
WHEN eng_value <= low_warning THEN 'LOW_WARNING'
ELSE 'NORMAL'
END AS status
FROM iiot_engineering_values
ORDER BY tag_id, measurement_ts DESC;
Step 3: Add Alerts and Aggregations
-- Active alarms: tags currently in alarm state
CREATE MATERIALIZED VIEW active_alarms AS
SELECT
tag_id,
canonical_name,
equipment_id,
area_id,
plant_id,
eng_value,
engineering_unit,
status,
measurement_ts AS alarm_start_ts
FROM iiot_current_values
WHERE status IN ('HIGH_ALARM', 'LOW_ALARM', 'HIGH_WARNING', 'LOW_WARNING');
-- 15-minute aggregated metrics per equipment (for historian / data lake)
CREATE MATERIALIZED VIEW iiot_15min_metrics AS
SELECT
window_start,
window_end,
equipment_id,
area_id,
plant_id,
canonical_name,
engineering_unit,
AVG(eng_value) AS avg_value,
MIN(eng_value) AS min_value,
MAX(eng_value) AS max_value,
STDDEV(eng_value) AS std_value,
COUNT(*) AS sample_count,
COUNT(*) FILTER (WHERE status = 'HIGH_ALARM') AS high_alarm_count,
COUNT(*) FILTER (WHERE status = 'LOW_ALARM') AS low_alarm_count
FROM TUMBLE(iiot_current_values, measurement_ts, INTERVAL '15 MINUTES')
GROUP BY window_start, window_end, equipment_id, area_id, plant_id,
canonical_name, engineering_unit;
-- Data quality metrics: ingestion lag and bad-quality rate per gateway
CREATE MATERIALIZED VIEW gateway_quality_metrics AS
SELECT
window_start,
window_end,
gateway_id,
source_protocol,
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE quality = 'BAD') AS bad_events,
COUNT(*) FILTER (WHERE quality = 'NOT_CONNECTED') AS disconnected_events,
AVG(ingestion_lag_sec) AS avg_ingestion_lag_sec,
MAX(ingestion_lag_sec) AS max_ingestion_lag_sec
FROM TUMBLE(iiot_raw_events, ingestion_ts, INTERVAL '5 MINUTES')
GROUP BY window_start, window_end, gateway_id, source_protocol;
Step 4: Sink Results Downstream
-- Stream active alarms to plant operations notification system
CREATE SINK active_alarm_sink
FROM active_alarms
WITH (
connector = 'kafka',
topic = 'iiot.alarms.active',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Write 15-minute metrics to Iceberg for long-term historian
CREATE SINK historian_sink
FROM iiot_15min_metrics
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
database.name = 'iiot_historian',
table.name = 'equipment_metrics_15min'
) FORMAT PLAIN ENCODE JSON;
-- Write current values to JDBC for SCADA / HMI integration
CREATE SINK current_values_sink
FROM iiot_current_values
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://scada-db:5432/iiot',
table.name = 'tag_current_values'
) FORMAT PLAIN ENCODE JSON;
How This Compares to Traditional Approaches
| Aspect | Custom Script Pipeline | Streaming SQL (RisingWave) |
| Maintainability | Fragile, tribal knowledge | Declarative SQL DDL |
| Contextual enrichment | Custom join logic | Temporal JOIN with CDC source |
| Unit conversion | Hardcoded per-script | Scale factor in asset context table |
| Adding new sensor types | Modify multiple scripts | Update asset context table |
| Monitoring | Per-script logging | SQL quality metrics view |
| Downstream delivery | Custom adapters | Kafka + JDBC + Iceberg sinks |
FAQ
What is an IIoT data pipeline?
An IIoT (Industrial Internet of Things) data pipeline is the infrastructure that moves sensor data from shop-floor devices to analytics, monitoring, and control systems. It typically includes protocol adaptation, data normalization, quality filtering, contextual enrichment, aggregation, and delivery to multiple downstream consumers.
How does RisingWave handle protocol diversity in IIoT environments?
RisingWave ingests from Kafka, which acts as the universal message bus. An upstream protocol gateway (lightweight adapters for OPC-UA, MQTT, Modbus) converts proprietary protocols to JSON events on Kafka topics. RisingWave then handles all transformation and routing using SQL.
Can I integrate RisingWave with my existing stack?
Yes. RisingWave connects to Kafka (source and sink), PostgreSQL via CDC, and MySQL via CDC. It writes to JDBC databases, Kafka topics, and Iceberg tables, making it compatible with SCADA systems, CMMS platforms, data lakes, and BI tools.
How do I handle the time offset between device clock and server clock?
The ingestion_lag_sec column in iiot_engineering_values tracks this offset. You can filter out readings with excessive lag (indicating clock drift or network delay) and build a gateway quality view to monitor lag trends per gateway.
Key Takeaways
- Custom IIoT pipeline scripts accumulate into unmaintainable systems; RisingWave replaces them with declarative SQL DDL that is readable, versionable, and modifiable.
- Temporal joins against the asset context CDC source handle contextual enrichment (unit conversion, alarm thresholds, equipment mapping) without a separate ETL step.
- The same pipeline delivers real-time alarms to operations systems, 15-minute metrics to a long-term historian, and current values to SCADA — from a single set of SQL materialized views.
- Data quality monitoring (gateway lag, bad-quality rates) is built into the pipeline as SQL views, eliminating the need for a separate monitoring tool.
Ready to try this? Get started with RisingWave. Join our Slack community.

