How to Process AMI Data Streams at Scale

How to Process AMI Data Streams at Scale

Processing AMI (Advanced Metering Infrastructure) data streams at scale means using a PostgreSQL-compatible streaming database to continuously ingest interval reads from millions of smart meters, compute per-customer and per-grid-segment aggregations in real time, and deliver operational intelligence to billing, outage management, and grid operations systems without batch processing delays.

Why This Matters for Energy Operators

AMI deployments have transformed how utilities understand consumption, but most utilities have not transformed how they process AMI data. The typical architecture looks like this: meters transmit reads to the AMI head-end system every 15 minutes, the head-end writes to a staging database, a nightly ETL job loads validated reads into the meter data management system (MDMS), and reports are available the next morning.

This architecture made sense when AMI was primarily for billing. But modern AMI networks are also a distribution grid sensing layer. The same meters that record consumption also report:

  • Voltage at the customer point—a proxy for grid health
  • Power quality events (sags, swells, outages)
  • Last-gasp signals that pinpoint outages before any customer calls
  • Tamper and reverse energy flags for loss control

None of these signals are useful if they arrive 12–24 hours late. A streaming architecture processes them as they arrive, making AMI data operationally useful in real time while the nightly MDMS load continues for billing.

How Streaming SQL Works for Energy Data

RisingWave ingests the AMI head-end's Kafka output stream and maintains continuously updated materialized views that serve multiple consuming systems simultaneously: the outage management system gets fault signals, the distribution engineering team gets voltage heat maps, the customer service team gets consumption summaries, and the billing system gets the same validated reads it always received—now also available in near-real time.

Building the System: Step by Step

Step 1: Connect the Data Source

Ingest AMI reads from the head-end Kafka output:

