EV charging fraud costs network operators through stolen RFID credentials, manipulated metering, and account sharing across multiple vehicles. With RisingWave, a PostgreSQL-compatible streaming database, you can detect suspicious patterns—impossible travel, anomalous energy consumption, credential reuse—within seconds of the triggering event, not days later in a billing audit.
Why Real-Time Fraud Detection Matters for EV Charging
EV charging fraud takes several forms:
- Credential theft — stolen RFID tags or app tokens used to initiate paid charging sessions on someone else's account.
- Impossible travel — the same credential used at two geographically distant stations within a time window that makes travel impossible.
- Meter manipulation — hardware tampering that causes a charge point to report inflated kWh to an operator running usage-based billing.
- Account sharing — a single subscription account simultaneously active at multiple charge points, violating terms of service.
- Energy theft — sessions that deliver significant energy but are never charged due to payment processing failures or spoofed stop reasons.
Traditional fraud detection runs in nightly batch jobs that review completed billing records. By the time fraud is identified, dozens of sessions may have been compromised. Streaming SQL enables detection during or immediately after each session.
The Streaming SQL Approach
RisingWave ingests OCPP transaction events, meter values, and charge point location data from Kafka. It also reads a user account table via Postgres CDC. Materialized views implement fraud detection rules: impossible travel detection using geospatial distance calculations, concurrent session detection, anomalous energy per session, and payment failure correlation.
Step-by-Step Tutorial
Step 1: Data Source Setup
-- OCPP transaction events with geolocation
CREATE SOURCE transaction_events_geo (
transaction_id VARCHAR,
id_tag VARCHAR, -- RFID UID or app token
user_account_id VARCHAR,
charge_point_id VARCHAR,
location_id VARCHAR,
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
event_type VARCHAR, -- START | STOP
meter_wh BIGINT,
stop_reason VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ocpp.transaction.events.geo',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- OCPP MeterValues for energy anomaly detection
CREATE SOURCE session_meter_values (
transaction_id VARCHAR,
charge_point_id VARCHAR,
connector_id INTEGER,
energy_kwh DOUBLE PRECISION,
power_kw DOUBLE PRECISION,
sampled_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ocpp.meter.values',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Payment processing results (from billing platform)
CREATE SOURCE payment_results (
transaction_id VARCHAR,
user_account_id VARCHAR,
energy_kwh DOUBLE PRECISION,
amount_usd DOUBLE PRECISION,
payment_status VARCHAR, -- SUCCESS | FAILED | CHARGEBACK | DISPUTED
processor_code VARCHAR,
processed_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'billing.payment.results',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- User account risk profile (Postgres CDC)
CREATE SOURCE user_account_risk (
user_account_id VARCHAR,
id_tag VARCHAR,
risk_score INTEGER, -- 0-100, updated by risk engine
flagged BOOLEAN,
account_created DATE,
updated_at TIMESTAMPTZ
) WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'risingwave',
password = 'password',
database.name = 'charging_platform',
schema.name = 'public',
table.name = 'user_account_risk'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Recent session starts (last 2 hours, for impossible travel analysis)
CREATE MATERIALIZED VIEW recent_session_starts AS
SELECT DISTINCT ON (id_tag, event_ts)
transaction_id, id_tag, user_account_id,
charge_point_id, location_id,
latitude, longitude, event_ts
FROM transaction_events_geo
WHERE event_type = 'START'
AND event_ts > NOW() - INTERVAL '2 HOURS'
ORDER BY id_tag, event_ts DESC;
-- Detect concurrent sessions (same id_tag, multiple active transactions)
CREATE MATERIALIZED VIEW concurrent_session_flags AS
SELECT
id_tag,
user_account_id,
COUNT(DISTINCT transaction_id) AS active_session_count,
ARRAY_AGG(charge_point_id) AS charge_points,
MIN(event_ts) AS earliest_session_ts
FROM recent_session_starts
GROUP BY id_tag, user_account_id
HAVING COUNT(DISTINCT transaction_id) >= 2;
-- Impossible travel: same id_tag, two sessions too close in time to allow travel
-- Approximation: 1 degree lat/lon ≈ 111 km; threshold: if distance > 50 km and gap < 30 min
CREATE MATERIALIZED VIEW impossible_travel_flags AS
SELECT
a.id_tag,
a.user_account_id,
a.transaction_id AS tx_a,
b.transaction_id AS tx_b,
a.charge_point_id AS cp_a,
b.charge_point_id AS cp_b,
a.event_ts AS ts_a,
b.event_ts AS ts_b,
EXTRACT(EPOCH FROM (b.event_ts - a.event_ts)) / 60.0 AS gap_minutes,
-- Approximate great-circle distance in km (Haversine simplified for short distances)
111.0 * SQRT(
POWER(b.latitude - a.latitude, 2) +
POWER((b.longitude - a.longitude) * COS(RADIANS((a.latitude + b.latitude) / 2.0)), 2)
) AS approx_distance_km
FROM recent_session_starts a
JOIN recent_session_starts b
ON a.id_tag = b.id_tag
AND b.event_ts > a.event_ts
AND b.event_ts < a.event_ts + INTERVAL '30 MINUTES'
AND a.charge_point_id <> b.charge_point_id
WHERE 111.0 * SQRT(
POWER(b.latitude - a.latitude, 2) +
POWER((b.longitude - a.longitude) * COS(RADIANS((a.latitude + b.latitude) / 2.0)), 2)
) > 50;
-- Anomalous energy: session delivers >150% of EVSE rated capacity equivalent
-- (compare kWh per session against 30-day per-account baseline)
CREATE MATERIALIZED VIEW session_energy_summary AS
SELECT DISTINCT ON (transaction_id)
transaction_id,
charge_point_id,
connector_id,
MAX(energy_kwh) OVER (PARTITION BY transaction_id) AS peak_energy_kwh,
MAX(power_kw) OVER (PARTITION BY transaction_id) AS peak_power_kw,
sampled_ts
FROM session_meter_values
ORDER BY transaction_id, sampled_ts DESC;
-- Failed payment correlation: sessions with energy delivered but payment failed
CREATE MATERIALIZED VIEW unpaid_sessions AS
SELECT
p.transaction_id,
p.user_account_id,
p.energy_kwh,
p.amount_usd,
p.payment_status,
p.processor_code,
p.processed_ts
FROM payment_results p
WHERE p.payment_status IN ('FAILED', 'CHARGEBACK')
AND p.energy_kwh > 5.0; -- meaningful energy delivered
Step 3: Alerting Logic
-- Unified fraud alert view combining all signals
CREATE MATERIALIZED VIEW fraud_alerts AS
-- Concurrent sessions
SELECT
id_tag,
user_account_id,
'CONCURRENT_SESSIONS' AS fraud_type,
active_session_count::VARCHAR AS detail,
NOW() AS alert_ts,
'HIGH' AS severity
FROM concurrent_session_flags
UNION ALL
-- Impossible travel
SELECT
id_tag,
user_account_id,
'IMPOSSIBLE_TRAVEL' AS fraud_type,
ROUND(approx_distance_km::NUMERIC, 1)::VARCHAR || ' km in ' ||
ROUND(gap_minutes::NUMERIC, 0)::VARCHAR || ' min' AS detail,
NOW() AS alert_ts,
'HIGH' AS severity
FROM impossible_travel_flags
UNION ALL
-- Unpaid sessions
SELECT
'unknown' AS id_tag,
user_account_id,
'PAYMENT_FAILURE' AS fraud_type,
energy_kwh::VARCHAR || ' kWh, $' || amount_usd::VARCHAR AS detail,
NOW() AS alert_ts,
'MEDIUM' AS severity
FROM unpaid_sessions;
-- Sink fraud alerts to Kafka for risk and operations teams
CREATE SINK fraud_alert_sink AS
SELECT * FROM fraud_alerts
WITH (
connector = 'kafka',
topic = 'fraud.alerts.ev.charging',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Sink to JDBC for case management
CREATE SINK fraud_alert_case_sink AS
SELECT * FROM fraud_alerts
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://platform-db:5432/fraud_cases',
table.name = 'ev_fraud_alerts'
);
Comparison Table
| Fraud Type | Nightly Batch | Rule-Based CPMS | RisingWave Streaming SQL |
| Concurrent sessions | Next day | Partial | Seconds |
| Impossible travel | Next day | None | Seconds |
| Metering anomaly | Next day | None | Per-session |
| Unpaid sessions | Next day | Payment callback | Seconds |
| False positive tuning | Manual rerun | Config change | Update SQL view |
FAQ
Q: How accurate is the Haversine approximation for impossible travel detection?
A: The simplified Euclidean approximation in the SQL above is accurate within ~1% for distances under 500 km, which is sufficient for fraud detection thresholds. For production deployments requiring precise geodetic distance, call a PostGIS-compatible distance function or precompute distances in the preprocessing layer before Kafka.
Q: Can I tune fraud thresholds without redeploying the pipeline?
A: Update the threshold values (e.g., > 50 km, < 30 MINUTES) in the materialized view DDL and run ALTER MATERIALIZED VIEW or drop and recreate the view. RisingWave rebuilds the view incrementally from the source, typically without requiring a full historical reprocessing.
Q: How do I handle RFID cloning, where a cloned card is used at the same station as the original?
A: RFID cloning at the same station appears as normal usage to the session layer. Detect it through usage pattern analysis: unusually high session frequency per id_tag in a HOP window, or session initiation at times inconsistent with a user's historical usage profile. Add a session_frequency_anomaly view using a HOP window COUNT over the id_tag.
Key Takeaways
- RisingWave detects EV charging fraud patterns—concurrent sessions, impossible travel, unpaid sessions—within seconds using SQL joins and window aggregations.
- Multi-stream joins across transaction events, meter values, and payment results replace brittle custom detection scripts.
- Fraud alerts route to Kafka and JDBC sinks, integrating naturally with case management systems and risk scoring pipelines.
- Fraud detection rules are expressed in auditable, version-controlled SQL—not hard-coded logic buried in application code.

