How to Detect EV Charging Fraud in Real Time

How to Detect EV Charging Fraud in Real Time

EV charging fraud costs network operators through stolen RFID credentials, manipulated metering, and account sharing across multiple vehicles. With RisingWave, a PostgreSQL-compatible streaming database, you can detect suspicious patterns—impossible travel, anomalous energy consumption, credential reuse—within seconds of the triggering event, not days later in a billing audit.

Why Real-Time Fraud Detection Matters for EV Charging

EV charging fraud takes several forms:

  • Credential theft — stolen RFID tags or app tokens used to initiate paid charging sessions on someone else's account.
  • Impossible travel — the same credential used at two geographically distant stations within a time window that makes travel impossible.
  • Meter manipulation — hardware tampering that causes a charge point to report inflated kWh to an operator running usage-based billing.
  • Account sharing — a single subscription account simultaneously active at multiple charge points, violating terms of service.
  • Energy theft — sessions that deliver significant energy but are never charged due to payment processing failures or spoofed stop reasons.

Traditional fraud detection runs in nightly batch jobs that review completed billing records. By the time fraud is identified, dozens of sessions may have been compromised. Streaming SQL enables detection during or immediately after each session.

The Streaming SQL Approach

RisingWave ingests OCPP transaction events, meter values, and charge point location data from Kafka. It also reads a user account table via Postgres CDC. Materialized views implement fraud detection rules: impossible travel detection using geospatial distance calculations, concurrent session detection, anomalous energy per session, and payment failure correlation.

Step-by-Step Tutorial

Step 1: Data Source Setup

