Building a Charging Session Analytics Platform

Building a Charging Session Analytics Platform

EV charging networks generate thousands of OCPP (Open Charge Point Protocol) events every minute. A streaming SQL analytics platform built on RisingWave lets you turn raw connector status messages and meter values into live dashboards, revenue reports, and anomaly alerts — without any custom application code.

Why Charging Session Analytics Matters

Charging station operators face a hard problem: their revenue depends on uptime, but their visibility is limited. When a CCS or CHAdeMO connector goes offline in the middle of a session, operators often learn about it from frustrated drivers rather than from their own systems.

Beyond reactive support, there are deeper business questions. Which station clusters are underutilized during peak hours? How do session durations compare across connector types? What is the average energy delivered per kWh billing increment?

Answering these questions requires processing a continuous stream of events — StatusNotification, MeterValues, StartTransaction, and StopTransaction messages — and correlating them across sessions that may span tens of minutes. Batch pipelines built on hourly warehouse exports simply cannot provide the sub-minute latency that front-line operators need.

RisingWave is a PostgreSQL-compatible streaming database that ingests from Apache Kafka and maintains incrementally updated materialized views. You write standard SQL; RisingWave handles the state, fault tolerance, and exactly-once semantics.

The Streaming SQL Solution

The architecture has three layers:

  1. Ingestion: OCPP events land in Kafka topics as JSON. RisingWave reads them as a source table.
  2. Transformation: Materialized views join, aggregate, and window the raw events into session summaries, revenue metrics, and connector health states.
  3. Delivery: A sink view pushes alerts to a downstream Kafka topic or PostgreSQL table that your dashboard and on-call tooling read.

Because RisingWave materializes the views incrementally, each new event triggers only the rows it affects — not a full recompute.

Tutorial: Building It Step by Step

Step 1: Set Up the Data Source

