How to Process OCPP Data in Real Time with RisingWave

How to Process OCPP Data in Real Time with RisingWave

OCPP—the Open Charge Point Protocol—is the lingua franca of EV charging infrastructure. Every status change, meter reading, and transaction event from a charge point flows through OCPP messages. With RisingWave, a PostgreSQL-compatible streaming database, you can ingest these messages from Kafka and query them in real time using standard SQL.

Why OCPP Data Processing Matters

A charge point management system (CPMS) handling thousands of charge points generates a continuous stream of OCPP messages: StatusNotification, MeterValues, StartTransaction, StopTransaction, Heartbeat, and more. Extracting operational value from this data stream requires:

  • Real-time session tracking — know which sessions are active, how much energy has been delivered, and whether power delivery matches expectations.
  • Connector state management — maintain a current-state model of every connector across the fleet.
  • Metering accuracy — detect metering gaps (missing MeterValues intervals), billing discrepancies, and calibration drift.
  • Heartbeat monitoring — identify charge points that have stopped communicating before drivers arrive.

Traditional CPMS databases store OCPP events in relational tables and rely on periodic batch queries for analytics. RisingWave processes the same event stream continuously, keeping all derived metrics always fresh.

The Streaming SQL Approach

OCPP messages arrive at the CPMS WebSocket handler. A lightweight forwarder publishes normalized message payloads to Kafka topics organized by message type. RisingWave ingests these topics via CREATE SOURCE and maintains continuously updated materialized views for session state, connector state, metering aggregates, and heartbeat monitoring.

Step-by-Step Tutorial

Step 1: Data Source Setup

