SCADA data streaming from OPC-UA to real-time SQL means connecting industrial control system telemetry—valve positions, flow meters, pressure transmitters—to a streaming database where engineers query live process data with standard SQL. RisingWave, a PostgreSQL-compatible streaming database, serves as that analytics layer, receiving events from an OPC-UA to Kafka bridge.
Why SCADA Systems Need Real-Time SQL Analytics
SCADA systems have monitored industrial processes for decades. They excel at control—reading sensors, driving actuators, maintaining set points. But their analytics capabilities have always been limited: historian databases with proprietary query interfaces, pre-built trend screens, and batch exports to Excel for deeper analysis.
Modern industrial operations demand more. Process engineers want to correlate valve behavior with downstream pressure changes. Reliability teams want to track slow degradation trends across dozens of instruments. Operations managers want shift performance reports generated automatically, not manually assembled from historian exports.
Real-time SQL analytics on top of SCADA data delivers all of this. Engineers write familiar SQL queries against live process data. Automated reports run as continuously maintained views. Anomalies trigger alerts in real time rather than waiting for the morning walkdown.
How Streaming SQL Solves This
The bridge between OPC-UA and RisingWave is a Kafka producer that subscribes to OPC-UA subscriptions and publishes data changes to Kafka topics. Open-source tools like node-opcua, Eclipse Milo, and Confluent's industrial connectors all support this pattern.
Once process data flows through Kafka, RisingWave takes over: sources ingest the topics, materialized views compute process KPIs, and sinks forward results to dashboards or notification systems.
Step-by-Step Tutorial
Step 1: Connect Your Data Source
OPC-UA data changes arrive in Kafka as JSON with the node ID, value, quality, and server timestamp:
CREATE SOURCE opcua_data_changes (
node_id VARCHAR, -- e.g. 'ns=2;s=Plant1.Area3.FT-101.PV'
tag_name VARCHAR,
value DOUBLE,
quality INT, -- 192 = Good (OPC-UA quality code)
server_time TIMESTAMPTZ,
source_time TIMESTAMPTZ,
unit VARCHAR,
data_type VARCHAR
) WITH (
connector = 'kafka',
topic = 'opcua-data-changes',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Also connect to the tag configuration database for engineering units and ranges:
CREATE SOURCE tag_config (
tag_name VARCHAR,
description VARCHAR,
area VARCHAR,
process_unit VARCHAR,
eu_low DOUBLE,
eu_high DOUBLE,
alarm_high DOUBLE,
alarm_low DOUBLE
) WITH (
connector = 'postgres-cdc',
hostname = 'historian-db.plant',
port = '5432',
username = 'analytics',
password = 'secret',
database.name = 'scada_config',
schema.name = 'public',
table.name = 'tag_config'
);
Step 2: Build the Real-Time View
Create a live process snapshot showing the current value of every tag, enriched with engineering context:
CREATE MATERIALIZED VIEW process_snapshot AS
SELECT DISTINCT ON (d.tag_name)
d.tag_name,
t.description,
t.area,
t.process_unit,
d.value,
t.unit,
d.quality,
(d.value - t.eu_low) / NULLIF(t.eu_high - t.eu_low, 0) * 100 AS pct_of_range,
d.source_time AS last_updated
FROM opcua_data_changes d
LEFT JOIN tag_config t USING (tag_name)
WHERE d.quality = 192 -- Good quality only
ORDER BY d.tag_name, d.source_time DESC;
Step 3: Window-Based Aggregations
Compute 1-minute statistics for each process tag to support trend analysis and control chart generation:
CREATE MATERIALIZED VIEW tag_stats_1min AS
SELECT
tag_name,
window_start,
window_end,
AVG(value) AS avg_value,
MAX(value) AS max_value,
MIN(value) AS min_value,
STDDEV(value) AS stddev_value,
COUNT(*) AS sample_count,
COUNT(*) FILTER (WHERE quality = 192) AS good_samples
FROM TUMBLE(opcua_data_changes, source_time, INTERVAL '1 MINUTE')
GROUP BY tag_name, window_start, window_end;
Compute area-level rollups for process unit dashboards:
CREATE MATERIALIZED VIEW area_summary_5min AS
SELECT
t.area,
t.process_unit,
s.window_start,
s.window_end,
COUNT(DISTINCT s.tag_name) AS active_tags,
AVG(s.avg_value) AS fleet_avg,
SUM(s.good_samples) AS total_good_samples,
SUM(s.sample_count) AS total_samples,
ROUND(
100.0 * SUM(s.good_samples) / NULLIF(SUM(s.sample_count), 0), 2
) AS data_quality_pct
FROM tag_stats_1min s
JOIN tag_config t USING (tag_name)
GROUP BY t.area, t.process_unit, s.window_start, s.window_end;
Step 4: Alerts and Sinks
Fire process alarms when tag values exceed their configured high/low limits:
CREATE MATERIALIZED VIEW process_alarms AS
SELECT
d.tag_name,
t.description,
t.area,
d.value,
t.alarm_high,
t.alarm_low,
d.source_time AS alarm_time,
CASE
WHEN d.value > t.alarm_high THEN 'HIGH'
WHEN d.value < t.alarm_low THEN 'LOW'
END AS alarm_state
FROM opcua_data_changes d
JOIN tag_config t USING (tag_name)
WHERE d.quality = 192
AND (d.value > t.alarm_high OR d.value < t.alarm_low);
CREATE SINK process_alarms_sink
FROM process_alarms
WITH (
connector = 'kafka',
topic = 'process-alarms',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| SCADA Historian | Time-Series DB + Grafana | Streaming SQL (RisingWave) | |
| Query language | Proprietary SQL / REST | InfluxQL / PromQL | Standard PostgreSQL SQL |
| Real-time latency | Seconds to minutes | Seconds | Sub-second |
| Cross-tag correlation | Limited | Tag-by-tag | Full relational SQL joins |
| Alert definition | HMI screen config | Grafana alerting | SQL WHERE clause |
| Integration with IT | Complex adapters | REST APIs | JDBC/ODBC native |
FAQ
What is the recommended OPC-UA to Kafka bridge?
The most commonly used options are Eclipse Milo (Java, open source), node-opcua with a custom Kafka producer, and industrial-grade connectors from Confluent or Prosys. The choice depends on whether you need OPC-UA security profiles, subscriptions vs. polling, and required throughput.
How do I handle OPC-UA bad quality data?
Filter on quality = 192 (OPC-UA Good quality code) in your materialized view WHERE clauses. For historian-style gap filling, use a HOP window and track good_samples / total_samples to detect quality degradation, as shown in area_summary_5min.
Can RisingWave replace the SCADA historian entirely?
RisingWave is optimized for streaming analytics and real-time query latency. For long-term time-series storage (years of raw samples), a purpose-built historian or time-series database is still appropriate. The recommended pattern is to use RisingWave for live analytics while also sinking to cold storage for historical retention.
Key Takeaways
- An OPC-UA to Kafka bridge is all that is needed to connect industrial SCADA data to RisingWave's streaming SQL engine.
- Materialized views provide a live process snapshot with sub-second freshness, enriched with tag configuration from a CDC-connected database.
- Standard SQL replaces proprietary historian query interfaces, enabling process engineers to use familiar tools.
- Process alarms defined as materialized views automatically sink to Kafka, bridging industrial OT and IT notification systems.

