Building an energy management system (EMS) with streaming SQL means using a PostgreSQL-compatible streaming database as the real-time data layer—continuously ingesting grid telemetry, maintaining live state estimates, and surfacing actionable operator intelligence—without the complex custom software stacks that traditional EMS platforms require.
Why This Matters for Energy Operators
Traditional Energy Management Systems are monolithic, vendor-locked platforms built on proprietary databases and closed APIs. They are effective for the grid conditions of the 1990s and 2000s but increasingly strained by:
- Higher data rates: Phasor Measurement Units and distributed sensors generate orders of magnitude more data than RTU polling
- New resource types: Distributed generation, battery storage, and flexible loads that change state far more frequently than conventional generators
- Integration demands: ISO/RTO interfaces, market systems, weather feeds, and forecasting platforms that need to exchange data with the EMS continuously
- Analytics expectations: Operations teams who expect near-real-time KPIs, not daily reports
A streaming SQL layer does not replace the EMS—it modernizes the data infrastructure beneath it, enabling real-time visibility and analytics that the EMS core cannot provide on its own.
How Streaming SQL Works for Energy Data
RisingWave, a PostgreSQL-compatible streaming database, sits between the data acquisition layer (SCADA, PMU, AMI) and the applications layer (operator displays, optimization engines, market systems). It continuously ingests telemetry, maintains live state aggregations as materialized views, and serves data to consuming applications over standard Postgres connections. The result: an always-fresh, queryable representation of grid state.
Building the System: Step by Step
Step 1: Connect the Data Source
Ingest SCADA measurements and PMU synchrophasor data:
-- SCADA analog measurements
CREATE SOURCE scada_analogs (
rtu_id VARCHAR,
measurement_id VARCHAR,
bus_id VARCHAR,
zone_id VARCHAR,
meas_type VARCHAR, -- 'MW', 'MVAR', 'KV', 'AMP', 'HZ'
value DOUBLE PRECISION,
quality_flag INTEGER,
ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'scada.analogs',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Unit dispatch signals and responses
CREATE SOURCE unit_dispatch (
unit_id VARCHAR,
zone_id VARCHAR,
dispatch_mw DOUBLE PRECISION,
actual_mw DOUBLE PRECISION,
agc_mode VARCHAR,
dispatch_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ems.dispatch',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build Real-Time Aggregations
Compute area control error (ACE), generation-load balance, and reserve margins:
-- Area-level generation-load balance
CREATE MATERIALIZED VIEW area_power_balance AS
SELECT
zone_id,
window_end,
SUM(CASE WHEN meas_type = 'MW' AND value > 0 THEN value ELSE 0 END) AS generation_mw,
SUM(CASE WHEN meas_type = 'MW' AND value < 0 THEN ABS(value) ELSE 0 END) AS load_mw,
SUM(CASE WHEN meas_type = 'MW' THEN value ELSE 0 END) AS net_mw,
AVG(CASE WHEN meas_type = 'HZ' THEN value END) AS avg_frequency_hz
FROM TUMBLE(
(SELECT * FROM scada_analogs WHERE quality_flag = 0),
ts,
INTERVAL '10' SECOND
)
GROUP BY zone_id, window_end;
-- Unit dispatch tracking: scheduled vs. actual
CREATE MATERIALIZED VIEW dispatch_tracking AS
SELECT
unit_id,
zone_id,
window_end,
AVG(dispatch_mw) AS scheduled_mw,
AVG(actual_mw) AS actual_mw,
AVG(actual_mw) - AVG(dispatch_mw) AS deviation_mw,
ABS(AVG(actual_mw) - AVG(dispatch_mw)) AS abs_deviation_mw
FROM TUMBLE(unit_dispatch, dispatch_ts, INTERVAL '1' MINUTE)
GROUP BY unit_id, zone_id, window_end;
Step 3: Detect Anomalies and Generate Alerts
Generate ACE alerts, frequency deviation warnings, and unit non-compliance signals:
CREATE MATERIALIZED VIEW ems_alerts AS
SELECT
b.zone_id,
b.window_end AS alert_time,
b.net_mw,
b.avg_frequency_hz,
d.unit_id,
d.deviation_mw,
CASE
WHEN ABS(b.net_mw) > 50 THEN 'HIGH_ACE'
WHEN b.avg_frequency_hz < 59.95
OR b.avg_frequency_hz > 60.05 THEN 'FREQUENCY_DEVIATION'
WHEN d.abs_deviation_mw > 20 THEN 'DISPATCH_NON_COMPLIANCE'
ELSE NULL
END AS alert_type,
CASE
WHEN ABS(b.net_mw) > 100 THEN 'CRITICAL'
WHEN ABS(b.net_mw) > 50 THEN 'WARNING'
WHEN d.abs_deviation_mw > 20 THEN 'WARNING'
ELSE 'NORMAL'
END AS severity
FROM area_power_balance b
LEFT JOIN dispatch_tracking d
ON b.zone_id = d.zone_id
AND b.window_end = d.window_end
WHERE
ABS(b.net_mw) > 50
OR b.avg_frequency_hz < 59.95
OR b.avg_frequency_hz > 60.05
OR d.abs_deviation_mw > 20;
Step 4: Integrate with SCADA/EMS Downstream
Send EMS alerts to the AGC system and operator notification platform:
CREATE SINK ems_alert_sink
FROM ems_alerts
WITH (
connector = 'kafka',
topic = 'ems.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Expose current area state to optimization engine via materialized view
-- (queried directly over Postgres connection by the dispatch optimizer)
Comparison: Batch vs Streaming
| Capability | Legacy EMS Architecture | Streaming SQL Layer |
| State estimate refresh | Every 2–5 minutes | Continuous (seconds) |
| ACE and frequency visibility | EMS console only | Any Postgres-compatible client |
| Dispatch deviation tracking | Manual log review | Live materialized view |
| New KPI development | Vendor customization request | New SQL view |
| Integration with external systems | Proprietary protocols | Standard Kafka + Postgres |
| Data retention and replay | Historian (separate system) | Built-in with Kafka replay |
| Multi-zone aggregation | Custom EMS configuration | SQL GROUP BY |
FAQ
Does adding a streaming SQL layer require changes to the existing EMS software? No. The streaming SQL layer sits alongside the EMS, consuming the same data feeds via Kafka. It does not intercept or modify EMS control actions. Operator displays and analytics tools query the streaming layer for enhanced visibility; the EMS continues to handle control and state estimation independently.
How does streaming SQL complement the EMS's own state estimation (SE) function? The EMS state estimator runs a full network model to compute system state from redundant measurements. Streaming SQL complements this by providing fast, approximate aggregations useful for monitoring and alerting—not a full SE solution, but an early-warning layer that surfaces anomalies before the SE cycle completes.
Can streaming SQL replace the historian for data archival? Not directly—historians are optimized for time-series storage and retrieval of millions of tags. However, RisingWave can serve as an intermediary: computing derived time series (aggregated KPIs, deviation metrics) and storing them for medium-term querying, while raw tag data continues to flow to the historian for long-term archival.
Key Takeaways
- A streaming SQL layer modernizes EMS data infrastructure without replacing the existing EMS platform or requiring vendor customization.
- Continuously maintained materialized views for ACE, frequency, and dispatch deviation give operators faster awareness than EMS polling cycles alone.
- PostgreSQL compatibility opens EMS data to any BI tool, dashboard, or analytics application using standard database connections.
- Kafka sources and sinks integrate bidirectionally with SCADA, AGC, market systems, and external APIs.
- New KPIs, alert thresholds, and data integrations are SQL view changes, not vendor change requests.
Further reading:

