Real-Time EV Charging Station Monitoring

Real-Time EV Charging Station Monitoring

Running a reliable EV charging network means knowing the state of every connector—available, charging, faulted, or offline—at all times. With RisingWave, a PostgreSQL-compatible streaming database, you can ingest OCPP status notifications and metering values in real time and maintain a live operational dashboard without polling or batch jobs.

Why Real-Time Charging Station Monitoring Matters

EV drivers expect charging stations to be available when they arrive. A faulted connector that goes undetected for 30 minutes drives driver abandonment, one-star reviews, and lost revenue. For network operators managing hundreds or thousands of stations across multiple locations, the operational challenge compounds:

  • Connector availability must be tracked per EVSE and per connector type (CCS, CHAdeMO, Type 2).
  • Session health requires monitoring energy delivery (kWh) against expected charge rates to catch soft faults—stations that appear online but deliver no energy.
  • Fault rate trends must be tracked per station model and firmware version to prioritize maintenance.

Streaming SQL in RisingWave continuously processes OCPP messages, aggregates metrics per station and region, and fires alerts when stations deviate from expected behavior—all with sub-second latency.

The Streaming SQL Approach

OCPP messages from charge point management systems (CPMS) are forwarded to Kafka. RisingWave ingests StatusNotification, MeterValues, and StartTransaction/StopTransaction messages via Kafka sources and maintains materialized views that always reflect the current state of the fleet.

Step-by-Step Tutorial

Step 1: Data Source Setup

