EV Demand Forecasting with Streaming SQL

EV Demand Forecasting with Streaming SQL

Accurate EV charging demand forecasts let station operators pre-position capacity, adjust dynamic pricing, and coordinate with grid operators before peak loads materialize. Streaming SQL in RisingWave makes it possible to feed live session data continuously into rolling demand estimates — without waiting for the next batch job.

Why EV Demand Forecasting Matters

Charging demand is bursty and location-specific. A highway corridor station may see near-zero utilization for two hours and then face a surge of ten vehicles all needing 150 kW DC fast charge sessions simultaneously. Traditional demand forecasting systems operate on daily or hourly aggregations exported to a data warehouse. By the time a forecast runs and a human reviews it, the surge has already started.

There are two layers where forecasting matters in practice:

  1. Operational horizon (next 15–60 minutes): How many connectors will be needed? Should the station operator activate demand-response limits with the utility? Should V2G (vehicle-to-grid) discharge begin now?
  2. Planning horizon (next 7–30 days): Is this station cluster approaching capacity saturation? Should a new charger be installed?

Streaming SQL addresses the operational horizon directly. By maintaining continuously updated window aggregations over recent session events, RisingWave provides the real-time inputs that a forecasting model or rule engine needs — already aggregated, already joined to external context.

The Streaming SQL Solution

The pipeline ingests OCPP session events from Kafka, computes rolling demand metrics using HOP windows, and maintains a demand-signal table that a downstream forecasting service or simple rule engine can query. The RisingWave materialized views update within milliseconds of each new event.

For stations that expose real-time occupancy feeds, those can be ingested as an additional Kafka source and joined with session data to build connector-level demand signals.

Tutorial: Building It Step by Step

Step 1: Set Up the Data Source

