SCADA Data Streaming: From OPC-UA to Real-Time SQL

SCADA Data Streaming: From OPC-UA to Real-Time SQL

SCADA data streaming from OPC-UA to real-time SQL means connecting industrial control system telemetry—valve positions, flow meters, pressure transmitters—to a streaming database where engineers query live process data with standard SQL. RisingWave, a PostgreSQL-compatible streaming database, serves as that analytics layer, receiving events from an OPC-UA to Kafka bridge.

Why SCADA Systems Need Real-Time SQL Analytics

SCADA systems have monitored industrial processes for decades. They excel at control—reading sensors, driving actuators, maintaining set points. But their analytics capabilities have always been limited: historian databases with proprietary query interfaces, pre-built trend screens, and batch exports to Excel for deeper analysis.

Modern industrial operations demand more. Process engineers want to correlate valve behavior with downstream pressure changes. Reliability teams want to track slow degradation trends across dozens of instruments. Operations managers want shift performance reports generated automatically, not manually assembled from historian exports.

Real-time SQL analytics on top of SCADA data delivers all of this. Engineers write familiar SQL queries against live process data. Automated reports run as continuously maintained views. Anomalies trigger alerts in real time rather than waiting for the morning walkdown.

How Streaming SQL Solves This

The bridge between OPC-UA and RisingWave is a Kafka producer that subscribes to OPC-UA subscriptions and publishes data changes to Kafka topics. Open-source tools like node-opcua, Eclipse Milo, and Confluent's industrial connectors all support this pattern.

Once process data flows through Kafka, RisingWave takes over: sources ingest the topics, materialized views compute process KPIs, and sinks forward results to dashboards or notification systems.

Step-by-Step Tutorial

Step 1: Connect Your Data Source

OPC-UA data changes arrive in Kafka as JSON with the node ID, value, quality, and server timestamp:

CREATE SOURCE opcua_data_changes (
    node_id       VARCHAR,   -- e.g. 'ns=2;s=Plant1.Area3.FT-101.PV'
    tag_name      VARCHAR,
    value         DOUBLE,
    quality       INT,       -- 192 = Good (OPC-UA quality code)
    server_time   TIMESTAMPTZ,
    source_time   TIMESTAMPTZ,
    unit          VARCHAR,
    data_type     VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'opcua-data-changes',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Also connect to the tag configuration database for engineering units and ranges:

CREATE SOURCE tag_config (
    tag_name      VARCHAR,
    description   VARCHAR,
    area          VARCHAR,
    process_unit  VARCHAR,
    eu_low        DOUBLE,
    eu_high       DOUBLE,
    alarm_high    DOUBLE,
    alarm_low     DOUBLE
) WITH (
    connector = 'postgres-cdc',
    hostname = 'historian-db.plant',
    port = '5432',
    username = 'analytics',
    password = 'secret',
    database.name = 'scada_config',
    schema.name = 'public',
    table.name = 'tag_config'
);

Step 2: Build the Real-Time View

Create a live process snapshot showing the current value of every tag, enriched with engineering context:

CREATE MATERIALIZED VIEW process_snapshot AS
SELECT DISTINCT ON (d.tag_name)
    d.tag_name,
    t.description,
    t.area,
    t.process_unit,
    d.value,
    t.unit,
    d.quality,
    (d.value - t.eu_low) / NULLIF(t.eu_high - t.eu_low, 0) * 100 AS pct_of_range,
    d.source_time AS last_updated
FROM opcua_data_changes d
LEFT JOIN tag_config t USING (tag_name)
WHERE d.quality = 192  -- Good quality only
ORDER BY d.tag_name, d.source_time DESC;

Step 3: Window-Based Aggregations

Compute 1-minute statistics for each process tag to support trend analysis and control chart generation:

CREATE MATERIALIZED VIEW tag_stats_1min AS
SELECT
    tag_name,
    window_start,
    window_end,
    AVG(value)    AS avg_value,
    MAX(value)    AS max_value,
    MIN(value)    AS min_value,
    STDDEV(value) AS stddev_value,
    COUNT(*)      AS sample_count,
    COUNT(*) FILTER (WHERE quality = 192) AS good_samples
FROM TUMBLE(opcua_data_changes, source_time, INTERVAL '1 MINUTE')
GROUP BY tag_name, window_start, window_end;

Compute area-level rollups for process unit dashboards:

CREATE MATERIALIZED VIEW area_summary_5min AS
SELECT
    t.area,
    t.process_unit,
    s.window_start,
    s.window_end,
    COUNT(DISTINCT s.tag_name)  AS active_tags,
    AVG(s.avg_value)            AS fleet_avg,
    SUM(s.good_samples)         AS total_good_samples,
    SUM(s.sample_count)         AS total_samples,
    ROUND(
        100.0 * SUM(s.good_samples) / NULLIF(SUM(s.sample_count), 0), 2
    )                           AS data_quality_pct
FROM tag_stats_1min s
JOIN tag_config t USING (tag_name)
GROUP BY t.area, t.process_unit, s.window_start, s.window_end;

Step 4: Alerts and Sinks

Fire process alarms when tag values exceed their configured high/low limits:

CREATE MATERIALIZED VIEW process_alarms AS
SELECT
    d.tag_name,
    t.description,
    t.area,
    d.value,
    t.alarm_high,
    t.alarm_low,
    d.source_time  AS alarm_time,
    CASE
        WHEN d.value > t.alarm_high THEN 'HIGH'
        WHEN d.value < t.alarm_low  THEN 'LOW'
    END AS alarm_state
FROM opcua_data_changes d
JOIN tag_config t USING (tag_name)
WHERE d.quality = 192
  AND (d.value > t.alarm_high OR d.value < t.alarm_low);

CREATE SINK process_alarms_sink
FROM process_alarms
WITH (
    connector = 'kafka',
    topic = 'process-alarms',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

SCADA HistorianTime-Series DB + GrafanaStreaming SQL (RisingWave)
Query languageProprietary SQL / RESTInfluxQL / PromQLStandard PostgreSQL SQL
Real-time latencySeconds to minutesSecondsSub-second
Cross-tag correlationLimitedTag-by-tagFull relational SQL joins
Alert definitionHMI screen configGrafana alertingSQL WHERE clause
Integration with ITComplex adaptersREST APIsJDBC/ODBC native

FAQ

The most commonly used options are Eclipse Milo (Java, open source), node-opcua with a custom Kafka producer, and industrial-grade connectors from Confluent or Prosys. The choice depends on whether you need OPC-UA security profiles, subscriptions vs. polling, and required throughput.

How do I handle OPC-UA bad quality data?

Filter on quality = 192 (OPC-UA Good quality code) in your materialized view WHERE clauses. For historian-style gap filling, use a HOP window and track good_samples / total_samples to detect quality degradation, as shown in area_summary_5min.

Can RisingWave replace the SCADA historian entirely?

RisingWave is optimized for streaming analytics and real-time query latency. For long-term time-series storage (years of raw samples), a purpose-built historian or time-series database is still appropriate. The recommended pattern is to use RisingWave for live analytics while also sinking to cold storage for historical retention.

Key Takeaways

  • An OPC-UA to Kafka bridge is all that is needed to connect industrial SCADA data to RisingWave's streaming SQL engine.
  • Materialized views provide a live process snapshot with sub-second freshness, enriched with tag configuration from a CDC-connected database.
  • Standard SQL replaces proprietary historian query interfaces, enabling process engineers to use familiar tools.
  • Process alarms defined as materialized views automatically sink to Kafka, bridging industrial OT and IT notification systems.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.