Building an IIoT Data Pipeline with RisingWave

Building an IIoT Data Pipeline with RisingWave

Industrial IoT data pipelines often become unmaintainable tangles of custom connectors, transformation scripts, and message queues. RisingWave — a PostgreSQL-compatible streaming database — simplifies IIoT pipelines by providing a single SQL layer that ingests, transforms, aggregates, and delivers sensor data to any downstream system.

Why IIoT Data Pipelines Are Complex

Industrial IoT deployments span a wide range of hardware generations, protocols, and data formats. OPC-UA servers, MQTT brokers, Modbus gateways, and proprietary PLC export formats all coexist in a typical plant network. Getting data from these sources into a form that enterprise analytics tools can consume requires a chain of transformations: protocol normalization, unit conversion, quality filtering, contextual enrichment (mapping sensor IDs to asset names and locations), and temporal aggregation.

Most organizations build this pipeline incrementally — a Python script here, a Kafka Streams job there, a cron-scheduled dbt model for the warehouse export. Over time, the pipeline becomes a distributed collection of fragile scripts with no unified monitoring, inconsistent error handling, and tribal knowledge spread across three or four engineers. Adding a new sensor type or new downstream consumer requires understanding the entire chain.

The operational cost of maintaining such a pipeline often exceeds the value it delivers. Engineers spend more time debugging broken ingestion jobs than analyzing equipment performance data. When a new factory line is added, replicating the pipeline for the new equipment takes weeks of engineering work.

How RisingWave Simplifies IIoT Pipelines

RisingWave provides a single SQL interface for the entire transformation chain. A CREATE SOURCE statement connects to the Kafka topic where normalized IIoT events land (after a lightweight upstream protocol adapter converts OPC-UA/MQTT/Modbus to JSON). Materialized views perform contextual enrichment using temporal joins against asset master data in PostgreSQL. Additional views compute aggregations. CREATE SINK statements deliver results to downstream consumers.

The entire pipeline is expressed in SQL DDL statements that are easy to read, version-control, and modify. Adding a new sensor type means adding a few rows to the asset master and potentially a new column in a view — not modifying a Python codebase.

Building It Step by Step

Step 1: Create the Data Source