-- OCPP transaction events with geolocation
CREATE SOURCE transaction_events_geo (
    transaction_id    VARCHAR,
    id_tag            VARCHAR,   -- RFID UID or app token
    user_account_id   VARCHAR,
    charge_point_id   VARCHAR,
    location_id       VARCHAR,
    latitude          DOUBLE PRECISION,
    longitude         DOUBLE PRECISION,
    event_type        VARCHAR,   -- START | STOP
    meter_wh          BIGINT,
    stop_reason       VARCHAR,
    event_ts          TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.transaction.events.geo',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- OCPP MeterValues for energy anomaly detection
CREATE SOURCE session_meter_values (
    transaction_id    VARCHAR,
    charge_point_id   VARCHAR,
    connector_id      INTEGER,
    energy_kwh        DOUBLE PRECISION,
    power_kw          DOUBLE PRECISION,
    sampled_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'ocpp.meter.values',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Payment processing results (from billing platform)
CREATE SOURCE payment_results (
    transaction_id    VARCHAR,
    user_account_id   VARCHAR,
    energy_kwh        DOUBLE PRECISION,
    amount_usd        DOUBLE PRECISION,
    payment_status    VARCHAR,   -- SUCCESS | FAILED | CHARGEBACK | DISPUTED
    processor_code    VARCHAR,
    processed_ts      TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'billing.payment.results',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- User account risk profile (Postgres CDC)
CREATE SOURCE user_account_risk (
    user_account_id   VARCHAR,
    id_tag            VARCHAR,
    risk_score        INTEGER,   -- 0-100, updated by risk engine
    flagged           BOOLEAN,
    account_created   DATE,
    updated_at        TIMESTAMPTZ
) WITH (
    connector = 'postgres-cdc',
    hostname = 'postgres',
    port = '5432',
    username = 'risingwave',
    password = 'password',
    database.name = 'charging_platform',
    schema.name = 'public',
    table.name = 'user_account_risk'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Recent session starts (last 2 hours, for impossible travel analysis)
CREATE MATERIALIZED VIEW recent_session_starts AS
SELECT DISTINCT ON (id_tag, event_ts)
    transaction_id, id_tag, user_account_id,
    charge_point_id, location_id,
    latitude, longitude, event_ts
FROM transaction_events_geo
WHERE event_type = 'START'
  AND event_ts > NOW() - INTERVAL '2 HOURS'
ORDER BY id_tag, event_ts DESC;

-- Detect concurrent sessions (same id_tag, multiple active transactions)
CREATE MATERIALIZED VIEW concurrent_session_flags AS
SELECT
    id_tag,
    user_account_id,
    COUNT(DISTINCT transaction_id)    AS active_session_count,
    ARRAY_AGG(charge_point_id)        AS charge_points,
    MIN(event_ts)                     AS earliest_session_ts
FROM recent_session_starts
GROUP BY id_tag, user_account_id
HAVING COUNT(DISTINCT transaction_id) >= 2;

-- Impossible travel: same id_tag, two sessions too close in time to allow travel
-- Approximation: 1 degree lat/lon ≈ 111 km; threshold: if distance > 50 km and gap < 30 min
CREATE MATERIALIZED VIEW impossible_travel_flags AS
SELECT
    a.id_tag,
    a.user_account_id,
    a.transaction_id            AS tx_a,
    b.transaction_id            AS tx_b,
    a.charge_point_id           AS cp_a,
    b.charge_point_id           AS cp_b,
    a.event_ts                  AS ts_a,
    b.event_ts                  AS ts_b,
    EXTRACT(EPOCH FROM (b.event_ts - a.event_ts)) / 60.0 AS gap_minutes,
    -- Approximate great-circle distance in km (Haversine simplified for short distances)
    111.0 * SQRT(
        POWER(b.latitude  - a.latitude,  2) +
        POWER((b.longitude - a.longitude) * COS(RADIANS((a.latitude + b.latitude) / 2.0)), 2)
    )                           AS approx_distance_km
FROM recent_session_starts a
JOIN recent_session_starts b
    ON  a.id_tag = b.id_tag
    AND b.event_ts > a.event_ts
    AND b.event_ts < a.event_ts + INTERVAL '30 MINUTES'
    AND a.charge_point_id <> b.charge_point_id
WHERE 111.0 * SQRT(
    POWER(b.latitude  - a.latitude,  2) +
    POWER((b.longitude - a.longitude) * COS(RADIANS((a.latitude + b.latitude) / 2.0)), 2)
) > 50;

-- Anomalous energy: session delivers >150% of EVSE rated capacity equivalent
-- (compare kWh per session against 30-day per-account baseline)
CREATE MATERIALIZED VIEW session_energy_summary AS
SELECT DISTINCT ON (transaction_id)
    transaction_id,
    charge_point_id,
    connector_id,
    MAX(energy_kwh) OVER (PARTITION BY transaction_id) AS peak_energy_kwh,
    MAX(power_kw)   OVER (PARTITION BY transaction_id) AS peak_power_kw,
    sampled_ts
FROM session_meter_values
ORDER BY transaction_id, sampled_ts DESC;

-- Failed payment correlation: sessions with energy delivered but payment failed
CREATE MATERIALIZED VIEW unpaid_sessions AS
SELECT
    p.transaction_id,
    p.user_account_id,
    p.energy_kwh,
    p.amount_usd,
    p.payment_status,
    p.processor_code,
    p.processed_ts
FROM payment_results p
WHERE p.payment_status IN ('FAILED', 'CHARGEBACK')
  AND p.energy_kwh > 5.0;  -- meaningful energy delivered

Step 3: Alerting Logic

-- Unified fraud alert view combining all signals
CREATE MATERIALIZED VIEW fraud_alerts AS
-- Concurrent sessions
SELECT
    id_tag,
    user_account_id,
    'CONCURRENT_SESSIONS'           AS fraud_type,
    active_session_count::VARCHAR   AS detail,
    NOW()                           AS alert_ts,
    'HIGH'                          AS severity
FROM concurrent_session_flags

UNION ALL

-- Impossible travel
SELECT
    id_tag,
    user_account_id,
    'IMPOSSIBLE_TRAVEL'             AS fraud_type,
    ROUND(approx_distance_km::NUMERIC, 1)::VARCHAR || ' km in ' ||
        ROUND(gap_minutes::NUMERIC, 0)::VARCHAR || ' min' AS detail,
    NOW()                           AS alert_ts,
    'HIGH'                          AS severity
FROM impossible_travel_flags

UNION ALL

-- Unpaid sessions
SELECT
    'unknown'                       AS id_tag,
    user_account_id,
    'PAYMENT_FAILURE'               AS fraud_type,
    energy_kwh::VARCHAR || ' kWh, $' || amount_usd::VARCHAR AS detail,
    NOW()                           AS alert_ts,
    'MEDIUM'                        AS severity
FROM unpaid_sessions;

-- Sink fraud alerts to Kafka for risk and operations teams
CREATE SINK fraud_alert_sink AS
SELECT * FROM fraud_alerts
WITH (
    connector = 'kafka',
    topic = 'fraud.alerts.ev.charging',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Sink to JDBC for case management
CREATE SINK fraud_alert_case_sink AS
SELECT * FROM fraud_alerts
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://platform-db:5432/fraud_cases',
    table.name = 'ev_fraud_alerts'
);

Comparison Table

Fraud TypeNightly BatchRule-Based CPMSRisingWave Streaming SQL
Concurrent sessionsNext dayPartialSeconds
Impossible travelNext dayNoneSeconds
Metering anomalyNext dayNonePer-session
Unpaid sessionsNext dayPayment callbackSeconds
False positive tuningManual rerunConfig changeUpdate SQL view

FAQ

Q: How accurate is the Haversine approximation for impossible travel detection?
A: The simplified Euclidean approximation in the SQL above is accurate within ~1% for distances under 500 km, which is sufficient for fraud detection thresholds. For production deployments requiring precise geodetic distance, call a PostGIS-compatible distance function or precompute distances in the preprocessing layer before Kafka.

Q: Can I tune fraud thresholds without redeploying the pipeline?
A: Update the threshold values (e.g., > 50 km, < 30 MINUTES) in the materialized view DDL and run ALTER MATERIALIZED VIEW or drop and recreate the view. RisingWave rebuilds the view incrementally from the source, typically without requiring a full historical reprocessing.

Q: How do I handle RFID cloning, where a cloned card is used at the same station as the original?
A: RFID cloning at the same station appears as normal usage to the session layer. Detect it through usage pattern analysis: unusually high session frequency per id_tag in a HOP window, or session initiation at times inconsistent with a user's historical usage profile. Add a session_frequency_anomaly view using a HOP window COUNT over the id_tag.

Key Takeaways

  • RisingWave detects EV charging fraud patterns—concurrent sessions, impossible travel, unpaid sessions—within seconds using SQL joins and window aggregations.
  • Multi-stream joins across transaction events, meter values, and payment results replace brittle custom detection scripts.
  • Fraud alerts route to Kafka and JDBC sinks, integrating naturally with case management systems and risk scoring pipelines.
  • Fraud detection rules are expressed in auditable, version-controlled SQL—not hard-coded logic buried in application code.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.