-- OCPP Heartbeat messages (charge point keep-alive)
CREATE SOURCE ocpp_heartbeats (
    charge_point_id   VARCHAR,
    firmware_version  VARCHAR,
    network_region    VARCHAR,
    heartbeat_ts      TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.heartbeats',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- OCPP BootNotification (charge point connects / reboots)
CREATE SOURCE ocpp_boot_notifications (
    charge_point_id       VARCHAR,
    charge_point_model    VARCHAR,
    charge_point_vendor   VARCHAR,
    firmware_version      VARCHAR,
    iccid                 VARCHAR,
    imsi                  VARCHAR,
    meter_serial_number   VARCHAR,
    boot_ts               TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.boot.notifications',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- OCPP MeterValues with sampled value array (flattened to rows)
CREATE SOURCE ocpp_meter_readings (
    charge_point_id   VARCHAR,
    connector_id      INTEGER,
    transaction_id    VARCHAR,
    measurand         VARCHAR,   -- Energy.Active.Import.Register | Power.Active.Import | Current.Import | Voltage | SoC
    value             DOUBLE PRECISION,
    unit              VARCHAR,   -- kWh | W | A | V | Percent
    context           VARCHAR,   -- Sample.Periodic | Transaction.Begin | Transaction.End | Trigger
    phase             VARCHAR,   -- L1 | L2 | L3 | NULL (single-phase)
    sampled_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.meter.readings',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- OCPP StartTransaction / StopTransaction (combined schema)
CREATE SOURCE ocpp_transaction_events (
    event_type        VARCHAR,   -- START | STOP
    transaction_id    VARCHAR,
    charge_point_id   VARCHAR,
    connector_id      INTEGER,
    id_tag            VARCHAR,
    meter_start_wh    BIGINT,    -- Wh at session start
    meter_stop_wh     BIGINT,    -- Wh at session end (NULL for START events)
    stop_reason       VARCHAR,
    event_ts          TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.transaction.events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Last heartbeat per charge point (for offline detection)
CREATE MATERIALIZED VIEW last_heartbeat AS
SELECT DISTINCT ON (charge_point_id)
    charge_point_id,
    heartbeat_ts,
    EXTRACT(EPOCH FROM (NOW() - heartbeat_ts)) / 60.0 AS minutes_since_heartbeat
FROM ocpp_heartbeats
ORDER BY charge_point_id, heartbeat_ts DESC;

-- Charge point registry (latest firmware version)
CREATE MATERIALIZED VIEW charge_point_registry AS
SELECT DISTINCT ON (charge_point_id)
    charge_point_id,
    charge_point_model,
    charge_point_vendor,
    firmware_version,
    boot_ts AS last_boot_ts
FROM ocpp_boot_notifications
ORDER BY charge_point_id, boot_ts DESC;

-- Active sessions with cumulative energy (Energy.Active.Import.Register)
CREATE MATERIALIZED VIEW active_session_energy AS
SELECT DISTINCT ON (transaction_id)
    charge_point_id,
    connector_id,
    transaction_id,
    value           AS cumulative_energy_kwh,
    sampled_ts
FROM ocpp_meter_readings
WHERE measurand = 'Energy.Active.Import.Register'
  AND context   = 'Sample.Periodic'
ORDER BY transaction_id, sampled_ts DESC;

-- Instantaneous power per session
CREATE MATERIALIZED VIEW active_session_power AS
SELECT DISTINCT ON (transaction_id)
    charge_point_id,
    connector_id,
    transaction_id,
    value / 1000.0  AS power_kw,   -- convert W to kW
    sampled_ts
FROM ocpp_meter_readings
WHERE measurand = 'Power.Active.Import'
  AND context   = 'Sample.Periodic'
ORDER BY transaction_id, sampled_ts DESC;

-- Session open/close (join START and STOP events)
CREATE MATERIALIZED VIEW session_lifecycle AS
SELECT
    s.transaction_id,
    s.charge_point_id,
    s.connector_id,
    s.id_tag,
    s.meter_start_wh,
    s.event_ts                       AS start_ts,
    e.meter_stop_wh,
    e.event_ts                       AS stop_ts,
    (e.meter_stop_wh - s.meter_start_wh) / 1000.0 AS energy_delivered_kwh,
    e.stop_reason
FROM (SELECT * FROM ocpp_transaction_events WHERE event_type = 'START') s
LEFT JOIN (SELECT * FROM ocpp_transaction_events WHERE event_type = 'STOP')  e
    ON s.transaction_id = e.transaction_id;

-- 15-minute metering aggregates per charge point (for billing reconciliation)
CREATE MATERIALIZED VIEW metering_aggregates_15min AS
SELECT
    charge_point_id,
    connector_id,
    window_start,
    window_end,
    MAX(value) - MIN(value)   AS interval_energy_kwh,
    AVG(value)                AS avg_energy_register,
    COUNT(*)                  AS reading_count
FROM TUMBLE(
    (SELECT * FROM ocpp_meter_readings
     WHERE measurand = 'Energy.Active.Import.Register'),
    sampled_ts,
    INTERVAL '15 MINUTES'
)
GROUP BY charge_point_id, connector_id, window_start, window_end;

Step 3: Alerting Logic

-- Offline alert: no heartbeat for more than 5 minutes
CREATE MATERIALIZED VIEW offline_charge_point_alerts AS
SELECT
    lh.charge_point_id,
    lh.heartbeat_ts,
    lh.minutes_since_heartbeat,
    cp.charge_point_model,
    cp.firmware_version,
    NOW()             AS alert_ts,
    'CHARGE_POINT_OFFLINE' AS alert_type
FROM last_heartbeat lh
JOIN charge_point_registry cp ON lh.charge_point_id = cp.charge_point_id
WHERE lh.minutes_since_heartbeat > 5;

-- Metering gap: fewer than expected readings in a 15-minute window
CREATE MATERIALIZED VIEW metering_gap_alerts AS
SELECT
    charge_point_id,
    connector_id,
    window_start,
    window_end,
    reading_count,
    NOW()             AS alert_ts,
    'METERING_GAP'    AS alert_type
FROM metering_aggregates_15min
WHERE reading_count < 3;  -- expect ~5 readings per 15 min at 3-min interval

-- Sink all OCPP alerts to Kafka
CREATE SINK ocpp_alert_sink AS
SELECT charge_point_id, NULL::INTEGER AS connector_id,
       minutes_since_heartbeat::VARCHAR AS detail, alert_ts, alert_type
FROM offline_charge_point_alerts
UNION ALL
SELECT charge_point_id, connector_id,
       reading_count::VARCHAR, alert_ts, alert_type
FROM metering_gap_alerts
WITH (
    connector = 'kafka',
    topic = 'ocpp.processing.alerts',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

OCPP Processing ApproachReal-Time StateBilling ReconciliationHeartbeat MonitoringSQL Queries
CPMS relational DB (batch queries)Stale (poll interval)Yes (batch)Cron jobYes
TimescaleDB + hypertablesNear real-timePartialScheduledYes
Custom Kafka consumerReal-timeCustom codeCustom codeNo
RisingWave streaming SQLReal-timeMaterialized viewSQL joinYes

FAQ

Q: What is the difference between processing OCPP 1.6 and OCPP 2.0.1 in RisingWave?
A: OCPP 2.0.1 introduces a richer event model (TransactionEvent replaces separate Start/StopTransaction) and additional measurands. The CPMS normalizes both versions to the canonical source schema before publishing to Kafka. The RisingWave DDL and views remain the same regardless of OCPP version.

Q: How do I handle MeterValues with multiple sampled values in a single message?
A: The CPMS expands each sampled value into a separate row before publishing to Kafka, using the measurand, value, unit, and phase columns. This row-per-measurand model is idiomatic for SQL aggregation and eliminates the need for JSON array unnesting in RisingWave.

Q: Can RisingWave handle the volume from 10,000 charge points sending MeterValues every 60 seconds?
A: 10,000 charge points × 1 message/min = ~167 messages/second. RisingWave handles millions of messages per second. The architecture scales by adding Kafka partitions and scaling RisingWave compute nodes horizontally. The DISTINCT ON deduplication pattern is efficient for high-cardinality device streams.

Key Takeaways

  • RisingWave can ingest and process all major OCPP message types—Heartbeat, BootNotification, MeterValues, and TransactionEvents—via Kafka sources.
  • Materialized views maintain always-fresh session energy, connector state, and heartbeat status without polling.
  • Billing reconciliation and metering gap detection are expressed as standard SQL window aggregations.
  • The PostgreSQL-compatible interface enables direct integration with CPMS dashboards, billing systems, and BI tools.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.