-- Normalized IIoT events from protocol gateway (OPC-UA/MQTT → Kafka)
CREATE SOURCE iiot_raw_events (
    device_id       VARCHAR,
    tag_id          VARCHAR,
    raw_value       DOUBLE PRECISION,
    raw_unit        VARCHAR,
    quality         VARCHAR,   -- GOOD, UNCERTAIN, BAD, NOT_CONNECTED
    source_protocol VARCHAR,   -- OPCUA, MQTT, MODBUS
    gateway_id      VARCHAR,
    ingestion_ts    TIMESTAMPTZ,
    device_ts       TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'iiot.raw_events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Asset context from plant information system (PostgreSQL CDC)
CREATE SOURCE asset_context (
    device_id           VARCHAR,
    tag_id              VARCHAR,
    canonical_name      VARCHAR,
    engineering_unit    VARCHAR,
    scale_factor        DOUBLE PRECISION,
    scale_offset        DOUBLE PRECISION,
    equipment_id        VARCHAR,
    area_id             VARCHAR,
    plant_id            VARCHAR,
    high_alarm          DOUBLE PRECISION,
    high_warning        DOUBLE PRECISION,
    low_warning         DOUBLE PRECISION,
    low_alarm           DOUBLE PRECISION
) WITH (
    connector = 'postgres-cdc',
    hostname = 'pis-db',
    port = '5432',
    username = 'replicator',
    password = 'secret',
    database.name = 'plant_info',
    schema.name = 'public',
    table.name = 'asset_context'
) FORMAT DEBEZIUM ENCODE JSON;

Step 2: Build the Core Materialized View

-- Enriched and scaled engineering values
CREATE MATERIALIZED VIEW iiot_engineering_values AS
SELECT
    r.device_id,
    r.tag_id,
    a.canonical_name,
    -- Apply scale and offset: engineering_value = raw_value * scale_factor + scale_offset
    r.raw_value * COALESCE(a.scale_factor, 1.0) + COALESCE(a.scale_offset, 0.0) AS eng_value,
    a.engineering_unit,
    r.quality,
    r.source_protocol,
    a.equipment_id,
    a.area_id,
    a.plant_id,
    a.high_alarm,
    a.high_warning,
    a.low_warning,
    a.low_alarm,
    r.device_ts                   AS measurement_ts,
    r.ingestion_ts,
    -- Latency between device measurement and platform ingestion
    EXTRACT(EPOCH FROM (r.ingestion_ts - r.device_ts)) AS ingestion_lag_sec
FROM iiot_raw_events r
LEFT JOIN asset_context a
    ON  a.device_id = r.device_id
    AND a.tag_id    = r.tag_id
WHERE r.quality = 'GOOD';

-- Latest value per tag (for current-value dashboard)
CREATE MATERIALIZED VIEW iiot_current_values AS
SELECT DISTINCT ON (tag_id)
    tag_id,
    canonical_name,
    eng_value,
    engineering_unit,
    equipment_id,
    area_id,
    plant_id,
    high_alarm,
    high_warning,
    low_warning,
    low_alarm,
    measurement_ts,
    CASE
        WHEN eng_value >= high_alarm   THEN 'HIGH_ALARM'
        WHEN eng_value >= high_warning THEN 'HIGH_WARNING'
        WHEN eng_value <= low_alarm    THEN 'LOW_ALARM'
        WHEN eng_value <= low_warning  THEN 'LOW_WARNING'
        ELSE 'NORMAL'
    END AS status
FROM iiot_engineering_values
ORDER BY tag_id, measurement_ts DESC;

Step 3: Add Alerts and Aggregations

-- Active alarms: tags currently in alarm state
CREATE MATERIALIZED VIEW active_alarms AS
SELECT
    tag_id,
    canonical_name,
    equipment_id,
    area_id,
    plant_id,
    eng_value,
    engineering_unit,
    status,
    measurement_ts AS alarm_start_ts
FROM iiot_current_values
WHERE status IN ('HIGH_ALARM', 'LOW_ALARM', 'HIGH_WARNING', 'LOW_WARNING');

-- 15-minute aggregated metrics per equipment (for historian / data lake)
CREATE MATERIALIZED VIEW iiot_15min_metrics AS
SELECT
    window_start,
    window_end,
    equipment_id,
    area_id,
    plant_id,
    canonical_name,
    engineering_unit,
    AVG(eng_value)   AS avg_value,
    MIN(eng_value)   AS min_value,
    MAX(eng_value)   AS max_value,
    STDDEV(eng_value) AS std_value,
    COUNT(*)         AS sample_count,
    COUNT(*) FILTER (WHERE status = 'HIGH_ALARM')   AS high_alarm_count,
    COUNT(*) FILTER (WHERE status = 'LOW_ALARM')    AS low_alarm_count
FROM TUMBLE(iiot_current_values, measurement_ts, INTERVAL '15 MINUTES')
GROUP BY window_start, window_end, equipment_id, area_id, plant_id,
         canonical_name, engineering_unit;

-- Data quality metrics: ingestion lag and bad-quality rate per gateway
CREATE MATERIALIZED VIEW gateway_quality_metrics AS
SELECT
    window_start,
    window_end,
    gateway_id,
    source_protocol,
    COUNT(*)                                            AS total_events,
    COUNT(*) FILTER (WHERE quality = 'BAD')             AS bad_events,
    COUNT(*) FILTER (WHERE quality = 'NOT_CONNECTED')   AS disconnected_events,
    AVG(ingestion_lag_sec)                              AS avg_ingestion_lag_sec,
    MAX(ingestion_lag_sec)                              AS max_ingestion_lag_sec
FROM TUMBLE(iiot_raw_events, ingestion_ts, INTERVAL '5 MINUTES')
GROUP BY window_start, window_end, gateway_id, source_protocol;

Step 4: Sink Results Downstream

-- Stream active alarms to plant operations notification system
CREATE SINK active_alarm_sink
FROM active_alarms
WITH (
    connector = 'kafka',
    topic = 'iiot.alarms.active',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Write 15-minute metrics to Iceberg for long-term historian
CREATE SINK historian_sink
FROM iiot_15min_metrics
WITH (
    connector = 'iceberg',
    type = 'append-only',
    catalog.type = 'rest',
    catalog.uri = 'http://iceberg-catalog:8181',
    database.name = 'iiot_historian',
    table.name = 'equipment_metrics_15min'
) FORMAT PLAIN ENCODE JSON;

-- Write current values to JDBC for SCADA / HMI integration
CREATE SINK current_values_sink
FROM iiot_current_values
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://scada-db:5432/iiot',
    table.name = 'tag_current_values'
) FORMAT PLAIN ENCODE JSON;

How This Compares to Traditional Approaches

AspectCustom Script PipelineStreaming SQL (RisingWave)
MaintainabilityFragile, tribal knowledgeDeclarative SQL DDL
Contextual enrichmentCustom join logicTemporal JOIN with CDC source
Unit conversionHardcoded per-scriptScale factor in asset context table
Adding new sensor typesModify multiple scriptsUpdate asset context table
MonitoringPer-script loggingSQL quality metrics view
Downstream deliveryCustom adaptersKafka + JDBC + Iceberg sinks

FAQ

What is an IIoT data pipeline?

An IIoT (Industrial Internet of Things) data pipeline is the infrastructure that moves sensor data from shop-floor devices to analytics, monitoring, and control systems. It typically includes protocol adaptation, data normalization, quality filtering, contextual enrichment, aggregation, and delivery to multiple downstream consumers.

How does RisingWave handle protocol diversity in IIoT environments?

RisingWave ingests from Kafka, which acts as the universal message bus. An upstream protocol gateway (lightweight adapters for OPC-UA, MQTT, Modbus) converts proprietary protocols to JSON events on Kafka topics. RisingWave then handles all transformation and routing using SQL.

Can I integrate RisingWave with my existing stack?

Yes. RisingWave connects to Kafka (source and sink), PostgreSQL via CDC, and MySQL via CDC. It writes to JDBC databases, Kafka topics, and Iceberg tables, making it compatible with SCADA systems, CMMS platforms, data lakes, and BI tools.

How do I handle the time offset between device clock and server clock?

The ingestion_lag_sec column in iiot_engineering_values tracks this offset. You can filter out readings with excessive lag (indicating clock drift or network delay) and build a gateway quality view to monitor lag trends per gateway.

Key Takeaways

  • Custom IIoT pipeline scripts accumulate into unmaintainable systems; RisingWave replaces them with declarative SQL DDL that is readable, versionable, and modifiable.
  • Temporal joins against the asset context CDC source handle contextual enrichment (unit conversion, alarm thresholds, equipment mapping) without a separate ETL step.
  • The same pipeline delivers real-time alarms to operations systems, 15-minute metrics to a long-term historian, and current values to SCADA — from a single set of SQL materialized views.
  • Data quality monitoring (gateway lag, bad-quality rates) is built into the pipeline as SQL views, eliminating the need for a separate monitoring tool.

Ready to try this? Get started with RisingWave. Join our Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.