Electricity prices can move from $30/MWh to $3,000/MWh within minutes during grid stress events. Traders, load-serving entities, and large industrials need live price visibility across settlement nodes, products, and time horizons. RisingWave, a PostgreSQL-compatible streaming database, continuously aggregates price ticks, computes spreads, and fires alerts the moment a threshold is crossed.
Why Real-Time Price Monitoring Matters
Electricity markets operate on 5-minute real-time intervals and hourly day-ahead auctions. The gap between the day-ahead price and the real-time price—the DA/RT spread—drives billions in energy trading and congestion revenue. Missing a spike or collapse by even a few minutes can mean the difference between profit and penalty.
Traditional approaches rely on polling ISO/RTO APIs on fixed schedules or subscribing to expensive vendor data feeds with rigid alerting rules. Streaming SQL enables a flexible, query-driven approach:
- Continuously track Locational Marginal Prices (LMPs) across hundreds of settlement nodes.
- Compute spread, basis, and rolling volatility in real time without manual scripts.
- Alert on configurable thresholds and automatically log price events for post-trade analysis.
The Streaming SQL Approach
ISO/RTOs publish LMP data via HTTPS APIs. A lightweight connector publishes these updates to Kafka. RisingWave ingests the Kafka stream, maintains continuously updated materialized views for current prices, spreads, and statistics, and routes alerts via Kafka sinks to trading desks and automated hedging systems.
Step-by-Step Tutorial
Step 1: Data Source Setup
-- Real-time LMP ticks (5-minute intervals from ISO/RTO)
CREATE SOURCE lmp_realtime (
node_id VARCHAR, -- settlement point, e.g. AEP_DAYTON
zone VARCHAR,
iso VARCHAR, -- PJM | MISO | ERCOT | CAISO | NYISO
lmp_usd_mwh DOUBLE PRECISION,
energy_component DOUBLE PRECISION,
congestion_component DOUBLE PRECISION,
loss_component DOUBLE PRECISION,
interval_start TIMESTAMPTZ,
published_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'iso.lmp.realtime',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Day-ahead hourly LMP (published daily, ~12h ahead of delivery)
CREATE SOURCE lmp_day_ahead (
node_id VARCHAR,
zone VARCHAR,
iso VARCHAR,
da_lmp_usd_mwh DOUBLE PRECISION,
delivery_hour TIMESTAMPTZ,
published_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'iso.lmp.day_ahead',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Fuel price index (natural gas, oil — used for spark/dark spread)
CREATE SOURCE fuel_prices (
fuel_type VARCHAR, -- NATURAL_GAS | OIL | COAL
hub VARCHAR, -- HH | WTI | API2
price DOUBLE PRECISION,
unit VARCHAR, -- USD_PER_MMBTU | USD_PER_BBL
price_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'commodity.fuel.prices',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Latest real-time LMP per node
CREATE MATERIALIZED VIEW current_lmp AS
SELECT DISTINCT ON (node_id, iso)
node_id, zone, iso,
lmp_usd_mwh, energy_component, congestion_component, loss_component,
interval_start, published_ts
FROM lmp_realtime
ORDER BY node_id, iso, published_ts DESC;
-- Latest day-ahead LMP per node and delivery hour
CREATE MATERIALIZED VIEW current_da_lmp AS
SELECT DISTINCT ON (node_id, iso, delivery_hour)
node_id, zone, iso, da_lmp_usd_mwh, delivery_hour
FROM lmp_day_ahead
ORDER BY node_id, iso, delivery_hour, published_ts DESC;
-- DA/RT spread: positive = RT above DA (intra-day bull signal)
CREATE MATERIALIZED VIEW dart_spread AS
SELECT
rt.node_id,
rt.zone,
rt.iso,
rt.lmp_usd_mwh AS rt_lmp,
da.da_lmp_usd_mwh AS da_lmp,
rt.lmp_usd_mwh - da.da_lmp_usd_mwh AS dart_spread,
rt.interval_start
FROM current_lmp rt
JOIN current_da_lmp da
ON rt.node_id = da.node_id
AND rt.iso = da.iso
AND DATE_TRUNC('hour', rt.interval_start) = da.delivery_hour;
-- Rolling 1-hour price statistics per zone (HOP window)
CREATE MATERIALIZED VIEW hourly_price_stats AS
SELECT
zone,
iso,
window_start,
window_end,
AVG(lmp_usd_mwh) AS avg_lmp,
MAX(lmp_usd_mwh) AS max_lmp,
MIN(lmp_usd_mwh) AS min_lmp,
STDDEV(lmp_usd_mwh) AS stddev_lmp,
COUNT(*) AS interval_count
FROM HOP(lmp_realtime, published_ts, INTERVAL '5 MINUTES', INTERVAL '1 HOUR')
GROUP BY zone, iso, window_start, window_end;
-- Congestion spread: high congestion nodes vs. hub
CREATE MATERIALIZED VIEW congestion_alerts_base AS
SELECT
node_id,
zone,
iso,
lmp_usd_mwh,
congestion_component,
interval_start,
CASE
WHEN congestion_component > 50 THEN 'SEVERE'
WHEN congestion_component > 20 THEN 'MODERATE'
WHEN congestion_component < -20 THEN 'NEGATIVE_CONGESTION'
ELSE 'NORMAL'
END AS congestion_level
FROM current_lmp;
Step 3: Alerting Logic
-- Price spike alert: LMP exceeds $500/MWh
CREATE MATERIALIZED VIEW price_spike_alerts AS
SELECT
node_id,
zone,
iso,
lmp_usd_mwh,
congestion_component,
interval_start,
NOW() AS alert_ts,
'PRICE_SPIKE' AS alert_type
FROM current_lmp
WHERE lmp_usd_mwh > 500;
-- Large DART spread alert: RT more than $100/MWh above DA
CREATE MATERIALIZED VIEW dart_spread_alerts AS
SELECT
node_id,
zone,
iso,
rt_lmp,
da_lmp,
dart_spread,
interval_start,
NOW() AS alert_ts,
'DART_SPREAD' AS alert_type
FROM dart_spread
WHERE ABS(dart_spread) > 100;
-- Sink all price alerts to Kafka
CREATE SINK price_alert_sink AS
SELECT node_id, zone, iso, lmp_usd_mwh AS value, alert_ts, alert_type
FROM price_spike_alerts
UNION ALL
SELECT node_id, zone, iso, dart_spread AS value, alert_ts, alert_type
FROM dart_spread_alerts
WITH (
connector = 'kafka',
topic = 'trading.alerts.price',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Sink hourly stats to Postgres for dashboard
CREATE SINK hourly_stats_sink AS
SELECT * FROM hourly_price_stats
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://analytics-db:5432/trading',
table.name = 'hourly_price_stats'
);
Comparison Table
| Approach | Latency | Multi-node | Spread Analytics | Custom Thresholds |
| ISO API polling (5-min) | 5+ minutes | Manual | Post-processing | Limited |
| Vendor market data terminal | Near real-time | Yes | Display only | Limited |
| Custom Kafka consumer code | Seconds | Yes | Custom logic | Yes (hard-coded) |
| RisingWave streaming SQL | Seconds | Yes | SQL GROUP BY | SQL WHERE clause |
FAQ
Q: Can RisingWave connect directly to ISO/RTO data APIs without a Kafka intermediary?
A: RisingWave natively supports Kafka, Pulsar, and Kinesis as streaming sources. For ISO REST APIs, a thin Python or Go connector publishes API responses to Kafka. Alternatively, you can use RisingWave's HTTP source (available in newer versions) for direct polling.
Q: How do I track price nodes across multiple ISO/RTOs (PJM, MISO, ERCOT) simultaneously?
A: The iso column in the source schema partitions prices by market. All ISO feeds publish to the same Kafka topic, and the current_lmp materialized view uses DISTINCT ON (node_id, iso) to maintain separate current prices per ISO, enabling cross-market spread queries.
Q: How do I build a spark spread (power vs. gas) alert using this setup?
A: Join current_lmp to the fuel_prices source filtered to NATURAL_GAS. Apply a heat rate assumption (e.g., 7 MMBtu/MWh) to convert gas price to power equivalent, then subtract from LMP to compute the spark spread in a materialized view.
Key Takeaways
- RisingWave continuously tracks LMPs across hundreds of settlement nodes with sub-second freshness, enabling live spread and congestion analysis.
- DART spread, congestion component, and rolling volatility calculations are expressed in pure SQL—no custom aggregation code required.
- Alerts route to Kafka for trading system integration and to JDBC sinks for dashboard persistence.
- The PostgreSQL-compatible wire protocol means your existing BI tools can query live price data like a database table.

