Real-Time Risk Management for Financial Services with RisingWave

Real-Time Risk Management for Financial Services with RisingWave

Real-time risk management with RisingWave means computing credit exposure, market risk metrics, and counterparty concentrations continuously as trades execute and markets move—using materialized views that update within milliseconds—rather than running risk batch jobs at fixed intervals that leave institutions blind to intraday risk buildup.

The Intraday Risk Gap

Financial institutions have long accepted a dangerous assumption: that end-of-day risk calculations are sufficient for managing intraday exposure. This assumption has been shattered repeatedly—by rogue trader incidents, by flash crashes, by counterparty defaults that moved from rumor to fact in hours.

Regulators now expect intraday risk monitoring as a baseline. Basel III's Fundamental Review of the Trading Book (FRTB), CCAR stress testing, and DFAST all push toward more granular, more frequent risk computation. But the infrastructure challenge is real: computing Value at Risk, credit exposure, and concentration limits requires joining positions with live market data, counterparty data, and limit hierarchies—a multi-dimensional join problem at scale.

RisingWave solves this with streaming SQL: define the risk computation as a materialized view, and it stays current automatically.

Ingesting Position and Market Risk Data

-- Live position updates from trade booking system
CREATE SOURCE trading_positions (
    position_id         VARCHAR,
    instrument_id       VARCHAR,
    counterparty_id     VARCHAR,
    desk_id             VARCHAR,
    business_unit       VARCHAR,
    long_quantity       DECIMAL(18,6),
    short_quantity      DECIMAL(18,6),
    avg_cost            DECIMAL(18,6),
    currency            VARCHAR(3),
    asset_class         VARCHAR,    -- 'EQUITY', 'FX', 'RATES', 'CREDIT', 'COMMODITY'
    position_time       TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'trading-positions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Risk factor updates: market prices, volatilities, credit spreads
CREATE SOURCE risk_factor_updates (
    factor_id           VARCHAR,
    factor_type         VARCHAR,    -- 'PRICE', 'VOLATILITY', 'CREDIT_SPREAD', 'FX_RATE'
    instrument_id       VARCHAR,
    value               DECIMAL(18,8),
    currency            VARCHAR(3),
    update_time         TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'risk-factors',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Computing Live Risk Metrics

-- Risk limits reference table
CREATE TABLE risk_limits (
    entity_id           VARCHAR,
    entity_type         VARCHAR,    -- 'DESK', 'BUSINESS_UNIT', 'COUNTERPARTY'
    limit_type          VARCHAR,    -- 'NOTIONAL', 'PNL_STOP_LOSS', 'CONCENTRATION'
    limit_value         DECIMAL(18,2),
    currency            VARCHAR(3),
    is_active           BOOLEAN,
    PRIMARY KEY (entity_id, limit_type)
);

-- Net notional exposure per desk per asset class
CREATE MATERIALIZED VIEW desk_notional_exposure AS
SELECT
    desk_id,
    asset_class,
    currency,
    SUM((long_quantity - short_quantity) * avg_cost)    AS net_notional,
    SUM(long_quantity * avg_cost)                       AS gross_long,
    SUM(short_quantity * avg_cost)                      AS gross_short,
    COUNT(DISTINCT instrument_id)                       AS instrument_count,
    COUNT(DISTINCT counterparty_id)                     AS counterparty_count,
    MAX(position_time)                                  AS as_of_time
FROM trading_positions
GROUP BY desk_id, asset_class, currency;

-- Counterparty credit exposure (gross notional by counterparty)
CREATE MATERIALIZED VIEW counterparty_exposure AS
SELECT
    counterparty_id,
    asset_class,
    SUM(CASE WHEN long_quantity > short_quantity
             THEN (long_quantity - short_quantity) * avg_cost
             ELSE 0 END)                                AS long_exposure,
    SUM(CASE WHEN short_quantity > long_quantity
             THEN (short_quantity - long_quantity) * avg_cost
             ELSE 0 END)                                AS short_exposure,
    SUM(ABS(long_quantity - short_quantity) * avg_cost) AS gross_exposure,
    COUNT(DISTINCT desk_id)                             AS desk_count,
    MAX(position_time)                                  AS as_of_time
FROM trading_positions
GROUP BY counterparty_id, asset_class;

-- Live limit breach monitoring
CREATE MATERIALIZED VIEW limit_breach_monitor AS
SELECT
    dne.desk_id,
    dne.asset_class,
    dne.net_notional,
    rl.limit_value AS notional_limit,
    dne.net_notional / NULLIF(rl.limit_value, 0) AS utilization_pct,
    CASE WHEN ABS(dne.net_notional) > rl.limit_value THEN true
         ELSE false END AS is_breached,
    dne.as_of_time
FROM desk_notional_exposure dne
JOIN risk_limits rl
    ON dne.desk_id = rl.entity_id
    AND rl.limit_type = 'NOTIONAL'
    AND rl.is_active = true;

Concentration Risk Detection

-- Concentration risk: positions where single counterparty exceeds 10% of total exposure
CREATE MATERIALIZED VIEW concentration_risk AS
WITH total_exposure AS (
    SELECT
        asset_class,
        SUM(gross_exposure) AS total_gross
    FROM counterparty_exposure
    GROUP BY asset_class
)
SELECT
    ce.counterparty_id,
    ce.asset_class,
    ce.gross_exposure,
    te.total_gross,
    ce.gross_exposure / NULLIF(te.total_gross, 0)   AS concentration_pct,
    CASE WHEN ce.gross_exposure / NULLIF(te.total_gross, 0) > 0.10
         THEN true ELSE false END                   AS exceeds_10pct_threshold
FROM counterparty_exposure ce
JOIN total_exposure te ON ce.asset_class = te.asset_class;

-- Sink limit breaches and concentration alerts to risk operations
CREATE SINK risk_alerts_feed AS
SELECT
    desk_id     AS entity_id,
    'DESK'      AS entity_type,
    asset_class,
    net_notional AS current_exposure,
    notional_limit,
    utilization_pct,
    'LIMIT_BREACH' AS alert_type,
    as_of_time
FROM limit_breach_monitor
WHERE is_breached = true
WITH (
    connector = 'kafka',
    properties.bootstrap.server = 'kafka:9092',
    topic = 'risk-alerts'
) FORMAT PLAIN ENCODE JSON;

Intraday P&L Stop-Loss Monitoring

-- Rolling intraday P&L vs stop-loss limits
CREATE MATERIALIZED VIEW intraday_pnl_monitor AS
SELECT
    tp.desk_id,
    SUM((rf.value - tp.avg_cost) *
        (tp.long_quantity - tp.short_quantity))         AS unrealized_pnl,
    rl.limit_value                                      AS stop_loss_limit,
    CASE WHEN SUM((rf.value - tp.avg_cost) *
                  (tp.long_quantity - tp.short_quantity))
              < -ABS(rl.limit_value) THEN true
         ELSE false END                                 AS stop_loss_breached
FROM trading_positions tp
JOIN risk_factor_updates rf
    ON tp.instrument_id = rf.instrument_id
    AND rf.factor_type = 'PRICE'
LEFT JOIN risk_limits rl
    ON tp.desk_id = rl.entity_id
    AND rl.limit_type = 'PNL_STOP_LOSS'
GROUP BY tp.desk_id, rl.limit_value;

Comparison: Batch vs. Real-Time Risk Management

DimensionBatch Risk (EOD/Hourly)Streaming with RisingWave
Risk visibility lag1 hour to end-of-dayMilliseconds
Limit breach responseAfter the factReal-time alert + automated control
Intraday position trackingSnapshot at batch timeContinuous
Concentration risk detectionDailyPer trade
Regulatory reporting readinessNext-dayIntraday
Infrastructure complexityHigh (batch schedulers, jobs)Lower (streaming SQL)

FAQ

Q: Can RisingWave compute Value at Risk (VaR) in real time? A: Historical simulation VaR requires a look-back window of P&L scenarios (typically 250 days). You can store historical risk factor changes in a CREATE TABLE, join them with current positions in a materialized view, and compute percentile-based VaR using window functions. This is computationally intensive but feasible for desk-level VaR at moderate position counts.

Q: How do we integrate with existing risk systems like Murex or Calypso? A: Most risk systems can emit position and trade events to Kafka. RisingWave ingests these events as sources. Computed risk metrics can be sunk back to Kafka for consumption by Murex/Calypso reporting modules, or queried directly via the PostgreSQL interface by custom risk dashboards.

Q: What happens if a risk factor feed goes down during trading hours? A: RisingWave's materialized views continue serving the last-computed values. Implement a staleness check by monitoring MAX(update_time) in risk factor views—alert when feeds are stale beyond a threshold. For critical production systems, run redundant risk factor feeds from multiple providers.

Q: How do we handle cross-currency exposure netting? A: Maintain an FX rates table updated via CDC or periodic upsert. Add currency conversion logic in your desk_notional_exposure view to normalize all exposures to a base currency (e.g., USD) before aggregating.

Q: Is RisingWave suitable for real-time stress testing? A: Yes. Stress scenarios are essentially factor shocks applied to current positions. Store shock scenarios in a CREATE TABLE, cross-join with current positions in a materialized view, and compute stressed P&L. The view updates as positions change; rerun stress scenarios by updating the scenario table.


Get Started

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.