OCPP—the Open Charge Point Protocol—is the lingua franca of EV charging infrastructure. Every status change, meter reading, and transaction event from a charge point flows through OCPP messages. With RisingWave, a PostgreSQL-compatible streaming database, you can ingest these messages from Kafka and query them in real time using standard SQL.
Why OCPP Data Processing Matters
A charge point management system (CPMS) handling thousands of charge points generates a continuous stream of OCPP messages: StatusNotification, MeterValues, StartTransaction, StopTransaction, Heartbeat, and more. Extracting operational value from this data stream requires:
- Real-time session tracking — know which sessions are active, how much energy has been delivered, and whether power delivery matches expectations.
- Connector state management — maintain a current-state model of every connector across the fleet.
- Metering accuracy — detect metering gaps (missing MeterValues intervals), billing discrepancies, and calibration drift.
- Heartbeat monitoring — identify charge points that have stopped communicating before drivers arrive.
Traditional CPMS databases store OCPP events in relational tables and rely on periodic batch queries for analytics. RisingWave processes the same event stream continuously, keeping all derived metrics always fresh.
The Streaming SQL Approach
OCPP messages arrive at the CPMS WebSocket handler. A lightweight forwarder publishes normalized message payloads to Kafka topics organized by message type. RisingWave ingests these topics via CREATE SOURCE and maintains continuously updated materialized views for session state, connector state, metering aggregates, and heartbeat monitoring.
Step-by-Step Tutorial
Step 1: Data Source Setup
-- OCPP Heartbeat messages (charge point keep-alive)
CREATE SOURCE ocpp_heartbeats (
charge_point_id VARCHAR,
firmware_version VARCHAR,
network_region VARCHAR,
heartbeat_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ocpp.heartbeats',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- OCPP BootNotification (charge point connects / reboots)
CREATE SOURCE ocpp_boot_notifications (
charge_point_id VARCHAR,
charge_point_model VARCHAR,
charge_point_vendor VARCHAR,
firmware_version VARCHAR,
iccid VARCHAR,
imsi VARCHAR,
meter_serial_number VARCHAR,
boot_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ocpp.boot.notifications',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- OCPP MeterValues with sampled value array (flattened to rows)
CREATE SOURCE ocpp_meter_readings (
charge_point_id VARCHAR,
connector_id INTEGER,
transaction_id VARCHAR,
measurand VARCHAR, -- Energy.Active.Import.Register | Power.Active.Import | Current.Import | Voltage | SoC
value DOUBLE PRECISION,
unit VARCHAR, -- kWh | W | A | V | Percent
context VARCHAR, -- Sample.Periodic | Transaction.Begin | Transaction.End | Trigger
phase VARCHAR, -- L1 | L2 | L3 | NULL (single-phase)
sampled_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ocpp.meter.readings',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- OCPP StartTransaction / StopTransaction (combined schema)
CREATE SOURCE ocpp_transaction_events (
event_type VARCHAR, -- START | STOP
transaction_id VARCHAR,
charge_point_id VARCHAR,
connector_id INTEGER,
id_tag VARCHAR,
meter_start_wh BIGINT, -- Wh at session start
meter_stop_wh BIGINT, -- Wh at session end (NULL for START events)
stop_reason VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ocpp.transaction.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Last heartbeat per charge point (for offline detection)
CREATE MATERIALIZED VIEW last_heartbeat AS
SELECT DISTINCT ON (charge_point_id)
charge_point_id,
heartbeat_ts,
EXTRACT(EPOCH FROM (NOW() - heartbeat_ts)) / 60.0 AS minutes_since_heartbeat
FROM ocpp_heartbeats
ORDER BY charge_point_id, heartbeat_ts DESC;
-- Charge point registry (latest firmware version)
CREATE MATERIALIZED VIEW charge_point_registry AS
SELECT DISTINCT ON (charge_point_id)
charge_point_id,
charge_point_model,
charge_point_vendor,
firmware_version,
boot_ts AS last_boot_ts
FROM ocpp_boot_notifications
ORDER BY charge_point_id, boot_ts DESC;
-- Active sessions with cumulative energy (Energy.Active.Import.Register)
CREATE MATERIALIZED VIEW active_session_energy AS
SELECT DISTINCT ON (transaction_id)
charge_point_id,
connector_id,
transaction_id,
value AS cumulative_energy_kwh,
sampled_ts
FROM ocpp_meter_readings
WHERE measurand = 'Energy.Active.Import.Register'
AND context = 'Sample.Periodic'
ORDER BY transaction_id, sampled_ts DESC;
-- Instantaneous power per session
CREATE MATERIALIZED VIEW active_session_power AS
SELECT DISTINCT ON (transaction_id)
charge_point_id,
connector_id,
transaction_id,
value / 1000.0 AS power_kw, -- convert W to kW
sampled_ts
FROM ocpp_meter_readings
WHERE measurand = 'Power.Active.Import'
AND context = 'Sample.Periodic'
ORDER BY transaction_id, sampled_ts DESC;
-- Session open/close (join START and STOP events)
CREATE MATERIALIZED VIEW session_lifecycle AS
SELECT
s.transaction_id,
s.charge_point_id,
s.connector_id,
s.id_tag,
s.meter_start_wh,
s.event_ts AS start_ts,
e.meter_stop_wh,
e.event_ts AS stop_ts,
(e.meter_stop_wh - s.meter_start_wh) / 1000.0 AS energy_delivered_kwh,
e.stop_reason
FROM (SELECT * FROM ocpp_transaction_events WHERE event_type = 'START') s
LEFT JOIN (SELECT * FROM ocpp_transaction_events WHERE event_type = 'STOP') e
ON s.transaction_id = e.transaction_id;
-- 15-minute metering aggregates per charge point (for billing reconciliation)
CREATE MATERIALIZED VIEW metering_aggregates_15min AS
SELECT
charge_point_id,
connector_id,
window_start,
window_end,
MAX(value) - MIN(value) AS interval_energy_kwh,
AVG(value) AS avg_energy_register,
COUNT(*) AS reading_count
FROM TUMBLE(
(SELECT * FROM ocpp_meter_readings
WHERE measurand = 'Energy.Active.Import.Register'),
sampled_ts,
INTERVAL '15 MINUTES'
)
GROUP BY charge_point_id, connector_id, window_start, window_end;
Step 3: Alerting Logic
-- Offline alert: no heartbeat for more than 5 minutes
CREATE MATERIALIZED VIEW offline_charge_point_alerts AS
SELECT
lh.charge_point_id,
lh.heartbeat_ts,
lh.minutes_since_heartbeat,
cp.charge_point_model,
cp.firmware_version,
NOW() AS alert_ts,
'CHARGE_POINT_OFFLINE' AS alert_type
FROM last_heartbeat lh
JOIN charge_point_registry cp ON lh.charge_point_id = cp.charge_point_id
WHERE lh.minutes_since_heartbeat > 5;
-- Metering gap: fewer than expected readings in a 15-minute window
CREATE MATERIALIZED VIEW metering_gap_alerts AS
SELECT
charge_point_id,
connector_id,
window_start,
window_end,
reading_count,
NOW() AS alert_ts,
'METERING_GAP' AS alert_type
FROM metering_aggregates_15min
WHERE reading_count < 3; -- expect ~5 readings per 15 min at 3-min interval
-- Sink all OCPP alerts to Kafka
CREATE SINK ocpp_alert_sink AS
SELECT charge_point_id, NULL::INTEGER AS connector_id,
minutes_since_heartbeat::VARCHAR AS detail, alert_ts, alert_type
FROM offline_charge_point_alerts
UNION ALL
SELECT charge_point_id, connector_id,
reading_count::VARCHAR, alert_ts, alert_type
FROM metering_gap_alerts
WITH (
connector = 'kafka',
topic = 'ocpp.processing.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| OCPP Processing Approach | Real-Time State | Billing Reconciliation | Heartbeat Monitoring | SQL Queries |
| CPMS relational DB (batch queries) | Stale (poll interval) | Yes (batch) | Cron job | Yes |
| TimescaleDB + hypertables | Near real-time | Partial | Scheduled | Yes |
| Custom Kafka consumer | Real-time | Custom code | Custom code | No |
| RisingWave streaming SQL | Real-time | Materialized view | SQL join | Yes |
FAQ
Q: What is the difference between processing OCPP 1.6 and OCPP 2.0.1 in RisingWave?
A: OCPP 2.0.1 introduces a richer event model (TransactionEvent replaces separate Start/StopTransaction) and additional measurands. The CPMS normalizes both versions to the canonical source schema before publishing to Kafka. The RisingWave DDL and views remain the same regardless of OCPP version.
Q: How do I handle MeterValues with multiple sampled values in a single message?
A: The CPMS expands each sampled value into a separate row before publishing to Kafka, using the measurand, value, unit, and phase columns. This row-per-measurand model is idiomatic for SQL aggregation and eliminates the need for JSON array unnesting in RisingWave.
Q: Can RisingWave handle the volume from 10,000 charge points sending MeterValues every 60 seconds?
A: 10,000 charge points × 1 message/min = ~167 messages/second. RisingWave handles millions of messages per second. The architecture scales by adding Kafka partitions and scaling RisingWave compute nodes horizontally. The DISTINCT ON deduplication pattern is efficient for high-cardinality device streams.
Key Takeaways
- RisingWave can ingest and process all major OCPP message types—Heartbeat, BootNotification, MeterValues, and TransactionEvents—via Kafka sources.
- Materialized views maintain always-fresh session energy, connector state, and heartbeat status without polling.
- Billing reconciliation and metering gap detection are expressed as standard SQL window aggregations.
- The PostgreSQL-compatible interface enables direct integration with CPMS dashboards, billing systems, and BI tools.

