Running a reliable EV charging network means knowing the state of every connector—available, charging, faulted, or offline—at all times. With RisingWave, a PostgreSQL-compatible streaming database, you can ingest OCPP status notifications and metering values in real time and maintain a live operational dashboard without polling or batch jobs.
Why Real-Time Charging Station Monitoring Matters
EV drivers expect charging stations to be available when they arrive. A faulted connector that goes undetected for 30 minutes drives driver abandonment, one-star reviews, and lost revenue. For network operators managing hundreds or thousands of stations across multiple locations, the operational challenge compounds:
- Connector availability must be tracked per EVSE and per connector type (CCS, CHAdeMO, Type 2).
- Session health requires monitoring energy delivery (kWh) against expected charge rates to catch soft faults—stations that appear online but deliver no energy.
- Fault rate trends must be tracked per station model and firmware version to prioritize maintenance.
Streaming SQL in RisingWave continuously processes OCPP messages, aggregates metrics per station and region, and fires alerts when stations deviate from expected behavior—all with sub-second latency.
The Streaming SQL Approach
OCPP messages from charge point management systems (CPMS) are forwarded to Kafka. RisingWave ingests StatusNotification, MeterValues, and StartTransaction/StopTransaction messages via Kafka sources and maintains materialized views that always reflect the current state of the fleet.
Step-by-Step Tutorial
Step 1: Data Source Setup
-- OCPP StatusNotification messages (connector state changes)
CREATE SOURCE ocpp_status_notifications (
charge_point_id VARCHAR,
connector_id INTEGER,
location_id VARCHAR,
network_region VARCHAR,
status VARCHAR, -- Available | Preparing | Charging | SuspendedEV | SuspendedEVSE | Finishing | Reserved | Unavailable | Faulted
error_code VARCHAR, -- NoError | ConnectorLockFailure | EVCommunicationError | GroundFailure | HighTemperature | InternalError | ...
vendor_error_code VARCHAR,
info VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ocpp.status.notifications',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- OCPP MeterValues (energy and power delivered during sessions)
CREATE SOURCE ocpp_meter_values (
charge_point_id VARCHAR,
connector_id INTEGER,
transaction_id VARCHAR,
energy_kwh DOUBLE PRECISION, -- cumulative kWh since session start
power_kw DOUBLE PRECISION, -- instantaneous power
voltage_v DOUBLE PRECISION,
current_a DOUBLE PRECISION,
soc_pct DOUBLE PRECISION, -- vehicle SoC if reported
sampled_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ocpp.meter.values',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- OCPP transaction events (session start/stop)
CREATE SOURCE ocpp_transactions (
transaction_id VARCHAR,
charge_point_id VARCHAR,
connector_id INTEGER,
id_tag VARCHAR, -- driver RFID / app token
start_ts TIMESTAMPTZ,
stop_ts TIMESTAMPTZ,
start_kwh DOUBLE PRECISION,
stop_kwh DOUBLE PRECISION,
stop_reason VARCHAR -- Local | Remote | EVDisconnected | HardReset | PowerLoss | ...
) WITH (
connector = 'kafka',
topic = 'ocpp.transactions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Current connector status (latest StatusNotification per connector)
CREATE MATERIALIZED VIEW connector_current_status AS
SELECT DISTINCT ON (charge_point_id, connector_id)
charge_point_id,
connector_id,
location_id,
network_region,
status,
error_code,
vendor_error_code,
event_ts AS last_status_ts
FROM ocpp_status_notifications
ORDER BY charge_point_id, connector_id, event_ts DESC;
-- Network availability summary per region (5-minute tumbling window)
CREATE MATERIALIZED VIEW network_availability AS
SELECT
network_region,
window_start,
window_end,
COUNT(*) AS total_connectors,
COUNT(*) FILTER (WHERE status = 'Available') AS available_count,
COUNT(*) FILTER (WHERE status = 'Charging') AS charging_count,
COUNT(*) FILTER (WHERE status = 'Faulted') AS faulted_count,
COUNT(*) FILTER (WHERE status = 'Unavailable') AS unavailable_count,
ROUND(100.0 * COUNT(*) FILTER (WHERE status IN ('Available','Charging','Finishing','Preparing'))
/ NULLIF(COUNT(*), 0), 2) AS availability_pct
FROM TUMBLE(ocpp_status_notifications, event_ts, INTERVAL '5 MINUTES')
GROUP BY network_region, window_start, window_end;
-- Active session energy delivery tracking
CREATE MATERIALIZED VIEW active_session_metrics AS
SELECT DISTINCT ON (transaction_id)
mv.transaction_id,
mv.charge_point_id,
mv.connector_id,
mv.energy_kwh,
mv.power_kw,
mv.soc_pct,
mv.sampled_ts
FROM ocpp_meter_values mv
WHERE mv.transaction_id IS NOT NULL
ORDER BY transaction_id, sampled_ts DESC;
-- Session summary (completed sessions)
CREATE MATERIALIZED VIEW session_summary AS
SELECT
t.transaction_id,
t.charge_point_id,
t.connector_id,
t.id_tag,
t.start_ts,
t.stop_ts,
EXTRACT(EPOCH FROM (t.stop_ts - t.start_ts)) / 60.0 AS duration_min,
(t.stop_kwh - t.start_kwh) AS energy_delivered_kwh,
t.stop_reason
FROM ocpp_transactions t
WHERE t.stop_ts IS NOT NULL;
Step 3: Alerting Logic
-- Fault alert: connector enters Faulted state
CREATE MATERIALIZED VIEW fault_alerts AS
SELECT
charge_point_id,
connector_id,
location_id,
network_region,
error_code,
vendor_error_code,
last_status_ts,
NOW() AS alert_ts,
'CONNECTOR_FAULT' AS alert_type
FROM connector_current_status
WHERE status = 'Faulted';
-- Soft fault: session active but power delivery near zero
CREATE MATERIALIZED VIEW low_power_session_alerts AS
SELECT
asm.transaction_id,
asm.charge_point_id,
asm.connector_id,
asm.power_kw,
asm.energy_kwh,
asm.sampled_ts,
NOW() AS alert_ts,
'LOW_POWER_DELIVERY' AS alert_type
FROM active_session_metrics asm
JOIN connector_current_status cs
ON asm.charge_point_id = cs.charge_point_id
AND asm.connector_id = cs.connector_id
WHERE cs.status = 'Charging'
AND asm.power_kw < 1.0; -- below 1 kW while in Charging state
-- Sink all alerts to Kafka for NOC and mobile notification systems
CREATE SINK charging_alert_sink AS
SELECT charge_point_id, connector_id, location_id, error_code AS detail,
alert_ts, alert_type
FROM fault_alerts
UNION ALL
SELECT charge_point_id, connector_id, NULL, CAST(power_kw AS VARCHAR),
alert_ts, alert_type
FROM low_power_session_alerts
WITH (
connector = 'kafka',
topic = 'charging.network.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Approach | Status Latency | Soft Fault Detection | Fleet Scalability | SQL Queries |
| CPMS built-in dashboard | Minutes (polling) | None | Vendor-limited | No |
| Batch log analysis | Hours | Post-session | High | Yes |
| Custom stream consumer | Seconds | Custom logic | High (ops overhead) | No |
| RisingWave streaming SQL | Seconds | SQL join | High | Yes |
FAQ
Q: Which OCPP version does this architecture support—OCPP 1.6 or OCPP 2.0.1?
A: The Kafka topic schema is independent of OCPP version. Your CPMS translates OCPP messages to the canonical JSON schema defined in the CREATE SOURCE DDL. Both OCPP 1.6 SOAP/WebSocket and OCPP 2.0.1 JSON are supported as long as the CPMS normalizes them before publishing to Kafka.
Q: How do I track connector availability over time for SLA reporting?
A: The network_availability materialized view computes 5-minute availability percentages per region. Sink this view to a JDBC database or Iceberg table. Query it with a standard SQL AVG over the desired reporting period to compute monthly or quarterly SLA metrics.
Q: Can I query the current status of a specific charge point from my application?
A: Yes. RisingWave exposes a PostgreSQL-compatible wire protocol. Connect your application with any Postgres driver and run SELECT * FROM connector_current_status WHERE charge_point_id = 'CP_001' to get the current status in milliseconds.
Key Takeaways
- RisingWave processes OCPP StatusNotification, MeterValues, and transaction events in real time to maintain a live view of every connector in the fleet.
- Both hard faults (Faulted status) and soft faults (low power during an active session) are detected through SQL joins—no custom detection logic required.
- Availability metrics, session summaries, and fault alerts are available via the PostgreSQL wire protocol for dashboards and via Kafka sinks for notification systems.
- The architecture scales linearly with fleet size by partitioning Kafka topics and scaling RisingWave compute nodes.