CREATE SOURCE ami_interval_reads (
    meter_id              VARCHAR,
    service_point_id      VARCHAR,
    customer_id           VARCHAR,
    transformer_id        VARCHAR,
    feeder_id             VARCHAR,
    read_type             VARCHAR,   -- 'INTERVAL', 'DAILY', 'EVENT'
    interval_start        TIMESTAMPTZ,
    interval_end          TIMESTAMPTZ,
    consumption_kwh       DOUBLE PRECISION,
    delivered_kwh         DOUBLE PRECISION,
    received_kwh          DOUBLE PRECISION,
    voltage_v             DOUBLE PRECISION,
    power_quality_code    INTEGER,
    estimated_flag        BOOLEAN,
    read_ts               TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'ami.headend.reads',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- AMI event stream (separate high-priority topic)
CREATE SOURCE ami_events (
    meter_id      VARCHAR,
    event_code    INTEGER,
    event_desc    VARCHAR,
    event_ts      TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'ami.headend.events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build Real-Time Aggregations

Compute transformer-level consumption, voltage statistics, and read receipt rates:

-- Per-transformer consumption and voltage in 15-minute windows
CREATE MATERIALIZED VIEW transformer_summary AS
SELECT
    transformer_id,
    feeder_id,
    window_start,
    window_end,
    SUM(consumption_kwh)            AS total_consumption_kwh,
    COUNT(DISTINCT meter_id)        AS meters_reported,
    AVG(voltage_v)                  AS avg_voltage_v,
    MIN(voltage_v)                  AS min_voltage_v,
    MAX(voltage_v)                  AS max_voltage_v,
    SUM(CASE WHEN estimated_flag THEN 1 ELSE 0 END) AS estimated_count
FROM TUMBLE(
    (SELECT * FROM ami_interval_reads WHERE read_type = 'INTERVAL'),
    interval_end,
    INTERVAL '15' MINUTE
)
GROUP BY transformer_id, feeder_id, window_start, window_end;

-- Read receipt rate per transformer (to detect communication issues)
CREATE MATERIALIZED VIEW read_receipt_rate AS
SELECT
    t.transformer_id,
    t.window_end,
    t.meters_reported,
    c.total_meters,
    ROUND(t.meters_reported::DECIMAL / c.total_meters * 100, 1) AS receipt_rate_pct
FROM transformer_summary t
JOIN transformer_meter_count c ON t.transformer_id = c.transformer_id;

Step 3: Detect Anomalies and Generate Alerts

Detect communication failures, voltage issues, and high-value event codes:

-- Operational alerts from AMI data
CREATE MATERIALIZED VIEW ami_operational_alerts AS
SELECT
    alert_source,
    entity_id,
    alert_time,
    alert_type,
    detail_value,
    severity
FROM (
    -- Low read receipt rate: possible communications issue
    SELECT
        'COMM' AS alert_source,
        transformer_id AS entity_id,
        window_end AS alert_time,
        'LOW_RECEIPT_RATE' AS alert_type,
        receipt_rate_pct AS detail_value,
        'WARNING' AS severity
    FROM read_receipt_rate
    WHERE receipt_rate_pct < 80

    UNION ALL

    -- Voltage violations from AMI readings
    SELECT
        'VOLTAGE' AS alert_source,
        transformer_id AS entity_id,
        window_end AS alert_time,
        CASE
            WHEN min_voltage_v < 207 THEN 'LOW_VOLTAGE'
            WHEN max_voltage_v > 253 THEN 'HIGH_VOLTAGE'
        END AS alert_type,
        COALESCE(min_voltage_v, max_voltage_v) AS detail_value,
        'WARNING' AS severity
    FROM transformer_summary
    WHERE min_voltage_v < 207 OR max_voltage_v > 253

    UNION ALL

    -- High-priority AMI events (outage, tamper, reverse energy)
    SELECT
        'EVENT' AS alert_source,
        meter_id AS entity_id,
        event_ts AS alert_time,
        event_desc AS alert_type,
        event_code::DOUBLE PRECISION AS detail_value,
        CASE WHEN event_code IN (3840, 3841, 3842) THEN 'CRITICAL' ELSE 'WARNING' END AS severity
    FROM ami_events
    WHERE event_code IN (3840, 3841, 3842, 2048, 2049, 1024)
) alerts;

Step 4: Integrate with SCADA/EMS Downstream

Route alerts to OMS, distribution engineering tools, and billing validation:

-- Outage and grid alerts to OMS
CREATE SINK ami_grid_alert_sink
FROM (SELECT * FROM ami_operational_alerts WHERE alert_source IN ('VOLTAGE', 'COMM'))
WITH (
    connector = 'kafka',
    topic     = 'ami.alerts.grid',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Meter events to loss control and customer systems
CREATE SINK ami_event_sink
FROM (SELECT * FROM ami_operational_alerts WHERE alert_source = 'EVENT')
WITH (
    connector = 'kafka',
    topic     = 'ami.alerts.events',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison: Batch vs Streaming

CapabilityBatch MDMS ArchitectureStreaming SQL
Consumption visibilityNext-day report15-minute windows, real time
Outage detectionCustomer callsLast-gasp event within minutes
Voltage monitoringMonthly distribution analysisContinuous 15-min windows
Communication failure detectionNightly missing-reads reportSub-hour receipt rate view
Billing inputSame (nightly validated reads)Same, plus real-time preview
Tamper detectionWeekly exception reportsIntraday event stream
Scale to 5M+ metersLarge ETL clusterHorizontally scaled streaming DB

FAQ

Will this replace our MDMS? No. The MDMS handles billing validation, tariff application, revenue protection reporting, and regulatory compliance functions that require the full validated read history and complex business rules. The streaming SQL layer complements the MDMS by providing operational intelligence in the window between meter read and MDMS load.

How do we handle the validation and estimation rules that MDMS applies to reads? For operational purposes (grid monitoring, outage detection), raw or lightly validated reads are sufficient. The streaming layer applies only critical quality filters (e.g., implausible voltage values, impossible consumption spikes). Full validation and estimation remain in the MDMS for billing.

Our AMI head-end doesn't publish to Kafka. How do we connect? Most modern AMI head-end systems (Itron, Landis+Gyr, Honeywell) offer RESTful APIs or direct database access. A lightweight adapter process can poll the head-end API and publish to Kafka. Alternatively, if the head-end writes to a relational database, RisingWave's CDC connectors can consume the change stream directly.

Key Takeaways

  • AMI data processed through streaming SQL becomes a real-time grid sensing layer, not just a billing data source.
  • Read receipt rate monitoring detects communication failures before they become billing data gaps.
  • Voltage and event data from smart meters provide distribution-level grid intelligence without additional sensors.
  • The architecture is additive: the MDMS nightly load continues unchanged while the streaming layer adds real-time operational value.
  • RisingWave's horizontal scaling handles millions of meters through Kafka partition distribution and incremental view maintenance.

Further reading:

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.