Processing AMI (Advanced Metering Infrastructure) data streams at scale means using a PostgreSQL-compatible streaming database to continuously ingest interval reads from millions of smart meters, compute per-customer and per-grid-segment aggregations in real time, and deliver operational intelligence to billing, outage management, and grid operations systems without batch processing delays.
Why This Matters for Energy Operators
AMI deployments have transformed how utilities understand consumption, but most utilities have not transformed how they process AMI data. The typical architecture looks like this: meters transmit reads to the AMI head-end system every 15 minutes, the head-end writes to a staging database, a nightly ETL job loads validated reads into the meter data management system (MDMS), and reports are available the next morning.
This architecture made sense when AMI was primarily for billing. But modern AMI networks are also a distribution grid sensing layer. The same meters that record consumption also report:
- Voltage at the customer point—a proxy for grid health
- Power quality events (sags, swells, outages)
- Last-gasp signals that pinpoint outages before any customer calls
- Tamper and reverse energy flags for loss control
None of these signals are useful if they arrive 12–24 hours late. A streaming architecture processes them as they arrive, making AMI data operationally useful in real time while the nightly MDMS load continues for billing.
How Streaming SQL Works for Energy Data
RisingWave ingests the AMI head-end's Kafka output stream and maintains continuously updated materialized views that serve multiple consuming systems simultaneously: the outage management system gets fault signals, the distribution engineering team gets voltage heat maps, the customer service team gets consumption summaries, and the billing system gets the same validated reads it always received—now also available in near-real time.
Building the System: Step by Step
Step 1: Connect the Data Source
Ingest AMI reads from the head-end Kafka output:
CREATE SOURCE ami_interval_reads (
meter_id VARCHAR,
service_point_id VARCHAR,
customer_id VARCHAR,
transformer_id VARCHAR,
feeder_id VARCHAR,
read_type VARCHAR, -- 'INTERVAL', 'DAILY', 'EVENT'
interval_start TIMESTAMPTZ,
interval_end TIMESTAMPTZ,
consumption_kwh DOUBLE PRECISION,
delivered_kwh DOUBLE PRECISION,
received_kwh DOUBLE PRECISION,
voltage_v DOUBLE PRECISION,
power_quality_code INTEGER,
estimated_flag BOOLEAN,
read_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ami.headend.reads',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- AMI event stream (separate high-priority topic)
CREATE SOURCE ami_events (
meter_id VARCHAR,
event_code INTEGER,
event_desc VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ami.headend.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build Real-Time Aggregations
Compute transformer-level consumption, voltage statistics, and read receipt rates:
-- Per-transformer consumption and voltage in 15-minute windows
CREATE MATERIALIZED VIEW transformer_summary AS
SELECT
transformer_id,
feeder_id,
window_start,
window_end,
SUM(consumption_kwh) AS total_consumption_kwh,
COUNT(DISTINCT meter_id) AS meters_reported,
AVG(voltage_v) AS avg_voltage_v,
MIN(voltage_v) AS min_voltage_v,
MAX(voltage_v) AS max_voltage_v,
SUM(CASE WHEN estimated_flag THEN 1 ELSE 0 END) AS estimated_count
FROM TUMBLE(
(SELECT * FROM ami_interval_reads WHERE read_type = 'INTERVAL'),
interval_end,
INTERVAL '15' MINUTE
)
GROUP BY transformer_id, feeder_id, window_start, window_end;
-- Read receipt rate per transformer (to detect communication issues)
CREATE MATERIALIZED VIEW read_receipt_rate AS
SELECT
t.transformer_id,
t.window_end,
t.meters_reported,
c.total_meters,
ROUND(t.meters_reported::DECIMAL / c.total_meters * 100, 1) AS receipt_rate_pct
FROM transformer_summary t
JOIN transformer_meter_count c ON t.transformer_id = c.transformer_id;
Step 3: Detect Anomalies and Generate Alerts
Detect communication failures, voltage issues, and high-value event codes:
-- Operational alerts from AMI data
CREATE MATERIALIZED VIEW ami_operational_alerts AS
SELECT
alert_source,
entity_id,
alert_time,
alert_type,
detail_value,
severity
FROM (
-- Low read receipt rate: possible communications issue
SELECT
'COMM' AS alert_source,
transformer_id AS entity_id,
window_end AS alert_time,
'LOW_RECEIPT_RATE' AS alert_type,
receipt_rate_pct AS detail_value,
'WARNING' AS severity
FROM read_receipt_rate
WHERE receipt_rate_pct < 80
UNION ALL
-- Voltage violations from AMI readings
SELECT
'VOLTAGE' AS alert_source,
transformer_id AS entity_id,
window_end AS alert_time,
CASE
WHEN min_voltage_v < 207 THEN 'LOW_VOLTAGE'
WHEN max_voltage_v > 253 THEN 'HIGH_VOLTAGE'
END AS alert_type,
COALESCE(min_voltage_v, max_voltage_v) AS detail_value,
'WARNING' AS severity
FROM transformer_summary
WHERE min_voltage_v < 207 OR max_voltage_v > 253
UNION ALL
-- High-priority AMI events (outage, tamper, reverse energy)
SELECT
'EVENT' AS alert_source,
meter_id AS entity_id,
event_ts AS alert_time,
event_desc AS alert_type,
event_code::DOUBLE PRECISION AS detail_value,
CASE WHEN event_code IN (3840, 3841, 3842) THEN 'CRITICAL' ELSE 'WARNING' END AS severity
FROM ami_events
WHERE event_code IN (3840, 3841, 3842, 2048, 2049, 1024)
) alerts;
Step 4: Integrate with SCADA/EMS Downstream
Route alerts to OMS, distribution engineering tools, and billing validation:
-- Outage and grid alerts to OMS
CREATE SINK ami_grid_alert_sink
FROM (SELECT * FROM ami_operational_alerts WHERE alert_source IN ('VOLTAGE', 'COMM'))
WITH (
connector = 'kafka',
topic = 'ami.alerts.grid',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Meter events to loss control and customer systems
CREATE SINK ami_event_sink
FROM (SELECT * FROM ami_operational_alerts WHERE alert_source = 'EVENT')
WITH (
connector = 'kafka',
topic = 'ami.alerts.events',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison: Batch vs Streaming
| Capability | Batch MDMS Architecture | Streaming SQL |
| Consumption visibility | Next-day report | 15-minute windows, real time |
| Outage detection | Customer calls | Last-gasp event within minutes |
| Voltage monitoring | Monthly distribution analysis | Continuous 15-min windows |
| Communication failure detection | Nightly missing-reads report | Sub-hour receipt rate view |
| Billing input | Same (nightly validated reads) | Same, plus real-time preview |
| Tamper detection | Weekly exception reports | Intraday event stream |
| Scale to 5M+ meters | Large ETL cluster | Horizontally scaled streaming DB |
FAQ
Will this replace our MDMS? No. The MDMS handles billing validation, tariff application, revenue protection reporting, and regulatory compliance functions that require the full validated read history and complex business rules. The streaming SQL layer complements the MDMS by providing operational intelligence in the window between meter read and MDMS load.
How do we handle the validation and estimation rules that MDMS applies to reads? For operational purposes (grid monitoring, outage detection), raw or lightly validated reads are sufficient. The streaming layer applies only critical quality filters (e.g., implausible voltage values, impossible consumption spikes). Full validation and estimation remain in the MDMS for billing.
Our AMI head-end doesn't publish to Kafka. How do we connect? Most modern AMI head-end systems (Itron, Landis+Gyr, Honeywell) offer RESTful APIs or direct database access. A lightweight adapter process can poll the head-end API and publish to Kafka. Alternatively, if the head-end writes to a relational database, RisingWave's CDC connectors can consume the change stream directly.
Key Takeaways
- AMI data processed through streaming SQL becomes a real-time grid sensing layer, not just a billing data source.
- Read receipt rate monitoring detects communication failures before they become billing data gaps.
- Voltage and event data from smart meters provide distribution-level grid intelligence without additional sensors.
- The architecture is additive: the MDMS nightly load continues unchanged while the streaming layer adds real-time operational value.
- RisingWave's horizontal scaling handles millions of meters through Kafka partition distribution and incremental view maintenance.
Further reading:

