Real-Time Electricity Market Price Monitoring

Real-Time Electricity Market Price Monitoring

Electricity prices can move from $30/MWh to $3,000/MWh within minutes during grid stress events. Traders, load-serving entities, and large industrials need live price visibility across settlement nodes, products, and time horizons. RisingWave, a PostgreSQL-compatible streaming database, continuously aggregates price ticks, computes spreads, and fires alerts the moment a threshold is crossed.

Why Real-Time Price Monitoring Matters

Electricity markets operate on 5-minute real-time intervals and hourly day-ahead auctions. The gap between the day-ahead price and the real-time price—the DA/RT spread—drives billions in energy trading and congestion revenue. Missing a spike or collapse by even a few minutes can mean the difference between profit and penalty.

Traditional approaches rely on polling ISO/RTO APIs on fixed schedules or subscribing to expensive vendor data feeds with rigid alerting rules. Streaming SQL enables a flexible, query-driven approach:

  • Continuously track Locational Marginal Prices (LMPs) across hundreds of settlement nodes.
  • Compute spread, basis, and rolling volatility in real time without manual scripts.
  • Alert on configurable thresholds and automatically log price events for post-trade analysis.

The Streaming SQL Approach

ISO/RTOs publish LMP data via HTTPS APIs. A lightweight connector publishes these updates to Kafka. RisingWave ingests the Kafka stream, maintains continuously updated materialized views for current prices, spreads, and statistics, and routes alerts via Kafka sinks to trading desks and automated hedging systems.

Step-by-Step Tutorial

Step 1: Data Source Setup

