Energy traders and risk managers face volatile spot prices, renewable intermittency, and intraday position shifts that make end-of-day batch risk reports dangerously stale. RisingWave, a PostgreSQL-compatible streaming database, continuously updates mark-to-market P&L, VaR proxies, and limit utilization as trades and prices arrive—so your risk desk always has a live picture.
Why Energy Portfolio Risk Monitoring Matters
A single hour-ahead price spike in a liquid power market can swing an unhedged position by millions. Traditional risk systems run overnight batch jobs or hourly snapshots. By the time the report lands on the desk, the market has already moved.
Real-time risk monitoring addresses three critical needs:
- Mark-to-market (MtM) P&L — continuously revalue open positions against the latest spot and forward prices.
- Counterparty credit exposure — aggregate net exposure per counterparty against approved limits and alert on breaches.
- Generation vs. load imbalance — track scheduled versus actual MW output across generation assets in real time to anticipate imbalance penalties.
Streaming SQL makes this achievable without a dedicated quant engineering team. RisingWave's materialized views incrementally recompute as new price ticks and trade confirmations arrive.
The Streaming SQL Approach
The architecture ingests three Kafka streams: live market price ticks, trade confirmations, and real-time generation telemetry. Materialized views join and aggregate these streams continuously. Alerts sink back to Kafka or a JDBC database consumed by the risk dashboard.
Key RisingWave features used:
- Temporal joins — join a trade stream to a slowly changing price table using
FOR SYSTEM_TIME AS OF. - HOP windows — rolling 1-hour risk windows recalculated every 5 minutes.
- CREATE SINK — push breach alerts to Kafka and position snapshots to a Postgres-compatible data warehouse.
Step-by-Step Tutorial
Step 1: Data Source Setup
-- Real-time day-ahead and real-time spot price ticks ($/MWh)
CREATE SOURCE market_prices (
price_node VARCHAR, -- settlement point e.g. HB_NORTH
product VARCHAR, -- DA_HOURLY | RT_5MIN
delivery_hour TIMESTAMPTZ,
price_usd_mwh DOUBLE PRECISION,
tick_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'market.prices.ticks',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Trade confirmations from ETRM system
CREATE SOURCE trade_confirmations (
trade_id VARCHAR,
counterparty_id VARCHAR,
price_node VARCHAR,
product VARCHAR,
delivery_hour TIMESTAMPTZ,
volume_mwh DOUBLE PRECISION,
contracted_price DOUBLE PRECISION, -- $/MWh at execution
direction VARCHAR, -- BUY | SELL
confirmed_at TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'etrm.trades.confirmed',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Generation asset telemetry from EMS
CREATE SOURCE generation_telemetry (
asset_id VARCHAR,
price_node VARCHAR,
scheduled_mw DOUBLE PRECISION,
actual_mw DOUBLE PRECISION,
fuel_type VARCHAR, -- WIND | SOLAR | GAS | HYDRO
telemetry_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ems.generation.telemetry',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Latest price per node/product (point-in-time price table)
CREATE MATERIALIZED VIEW latest_prices AS
SELECT DISTINCT ON (price_node, product, delivery_hour)
price_node,
product,
delivery_hour,
price_usd_mwh,
tick_ts
FROM market_prices
ORDER BY price_node, product, delivery_hour, tick_ts DESC;
-- Mark-to-market P&L per trade
CREATE MATERIALIZED VIEW trade_mtm AS
SELECT
t.trade_id,
t.counterparty_id,
t.price_node,
t.delivery_hour,
t.volume_mwh,
t.direction,
t.contracted_price,
p.price_usd_mwh AS current_price,
CASE t.direction
WHEN 'BUY' THEN (p.price_usd_mwh - t.contracted_price) * t.volume_mwh
WHEN 'SELL' THEN (t.contracted_price - p.price_usd_mwh) * t.volume_mwh
END AS mtm_usd
FROM trade_confirmations t
JOIN latest_prices p
ON t.price_node = p.price_node
AND t.product = p.product
AND t.delivery_hour = p.delivery_hour;
-- Counterparty net exposure aggregation
CREATE MATERIALIZED VIEW counterparty_exposure AS
SELECT
counterparty_id,
SUM(mtm_usd) AS net_exposure_usd,
SUM(volume_mwh) AS net_volume_mwh,
COUNT(DISTINCT trade_id) AS open_trades,
MAX(delivery_hour) AS latest_delivery
FROM trade_mtm
GROUP BY counterparty_id;
-- Rolling generation imbalance (HOP window: 1h window, 5m slide)
CREATE MATERIALIZED VIEW generation_imbalance AS
SELECT
asset_id,
price_node,
fuel_type,
window_start,
window_end,
AVG(scheduled_mw) AS avg_scheduled_mw,
AVG(actual_mw) AS avg_actual_mw,
AVG(actual_mw - scheduled_mw) AS avg_imbalance_mw,
SUM(ABS(actual_mw - scheduled_mw)) AS total_abs_imbalance_mw
FROM HOP(generation_telemetry, telemetry_ts, INTERVAL '5 MINUTES', INTERVAL '1 HOUR')
GROUP BY asset_id, price_node, fuel_type, window_start, window_end;
Step 3: Alerting Logic
-- Alert when counterparty exposure exceeds $5M (parameterize per counterparty in production)
CREATE MATERIALIZED VIEW exposure_breach_alerts AS
SELECT
counterparty_id,
net_exposure_usd,
net_volume_mwh,
open_trades,
NOW() AS alert_ts,
'COUNTERPARTY_LIMIT_BREACH' AS alert_type
FROM counterparty_exposure
WHERE ABS(net_exposure_usd) > 5000000;
-- Alert on large generation imbalance (>50 MW sustained)
CREATE MATERIALIZED VIEW imbalance_alerts AS
SELECT
asset_id,
price_node,
fuel_type,
window_start,
avg_imbalance_mw,
NOW() AS alert_ts,
'GENERATION_IMBALANCE' AS alert_type
FROM generation_imbalance
WHERE ABS(avg_imbalance_mw) > 50;
-- Sink both alert types to Kafka
CREATE SINK risk_alert_sink AS
SELECT counterparty_id AS entity_id, net_exposure_usd AS value, alert_ts, alert_type
FROM exposure_breach_alerts
UNION ALL
SELECT asset_id, avg_imbalance_mw, alert_ts, alert_type
FROM imbalance_alerts
WITH (
connector = 'kafka',
topic = 'risk.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Capability | Batch Risk System | Custom Flink/Spark | RisingWave Streaming SQL |
| MtM refresh latency | Hours | Seconds (complex ops) | Seconds |
| Counterparty exposure | Daily snapshot | Requires custom UDFs | SQL GROUP BY |
| Imbalance tracking | Post-settlement | Near real-time | Real-time HOP windows |
| Engineer skill required | SQL + ETL | Scala/Java + ops | SQL only |
| PostgreSQL wire compatibility | Often no | No | Yes |
FAQ
Q: How does RisingWave handle late-arriving price ticks for already-computed MtM values?
A: RisingWave's incremental view maintenance automatically updates downstream materialized views when a new price tick arrives for a (price_node, product, delivery_hour) combination. The latest_prices view updates, which cascades to trade_mtm and then counterparty_exposure.
Q: Can I query historical position snapshots, not just the current state?
A: Yes. Sink the trade_mtm or counterparty_exposure views to an Iceberg table or a JDBC sink (PostgreSQL, Redshift) at regular intervals. You can then query the historical log from your data warehouse while RisingWave maintains the live state.
Q: How does RisingWave compare to using a time-series database like InfluxDB for this use case?
A: Time-series databases excel at single-stream metric storage but lack the SQL join and aggregation capabilities needed to correlate trade positions with market prices and generation telemetry. RisingWave handles all three streams in a unified SQL model with sub-second freshness.
Key Takeaways
- Streaming SQL in RisingWave enables continuous MtM revaluation, counterparty exposure tracking, and generation imbalance monitoring without custom code.
- Temporal joins and HOP windows handle the rolling, multi-stream nature of energy risk data natively in SQL.
- Alerts integrate with existing Kafka-based notification and dashboard infrastructure via
CREATE SINK. - The PostgreSQL-compatible interface means risk analysts can query live positions with familiar tools like psql, DBeaver, or Grafana.

