Commercial EV fleet operators—logistics companies, transit agencies, ride-hail platforms—face a complex optimization problem every shift: charge the right vehicles at the right time, at the lowest cost, while ensuring every vehicle is ready for its next dispatch. RisingWave, a PostgreSQL-compatible streaming database, continuously evaluates vehicle state, energy prices, and dispatch schedules to produce real-time charging recommendations.
Why Fleet Charging Optimization Matters
A fleet of 200 electric delivery vans charging simultaneously during the afternoon peak can add hundreds of thousands of dollars per year in demand charges and peak energy costs. Conversely, under-charging vehicles leads to range anxiety, mid-route failures, and expensive emergency tows.
The optimization problem has several interconnected dimensions:
- Vehicle readiness — each vehicle has a next-dispatch time and a minimum required SoC to complete the route.
- Energy cost — TOU tariffs and real-time spot prices reward shifting load to low-cost periods.
- Site capacity — the depot's electrical service capacity limits total simultaneous charging power.
- Battery health — avoiding sustained high-rate charging at high SoC extends battery life.
Streaming SQL in RisingWave evaluates all these constraints continuously as vehicle telemetry, price signals, and dispatch schedules change.
The Streaming SQL Approach
Three Kafka streams feed RisingWave: vehicle telematics (SoC, plug status, location), OCPP charging session data, and fleet dispatch schedules from the transportation management system (TMS). Electricity prices arrive via a fourth stream. Materialized views compute time-to-charge, cost-optimal charging windows, and per-vehicle priority scores. Recommendations publish to Kafka for the fleet management system.
Step-by-Step Tutorial
Step 1: Data Source Setup
-- Vehicle telematics: SoC, plug status, location
CREATE SOURCE vehicle_telematics (
vehicle_id VARCHAR,
vin VARCHAR,
fleet_id VARCHAR,
depot_id VARCHAR,
soc_pct DOUBLE PRECISION,
plug_status VARCHAR, -- PLUGGED | UNPLUGGED | CHARGING | FAULT
battery_capacity_kwh DOUBLE PRECISION,
max_charge_rate_kw DOUBLE PRECISION,
odometer_km DOUBLE PRECISION,
telemetry_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'fleet.vehicle.telematics',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Fleet dispatch schedule from TMS
CREATE SOURCE dispatch_schedule (
vehicle_id VARCHAR,
fleet_id VARCHAR,
route_id VARCHAR,
dispatch_ts TIMESTAMPTZ, -- when vehicle must depart
estimated_kwh_required DOUBLE PRECISION, -- energy needed for route
required_min_soc_pct DOUBLE PRECISION, -- minimum SoC at departure
updated_at TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'fleet.dispatch.schedule',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Active charging sessions (from OCPP CPMS)
CREATE SOURCE fleet_charging_sessions (
session_id VARCHAR,
vehicle_id VARCHAR,
charge_point_id VARCHAR,
depot_id VARCHAR,
current_power_kw DOUBLE PRECISION,
energy_delivered_kwh DOUBLE PRECISION,
session_start_ts TIMESTAMPTZ,
sampled_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'fleet.charging.sessions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- TOU electricity price for depot
CREATE SOURCE depot_electricity_price (
depot_id VARCHAR,
price_usd_kwh DOUBLE PRECISION,
price_tier VARCHAR,
valid_from TIMESTAMPTZ,
valid_to TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'tariff.depot.price',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Latest vehicle state
CREATE MATERIALIZED VIEW vehicle_current_state AS
SELECT DISTINCT ON (vehicle_id)
vehicle_id, vin, fleet_id, depot_id,
soc_pct, plug_status,
battery_capacity_kwh, max_charge_rate_kw,
telemetry_ts
FROM vehicle_telematics
ORDER BY vehicle_id, telemetry_ts DESC;
-- Next dispatch per vehicle
CREATE MATERIALIZED VIEW next_dispatch AS
SELECT DISTINCT ON (vehicle_id)
vehicle_id, fleet_id, route_id,
dispatch_ts, estimated_kwh_required, required_min_soc_pct
FROM dispatch_schedule
WHERE dispatch_ts > NOW()
ORDER BY vehicle_id, dispatch_ts ASC;
-- Current depot price
CREATE MATERIALIZED VIEW current_depot_price AS
SELECT DISTINCT ON (depot_id)
depot_id, price_usd_kwh, price_tier
FROM depot_electricity_price
WHERE NOW() BETWEEN valid_from AND valid_to
ORDER BY depot_id, valid_from DESC;
-- Vehicle charging readiness and priority score
CREATE MATERIALIZED VIEW vehicle_charging_priority AS
SELECT
v.vehicle_id,
v.depot_id,
v.soc_pct,
v.plug_status,
v.battery_capacity_kwh,
v.max_charge_rate_kw,
nd.dispatch_ts,
nd.required_min_soc_pct,
nd.estimated_kwh_required,
EXTRACT(EPOCH FROM (nd.dispatch_ts - NOW())) / 3600.0 AS hours_until_dispatch,
-- kWh needed to reach required SoC
GREATEST(0, (nd.required_min_soc_pct - v.soc_pct) / 100.0
* v.battery_capacity_kwh) AS kwh_needed,
-- Estimated hours to charge at max rate
CASE WHEN v.max_charge_rate_kw > 0
THEN GREATEST(0, (nd.required_min_soc_pct - v.soc_pct) / 100.0
* v.battery_capacity_kwh) / v.max_charge_rate_kw
ELSE NULL
END AS hours_to_charge,
-- Priority: higher = more urgent (low SoC + imminent dispatch)
((nd.required_min_soc_pct - v.soc_pct) / 100.0)
/ NULLIF(EXTRACT(EPOCH FROM (nd.dispatch_ts - NOW())) / 3600.0, 0)
AS urgency_score,
p.price_tier,
p.price_usd_kwh
FROM vehicle_current_state v
LEFT JOIN next_dispatch nd ON v.vehicle_id = nd.vehicle_id
LEFT JOIN current_depot_price p ON v.depot_id = p.depot_id
WHERE v.plug_status IN ('PLUGGED', 'CHARGING');
Step 3: Alerting Logic
-- Critical alert: vehicle cannot reach required SoC before dispatch
CREATE MATERIALIZED VIEW charging_risk_alerts AS
SELECT
vehicle_id,
depot_id,
soc_pct,
required_min_soc_pct,
kwh_needed,
hours_to_charge,
hours_until_dispatch,
dispatch_ts,
NOW() AS alert_ts,
'INSUFFICIENT_CHARGE_TIME' AS alert_type
FROM vehicle_charging_priority
WHERE hours_to_charge > hours_until_dispatch
AND hours_until_dispatch < 4; -- alert when departure is within 4 hours
-- Charging recommendation: prioritized charge start/stop guidance
CREATE MATERIALIZED VIEW charging_recommendations AS
SELECT
vehicle_id,
depot_id,
soc_pct,
kwh_needed,
urgency_score,
price_tier,
price_usd_kwh,
CASE
WHEN urgency_score > 0.5 THEN 'START_CHARGING_IMMEDIATELY'
WHEN price_tier = 'OFF_PEAK' THEN 'START_CHARGING_NOW'
WHEN price_tier = 'ON_PEAK' AND urgency_score < 0.1 THEN 'DEFER_TO_OFF_PEAK'
WHEN price_tier = 'CRITICAL_PEAK' AND urgency_score < 0.3 THEN 'PAUSE_IF_POSSIBLE'
ELSE 'CONTINUE_CHARGING'
END AS recommendation,
NOW() AS computed_at
FROM vehicle_charging_priority;
-- Sink recommendations to Kafka for fleet management system
CREATE SINK charging_recommendation_sink AS
SELECT * FROM charging_recommendations
WITH (
connector = 'kafka',
topic = 'fleet.charging.recommendations',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Approach | SoC Visibility | Cost Optimization | Dispatch-Aware | Real-Time |
| Manual depot rules | Limited | Static TOU | No | No |
| CPMS built-in scheduler | Per station | Basic TOU | No | Partial |
| Batch optimizer (hourly) | Good | Full TOU | Yes | Stale |
| RisingWave streaming SQL | Live | Real-time TOU | Yes | Yes |
FAQ
Q: How do I account for vehicles that return from a route with varying SoC due to traffic or climate conditions?
A: The vehicle_telematics source captures soc_pct at every telemetry interval. When a vehicle returns and the SoC is lower than expected, the vehicle_charging_priority view immediately recalculates kwh_needed and hours_to_charge, escalating the urgency score and potentially triggering a CHARGING_RISK_ALERT.
Q: Can I incorporate utility demand charge optimization (15-minute peak demand)?
A: Yes. Add a site_demand source tracking total depot load and compute a 15-minute HOP window to project demand peaks. Join this to the charging recommendation view and suppress high-power charging when the projected 15-minute peak approaches the demand threshold.
Q: How do I handle vehicles from different manufacturers with different SoC reporting formats?
A: Normalize telemetry to the canonical vehicle_telematics schema in a preprocessing layer before publishing to Kafka. Telematics normalization adapters (often built with Kafka Streams or a lightweight service) handle manufacturer-specific quirks before data reaches RisingWave.
Key Takeaways
- RisingWave continuously evaluates vehicle SoC, dispatch schedules, and electricity prices to produce live charging priority scores and recommendations.
- Urgency scoring in SQL replaces a complex optimization solver for most practical fleet charging scenarios.
- Risk alerts fire the moment a vehicle's charging trajectory makes it impossible to reach the required departure SoC.
- The system integrates with existing fleet management and CPMS platforms via Kafka sinks, requiring no changes to operational tooling.

