Building a Smart Charging Network with Streaming SQL

Building a Smart Charging Network with Streaming SQL

A smart charging network goes beyond basic EVSE monitoring—it dynamically adjusts charging power based on grid signals, local demand constraints, and time-of-use pricing. RisingWave, a PostgreSQL-compatible streaming database, is the real-time coordination layer that continuously evaluates grid state, active sessions, and pricing signals to issue smart charging setpoints via OCPP.

Why Smart Charging Needs Streaming SQL

Static charging schedules and simple timer-based controls fail in a world of variable renewable generation, dynamic electricity pricing, and constrained distribution infrastructure. A true smart charging network requires:

  • Dynamic load management — adjust EVSE setpoints in real time to stay within site-level kW demand limits and avoid demand charge peaks.
  • Price-responsive charging — shift charging load toward low-price intervals and away from peak pricing periods.
  • Grid flexibility services — respond to utility demand response signals by temporarily curtailing or boosting load across the fleet.
  • Fairness and SLA enforcement — ensure every connected vehicle receives a minimum charge rate while the overall site stays within its contracted capacity.

Streaming SQL handles the continuous multi-stream evaluation that makes this possible without a specialized rules engine or custom stream processing code.

The Streaming SQL Approach

RisingWave ingests live OCPP meter values, site-level demand measurements, real-time electricity prices, and utility dispatch signals from Kafka. Materialized views compute per-site load budgets, per-session setpoints, and override conditions. Smart charging setpoints are published back to Kafka for the CPMS to execute via OCPP SetChargingProfile.

Step-by-Step Tutorial

Step 1: Data Source Setup

