EV Fleet Charging Optimization: A Real-Time Approach

EV Fleet Charging Optimization: A Real-Time Approach

Commercial EV fleet operators—logistics companies, transit agencies, ride-hail platforms—face a complex optimization problem every shift: charge the right vehicles at the right time, at the lowest cost, while ensuring every vehicle is ready for its next dispatch. RisingWave, a PostgreSQL-compatible streaming database, continuously evaluates vehicle state, energy prices, and dispatch schedules to produce real-time charging recommendations.

Why Fleet Charging Optimization Matters

A fleet of 200 electric delivery vans charging simultaneously during the afternoon peak can add hundreds of thousands of dollars per year in demand charges and peak energy costs. Conversely, under-charging vehicles leads to range anxiety, mid-route failures, and expensive emergency tows.

The optimization problem has several interconnected dimensions:

  • Vehicle readiness — each vehicle has a next-dispatch time and a minimum required SoC to complete the route.
  • Energy cost — TOU tariffs and real-time spot prices reward shifting load to low-cost periods.
  • Site capacity — the depot's electrical service capacity limits total simultaneous charging power.
  • Battery health — avoiding sustained high-rate charging at high SoC extends battery life.

Streaming SQL in RisingWave evaluates all these constraints continuously as vehicle telemetry, price signals, and dispatch schedules change.

The Streaming SQL Approach

Three Kafka streams feed RisingWave: vehicle telematics (SoC, plug status, location), OCPP charging session data, and fleet dispatch schedules from the transportation management system (TMS). Electricity prices arrive via a fourth stream. Materialized views compute time-to-charge, cost-optimal charging windows, and per-vehicle priority scores. Recommendations publish to Kafka for the fleet management system.

Step-by-Step Tutorial

Step 1: Data Source Setup

