Building an Energy Management System with Streaming SQL

Building an Energy Management System with Streaming SQL

Building an energy management system (EMS) with streaming SQL means using a PostgreSQL-compatible streaming database as the real-time data layer—continuously ingesting grid telemetry, maintaining live state estimates, and surfacing actionable operator intelligence—without the complex custom software stacks that traditional EMS platforms require.

Why This Matters for Energy Operators

Traditional Energy Management Systems are monolithic, vendor-locked platforms built on proprietary databases and closed APIs. They are effective for the grid conditions of the 1990s and 2000s but increasingly strained by:

  • Higher data rates: Phasor Measurement Units and distributed sensors generate orders of magnitude more data than RTU polling
  • New resource types: Distributed generation, battery storage, and flexible loads that change state far more frequently than conventional generators
  • Integration demands: ISO/RTO interfaces, market systems, weather feeds, and forecasting platforms that need to exchange data with the EMS continuously
  • Analytics expectations: Operations teams who expect near-real-time KPIs, not daily reports

A streaming SQL layer does not replace the EMS—it modernizes the data infrastructure beneath it, enabling real-time visibility and analytics that the EMS core cannot provide on its own.

How Streaming SQL Works for Energy Data

RisingWave, a PostgreSQL-compatible streaming database, sits between the data acquisition layer (SCADA, PMU, AMI) and the applications layer (operator displays, optimization engines, market systems). It continuously ingests telemetry, maintains live state aggregations as materialized views, and serves data to consuming applications over standard Postgres connections. The result: an always-fresh, queryable representation of grid state.

Building the System: Step by Step

Step 1: Connect the Data Source

Ingest SCADA measurements and PMU synchrophasor data:

-- SCADA analog measurements
CREATE SOURCE scada_analogs (
    rtu_id          VARCHAR,
    measurement_id  VARCHAR,
    bus_id          VARCHAR,
    zone_id         VARCHAR,
    meas_type       VARCHAR,   -- 'MW', 'MVAR', 'KV', 'AMP', 'HZ'
    value           DOUBLE PRECISION,
    quality_flag    INTEGER,
    ts              TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'scada.analogs',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Unit dispatch signals and responses
CREATE SOURCE unit_dispatch (
    unit_id         VARCHAR,
    zone_id         VARCHAR,
    dispatch_mw     DOUBLE PRECISION,
    actual_mw       DOUBLE PRECISION,
    agc_mode        VARCHAR,
    dispatch_ts     TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'ems.dispatch',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build Real-Time Aggregations

Compute area control error (ACE), generation-load balance, and reserve margins:

-- Area-level generation-load balance
CREATE MATERIALIZED VIEW area_power_balance AS
SELECT
    zone_id,
    window_end,
    SUM(CASE WHEN meas_type = 'MW' AND value > 0 THEN value ELSE 0 END) AS generation_mw,
    SUM(CASE WHEN meas_type = 'MW' AND value < 0 THEN ABS(value) ELSE 0 END) AS load_mw,
    SUM(CASE WHEN meas_type = 'MW' THEN value ELSE 0 END) AS net_mw,
    AVG(CASE WHEN meas_type = 'HZ' THEN value END) AS avg_frequency_hz
FROM TUMBLE(
    (SELECT * FROM scada_analogs WHERE quality_flag = 0),
    ts,
    INTERVAL '10' SECOND
)
GROUP BY zone_id, window_end;

-- Unit dispatch tracking: scheduled vs. actual
CREATE MATERIALIZED VIEW dispatch_tracking AS
SELECT
    unit_id,
    zone_id,
    window_end,
    AVG(dispatch_mw)                          AS scheduled_mw,
    AVG(actual_mw)                            AS actual_mw,
    AVG(actual_mw) - AVG(dispatch_mw)         AS deviation_mw,
    ABS(AVG(actual_mw) - AVG(dispatch_mw))    AS abs_deviation_mw
FROM TUMBLE(unit_dispatch, dispatch_ts, INTERVAL '1' MINUTE)
GROUP BY unit_id, zone_id, window_end;

Step 3: Detect Anomalies and Generate Alerts

Generate ACE alerts, frequency deviation warnings, and unit non-compliance signals:

CREATE MATERIALIZED VIEW ems_alerts AS
SELECT
    b.zone_id,
    b.window_end       AS alert_time,
    b.net_mw,
    b.avg_frequency_hz,
    d.unit_id,
    d.deviation_mw,
    CASE
        WHEN ABS(b.net_mw) > 50             THEN 'HIGH_ACE'
        WHEN b.avg_frequency_hz < 59.95
          OR b.avg_frequency_hz > 60.05     THEN 'FREQUENCY_DEVIATION'
        WHEN d.abs_deviation_mw > 20        THEN 'DISPATCH_NON_COMPLIANCE'
        ELSE NULL
    END AS alert_type,
    CASE
        WHEN ABS(b.net_mw) > 100            THEN 'CRITICAL'
        WHEN ABS(b.net_mw) > 50             THEN 'WARNING'
        WHEN d.abs_deviation_mw > 20        THEN 'WARNING'
        ELSE 'NORMAL'
    END AS severity
FROM area_power_balance b
LEFT JOIN dispatch_tracking d
    ON b.zone_id = d.zone_id
    AND b.window_end = d.window_end
WHERE
    ABS(b.net_mw) > 50
    OR b.avg_frequency_hz < 59.95
    OR b.avg_frequency_hz > 60.05
    OR d.abs_deviation_mw > 20;

Step 4: Integrate with SCADA/EMS Downstream

Send EMS alerts to the AGC system and operator notification platform:

CREATE SINK ems_alert_sink
FROM ems_alerts
WITH (
    connector = 'kafka',
    topic     = 'ems.alerts',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Expose current area state to optimization engine via materialized view
-- (queried directly over Postgres connection by the dispatch optimizer)

Comparison: Batch vs Streaming

CapabilityLegacy EMS ArchitectureStreaming SQL Layer
State estimate refreshEvery 2–5 minutesContinuous (seconds)
ACE and frequency visibilityEMS console onlyAny Postgres-compatible client
Dispatch deviation trackingManual log reviewLive materialized view
New KPI developmentVendor customization requestNew SQL view
Integration with external systemsProprietary protocolsStandard Kafka + Postgres
Data retention and replayHistorian (separate system)Built-in with Kafka replay
Multi-zone aggregationCustom EMS configurationSQL GROUP BY

FAQ

Does adding a streaming SQL layer require changes to the existing EMS software? No. The streaming SQL layer sits alongside the EMS, consuming the same data feeds via Kafka. It does not intercept or modify EMS control actions. Operator displays and analytics tools query the streaming layer for enhanced visibility; the EMS continues to handle control and state estimation independently.

How does streaming SQL complement the EMS's own state estimation (SE) function? The EMS state estimator runs a full network model to compute system state from redundant measurements. Streaming SQL complements this by providing fast, approximate aggregations useful for monitoring and alerting—not a full SE solution, but an early-warning layer that surfaces anomalies before the SE cycle completes.

Can streaming SQL replace the historian for data archival? Not directly—historians are optimized for time-series storage and retrieval of millions of tags. However, RisingWave can serve as an intermediary: computing derived time series (aggregated KPIs, deviation metrics) and storing them for medium-term querying, while raw tag data continues to flow to the historian for long-term archival.

Key Takeaways

  • A streaming SQL layer modernizes EMS data infrastructure without replacing the existing EMS platform or requiring vendor customization.
  • Continuously maintained materialized views for ACE, frequency, and dispatch deviation give operators faster awareness than EMS polling cycles alone.
  • PostgreSQL compatibility opens EMS data to any BI tool, dashboard, or analytics application using standard database connections.
  • Kafka sources and sinks integrate bidirectionally with SCADA, AGC, market systems, and external APIs.
  • New KPIs, alert thresholds, and data integrations are SQL view changes, not vendor change requests.

Further reading:

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.