-- Create a Kafka source for OCPP events
CREATE SOURCE ocpp_events (
    event_id        VARCHAR,
    station_id      VARCHAR,
    connector_id    INT,
    session_id      VARCHAR,
    event_type      VARCHAR,   -- StartTransaction | StopTransaction | MeterValues | StatusNotification
    connector_type  VARCHAR,   -- CCS | CHAdeMO | Type2
    energy_kwh      DOUBLE PRECISION,
    power_kw        DOUBLE PRECISION,
    soc_percent     INT,       -- State of Charge 0-100
    status          VARCHAR,   -- Available | Charging | Faulted | Unavailable
    occurred_at     TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'ocpp.events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build Real-Time Aggregations

-- Active session view: one row per open charging session
CREATE MATERIALIZED VIEW active_sessions AS
SELECT
    session_id,
    station_id,
    connector_id,
    connector_type,
    MIN(occurred_at)                          AS session_start,
    MAX(energy_kwh)                           AS energy_delivered_kwh,
    MAX(power_kw)                             AS current_power_kw,
    MAX(soc_percent)                          AS latest_soc,
    EXTRACT(EPOCH FROM (NOW() - MIN(occurred_at))) / 60 AS duration_minutes
FROM ocpp_events
WHERE event_type IN ('StartTransaction', 'MeterValues')
  AND session_id NOT IN (
      SELECT session_id FROM ocpp_events WHERE event_type = 'StopTransaction'
  )
GROUP BY session_id, station_id, connector_id, connector_type;

-- Hourly station revenue aggregation (tumbling window)
CREATE MATERIALIZED VIEW station_revenue_hourly AS
SELECT
    station_id,
    connector_type,
    window_start,
    window_end,
    COUNT(DISTINCT session_id)                AS sessions_completed,
    SUM(energy_kwh)                           AS total_energy_kwh,
    AVG(energy_kwh)                           AS avg_energy_per_session_kwh,
    AVG(EXTRACT(EPOCH FROM (window_end - session_start)) / 60) AS avg_duration_minutes
FROM TUMBLE(
    (
        SELECT
            session_id,
            station_id,
            connector_type,
            MIN(occurred_at) AS session_start,
            MAX(energy_kwh)  AS energy_kwh,
            MAX(occurred_at) AS session_end
        FROM ocpp_events
        GROUP BY session_id, station_id, connector_type
    ),
    session_end,
    INTERVAL '1 HOUR'
)
GROUP BY station_id, connector_type, window_start, window_end;

-- Connector availability rate over 15-minute sliding window
CREATE MATERIALIZED VIEW connector_availability AS
SELECT
    station_id,
    connector_id,
    connector_type,
    window_start,
    window_end,
    COUNT(*) FILTER (WHERE status = 'Available' OR status = 'Charging') AS healthy_events,
    COUNT(*) AS total_events,
    ROUND(
        COUNT(*) FILTER (WHERE status = 'Available' OR status = 'Charging') * 100.0 / COUNT(*),
        2
    ) AS availability_pct
FROM HOP(ocpp_events, occurred_at, INTERVAL '1 MINUTE', INTERVAL '15 MINUTES')
WHERE event_type = 'StatusNotification'
GROUP BY station_id, connector_id, connector_type, window_start, window_end;

Step 3: Detect Anomalies or Generate Alerts

-- Detect sessions with no meter value updates for more than 10 minutes
CREATE MATERIALIZED VIEW stalled_sessions AS
SELECT
    session_id,
    station_id,
    connector_id,
    connector_type,
    MAX(occurred_at)                                   AS last_event_at,
    EXTRACT(EPOCH FROM (NOW() - MAX(occurred_at))) / 60 AS minutes_since_last_event,
    MAX(energy_kwh)                                    AS last_energy_kwh
FROM ocpp_events
WHERE event_type IN ('StartTransaction', 'MeterValues')
GROUP BY session_id, station_id, connector_id, connector_type
HAVING EXTRACT(EPOCH FROM (NOW() - MAX(occurred_at))) / 60 > 10
   AND session_id NOT IN (
       SELECT session_id FROM ocpp_events WHERE event_type = 'StopTransaction'
   );

-- Sink: push Faulted connector alerts to Kafka
CREATE SINK connector_fault_alerts
FROM (
    SELECT
        station_id,
        connector_id,
        connector_type,
        status,
        occurred_at,
        'CONNECTOR_FAULT' AS alert_type
    FROM ocpp_events
    WHERE event_type = 'StatusNotification'
      AND status = 'Faulted'
)
WITH (
    connector = 'kafka',
    topic = 'alerts.connector_faults',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

Traditional Batch PipelineRisingWave Streaming SQL
Latency to dashboard1–60 minutes (batch interval)Sub-second
Session state trackingRe-scan entire table each runIncremental state in memory
Schema changesRedeploy ETL jobALTER MATERIALIZED VIEW
Fault recoveryManual rerun or checkpointingBuilt-in exactly-once
Query languagePython + custom aggregationStandard SQL
Connector supportKafka consumer codeNative CREATE SOURCE

FAQ

Does RisingWave support OCPP 2.0 and 2.0.1 message formats? RisingWave does not parse OCPP natively — your Kafka producer or a lightweight transformation layer serializes OCPP messages to JSON or Avro before they arrive in the topic. RisingWave then reads that JSON.

How do I handle sessions that span across hourly window boundaries? Use a HOP (sliding) window instead of TUMBLE, or store open sessions in a separate active_sessions view keyed by session_id. Complete the session row only when StopTransaction arrives.

Can I query the materialized views from Grafana? Yes. RisingWave exposes a PostgreSQL-compatible wire protocol on port 4566. Point any Grafana PostgreSQL data source at RisingWave and query the views directly.

What happens if the Kafka topic falls behind? RisingWave tracks consumer offsets. If it restarts, it resumes from the last committed offset and reprocesses any missed events, updating affected views incrementally.

Key Takeaways

  • OCPP events from CCS, CHAdeMO, and Type2 connectors can be ingested directly from Kafka using CREATE SOURCE in RisingWave.
  • Materialized views with TUMBLE and HOP window functions replace custom aggregation code and deliver sub-second dashboard latency.
  • A CREATE SINK statement routes fault alerts to downstream Kafka topics without any application glue code.
  • The PostgreSQL-compatible interface means existing BI tools connect to RisingWave without modification.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.