A real-time energy trading platform built on streaming SQL continuously ingests market prices, trade executions, and physical delivery data to maintain live position books, enforce pre-trade risk limits, and feed settlement calculations—replacing batch-heavy architectures with a single PostgreSQL-compatible streaming database.
Why This Matters for Energy Operators
Energy markets move fast. Day-ahead and intraday power prices can shift by 10–30% within minutes during demand spikes or renewable generation swings. Traders who rely on end-of-day position reports are flying blind during intraday sessions. Risk managers who receive batch P&L updates cannot intervene before margin breaches become losses.
Modern energy trading requires continuous visibility into:
- Open positions across all delivery periods and hubs
- Mark-to-market P&L updated with every new price tick
- Exposure limits enforced in near-real time, not checked at batch close
- Physical vs. financial imbalance reconciled as trades and metering flow in
Streaming SQL makes this possible without a custom trading platform built from scratch. The logic lives in SQL views; the state lives in the database.
How Streaming SQL Works for Energy Data
RisingWave, a PostgreSQL-compatible streaming database, ingests trade and price streams from Kafka and maintains incrementally updated materialized views for each layer of the trading stack—position, risk, and settlement. These views are always current and queryable over Postgres connections, so existing trading dashboards and risk systems integrate without new APIs.
The key advantage over event-sourcing frameworks or custom stream processors is that the entire trading logic—aggregations, joins, conditionals—is expressed in SQL, making it auditable, versionable, and modifiable without redeploying application code.
Building the System: Step by Step
Step 1: Connect the Data Source
Ingest trade executions and market price ticks from separate Kafka topics:
-- Trade execution stream
CREATE SOURCE trade_executions (
trade_id VARCHAR,
trader_id VARCHAR,
book_id VARCHAR,
delivery_period VARCHAR,
hub VARCHAR,
direction VARCHAR, -- 'BUY' or 'SELL'
volume_mwh DOUBLE PRECISION,
price_per_mwh DOUBLE PRECISION,
trade_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'trading.executions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Real-time price feed
CREATE SOURCE market_prices (
hub VARCHAR,
delivery_period VARCHAR,
bid_price DOUBLE PRECISION,
ask_price DOUBLE PRECISION,
mid_price DOUBLE PRECISION,
price_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'market.prices',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build Real-Time Aggregations
Maintain a live position book and mark-to-market P&L:
-- Live position per book, delivery period, and hub
CREATE MATERIALIZED VIEW open_positions AS
SELECT
book_id,
delivery_period,
hub,
SUM(CASE WHEN direction = 'BUY' THEN volume_mwh ELSE 0 END) AS long_mwh,
SUM(CASE WHEN direction = 'SELL' THEN volume_mwh ELSE 0 END) AS short_mwh,
SUM(CASE WHEN direction = 'BUY' THEN volume_mwh ELSE -volume_mwh END) AS net_mwh,
SUM(CASE WHEN direction = 'BUY' THEN -volume_mwh * price_per_mwh
ELSE volume_mwh * price_per_mwh END) AS cash_flow
FROM trade_executions
GROUP BY book_id, delivery_period, hub;
-- Latest mid price per hub/period
CREATE MATERIALIZED VIEW latest_prices AS
SELECT DISTINCT ON (hub, delivery_period)
hub, delivery_period, mid_price, price_ts
FROM market_prices
ORDER BY hub, delivery_period, price_ts DESC;
-- Mark-to-market P&L
CREATE MATERIALIZED VIEW mtm_pnl AS
SELECT
p.book_id,
p.delivery_period,
p.hub,
p.net_mwh,
p.cash_flow,
lp.mid_price AS current_price,
p.net_mwh * lp.mid_price + p.cash_flow AS unrealized_pnl
FROM open_positions p
JOIN latest_prices lp ON p.hub = lp.hub AND p.delivery_period = lp.delivery_period;
Step 3: Detect Anomalies and Generate Alerts
Enforce pre-trade risk limits and alert on exposures:
CREATE MATERIALIZED VIEW risk_breaches AS
SELECT
m.book_id,
m.delivery_period,
m.hub,
m.net_mwh,
m.unrealized_pnl,
r.max_net_position_mwh,
r.max_loss_threshold,
CASE
WHEN ABS(m.net_mwh) > r.max_net_position_mwh THEN 'POSITION_LIMIT'
WHEN m.unrealized_pnl < -r.max_loss_threshold THEN 'LOSS_LIMIT'
ELSE NULL
END AS breach_type
FROM mtm_pnl m
JOIN risk_limits r ON m.book_id = r.book_id
WHERE
ABS(m.net_mwh) > r.max_net_position_mwh
OR m.unrealized_pnl < -r.max_loss_threshold;
Step 4: Integrate with SCADA/EMS Downstream
Push risk breach alerts and settlement-ready positions to downstream systems:
-- Risk alerts to trading risk management system
CREATE SINK risk_alert_sink
FROM risk_breaches
WITH (
connector = 'kafka',
topic = 'trading.risk.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Settled positions to settlement engine
CREATE SINK settlement_sink
FROM open_positions
WITH (
connector = 'kafka',
topic = 'trading.settlement.positions',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison: Batch vs Streaming
| Capability | Batch Trading Stack | Streaming SQL |
| Position update frequency | End of day or hourly | Every trade execution |
| MTM P&L freshness | Lagged by batch interval | Updated per price tick |
| Risk limit enforcement | Post-trade batch check | Continuous near-real-time |
| Settlement reconciliation | Overnight batch | Continuous, replay-capable |
| Infrastructure | Multiple ETL jobs + DBs | Single streaming database |
| Auditability | Pipeline logs | SQL view version history |
| New market onboarding | New ETL pipeline | New Kafka source + view |
FAQ
How does the system handle trade cancellations or amendments?
Trade cancellations can be modeled as new rows with a CANCEL direction that nets out the original volume and cash flow. The materialized view aggregation handles this naturally. Alternatively, use a CDC source on a trade database where updates and deletes propagate as change events.
Can the platform handle multiple markets (power, gas, certificates) in the same system?
Yes. Add a commodity or market column to the source schema and include it in the GROUP BY clause of the position view. Risk limits can be defined per book-market combination in the reference table.
What is the latency between a trade execution arriving in Kafka and the risk breach alert being emitted? Latency depends on Kafka consumer lag, view complexity, and cluster sizing. In practice, with a properly sized RisingWave deployment, end-to-end latency from Kafka message to updated materialized view is typically in the low hundreds of milliseconds.
Key Takeaways
- Streaming SQL consolidates position tracking, MTM P&L, and risk monitoring into a single database with no separate batch jobs.
- Materialized views over Kafka sources maintain live state automatically—traders and risk managers always see the current book.
- PostgreSQL compatibility means existing trading dashboards connect without new APIs or middleware.
- Risk limits enforced via continuously maintained views provide near-real-time pre-trade and post-trade guardrails.
- The architecture scales horizontally: new markets, new books, and new risk dimensions are added as SQL view amendments.
Further reading:

