Building a Virtual Power Plant with Streaming SQL

Building a Virtual Power Plant with Streaming SQL

A Virtual Power Plant (VPP) aggregates thousands of distributed energy resources—rooftop solar, battery storage, smart inverters, and flexible loads—into a single dispatchable entity that can respond to grid signals in seconds. RisingWave, a PostgreSQL-compatible streaming database, is the coordination layer that continuously aggregates telemetry, tracks available capacity, and triggers dispatch commands in real time.

Why Virtual Power Plants Need Streaming SQL

Traditional SCADA and DER management systems were designed for large centralized assets. A VPP operator managing 10,000 residential batteries and solar-plus-storage systems faces a fundamentally different data problem:

  • High cardinality telemetry — each DER sends state-of-charge (SoC), available kW, and grid connection status every few seconds.
  • Dynamic availability — a battery that was available for discharge a minute ago may now be in a mandatory charging cycle due to customer preference settings.
  • Millisecond dispatch windows — frequency response and demand response programs require sub-second to 10-second dispatch execution.
  • Settlement accuracy — every dispatched kWh must be metered and reconciled against the grid operator's dispatch signal.

Streaming SQL in RisingWave continuously aggregates available capacity across all DERs, detects grid signals that require dispatch, and publishes dispatch commands—all in a declarative SQL model with no custom stream processing code.

The Streaming SQL Approach

Four Kafka streams feed RisingWave: DER telemetry, grid frequency signals, demand response program dispatch instructions, and customer preference updates via Postgres CDC. Materialized views maintain a live view of fleet capacity and issue dispatch commands via a Kafka sink.

Step-by-Step Tutorial

Step 1: Data Source Setup

