Integrating EV charging into smart grid infrastructure requires millisecond-to-second responsiveness. When the grid signals a demand-response event or a frequency deviation, the charging management system must immediately throttle, shed, or redistribute load across CCS and CHAdeMO connectors — and RisingWave streaming SQL makes that control loop achievable in pure SQL.
Why Smart Grid and EV Integration Matters
The growth of EV adoption is fast outpacing the static grid infrastructure designed for it. A single DC fast-charging plaza with twenty 150 kW CCS stations draws up to 3 MW — comparable to a small industrial facility. When multiple such plazas in a utility district all see simultaneous afternoon peaks, the distribution transformer load can exceed safe operating limits.
Grid operators address this with demand-response programs: they broadcast signals asking large flexible loads to curtail. Charging station operators who respond receive financial incentives; those who do not risk grid penalties or supply interruptions.
The technical challenge is the control loop. A demand-response signal arrives as a timestamped event. Within seconds, the charging management system must:
- Determine the current total load across all active sessions.
- Compute how much capacity must be shed to meet the demand-response target.
- Issue
SetChargingProfilecommands via OCPP to individual stations, proportionally reducing their session power limits. - Confirm that actual power draw has decreased by monitoring subsequent
MeterValuesevents.
This is a streaming pipeline, and RisingWave is the right tool for it.
The Streaming SQL Solution
RisingWave ingests OCPP MeterValues events and grid demand-response signals from Kafka. Materialized views compute real-time load totals, apply demand-response targets, and calculate per-station power allocation. A sink emits load-shedding instructions back to Kafka, where the OCPP gateway picks them up and issues SetChargingProfile commands.
Tutorial: Building It Step by Step
Step 1: Set Up the Data Source
-- Real-time power consumption from charging stations
CREATE SOURCE station_meter_values (
station_id VARCHAR,
connector_id INT,
session_id VARCHAR,
connector_type VARCHAR, -- CCS | CHAdeMO | Type2
power_kw DOUBLE PRECISION,
current_a DOUBLE PRECISION,
voltage_v DOUBLE PRECISION,
energy_kwh DOUBLE PRECISION,
soc_percent INT,
event_ts TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'ocpp.meter_values',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Grid demand-response signals from utility
CREATE SOURCE grid_dr_signals (
signal_id VARCHAR,
zone_id VARCHAR,
signal_type VARCHAR, -- CURTAIL | RESTORE | SHED
target_reduction_kw DOUBLE PRECISION,
start_ts TIMESTAMPTZ,
end_ts TIMESTAMPTZ,
priority INT, -- 1=emergency, 2=economic, 3=optional
received_ts TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'grid.demand_response',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Station-to-grid-zone mapping (static reference, loaded from earliest)
CREATE SOURCE station_grid_zones (
station_id VARCHAR,
zone_id VARCHAR,
max_station_kw DOUBLE PRECISION,
contract_max_kw DOUBLE PRECISION
)
WITH (
connector = 'kafka',
topic = 'config.station_zones',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build Real-Time Aggregations
-- Current total load per station (latest 1-minute window)
CREATE MATERIALIZED VIEW station_current_load AS
SELECT
station_id,
window_start,
window_end,
SUM(power_kw) AS total_power_kw,
COUNT(DISTINCT session_id) AS active_sessions,
COUNT(DISTINCT connector_id) AS active_connectors,
AVG(power_kw) AS avg_power_per_connector_kw,
MAX(power_kw) AS max_connector_power_kw
FROM TUMBLE(station_meter_values, event_ts, INTERVAL '1 MINUTE')
GROUP BY station_id, window_start, window_end;
-- Zone-level load aggregation
CREATE MATERIALIZED VIEW zone_load_summary AS
SELECT
z.zone_id,
window_start,
window_end,
SUM(l.total_power_kw) AS zone_total_power_kw,
SUM(z.max_station_kw) AS zone_capacity_kw,
ROUND(SUM(l.total_power_kw) / NULLIF(SUM(z.max_station_kw), 0) * 100, 2) AS utilization_pct,
COUNT(DISTINCT l.station_id) AS stations_active
FROM station_current_load l
JOIN station_grid_zones z ON l.station_id = z.station_id
GROUP BY z.zone_id, window_start, window_end;
-- Demand-response load allocation: compute per-station curtailment
CREATE MATERIALIZED VIEW dr_load_allocation AS
SELECT
s.station_id,
s.zone_id,
l.total_power_kw AS current_power_kw,
d.target_reduction_kw AS zone_reduction_target_kw,
-- Proportional curtailment: each station sheds in proportion to its load
ROUND(
l.total_power_kw /
NULLIF((
SELECT SUM(l2.total_power_kw)
FROM station_current_load l2
JOIN station_grid_zones s2 ON l2.station_id = s2.station_id
WHERE s2.zone_id = s.zone_id
AND l2.window_end = l.window_end
), 0) * d.target_reduction_kw,
2
) AS station_shed_kw,
GREATEST(
0,
l.total_power_kw - ROUND(
l.total_power_kw /
NULLIF((
SELECT SUM(l2.total_power_kw)
FROM station_current_load l2
JOIN station_grid_zones s2 ON l2.station_id = s2.station_id
WHERE s2.zone_id = s.zone_id
AND l2.window_end = l.window_end
), 0) * d.target_reduction_kw,
2
)
) AS target_power_kw,
d.signal_id,
d.priority,
d.end_ts AS dr_end_ts
FROM station_current_load l
JOIN station_grid_zones s ON l.station_id = s.station_id
JOIN grid_dr_signals d
ON s.zone_id = d.zone_id
AND NOW() BETWEEN d.start_ts AND d.end_ts
WHERE l.window_end = (
SELECT MAX(window_end) FROM station_current_load WHERE station_id = l.station_id
);
Step 3: Detect Anomalies or Generate Alerts
-- Detect overload: station exceeds contracted maximum
CREATE MATERIALIZED VIEW overload_alerts AS
SELECT
l.station_id,
l.total_power_kw,
z.contract_max_kw,
l.total_power_kw - z.contract_max_kw AS overload_kw,
l.window_end AS detected_at,
'CONTRACT_OVERLOAD' AS alert_type
FROM station_current_load l
JOIN station_grid_zones z ON l.station_id = z.station_id
WHERE l.total_power_kw > z.contract_max_kw
AND l.window_end = (
SELECT MAX(window_end) FROM station_current_load WHERE station_id = l.station_id
);
-- Sink: emit load-shedding commands to OCPP gateway
CREATE SINK load_shed_commands
FROM (
SELECT
station_id,
target_power_kw,
station_shed_kw,
signal_id,
priority,
dr_end_ts,
NOW() AS command_ts
FROM dr_load_allocation
WHERE station_shed_kw > 0
)
WITH (
connector = 'kafka',
topic = 'ocpp.commands.set_charging_profile',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Sink: overload alerts to operations team
CREATE SINK overload_alert_sink
FROM overload_alerts
WITH (
connector = 'kafka',
topic = 'alerts.grid.overload',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Manual / Rule-Based System | RisingWave Streaming SQL | |
| DR response time | Minutes (human-in-the-loop) | Seconds (automated) |
| Load visibility | Periodic polling | Continuous per-event update |
| Curtailment calculation | Hardcoded spreadsheet | Proportional SQL expression |
| Zone-level aggregation | Batch export + join | Real-time stream join |
| Alert delivery | Email or PagerDuty | Kafka sink (sub-second) |
| Multi-zone support | Separate scripts per zone | Single parameterized view |
FAQ
How fast does RisingWave react to a new demand-response signal?
RisingWave processes new events within milliseconds of their arrival in the Kafka topic. The dr_load_allocation view is a stream join; as soon as a new grid_dr_signals event is ingested, the view recomputes the affected rows. The resulting Kafka sink message to the OCPP gateway appears within seconds.
What happens when a demand-response event ends?
When the current time passes d.end_ts in grid_dr_signals, the join condition NOW() BETWEEN d.start_ts AND d.end_ts becomes false. The dr_load_allocation view stops producing rows for that signal, and the OCPP gateway can restore normal charging profiles.
Can this architecture support OpenADR signals?
Yes. OpenADR 2.0 demand-response signals can be translated to the grid_dr_signals schema by a lightweight Kafka producer. The SQL pipeline does not care about the upstream protocol.
How do I handle CCS and CHAdeMO connectors with different power limits?
Add a max_connector_power_kw column to the station_grid_zones reference table, keyed by (station_id, connector_type). Adjust the curtailment calculation to respect per-connector-type limits when computing target_power_kw.
Key Takeaways
- RisingWave enables closed-loop demand-response by joining live OCPP meter values with utility grid signals in a single materialized view.
- Proportional load curtailment calculations run entirely in SQL — no custom application code required.
- A Kafka sink delivers
SetChargingProfilecommands to the OCPP gateway within seconds of a demand-response signal arriving. - The PostgreSQL-compatible interface allows network operations center dashboards to query real-time zone utilization and DR status without any additional API layer.