-- Active OCPP sessions with current power and SoC
CREATE SOURCE active_sessions (
    session_id        VARCHAR,
    charge_point_id   VARCHAR,
    connector_id      INTEGER,
    site_id           VARCHAR,
    id_tag            VARCHAR,
    rated_power_kw    DOUBLE PRECISION,  -- EVSE rated max (e.g. 22, 50, 150, 350)
    current_power_kw  DOUBLE PRECISION,
    soc_pct           DOUBLE PRECISION,
    energy_kwh        DOUBLE PRECISION,
    session_start_ts  TIMESTAMPTZ,
    sampled_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.active.sessions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Site-level demand measurement (from building energy management system)
CREATE SOURCE site_demand (
    site_id           VARCHAR,
    demand_limit_kw   DOUBLE PRECISION,   -- contracted demand cap
    current_demand_kw DOUBLE PRECISION,   -- current building + EV load
    ev_load_kw        DOUBLE PRECISION,
    measured_ts       TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'bems.site.demand',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Real-time electricity price signal (TOU tariff or spot price)
CREATE SOURCE electricity_prices (
    tariff_id         VARCHAR,
    site_id           VARCHAR,
    price_usd_kwh     DOUBLE PRECISION,
    price_tier        VARCHAR,    -- OFF_PEAK | MID_PEAK | ON_PEAK | CRITICAL_PEAK
    valid_from        TIMESTAMPTZ,
    valid_to          TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'tariff.price.signals',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Utility demand response dispatch signals
CREATE SOURCE dr_signals (
    dr_event_id       VARCHAR,
    site_id           VARCHAR,
    curtailment_pct   DOUBLE PRECISION,   -- requested load reduction %
    baseline_kw       DOUBLE PRECISION,
    event_start_ts    TIMESTAMPTZ,
    event_end_ts      TIMESTAMPTZ,
    signal_ts         TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'utility.dr.signals',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Current state per site: demand headroom for EV charging
CREATE MATERIALIZED VIEW site_ev_headroom AS
SELECT DISTINCT ON (site_id)
    site_id,
    demand_limit_kw,
    current_demand_kw,
    ev_load_kw,
    GREATEST(0, demand_limit_kw - current_demand_kw) AS available_headroom_kw,
    measured_ts
FROM site_demand
ORDER BY site_id, measured_ts DESC;

-- Active DR events (currently in effect)
CREATE MATERIALIZED VIEW active_dr_events AS
SELECT DISTINCT ON (site_id)
    dr_event_id, site_id, curtailment_pct, baseline_kw,
    event_start_ts, event_end_ts
FROM dr_signals
WHERE NOW() BETWEEN event_start_ts AND event_end_ts
ORDER BY site_id, signal_ts DESC;

-- Current price tier per site
CREATE MATERIALIZED VIEW current_price_tier AS
SELECT DISTINCT ON (site_id)
    site_id, tariff_id, price_usd_kwh, price_tier, valid_to
FROM electricity_prices
WHERE NOW() BETWEEN valid_from AND valid_to
ORDER BY site_id, valid_from DESC;

-- Per-session smart charging setpoint calculation
CREATE MATERIALIZED VIEW session_setpoints AS
WITH site_context AS (
    SELECT
        h.site_id,
        h.available_headroom_kw,
        COALESCE(dr.curtailment_pct, 0)                   AS curtailment_pct,
        COALESCE(pt.price_tier, 'OFF_PEAK')               AS price_tier,
        COALESCE(pt.price_usd_kwh, 0.10)                  AS price_usd_kwh
    FROM site_ev_headroom h
    LEFT JOIN active_dr_events dr ON h.site_id = dr.site_id
    LEFT JOIN current_price_tier pt ON h.site_id = pt.site_id
),
session_count AS (
    SELECT site_id, COUNT(*) AS active_sessions
    FROM (SELECT DISTINCT ON (session_id) site_id FROM active_sessions ORDER BY session_id, sampled_ts DESC) s
    GROUP BY site_id
)
SELECT
    a.session_id,
    a.charge_point_id,
    a.connector_id,
    a.site_id,
    a.rated_power_kw,
    a.current_power_kw,
    a.soc_pct,
    -- Compute setpoint: headroom divided equally, reduced by DR and price tier
    LEAST(
        a.rated_power_kw,
        GREATEST(
            3.0,   -- minimum guaranteed rate (kW)
            (sc.available_headroom_kw / NULLIF(cnt.active_sessions, 1))
            * (1.0 - sc.curtailment_pct / 100.0)
            * CASE sc.price_tier
                WHEN 'CRITICAL_PEAK' THEN 0.2
                WHEN 'ON_PEAK'       THEN 0.5
                WHEN 'MID_PEAK'      THEN 0.8
                ELSE                      1.0
              END
        )
    ) AS setpoint_kw,
    sc.price_tier,
    sc.curtailment_pct
FROM (SELECT DISTINCT ON (session_id) * FROM active_sessions ORDER BY session_id, sampled_ts DESC) a
JOIN site_context sc ON a.site_id = sc.site_id
JOIN session_count cnt ON a.site_id = cnt.site_id;

Step 3: Alerting Logic

-- Sessions where setpoint differs materially from current delivery (>2 kW gap)
CREATE MATERIALIZED VIEW setpoint_deviation_alerts AS
SELECT
    session_id,
    charge_point_id,
    connector_id,
    site_id,
    current_power_kw,
    setpoint_kw,
    ABS(current_power_kw - setpoint_kw) AS deviation_kw,
    price_tier,
    NOW()                               AS alert_ts,
    'SETPOINT_DEVIATION'                AS alert_type
FROM session_setpoints
WHERE ABS(current_power_kw - setpoint_kw) > 2.0;

-- Site over-demand alert
CREATE MATERIALIZED VIEW site_overdemand_alerts AS
SELECT
    site_id,
    demand_limit_kw,
    current_demand_kw,
    available_headroom_kw,
    measured_ts,
    NOW()              AS alert_ts,
    'SITE_OVER_DEMAND' AS alert_type
FROM site_ev_headroom
WHERE available_headroom_kw < 0;

-- Sink setpoints to Kafka for CPMS execution
CREATE SINK setpoint_command_sink AS
SELECT session_id, charge_point_id, connector_id, setpoint_kw, price_tier, NOW() AS command_ts
FROM session_setpoints
WITH (
    connector = 'kafka',
    topic = 'ocpp.setpoint.commands',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

CapabilityStatic ScheduleRule EngineCustom Stream CodeRisingWave SQL
Dynamic setpoint calcNoPartialYesYes
DR signal integrationNoManual rulesCustom handlerSQL join
Price-responsive logicNoConfigured rulesCustomSQL CASE
Fairness (equal split)NoNoCustomSQL expression
Developer complexityLowMediumHighLow

FAQ

Q: How quickly does a new setpoint reach the EVSE after a grid signal arrives?
A: The end-to-end latency is the sum of Kafka ingestion (~10ms), RisingWave incremental view update (~100ms), and Kafka sink delivery (~10ms)—typically under 200ms. The CPMS then sends the OCPP SetChargingProfile command over WebSocket to the charge point.

Q: What happens if a vehicle's SoC approaches 100% and the setpoint is still high?
A: The EVSE and vehicle negotiate charging current via the CCS/CHAdeMO protocol independently of the setpoint. The setpoint is a ceiling; the vehicle's BMS will taper current as SoC rises. RisingWave tracks the actual current_power_kw from MeterValues and updates the site headroom accordingly.

Q: Can I implement a bidding mechanism where drivers pay more for higher setpoints?
A: Yes. Add a bid_price_usd_kwh column to the active_sessions source and rank sessions by bid price in the setpoint calculation CTE. Allocate site headroom proportionally to bid price rather than equally. This turns the setpoint view into a continuous auction.

Key Takeaways

  • RisingWave continuously balances site demand limits, DR signals, and TOU pricing to compute per-session smart charging setpoints in real time.
  • Multi-stream SQL joins replace a complex rules engine—site demand, DR events, and price tiers are joined in a single materialized view.
  • Setpoints publish to Kafka for CPMS execution, closing the loop between grid signals and EVSE behavior in under 200ms.
  • The PostgreSQL interface lets operations teams query live setpoint state and session data directly without additional tooling.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.