-- DER telemetry: battery SoC, available power, grid status
CREATE SOURCE der_telemetry (
    der_id            VARCHAR,
    customer_id       VARCHAR,
    der_type          VARCHAR,   -- BATTERY | SOLAR_INVERTER | EV_CHARGER | SMART_LOAD
    available_kw      DOUBLE PRECISION,   -- positive = export, negative = import
    soc_pct           DOUBLE PRECISION,   -- battery state of charge 0-100
    grid_connected    BOOLEAN,
    inverter_status   VARCHAR,            -- ONLINE | OFFLINE | FAULT | STANDBY
    telemetry_ts      TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'vpp.der.telemetry',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Grid frequency and dispatch signals from ISO/grid operator
CREATE SOURCE grid_signals (
    signal_id         VARCHAR,
    signal_type       VARCHAR,   -- FREQUENCY_RESPONSE | DEMAND_RESPONSE | ECONOMIC_DISPATCH
    program_id        VARCHAR,
    required_mw       DOUBLE PRECISION,
    direction         VARCHAR,   -- INCREASE | DECREASE
    response_deadline_ts TIMESTAMPTZ,
    signal_ts         TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'grid.dispatch.signals',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Customer preference constraints (opt-out periods, min SoC)
CREATE SOURCE customer_preferences (
    customer_id       VARCHAR,
    der_id            VARCHAR,
    min_soc_pct       DOUBLE PRECISION,
    max_dispatch_kw   DOUBLE PRECISION,
    opt_out_start     TIMESTAMPTZ,
    opt_out_end       TIMESTAMPTZ,
    updated_at        TIMESTAMPTZ
) WITH (
    connector = 'postgres-cdc',
    hostname = 'postgres',
    port = '5432',
    username = 'risingwave',
    password = 'password',
    database.name = 'vpp_platform',
    schema.name = 'public',
    table.name = 'customer_preferences'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Latest telemetry per DER (deduplicated)
CREATE MATERIALIZED VIEW der_current_state AS
SELECT DISTINCT ON (der_id)
    der_id, customer_id, der_type,
    available_kw, soc_pct, grid_connected, inverter_status, telemetry_ts
FROM der_telemetry
ORDER BY der_id, telemetry_ts DESC;

-- Dispatchable capacity: only grid-connected, online DERs above minimum SoC
CREATE MATERIALIZED VIEW dispatchable_capacity AS
SELECT
    d.der_id,
    d.customer_id,
    d.der_type,
    d.available_kw,
    d.soc_pct,
    LEAST(d.available_kw, p.max_dispatch_kw)  AS curtailed_available_kw
FROM der_current_state d
JOIN customer_preferences p ON d.der_id = p.der_id
WHERE d.grid_connected = TRUE
  AND d.inverter_status = 'ONLINE'
  AND d.soc_pct > p.min_soc_pct
  AND (p.opt_out_end IS NULL OR NOW() NOT BETWEEN p.opt_out_start AND p.opt_out_end);

-- Fleet-level capacity summary by DER type
CREATE MATERIALIZED VIEW fleet_capacity_summary AS
SELECT
    der_type,
    COUNT(*)                              AS online_ders,
    SUM(curtailed_available_kw) / 1000.0  AS available_mw,
    AVG(soc_pct)                          AS avg_soc_pct,
    MIN(soc_pct)                          AS min_soc_pct
FROM dispatchable_capacity
GROUP BY der_type;

-- 30-second tumbling window for capacity tracking
CREATE MATERIALIZED VIEW capacity_snapshots AS
SELECT
    window_start,
    window_end,
    SUM(curtailed_available_kw) / 1000.0  AS total_available_mw,
    COUNT(*)                               AS eligible_der_count,
    AVG(soc_pct)                           AS fleet_avg_soc
FROM TUMBLE(
    (SELECT dc.*, dt.telemetry_ts FROM dispatchable_capacity dc
     JOIN der_current_state dt ON dc.der_id = dt.der_id),
    telemetry_ts,
    INTERVAL '30 SECONDS'
)
GROUP BY window_start, window_end;

Step 3: Alerting Logic

-- Dispatch instruction: match grid signal to top dispatchable DERs
-- (in production, rank by economic bid or SoC preference)
CREATE MATERIALIZED VIEW pending_dispatch_commands AS
SELECT
    gs.signal_id,
    gs.program_id,
    gs.signal_type,
    gs.required_mw,
    gs.direction,
    gs.response_deadline_ts,
    dc.der_id,
    dc.customer_id,
    dc.curtailed_available_kw,
    NOW() AS command_created_at
FROM grid_signals gs
CROSS JOIN LATERAL (
    SELECT der_id, customer_id, curtailed_available_kw
    FROM dispatchable_capacity
    WHERE available_kw > 0
    ORDER BY soc_pct DESC
    LIMIT 100
) dc
WHERE gs.response_deadline_ts > NOW()
  AND gs.direction = 'DECREASE';  -- export / discharge dispatch

-- Low fleet capacity alert
CREATE MATERIALIZED VIEW low_capacity_alerts AS
SELECT
    window_start,
    total_available_mw,
    eligible_der_count,
    NOW() AS alert_ts
FROM capacity_snapshots
WHERE total_available_mw < 5.0;  -- threshold in MW

-- Sink dispatch commands to Kafka for DER management system
CREATE SINK dispatch_command_sink AS
SELECT * FROM pending_dispatch_commands
WITH (
    connector = 'kafka',
    topic = 'vpp.dispatch.commands',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

CapabilityLegacy DER ManagementCustom Event-Driven CodeRisingWave VPP SQL
Fleet capacity aggregationPolling (minutes)Seconds (custom code)Seconds (SQL)
Preference constraint joinsBatch nightlyRequires cache layerCDC temporal join
Dispatch command latencyMinutesSub-secondSub-second
Developer skill neededProprietary APIsJava/Scala + opsSQL
Settlement audit trailManual exportCustom loggingIceberg sink

FAQ

Q: Can RisingWave handle 50,000 DERs sending telemetry every 5 seconds?
A: RisingWave is designed for high-throughput streaming workloads. The DISTINCT ON pattern for der_current_state efficiently deduplicates per-device state. For very high cardinality, partition the Kafka topics by DER region and scale RisingWave horizontally.

Q: How do I handle DERs that go offline mid-dispatch?
A: The dispatchable_capacity view filters on grid_connected = TRUE AND inverter_status = 'ONLINE'. When a DER goes offline, its telemetry update propagates through the view within seconds, and the dispatch command generator automatically excludes it from subsequent instructions.

Q: How do I reconcile dispatched kWh for settlement purposes?
A: Create a sink that writes pending_dispatch_commands and subsequent telemetry confirmations to an Iceberg or JDBC sink. Join the dispatch signal to the metered response in a post-settlement materialized view to compute deviation penalties and performance payments.

Key Takeaways

  • RisingWave can aggregate real-time telemetry from thousands of DERs and compute fleet-wide available capacity with sub-second latency.
  • Temporal joins with CDC-backed customer preference tables enforce opt-out constraints without polling external APIs.
  • Dispatch commands flow from materialized views directly to Kafka, enabling seamless integration with existing DER management systems.
  • The declarative SQL model makes the VPP coordination logic auditable, maintainable, and extendable by any SQL-proficient engineer.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.