-- OCPP StatusNotification messages (connector state changes)
CREATE SOURCE ocpp_status_notifications (
    charge_point_id   VARCHAR,
    connector_id      INTEGER,
    location_id       VARCHAR,
    network_region    VARCHAR,
    status            VARCHAR,   -- Available | Preparing | Charging | SuspendedEV | SuspendedEVSE | Finishing | Reserved | Unavailable | Faulted
    error_code        VARCHAR,   -- NoError | ConnectorLockFailure | EVCommunicationError | GroundFailure | HighTemperature | InternalError | ...
    vendor_error_code VARCHAR,
    info              VARCHAR,
    event_ts          TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.status.notifications',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- OCPP MeterValues (energy and power delivered during sessions)
CREATE SOURCE ocpp_meter_values (
    charge_point_id   VARCHAR,
    connector_id      INTEGER,
    transaction_id    VARCHAR,
    energy_kwh        DOUBLE PRECISION,   -- cumulative kWh since session start
    power_kw          DOUBLE PRECISION,   -- instantaneous power
    voltage_v         DOUBLE PRECISION,
    current_a         DOUBLE PRECISION,
    soc_pct           DOUBLE PRECISION,   -- vehicle SoC if reported
    sampled_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.meter.values',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- OCPP transaction events (session start/stop)
CREATE SOURCE ocpp_transactions (
    transaction_id    VARCHAR,
    charge_point_id   VARCHAR,
    connector_id      INTEGER,
    id_tag            VARCHAR,   -- driver RFID / app token
    start_ts          TIMESTAMPTZ,
    stop_ts           TIMESTAMPTZ,
    start_kwh         DOUBLE PRECISION,
    stop_kwh          DOUBLE PRECISION,
    stop_reason       VARCHAR    -- Local | Remote | EVDisconnected | HardReset | PowerLoss | ...
) WITH (
    connector = 'kafka',
    topic = 'ocpp.transactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Current connector status (latest StatusNotification per connector)
CREATE MATERIALIZED VIEW connector_current_status AS
SELECT DISTINCT ON (charge_point_id, connector_id)
    charge_point_id,
    connector_id,
    location_id,
    network_region,
    status,
    error_code,
    vendor_error_code,
    event_ts AS last_status_ts
FROM ocpp_status_notifications
ORDER BY charge_point_id, connector_id, event_ts DESC;

-- Network availability summary per region (5-minute tumbling window)
CREATE MATERIALIZED VIEW network_availability AS
SELECT
    network_region,
    window_start,
    window_end,
    COUNT(*) AS total_connectors,
    COUNT(*) FILTER (WHERE status = 'Available')     AS available_count,
    COUNT(*) FILTER (WHERE status = 'Charging')      AS charging_count,
    COUNT(*) FILTER (WHERE status = 'Faulted')       AS faulted_count,
    COUNT(*) FILTER (WHERE status = 'Unavailable')   AS unavailable_count,
    ROUND(100.0 * COUNT(*) FILTER (WHERE status IN ('Available','Charging','Finishing','Preparing'))
          / NULLIF(COUNT(*), 0), 2)                  AS availability_pct
FROM TUMBLE(ocpp_status_notifications, event_ts, INTERVAL '5 MINUTES')
GROUP BY network_region, window_start, window_end;

-- Active session energy delivery tracking
CREATE MATERIALIZED VIEW active_session_metrics AS
SELECT DISTINCT ON (transaction_id)
    mv.transaction_id,
    mv.charge_point_id,
    mv.connector_id,
    mv.energy_kwh,
    mv.power_kw,
    mv.soc_pct,
    mv.sampled_ts
FROM ocpp_meter_values mv
WHERE mv.transaction_id IS NOT NULL
ORDER BY transaction_id, sampled_ts DESC;

-- Session summary (completed sessions)
CREATE MATERIALIZED VIEW session_summary AS
SELECT
    t.transaction_id,
    t.charge_point_id,
    t.connector_id,
    t.id_tag,
    t.start_ts,
    t.stop_ts,
    EXTRACT(EPOCH FROM (t.stop_ts - t.start_ts)) / 60.0 AS duration_min,
    (t.stop_kwh - t.start_kwh)                           AS energy_delivered_kwh,
    t.stop_reason
FROM ocpp_transactions t
WHERE t.stop_ts IS NOT NULL;

Step 3: Alerting Logic

-- Fault alert: connector enters Faulted state
CREATE MATERIALIZED VIEW fault_alerts AS
SELECT
    charge_point_id,
    connector_id,
    location_id,
    network_region,
    error_code,
    vendor_error_code,
    last_status_ts,
    NOW()             AS alert_ts,
    'CONNECTOR_FAULT' AS alert_type
FROM connector_current_status
WHERE status = 'Faulted';

-- Soft fault: session active but power delivery near zero
CREATE MATERIALIZED VIEW low_power_session_alerts AS
SELECT
    asm.transaction_id,
    asm.charge_point_id,
    asm.connector_id,
    asm.power_kw,
    asm.energy_kwh,
    asm.sampled_ts,
    NOW()                  AS alert_ts,
    'LOW_POWER_DELIVERY'   AS alert_type
FROM active_session_metrics asm
JOIN connector_current_status cs
    ON  asm.charge_point_id = cs.charge_point_id
    AND asm.connector_id    = cs.connector_id
WHERE cs.status = 'Charging'
  AND asm.power_kw < 1.0;  -- below 1 kW while in Charging state

-- Sink all alerts to Kafka for NOC and mobile notification systems
CREATE SINK charging_alert_sink AS
SELECT charge_point_id, connector_id, location_id, error_code AS detail,
       alert_ts, alert_type
FROM fault_alerts
UNION ALL
SELECT charge_point_id, connector_id, NULL, CAST(power_kw AS VARCHAR),
       alert_ts, alert_type
FROM low_power_session_alerts
WITH (
    connector = 'kafka',
    topic = 'charging.network.alerts',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

ApproachStatus LatencySoft Fault DetectionFleet ScalabilitySQL Queries
CPMS built-in dashboardMinutes (polling)NoneVendor-limitedNo
Batch log analysisHoursPost-sessionHighYes
Custom stream consumerSecondsCustom logicHigh (ops overhead)No
RisingWave streaming SQLSecondsSQL joinHighYes

FAQ

Q: Which OCPP version does this architecture support—OCPP 1.6 or OCPP 2.0.1?
A: The Kafka topic schema is independent of OCPP version. Your CPMS translates OCPP messages to the canonical JSON schema defined in the CREATE SOURCE DDL. Both OCPP 1.6 SOAP/WebSocket and OCPP 2.0.1 JSON are supported as long as the CPMS normalizes them before publishing to Kafka.

Q: How do I track connector availability over time for SLA reporting?
A: The network_availability materialized view computes 5-minute availability percentages per region. Sink this view to a JDBC database or Iceberg table. Query it with a standard SQL AVG over the desired reporting period to compute monthly or quarterly SLA metrics.

Q: Can I query the current status of a specific charge point from my application?
A: Yes. RisingWave exposes a PostgreSQL-compatible wire protocol. Connect your application with any Postgres driver and run SELECT * FROM connector_current_status WHERE charge_point_id = 'CP_001' to get the current status in milliseconds.

Key Takeaways

  • RisingWave processes OCPP StatusNotification, MeterValues, and transaction events in real time to maintain a live view of every connector in the fleet.
  • Both hard faults (Faulted status) and soft faults (low power during an active session) are detected through SQL joins—no custom detection logic required.
  • Availability metrics, session summaries, and fault alerts are available via the PostgreSQL wire protocol for dashboards and via Kafka sinks for notification systems.
  • The architecture scales linearly with fleet size by partitioning Kafka topics and scaling RisingWave compute nodes.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.