-- Vehicle telematics: SoC, plug status, location
CREATE SOURCE vehicle_telematics (
    vehicle_id        VARCHAR,
    vin               VARCHAR,
    fleet_id          VARCHAR,
    depot_id          VARCHAR,
    soc_pct           DOUBLE PRECISION,
    plug_status       VARCHAR,   -- PLUGGED | UNPLUGGED | CHARGING | FAULT
    battery_capacity_kwh DOUBLE PRECISION,
    max_charge_rate_kw   DOUBLE PRECISION,
    odometer_km       DOUBLE PRECISION,
    telemetry_ts      TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'fleet.vehicle.telematics',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Fleet dispatch schedule from TMS
CREATE SOURCE dispatch_schedule (
    vehicle_id        VARCHAR,
    fleet_id          VARCHAR,
    route_id          VARCHAR,
    dispatch_ts       TIMESTAMPTZ,         -- when vehicle must depart
    estimated_kwh_required DOUBLE PRECISION,  -- energy needed for route
    required_min_soc_pct   DOUBLE PRECISION,  -- minimum SoC at departure
    updated_at        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'fleet.dispatch.schedule',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Active charging sessions (from OCPP CPMS)
CREATE SOURCE fleet_charging_sessions (
    session_id        VARCHAR,
    vehicle_id        VARCHAR,
    charge_point_id   VARCHAR,
    depot_id          VARCHAR,
    current_power_kw  DOUBLE PRECISION,
    energy_delivered_kwh DOUBLE PRECISION,
    session_start_ts  TIMESTAMPTZ,
    sampled_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'fleet.charging.sessions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- TOU electricity price for depot
CREATE SOURCE depot_electricity_price (
    depot_id          VARCHAR,
    price_usd_kwh     DOUBLE PRECISION,
    price_tier        VARCHAR,
    valid_from        TIMESTAMPTZ,
    valid_to          TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'tariff.depot.price',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Latest vehicle state
CREATE MATERIALIZED VIEW vehicle_current_state AS
SELECT DISTINCT ON (vehicle_id)
    vehicle_id, vin, fleet_id, depot_id,
    soc_pct, plug_status,
    battery_capacity_kwh, max_charge_rate_kw,
    telemetry_ts
FROM vehicle_telematics
ORDER BY vehicle_id, telemetry_ts DESC;

-- Next dispatch per vehicle
CREATE MATERIALIZED VIEW next_dispatch AS
SELECT DISTINCT ON (vehicle_id)
    vehicle_id, fleet_id, route_id,
    dispatch_ts, estimated_kwh_required, required_min_soc_pct
FROM dispatch_schedule
WHERE dispatch_ts > NOW()
ORDER BY vehicle_id, dispatch_ts ASC;

-- Current depot price
CREATE MATERIALIZED VIEW current_depot_price AS
SELECT DISTINCT ON (depot_id)
    depot_id, price_usd_kwh, price_tier
FROM depot_electricity_price
WHERE NOW() BETWEEN valid_from AND valid_to
ORDER BY depot_id, valid_from DESC;

-- Vehicle charging readiness and priority score
CREATE MATERIALIZED VIEW vehicle_charging_priority AS
SELECT
    v.vehicle_id,
    v.depot_id,
    v.soc_pct,
    v.plug_status,
    v.battery_capacity_kwh,
    v.max_charge_rate_kw,
    nd.dispatch_ts,
    nd.required_min_soc_pct,
    nd.estimated_kwh_required,
    EXTRACT(EPOCH FROM (nd.dispatch_ts - NOW())) / 3600.0  AS hours_until_dispatch,
    -- kWh needed to reach required SoC
    GREATEST(0, (nd.required_min_soc_pct - v.soc_pct) / 100.0
             * v.battery_capacity_kwh)                     AS kwh_needed,
    -- Estimated hours to charge at max rate
    CASE WHEN v.max_charge_rate_kw > 0
         THEN GREATEST(0, (nd.required_min_soc_pct - v.soc_pct) / 100.0
                       * v.battery_capacity_kwh) / v.max_charge_rate_kw
         ELSE NULL
    END                                                    AS hours_to_charge,
    -- Priority: higher = more urgent (low SoC + imminent dispatch)
    ((nd.required_min_soc_pct - v.soc_pct) / 100.0)
    / NULLIF(EXTRACT(EPOCH FROM (nd.dispatch_ts - NOW())) / 3600.0, 0)
                                                           AS urgency_score,
    p.price_tier,
    p.price_usd_kwh
FROM vehicle_current_state v
LEFT JOIN next_dispatch nd ON v.vehicle_id = nd.vehicle_id
LEFT JOIN current_depot_price p ON v.depot_id = p.depot_id
WHERE v.plug_status IN ('PLUGGED', 'CHARGING');

Step 3: Alerting Logic

-- Critical alert: vehicle cannot reach required SoC before dispatch
CREATE MATERIALIZED VIEW charging_risk_alerts AS
SELECT
    vehicle_id,
    depot_id,
    soc_pct,
    required_min_soc_pct,
    kwh_needed,
    hours_to_charge,
    hours_until_dispatch,
    dispatch_ts,
    NOW()                    AS alert_ts,
    'INSUFFICIENT_CHARGE_TIME' AS alert_type
FROM vehicle_charging_priority
WHERE hours_to_charge > hours_until_dispatch
  AND hours_until_dispatch < 4;   -- alert when departure is within 4 hours

-- Charging recommendation: prioritized charge start/stop guidance
CREATE MATERIALIZED VIEW charging_recommendations AS
SELECT
    vehicle_id,
    depot_id,
    soc_pct,
    kwh_needed,
    urgency_score,
    price_tier,
    price_usd_kwh,
    CASE
        WHEN urgency_score > 0.5 THEN 'START_CHARGING_IMMEDIATELY'
        WHEN price_tier = 'OFF_PEAK' THEN 'START_CHARGING_NOW'
        WHEN price_tier = 'ON_PEAK' AND urgency_score < 0.1 THEN 'DEFER_TO_OFF_PEAK'
        WHEN price_tier = 'CRITICAL_PEAK' AND urgency_score < 0.3 THEN 'PAUSE_IF_POSSIBLE'
        ELSE 'CONTINUE_CHARGING'
    END AS recommendation,
    NOW() AS computed_at
FROM vehicle_charging_priority;

-- Sink recommendations to Kafka for fleet management system
CREATE SINK charging_recommendation_sink AS
SELECT * FROM charging_recommendations
WITH (
    connector = 'kafka',
    topic = 'fleet.charging.recommendations',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

ApproachSoC VisibilityCost OptimizationDispatch-AwareReal-Time
Manual depot rulesLimitedStatic TOUNoNo
CPMS built-in schedulerPer stationBasic TOUNoPartial
Batch optimizer (hourly)GoodFull TOUYesStale
RisingWave streaming SQLLiveReal-time TOUYesYes

FAQ

Q: How do I account for vehicles that return from a route with varying SoC due to traffic or climate conditions?
A: The vehicle_telematics source captures soc_pct at every telemetry interval. When a vehicle returns and the SoC is lower than expected, the vehicle_charging_priority view immediately recalculates kwh_needed and hours_to_charge, escalating the urgency score and potentially triggering a CHARGING_RISK_ALERT.

Q: Can I incorporate utility demand charge optimization (15-minute peak demand)?
A: Yes. Add a site_demand source tracking total depot load and compute a 15-minute HOP window to project demand peaks. Join this to the charging recommendation view and suppress high-power charging when the projected 15-minute peak approaches the demand threshold.

Q: How do I handle vehicles from different manufacturers with different SoC reporting formats?
A: Normalize telemetry to the canonical vehicle_telematics schema in a preprocessing layer before publishing to Kafka. Telematics normalization adapters (often built with Kafka Streams or a lightweight service) handle manufacturer-specific quirks before data reaches RisingWave.

Key Takeaways

  • RisingWave continuously evaluates vehicle SoC, dispatch schedules, and electricity prices to produce live charging priority scores and recommendations.
  • Urgency scoring in SQL replaces a complex optimization solver for most practical fleet charging scenarios.
  • Risk alerts fire the moment a vehicle's charging trajectory makes it impossible to reach the required departure SoC.
  • The system integrates with existing fleet management and CPMS platforms via Kafka sinks, requiring no changes to operational tooling.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.