Real-time risk management with RisingWave means computing credit exposure, market risk metrics, and counterparty concentrations continuously as trades execute and markets move—using materialized views that update within milliseconds—rather than running risk batch jobs at fixed intervals that leave institutions blind to intraday risk buildup.
The Intraday Risk Gap
Financial institutions have long accepted a dangerous assumption: that end-of-day risk calculations are sufficient for managing intraday exposure. This assumption has been shattered repeatedly—by rogue trader incidents, by flash crashes, by counterparty defaults that moved from rumor to fact in hours.
Regulators now expect intraday risk monitoring as a baseline. Basel III's Fundamental Review of the Trading Book (FRTB), CCAR stress testing, and DFAST all push toward more granular, more frequent risk computation. But the infrastructure challenge is real: computing Value at Risk, credit exposure, and concentration limits requires joining positions with live market data, counterparty data, and limit hierarchies—a multi-dimensional join problem at scale.
RisingWave solves this with streaming SQL: define the risk computation as a materialized view, and it stays current automatically.
Ingesting Position and Market Risk Data
-- Live position updates from trade booking system
CREATE SOURCE trading_positions (
position_id VARCHAR,
instrument_id VARCHAR,
counterparty_id VARCHAR,
desk_id VARCHAR,
business_unit VARCHAR,
long_quantity DECIMAL(18,6),
short_quantity DECIMAL(18,6),
avg_cost DECIMAL(18,6),
currency VARCHAR(3),
asset_class VARCHAR, -- 'EQUITY', 'FX', 'RATES', 'CREDIT', 'COMMODITY'
position_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'trading-positions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Risk factor updates: market prices, volatilities, credit spreads
CREATE SOURCE risk_factor_updates (
factor_id VARCHAR,
factor_type VARCHAR, -- 'PRICE', 'VOLATILITY', 'CREDIT_SPREAD', 'FX_RATE'
instrument_id VARCHAR,
value DECIMAL(18,8),
currency VARCHAR(3),
update_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'risk-factors',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Computing Live Risk Metrics
-- Risk limits reference table
CREATE TABLE risk_limits (
entity_id VARCHAR,
entity_type VARCHAR, -- 'DESK', 'BUSINESS_UNIT', 'COUNTERPARTY'
limit_type VARCHAR, -- 'NOTIONAL', 'PNL_STOP_LOSS', 'CONCENTRATION'
limit_value DECIMAL(18,2),
currency VARCHAR(3),
is_active BOOLEAN,
PRIMARY KEY (entity_id, limit_type)
);
-- Net notional exposure per desk per asset class
CREATE MATERIALIZED VIEW desk_notional_exposure AS
SELECT
desk_id,
asset_class,
currency,
SUM((long_quantity - short_quantity) * avg_cost) AS net_notional,
SUM(long_quantity * avg_cost) AS gross_long,
SUM(short_quantity * avg_cost) AS gross_short,
COUNT(DISTINCT instrument_id) AS instrument_count,
COUNT(DISTINCT counterparty_id) AS counterparty_count,
MAX(position_time) AS as_of_time
FROM trading_positions
GROUP BY desk_id, asset_class, currency;
-- Counterparty credit exposure (gross notional by counterparty)
CREATE MATERIALIZED VIEW counterparty_exposure AS
SELECT
counterparty_id,
asset_class,
SUM(CASE WHEN long_quantity > short_quantity
THEN (long_quantity - short_quantity) * avg_cost
ELSE 0 END) AS long_exposure,
SUM(CASE WHEN short_quantity > long_quantity
THEN (short_quantity - long_quantity) * avg_cost
ELSE 0 END) AS short_exposure,
SUM(ABS(long_quantity - short_quantity) * avg_cost) AS gross_exposure,
COUNT(DISTINCT desk_id) AS desk_count,
MAX(position_time) AS as_of_time
FROM trading_positions
GROUP BY counterparty_id, asset_class;
-- Live limit breach monitoring
CREATE MATERIALIZED VIEW limit_breach_monitor AS
SELECT
dne.desk_id,
dne.asset_class,
dne.net_notional,
rl.limit_value AS notional_limit,
dne.net_notional / NULLIF(rl.limit_value, 0) AS utilization_pct,
CASE WHEN ABS(dne.net_notional) > rl.limit_value THEN true
ELSE false END AS is_breached,
dne.as_of_time
FROM desk_notional_exposure dne
JOIN risk_limits rl
ON dne.desk_id = rl.entity_id
AND rl.limit_type = 'NOTIONAL'
AND rl.is_active = true;
Concentration Risk Detection
-- Concentration risk: positions where single counterparty exceeds 10% of total exposure
CREATE MATERIALIZED VIEW concentration_risk AS
WITH total_exposure AS (
SELECT
asset_class,
SUM(gross_exposure) AS total_gross
FROM counterparty_exposure
GROUP BY asset_class
)
SELECT
ce.counterparty_id,
ce.asset_class,
ce.gross_exposure,
te.total_gross,
ce.gross_exposure / NULLIF(te.total_gross, 0) AS concentration_pct,
CASE WHEN ce.gross_exposure / NULLIF(te.total_gross, 0) > 0.10
THEN true ELSE false END AS exceeds_10pct_threshold
FROM counterparty_exposure ce
JOIN total_exposure te ON ce.asset_class = te.asset_class;
-- Sink limit breaches and concentration alerts to risk operations
CREATE SINK risk_alerts_feed AS
SELECT
desk_id AS entity_id,
'DESK' AS entity_type,
asset_class,
net_notional AS current_exposure,
notional_limit,
utilization_pct,
'LIMIT_BREACH' AS alert_type,
as_of_time
FROM limit_breach_monitor
WHERE is_breached = true
WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'risk-alerts'
) FORMAT PLAIN ENCODE JSON;
Intraday P&L Stop-Loss Monitoring
-- Rolling intraday P&L vs stop-loss limits
CREATE MATERIALIZED VIEW intraday_pnl_monitor AS
SELECT
tp.desk_id,
SUM((rf.value - tp.avg_cost) *
(tp.long_quantity - tp.short_quantity)) AS unrealized_pnl,
rl.limit_value AS stop_loss_limit,
CASE WHEN SUM((rf.value - tp.avg_cost) *
(tp.long_quantity - tp.short_quantity))
< -ABS(rl.limit_value) THEN true
ELSE false END AS stop_loss_breached
FROM trading_positions tp
JOIN risk_factor_updates rf
ON tp.instrument_id = rf.instrument_id
AND rf.factor_type = 'PRICE'
LEFT JOIN risk_limits rl
ON tp.desk_id = rl.entity_id
AND rl.limit_type = 'PNL_STOP_LOSS'
GROUP BY tp.desk_id, rl.limit_value;
Comparison: Batch vs. Real-Time Risk Management
| Dimension | Batch Risk (EOD/Hourly) | Streaming with RisingWave |
| Risk visibility lag | 1 hour to end-of-day | Milliseconds |
| Limit breach response | After the fact | Real-time alert + automated control |
| Intraday position tracking | Snapshot at batch time | Continuous |
| Concentration risk detection | Daily | Per trade |
| Regulatory reporting readiness | Next-day | Intraday |
| Infrastructure complexity | High (batch schedulers, jobs) | Lower (streaming SQL) |
FAQ
Q: Can RisingWave compute Value at Risk (VaR) in real time?
A: Historical simulation VaR requires a look-back window of P&L scenarios (typically 250 days). You can store historical risk factor changes in a CREATE TABLE, join them with current positions in a materialized view, and compute percentile-based VaR using window functions. This is computationally intensive but feasible for desk-level VaR at moderate position counts.
Q: How do we integrate with existing risk systems like Murex or Calypso? A: Most risk systems can emit position and trade events to Kafka. RisingWave ingests these events as sources. Computed risk metrics can be sunk back to Kafka for consumption by Murex/Calypso reporting modules, or queried directly via the PostgreSQL interface by custom risk dashboards.
Q: What happens if a risk factor feed goes down during trading hours?
A: RisingWave's materialized views continue serving the last-computed values. Implement a staleness check by monitoring MAX(update_time) in risk factor views—alert when feeds are stale beyond a threshold. For critical production systems, run redundant risk factor feeds from multiple providers.
Q: How do we handle cross-currency exposure netting?
A: Maintain an FX rates table updated via CDC or periodic upsert. Add currency conversion logic in your desk_notional_exposure view to normalize all exposures to a base currency (e.g., USD) before aggregating.
Q: Is RisingWave suitable for real-time stress testing?
A: Yes. Stress scenarios are essentially factor shocks applied to current positions. Store shock scenarios in a CREATE TABLE, cross-join with current positions in a materialized view, and compute stressed P&L. The view updates as positions change; rerun stress scenarios by updating the scenario table.
Get Started
- Deploy your first real-time risk management pipeline with the RisingWave documentation.
- Join the RisingWave Slack to discuss risk system architectures with the community.

