Accurate EV charging demand forecasts let station operators pre-position capacity, adjust dynamic pricing, and coordinate with grid operators before peak loads materialize. Streaming SQL in RisingWave makes it possible to feed live session data continuously into rolling demand estimates — without waiting for the next batch job.
Why EV Demand Forecasting Matters
Charging demand is bursty and location-specific. A highway corridor station may see near-zero utilization for two hours and then face a surge of ten vehicles all needing 150 kW DC fast charge sessions simultaneously. Traditional demand forecasting systems operate on daily or hourly aggregations exported to a data warehouse. By the time a forecast runs and a human reviews it, the surge has already started.
There are two layers where forecasting matters in practice:
- Operational horizon (next 15–60 minutes): How many connectors will be needed? Should the station operator activate demand-response limits with the utility? Should V2G (vehicle-to-grid) discharge begin now?
- Planning horizon (next 7–30 days): Is this station cluster approaching capacity saturation? Should a new charger be installed?
Streaming SQL addresses the operational horizon directly. By maintaining continuously updated window aggregations over recent session events, RisingWave provides the real-time inputs that a forecasting model or rule engine needs — already aggregated, already joined to external context.
The Streaming SQL Solution
The pipeline ingests OCPP session events from Kafka, computes rolling demand metrics using HOP windows, and maintains a demand-signal table that a downstream forecasting service or simple rule engine can query. The RisingWave materialized views update within milliseconds of each new event.
For stations that expose real-time occupancy feeds, those can be ingested as an additional Kafka source and joined with session data to build connector-level demand signals.
Tutorial: Building It Step by Step
Step 1: Set Up the Data Source
-- Charging session events from OCPP gateway
CREATE SOURCE ocpp_sessions (
session_id VARCHAR,
station_id VARCHAR,
connector_id INT,
connector_type VARCHAR, -- CCS | CHAdeMO | Type2
event_type VARCHAR, -- SessionStart | SessionEnd | MeterValues
energy_kwh DOUBLE PRECISION,
power_kw DOUBLE PRECISION,
soc_percent INT,
event_ts TIMESTAMPTZ,
location_lat DOUBLE PRECISION,
location_lon DOUBLE PRECISION,
tariff_id VARCHAR
)
WITH (
connector = 'kafka',
topic = 'ocpp.sessions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- External context: grid pricing signals (from utility API via Kafka)
CREATE SOURCE grid_pricing (
zone_id VARCHAR,
station_id VARCHAR,
interval_start TIMESTAMPTZ,
interval_end TIMESTAMPTZ,
price_per_kwh DOUBLE PRECISION,
grid_load_pct DOUBLE PRECISION -- grid utilization 0-100
)
WITH (
connector = 'kafka',
topic = 'grid.pricing',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build Real-Time Aggregations
-- 15-minute rolling demand signal per station
CREATE MATERIALIZED VIEW station_demand_15m AS
SELECT
station_id,
window_start,
window_end,
COUNT(DISTINCT session_id) AS active_sessions,
SUM(power_kw) AS total_power_kw,
AVG(power_kw) AS avg_power_kw,
MAX(power_kw) AS peak_power_kw,
COUNT(DISTINCT connector_type) AS connector_types_active,
COUNT(*) FILTER (WHERE connector_type = 'CCS') AS ccs_sessions,
COUNT(*) FILTER (WHERE connector_type = 'CHAdeMO') AS chademo_sessions
FROM HOP(ocpp_sessions, event_ts, INTERVAL '1 MINUTE', INTERVAL '15 MINUTES')
WHERE event_type IN ('SessionStart', 'MeterValues')
GROUP BY station_id, window_start, window_end;
-- Hourly session completion rate (measure of throughput)
CREATE MATERIALIZED VIEW station_throughput_1h AS
SELECT
station_id,
connector_type,
window_start,
window_end,
COUNT(DISTINCT session_id) AS sessions_completed,
AVG(energy_kwh) AS avg_energy_kwh,
AVG(power_kw) AS avg_peak_power_kw,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY energy_kwh) AS p95_energy_kwh
FROM TUMBLE(ocpp_sessions, event_ts, INTERVAL '1 HOUR')
WHERE event_type = 'SessionEnd'
GROUP BY station_id, connector_type, window_start, window_end;
-- Demand forecast input: join live demand with current grid price
CREATE MATERIALIZED VIEW demand_with_grid_context AS
SELECT
d.station_id,
d.window_start,
d.window_end,
d.active_sessions,
d.total_power_kw,
d.avg_power_kw,
g.price_per_kwh,
g.grid_load_pct,
-- Simple demand pressure score (used by downstream rule engine)
CASE
WHEN d.total_power_kw > 500 AND g.grid_load_pct > 80 THEN 'HIGH'
WHEN d.total_power_kw > 250 OR g.grid_load_pct > 60 THEN 'MEDIUM'
ELSE 'LOW'
END AS demand_pressure
FROM station_demand_15m d
LEFT JOIN grid_pricing g
ON d.station_id = g.station_id
AND d.window_end BETWEEN g.interval_start AND g.interval_end;
Step 3: Detect Anomalies or Generate Alerts
-- Detect demand surge: current 15-min power exceeds 2x recent baseline
CREATE MATERIALIZED VIEW demand_surge_alerts AS
WITH recent_baseline AS (
SELECT
station_id,
AVG(total_power_kw) AS baseline_power_kw
FROM station_demand_15m
WHERE window_end > NOW() - INTERVAL '2 HOURS'
GROUP BY station_id
)
SELECT
d.station_id,
d.window_start,
d.window_end,
d.total_power_kw AS current_power_kw,
b.baseline_power_kw,
ROUND(d.total_power_kw / NULLIF(b.baseline_power_kw, 0), 2) AS surge_ratio,
d.demand_pressure,
NOW() AS alert_ts
FROM demand_with_grid_context d
JOIN recent_baseline b ON d.station_id = b.station_id
WHERE d.total_power_kw > b.baseline_power_kw * 2.0
AND d.window_end = (
SELECT MAX(window_end) FROM station_demand_15m WHERE station_id = d.station_id
);
-- Sink: demand forecast signals to downstream service
CREATE SINK demand_forecast_signals
FROM demand_with_grid_context
WITH (
connector = 'kafka',
topic = 'forecasting.demand.signals',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Batch Forecasting (hourly export) | RisingWave Streaming SQL | |
| Input freshness | Stale by up to 1 hour | Updated every 1–60 seconds |
| Surge detection lag | Discovered after the fact | Sub-minute detection |
| Grid signal integration | Manual join in ETL | Native stream join |
| Connector-type granularity | Available after aggregation | Real-time per connector type |
| Complexity | Spark job + scheduler + warehouse | Single SQL layer |
| V2G trigger latency | Minutes to hours | Seconds |
FAQ
Does RisingWave include a time-series forecasting model? No. RisingWave computes the real-time aggregations that serve as inputs to a forecasting model. The model itself (a simple linear regression, ARIMA, or ML model) runs outside RisingWave — it reads the materialized views via the PostgreSQL interface and writes predictions back to a Kafka topic or PostgreSQL table.
Can I use the same pipeline for V2G scheduling decisions?
Yes. The demand_with_grid_context view exposes both real-time load and grid pricing signals. A downstream rule engine can query this view and issue V2G discharge commands via OCPP SetChargingProfile messages when demand_pressure = 'HIGH' and price_per_kwh is above a threshold.
How do I backfill historical data for baseline calculation?
Set scan.startup.mode = 'earliest' on the source and create a separate Kafka topic that contains your historical session records. RisingWave will process the historical records first, populating the baseline windows, before switching to the live stream.
Can the pipeline handle multiple stations across different time zones?
Yes. Use TIMESTAMPTZ columns (not TIMESTAMP) for all time fields. RisingWave stores and compares timestamps in UTC internally; display-layer queries can convert to local time using AT TIME ZONE.
Key Takeaways
- Rolling HOP window aggregations in RisingWave provide continuously updated demand signals that are far fresher than hourly batch exports.
- Joining live session data with grid pricing signals in a single materialized view enables demand-pressure scoring without external application code.
- Surge detection based on a ratio of current power to recent baseline catches demand spikes within 15 minutes of their onset.
- The PostgreSQL-compatible interface lets existing forecasting services read RisingWave views as if they were standard database tables.

