Real-Time P&L Calculation for Trading Desks with Streaming SQL

Real-Time P&L Calculation for Trading Desks with Streaming SQL

Real-time P&L calculation means continuously computing profit and loss as trades execute and market prices move, rather than running batch jobs at end-of-day. With RisingWave's streaming SQL, trading desks can maintain live materialized views that aggregate positions, apply current mark-to-market prices, and surface unrealized gains or losses within milliseconds of a price tick.

Why Batch P&L Is No Longer Acceptable

Traditional trading desk infrastructure runs P&L calculations in batch windows—sometimes hourly, sometimes only at end-of-day. By the time a risk manager sees a number, positions have shifted, prices have moved, and the calculation is already stale. In volatile markets, stale P&L is not just inconvenient—it is a risk management failure.

The core challenge is that P&L is a join problem: you must continuously join live position data with streaming market prices, apply cost-basis adjustments, and roll up results across hierarchies (trader → desk → book → portfolio). Doing that in a relational database with repeated queries is expensive and slow. Streaming SQL solves it differently—by keeping the join result incrementally maintained as a materialized view.

Architecture Overview

A streaming P&L system on RisingWave looks like this:

  1. Trade feed — ingested from Kafka (FIX messages, exchange adapters)
  2. Market data feed — ingested from Kafka (price ticks from Bloomberg, Refinitiv, exchange feeds)
  3. Reference data — instrument master, desk hierarchy loaded as tables
  4. Materialized views — incrementally maintained P&L by position, desk, and portfolio
-- Ingest trade executions from Kafka
CREATE SOURCE trade_executions (
    trade_id        VARCHAR,
    instrument_id   VARCHAR,
    desk_id         VARCHAR,
    trader_id       VARCHAR,
    side            VARCHAR,       -- 'BUY' or 'SELL'
    quantity        DECIMAL(18,6),
    exec_price      DECIMAL(18,6),
    exec_time       TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'trade-executions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Ingest real-time market price ticks
CREATE SOURCE market_prices (
    instrument_id   VARCHAR,
    bid             DECIMAL(18,6),
    ask             DECIMAL(18,6),
    mid_price       DECIMAL(18,6),
    price_time      TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'market-prices',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Computing Live Net Positions

Before calculating P&L, you need net positions per instrument per desk. A materialized view handles this incrementally:

-- Net position and average cost per instrument per desk
CREATE MATERIALIZED VIEW net_positions AS
SELECT
    instrument_id,
    desk_id,
    SUM(CASE WHEN side = 'BUY'  THEN  quantity
             WHEN side = 'SELL' THEN -quantity
             ELSE 0 END)                         AS net_quantity,
    SUM(CASE WHEN side = 'BUY'  THEN  quantity * exec_price
             WHEN side = 'SELL' THEN -quantity * exec_price
             ELSE 0 END)                         AS net_cost_basis,
    COUNT(*) FILTER (WHERE side = 'BUY')         AS buy_trade_count,
    COUNT(*) FILTER (WHERE side = 'SELL')        AS sell_trade_count,
    MAX(exec_time)                               AS last_trade_time
FROM trade_executions
GROUP BY instrument_id, desk_id;

-- Mark-to-market P&L joining positions with latest prices
CREATE MATERIALIZED VIEW desk_pnl AS
SELECT
    np.desk_id,
    np.instrument_id,
    np.net_quantity,
    np.net_cost_basis,
    mp.mid_price                                                AS current_price,
    np.net_quantity * mp.mid_price                             AS market_value,
    (np.net_quantity * mp.mid_price) - np.net_cost_basis       AS unrealized_pnl,
    np.last_trade_time,
    mp.price_time                                              AS price_as_of
FROM net_positions np
JOIN market_prices mp
    ON np.instrument_id = mp.instrument_id;

Every time a new price tick arrives for an instrument, RisingWave recomputes only the affected rows in desk_pnl—not the entire table. This is the core efficiency gain over batch queries.

Rolling Up P&L Across the Desk Hierarchy

-- Aggregate unrealized P&L by desk for risk dashboards
CREATE MATERIALIZED VIEW desk_pnl_summary AS
SELECT
    desk_id,
    COUNT(DISTINCT instrument_id)          AS open_positions,
    SUM(market_value)                      AS total_market_value,
    SUM(unrealized_pnl)                    AS total_unrealized_pnl,
    SUM(unrealized_pnl) FILTER
        (WHERE unrealized_pnl < 0)         AS total_losing_pnl,
    SUM(unrealized_pnl) FILTER
        (WHERE unrealized_pnl > 0)         AS total_winning_pnl,
    MAX(price_as_of)                       AS data_freshness
FROM desk_pnl
GROUP BY desk_id;

Results are served directly via RisingWave's PostgreSQL-compatible wire protocol—risk dashboards can query desk_pnl_summary using any Postgres client with sub-millisecond latency because the answer is pre-computed.

Comparison: Batch vs. Streaming P&L

DimensionBatch (Nightly/Hourly)Streaming with RisingWave
Data freshnessMinutes to hoursMilliseconds
InfrastructureETL jobs + data warehouseStreaming SQL + materialized views
Query latencySeconds to minutesSub-millisecond (pre-computed)
Risk during market hoursHigh (stale data)Low (always current)
Operational complexityHigh (job scheduling, failures)Low (always-on views)
ScalabilityHorizontal scale costlyKafka partitions + RW workers

Operational Considerations

Backfilling historical positions. When RisingWave starts, the Kafka source can replay from the earliest offset, rebuilding net positions from trade history before the view goes live.

Late-arriving data. Exchange confirmations sometimes arrive seconds or minutes after execution. RisingWave's append-only sources handle late events by reprocessing them into materialized views as they arrive.

Price staleness detection. The data_freshness column in desk_pnl_summary lets monitoring systems alert when price feeds go stale—a simple WHERE data_freshness < NOW() - INTERVAL '30 seconds' query surfaces lagging feeds.

FAQ

Q: Can RisingWave handle the throughput of high-frequency market data feeds? A: Yes. RisingWave ingests from Kafka, which decouples ingestion throughput from processing. Market price topics with thousands of ticks per second are processed by multiple workers, and only affected materialized view rows are updated. For extremely high tick rates, you can pre-aggregate prices to best-bid/best-ask in a Kafka Streams layer before ingesting.

Q: How do we handle corporate actions (splits, dividends) that change cost basis? A: Model corporate actions as adjustment records in the trade_executions source with a special side value (e.g., 'ADJUST'), and update the CASE expressions in net_positions to apply them. Reference data tables can store corporate action factors joined at query time.

Q: Is the PostgreSQL-compatible interface production-grade for trading applications? A: RisingWave exposes a full PostgreSQL wire protocol. Any Postgres client library—psycopg2, JDBC, node-postgres—connects without modification. For production trading dashboards, read-replicas can be fronted with a connection pool like PgBouncer.

Q: What happens if a Kafka broker goes down during market hours? A: RisingWave checkpoints its processing state. When the broker recovers, processing resumes exactly from the last committed offset. No trades are double-counted and no P&L updates are lost.

Q: Can we sink P&L results back to Kafka for downstream consumers? A: Yes. Use CREATE SINK ... WITH (connector = 'kafka') on any materialized view. Risk systems, compliance monitors, and client-facing APIs can all consume the same P&L stream.


Get Started

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.