A Virtual Power Plant (VPP) aggregates thousands of distributed energy resources—rooftop solar, battery storage, smart inverters, and flexible loads—into a single dispatchable entity that can respond to grid signals in seconds. RisingWave, a PostgreSQL-compatible streaming database, is the coordination layer that continuously aggregates telemetry, tracks available capacity, and triggers dispatch commands in real time.
Why Virtual Power Plants Need Streaming SQL
Traditional SCADA and DER management systems were designed for large centralized assets. A VPP operator managing 10,000 residential batteries and solar-plus-storage systems faces a fundamentally different data problem:
- High cardinality telemetry — each DER sends state-of-charge (SoC), available kW, and grid connection status every few seconds.
- Dynamic availability — a battery that was available for discharge a minute ago may now be in a mandatory charging cycle due to customer preference settings.
- Millisecond dispatch windows — frequency response and demand response programs require sub-second to 10-second dispatch execution.
- Settlement accuracy — every dispatched kWh must be metered and reconciled against the grid operator's dispatch signal.
Streaming SQL in RisingWave continuously aggregates available capacity across all DERs, detects grid signals that require dispatch, and publishes dispatch commands—all in a declarative SQL model with no custom stream processing code.
The Streaming SQL Approach
Four Kafka streams feed RisingWave: DER telemetry, grid frequency signals, demand response program dispatch instructions, and customer preference updates via Postgres CDC. Materialized views maintain a live view of fleet capacity and issue dispatch commands via a Kafka sink.
Step-by-Step Tutorial
Step 1: Data Source Setup
-- DER telemetry: battery SoC, available power, grid status
CREATE SOURCE der_telemetry (
der_id VARCHAR,
customer_id VARCHAR,
der_type VARCHAR, -- BATTERY | SOLAR_INVERTER | EV_CHARGER | SMART_LOAD
available_kw DOUBLE PRECISION, -- positive = export, negative = import
soc_pct DOUBLE PRECISION, -- battery state of charge 0-100
grid_connected BOOLEAN,
inverter_status VARCHAR, -- ONLINE | OFFLINE | FAULT | STANDBY
telemetry_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'vpp.der.telemetry',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Grid frequency and dispatch signals from ISO/grid operator
CREATE SOURCE grid_signals (
signal_id VARCHAR,
signal_type VARCHAR, -- FREQUENCY_RESPONSE | DEMAND_RESPONSE | ECONOMIC_DISPATCH
program_id VARCHAR,
required_mw DOUBLE PRECISION,
direction VARCHAR, -- INCREASE | DECREASE
response_deadline_ts TIMESTAMPTZ,
signal_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'grid.dispatch.signals',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Customer preference constraints (opt-out periods, min SoC)
CREATE SOURCE customer_preferences (
customer_id VARCHAR,
der_id VARCHAR,
min_soc_pct DOUBLE PRECISION,
max_dispatch_kw DOUBLE PRECISION,
opt_out_start TIMESTAMPTZ,
opt_out_end TIMESTAMPTZ,
updated_at TIMESTAMPTZ
) WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'risingwave',
password = 'password',
database.name = 'vpp_platform',
schema.name = 'public',
table.name = 'customer_preferences'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Latest telemetry per DER (deduplicated)
CREATE MATERIALIZED VIEW der_current_state AS
SELECT DISTINCT ON (der_id)
der_id, customer_id, der_type,
available_kw, soc_pct, grid_connected, inverter_status, telemetry_ts
FROM der_telemetry
ORDER BY der_id, telemetry_ts DESC;
-- Dispatchable capacity: only grid-connected, online DERs above minimum SoC
CREATE MATERIALIZED VIEW dispatchable_capacity AS
SELECT
d.der_id,
d.customer_id,
d.der_type,
d.available_kw,
d.soc_pct,
LEAST(d.available_kw, p.max_dispatch_kw) AS curtailed_available_kw
FROM der_current_state d
JOIN customer_preferences p ON d.der_id = p.der_id
WHERE d.grid_connected = TRUE
AND d.inverter_status = 'ONLINE'
AND d.soc_pct > p.min_soc_pct
AND (p.opt_out_end IS NULL OR NOW() NOT BETWEEN p.opt_out_start AND p.opt_out_end);
-- Fleet-level capacity summary by DER type
CREATE MATERIALIZED VIEW fleet_capacity_summary AS
SELECT
der_type,
COUNT(*) AS online_ders,
SUM(curtailed_available_kw) / 1000.0 AS available_mw,
AVG(soc_pct) AS avg_soc_pct,
MIN(soc_pct) AS min_soc_pct
FROM dispatchable_capacity
GROUP BY der_type;
-- 30-second tumbling window for capacity tracking
CREATE MATERIALIZED VIEW capacity_snapshots AS
SELECT
window_start,
window_end,
SUM(curtailed_available_kw) / 1000.0 AS total_available_mw,
COUNT(*) AS eligible_der_count,
AVG(soc_pct) AS fleet_avg_soc
FROM TUMBLE(
(SELECT dc.*, dt.telemetry_ts FROM dispatchable_capacity dc
JOIN der_current_state dt ON dc.der_id = dt.der_id),
telemetry_ts,
INTERVAL '30 SECONDS'
)
GROUP BY window_start, window_end;
Step 3: Alerting Logic
-- Dispatch instruction: match grid signal to top dispatchable DERs
-- (in production, rank by economic bid or SoC preference)
CREATE MATERIALIZED VIEW pending_dispatch_commands AS
SELECT
gs.signal_id,
gs.program_id,
gs.signal_type,
gs.required_mw,
gs.direction,
gs.response_deadline_ts,
dc.der_id,
dc.customer_id,
dc.curtailed_available_kw,
NOW() AS command_created_at
FROM grid_signals gs
CROSS JOIN LATERAL (
SELECT der_id, customer_id, curtailed_available_kw
FROM dispatchable_capacity
WHERE available_kw > 0
ORDER BY soc_pct DESC
LIMIT 100
) dc
WHERE gs.response_deadline_ts > NOW()
AND gs.direction = 'DECREASE'; -- export / discharge dispatch
-- Low fleet capacity alert
CREATE MATERIALIZED VIEW low_capacity_alerts AS
SELECT
window_start,
total_available_mw,
eligible_der_count,
NOW() AS alert_ts
FROM capacity_snapshots
WHERE total_available_mw < 5.0; -- threshold in MW
-- Sink dispatch commands to Kafka for DER management system
CREATE SINK dispatch_command_sink AS
SELECT * FROM pending_dispatch_commands
WITH (
connector = 'kafka',
topic = 'vpp.dispatch.commands',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Capability | Legacy DER Management | Custom Event-Driven Code | RisingWave VPP SQL |
| Fleet capacity aggregation | Polling (minutes) | Seconds (custom code) | Seconds (SQL) |
| Preference constraint joins | Batch nightly | Requires cache layer | CDC temporal join |
| Dispatch command latency | Minutes | Sub-second | Sub-second |
| Developer skill needed | Proprietary APIs | Java/Scala + ops | SQL |
| Settlement audit trail | Manual export | Custom logging | Iceberg sink |
FAQ
Q: Can RisingWave handle 50,000 DERs sending telemetry every 5 seconds?
A: RisingWave is designed for high-throughput streaming workloads. The DISTINCT ON pattern for der_current_state efficiently deduplicates per-device state. For very high cardinality, partition the Kafka topics by DER region and scale RisingWave horizontally.
Q: How do I handle DERs that go offline mid-dispatch?
A: The dispatchable_capacity view filters on grid_connected = TRUE AND inverter_status = 'ONLINE'. When a DER goes offline, its telemetry update propagates through the view within seconds, and the dispatch command generator automatically excludes it from subsequent instructions.
Q: How do I reconcile dispatched kWh for settlement purposes?
A: Create a sink that writes pending_dispatch_commands and subsequent telemetry confirmations to an Iceberg or JDBC sink. Join the dispatch signal to the metered response in a post-settlement materialized view to compute deviation penalties and performance payments.
Key Takeaways
- RisingWave can aggregate real-time telemetry from thousands of DERs and compute fleet-wide available capacity with sub-second latency.
- Temporal joins with CDC-backed customer preference tables enforce opt-out constraints without polling external APIs.
- Dispatch commands flow from materialized views directly to Kafka, enabling seamless integration with existing DER management systems.
- The declarative SQL model makes the VPP coordination logic auditable, maintainable, and extendable by any SQL-proficient engineer.