-- Charging session events from OCPP gateway
CREATE SOURCE ocpp_sessions (
    session_id      VARCHAR,
    station_id      VARCHAR,
    connector_id    INT,
    connector_type  VARCHAR,   -- CCS | CHAdeMO | Type2
    event_type      VARCHAR,   -- SessionStart | SessionEnd | MeterValues
    energy_kwh      DOUBLE PRECISION,
    power_kw        DOUBLE PRECISION,
    soc_percent     INT,
    event_ts        TIMESTAMPTZ,
    location_lat    DOUBLE PRECISION,
    location_lon    DOUBLE PRECISION,
    tariff_id       VARCHAR
)
WITH (
    connector = 'kafka',
    topic = 'ocpp.sessions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- External context: grid pricing signals (from utility API via Kafka)
CREATE SOURCE grid_pricing (
    zone_id         VARCHAR,
    station_id      VARCHAR,
    interval_start  TIMESTAMPTZ,
    interval_end    TIMESTAMPTZ,
    price_per_kwh   DOUBLE PRECISION,
    grid_load_pct   DOUBLE PRECISION   -- grid utilization 0-100
)
WITH (
    connector = 'kafka',
    topic = 'grid.pricing',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build Real-Time Aggregations

-- 15-minute rolling demand signal per station
CREATE MATERIALIZED VIEW station_demand_15m AS
SELECT
    station_id,
    window_start,
    window_end,
    COUNT(DISTINCT session_id)          AS active_sessions,
    SUM(power_kw)                       AS total_power_kw,
    AVG(power_kw)                       AS avg_power_kw,
    MAX(power_kw)                       AS peak_power_kw,
    COUNT(DISTINCT connector_type)      AS connector_types_active,
    COUNT(*) FILTER (WHERE connector_type = 'CCS')     AS ccs_sessions,
    COUNT(*) FILTER (WHERE connector_type = 'CHAdeMO') AS chademo_sessions
FROM HOP(ocpp_sessions, event_ts, INTERVAL '1 MINUTE', INTERVAL '15 MINUTES')
WHERE event_type IN ('SessionStart', 'MeterValues')
GROUP BY station_id, window_start, window_end;

-- Hourly session completion rate (measure of throughput)
CREATE MATERIALIZED VIEW station_throughput_1h AS
SELECT
    station_id,
    connector_type,
    window_start,
    window_end,
    COUNT(DISTINCT session_id)          AS sessions_completed,
    AVG(energy_kwh)                     AS avg_energy_kwh,
    AVG(power_kw)                       AS avg_peak_power_kw,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY energy_kwh) AS p95_energy_kwh
FROM TUMBLE(ocpp_sessions, event_ts, INTERVAL '1 HOUR')
WHERE event_type = 'SessionEnd'
GROUP BY station_id, connector_type, window_start, window_end;

-- Demand forecast input: join live demand with current grid price
CREATE MATERIALIZED VIEW demand_with_grid_context AS
SELECT
    d.station_id,
    d.window_start,
    d.window_end,
    d.active_sessions,
    d.total_power_kw,
    d.avg_power_kw,
    g.price_per_kwh,
    g.grid_load_pct,
    -- Simple demand pressure score (used by downstream rule engine)
    CASE
        WHEN d.total_power_kw > 500 AND g.grid_load_pct > 80 THEN 'HIGH'
        WHEN d.total_power_kw > 250 OR g.grid_load_pct > 60  THEN 'MEDIUM'
        ELSE 'LOW'
    END AS demand_pressure
FROM station_demand_15m d
LEFT JOIN grid_pricing g
    ON d.station_id = g.station_id
    AND d.window_end BETWEEN g.interval_start AND g.interval_end;

Step 3: Detect Anomalies or Generate Alerts

-- Detect demand surge: current 15-min power exceeds 2x recent baseline
CREATE MATERIALIZED VIEW demand_surge_alerts AS
WITH recent_baseline AS (
    SELECT
        station_id,
        AVG(total_power_kw) AS baseline_power_kw
    FROM station_demand_15m
    WHERE window_end > NOW() - INTERVAL '2 HOURS'
    GROUP BY station_id
)
SELECT
    d.station_id,
    d.window_start,
    d.window_end,
    d.total_power_kw       AS current_power_kw,
    b.baseline_power_kw,
    ROUND(d.total_power_kw / NULLIF(b.baseline_power_kw, 0), 2) AS surge_ratio,
    d.demand_pressure,
    NOW()                  AS alert_ts
FROM demand_with_grid_context d
JOIN recent_baseline b ON d.station_id = b.station_id
WHERE d.total_power_kw > b.baseline_power_kw * 2.0
  AND d.window_end = (
      SELECT MAX(window_end) FROM station_demand_15m WHERE station_id = d.station_id
  );

-- Sink: demand forecast signals to downstream service
CREATE SINK demand_forecast_signals
FROM demand_with_grid_context
WITH (
    connector = 'kafka',
    topic = 'forecasting.demand.signals',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

Batch Forecasting (hourly export)RisingWave Streaming SQL
Input freshnessStale by up to 1 hourUpdated every 1–60 seconds
Surge detection lagDiscovered after the factSub-minute detection
Grid signal integrationManual join in ETLNative stream join
Connector-type granularityAvailable after aggregationReal-time per connector type
ComplexitySpark job + scheduler + warehouseSingle SQL layer
V2G trigger latencyMinutes to hoursSeconds

FAQ

Does RisingWave include a time-series forecasting model? No. RisingWave computes the real-time aggregations that serve as inputs to a forecasting model. The model itself (a simple linear regression, ARIMA, or ML model) runs outside RisingWave — it reads the materialized views via the PostgreSQL interface and writes predictions back to a Kafka topic or PostgreSQL table.

Can I use the same pipeline for V2G scheduling decisions? Yes. The demand_with_grid_context view exposes both real-time load and grid pricing signals. A downstream rule engine can query this view and issue V2G discharge commands via OCPP SetChargingProfile messages when demand_pressure = 'HIGH' and price_per_kwh is above a threshold.

How do I backfill historical data for baseline calculation? Set scan.startup.mode = 'earliest' on the source and create a separate Kafka topic that contains your historical session records. RisingWave will process the historical records first, populating the baseline windows, before switching to the live stream.

Can the pipeline handle multiple stations across different time zones? Yes. Use TIMESTAMPTZ columns (not TIMESTAMP) for all time fields. RisingWave stores and compares timestamps in UTC internally; display-layer queries can convert to local time using AT TIME ZONE.

Key Takeaways

  • Rolling HOP window aggregations in RisingWave provide continuously updated demand signals that are far fresher than hourly batch exports.
  • Joining live session data with grid pricing signals in a single materialized view enables demand-pressure scoring without external application code.
  • Surge detection based on a ratio of current power to recent baseline catches demand spikes within 15 minutes of their onset.
  • The PostgreSQL-compatible interface lets existing forecasting services read RisingWave views as if they were standard database tables.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.