-- Real-time LMP ticks (5-minute intervals from ISO/RTO)
CREATE SOURCE lmp_realtime (
    node_id           VARCHAR,   -- settlement point, e.g. AEP_DAYTON
    zone              VARCHAR,
    iso               VARCHAR,   -- PJM | MISO | ERCOT | CAISO | NYISO
    lmp_usd_mwh       DOUBLE PRECISION,
    energy_component  DOUBLE PRECISION,
    congestion_component DOUBLE PRECISION,
    loss_component    DOUBLE PRECISION,
    interval_start    TIMESTAMPTZ,
    published_ts      TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'iso.lmp.realtime',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Day-ahead hourly LMP (published daily, ~12h ahead of delivery)
CREATE SOURCE lmp_day_ahead (
    node_id           VARCHAR,
    zone              VARCHAR,
    iso               VARCHAR,
    da_lmp_usd_mwh    DOUBLE PRECISION,
    delivery_hour     TIMESTAMPTZ,
    published_ts      TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'iso.lmp.day_ahead',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Fuel price index (natural gas, oil — used for spark/dark spread)
CREATE SOURCE fuel_prices (
    fuel_type         VARCHAR,   -- NATURAL_GAS | OIL | COAL
    hub               VARCHAR,   -- HH | WTI | API2
    price             DOUBLE PRECISION,
    unit              VARCHAR,   -- USD_PER_MMBTU | USD_PER_BBL
    price_ts          TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'commodity.fuel.prices',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Latest real-time LMP per node
CREATE MATERIALIZED VIEW current_lmp AS
SELECT DISTINCT ON (node_id, iso)
    node_id, zone, iso,
    lmp_usd_mwh, energy_component, congestion_component, loss_component,
    interval_start, published_ts
FROM lmp_realtime
ORDER BY node_id, iso, published_ts DESC;

-- Latest day-ahead LMP per node and delivery hour
CREATE MATERIALIZED VIEW current_da_lmp AS
SELECT DISTINCT ON (node_id, iso, delivery_hour)
    node_id, zone, iso, da_lmp_usd_mwh, delivery_hour
FROM lmp_day_ahead
ORDER BY node_id, iso, delivery_hour, published_ts DESC;

-- DA/RT spread: positive = RT above DA (intra-day bull signal)
CREATE MATERIALIZED VIEW dart_spread AS
SELECT
    rt.node_id,
    rt.zone,
    rt.iso,
    rt.lmp_usd_mwh                       AS rt_lmp,
    da.da_lmp_usd_mwh                    AS da_lmp,
    rt.lmp_usd_mwh - da.da_lmp_usd_mwh  AS dart_spread,
    rt.interval_start
FROM current_lmp rt
JOIN current_da_lmp da
    ON  rt.node_id = da.node_id
    AND rt.iso     = da.iso
    AND DATE_TRUNC('hour', rt.interval_start) = da.delivery_hour;

-- Rolling 1-hour price statistics per zone (HOP window)
CREATE MATERIALIZED VIEW hourly_price_stats AS
SELECT
    zone,
    iso,
    window_start,
    window_end,
    AVG(lmp_usd_mwh)                     AS avg_lmp,
    MAX(lmp_usd_mwh)                     AS max_lmp,
    MIN(lmp_usd_mwh)                     AS min_lmp,
    STDDEV(lmp_usd_mwh)                  AS stddev_lmp,
    COUNT(*)                             AS interval_count
FROM HOP(lmp_realtime, published_ts, INTERVAL '5 MINUTES', INTERVAL '1 HOUR')
GROUP BY zone, iso, window_start, window_end;

-- Congestion spread: high congestion nodes vs. hub
CREATE MATERIALIZED VIEW congestion_alerts_base AS
SELECT
    node_id,
    zone,
    iso,
    lmp_usd_mwh,
    congestion_component,
    interval_start,
    CASE
        WHEN congestion_component > 50  THEN 'SEVERE'
        WHEN congestion_component > 20  THEN 'MODERATE'
        WHEN congestion_component < -20 THEN 'NEGATIVE_CONGESTION'
        ELSE 'NORMAL'
    END AS congestion_level
FROM current_lmp;

Step 3: Alerting Logic

-- Price spike alert: LMP exceeds $500/MWh
CREATE MATERIALIZED VIEW price_spike_alerts AS
SELECT
    node_id,
    zone,
    iso,
    lmp_usd_mwh,
    congestion_component,
    interval_start,
    NOW()              AS alert_ts,
    'PRICE_SPIKE'      AS alert_type
FROM current_lmp
WHERE lmp_usd_mwh > 500;

-- Large DART spread alert: RT more than $100/MWh above DA
CREATE MATERIALIZED VIEW dart_spread_alerts AS
SELECT
    node_id,
    zone,
    iso,
    rt_lmp,
    da_lmp,
    dart_spread,
    interval_start,
    NOW()              AS alert_ts,
    'DART_SPREAD'      AS alert_type
FROM dart_spread
WHERE ABS(dart_spread) > 100;

-- Sink all price alerts to Kafka
CREATE SINK price_alert_sink AS
SELECT node_id, zone, iso, lmp_usd_mwh AS value, alert_ts, alert_type
FROM price_spike_alerts
UNION ALL
SELECT node_id, zone, iso, dart_spread AS value, alert_ts, alert_type
FROM dart_spread_alerts
WITH (
    connector = 'kafka',
    topic = 'trading.alerts.price',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Sink hourly stats to Postgres for dashboard
CREATE SINK hourly_stats_sink AS
SELECT * FROM hourly_price_stats
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://analytics-db:5432/trading',
    table.name = 'hourly_price_stats'
);

Comparison Table

ApproachLatencyMulti-nodeSpread AnalyticsCustom Thresholds
ISO API polling (5-min)5+ minutesManualPost-processingLimited
Vendor market data terminalNear real-timeYesDisplay onlyLimited
Custom Kafka consumer codeSecondsYesCustom logicYes (hard-coded)
RisingWave streaming SQLSecondsYesSQL GROUP BYSQL WHERE clause

FAQ

Q: Can RisingWave connect directly to ISO/RTO data APIs without a Kafka intermediary?
A: RisingWave natively supports Kafka, Pulsar, and Kinesis as streaming sources. For ISO REST APIs, a thin Python or Go connector publishes API responses to Kafka. Alternatively, you can use RisingWave's HTTP source (available in newer versions) for direct polling.

Q: How do I track price nodes across multiple ISO/RTOs (PJM, MISO, ERCOT) simultaneously?
A: The iso column in the source schema partitions prices by market. All ISO feeds publish to the same Kafka topic, and the current_lmp materialized view uses DISTINCT ON (node_id, iso) to maintain separate current prices per ISO, enabling cross-market spread queries.

Q: How do I build a spark spread (power vs. gas) alert using this setup?
A: Join current_lmp to the fuel_prices source filtered to NATURAL_GAS. Apply a heat rate assumption (e.g., 7 MMBtu/MWh) to convert gas price to power equivalent, then subtract from LMP to compute the spark spread in a materialized view.

Key Takeaways

  • RisingWave continuously tracks LMPs across hundreds of settlement nodes with sub-second freshness, enabling live spread and congestion analysis.
  • DART spread, congestion component, and rolling volatility calculations are expressed in pure SQL—no custom aggregation code required.
  • Alerts route to Kafka for trading system integration and to JDBC sinks for dashboard persistence.
  • The PostgreSQL-compatible wire protocol means your existing BI tools can query live price data like a database table.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.