Real-time P&L calculation means continuously computing profit and loss as trades execute and market prices move, rather than running batch jobs at end-of-day. With RisingWave's streaming SQL, trading desks can maintain live materialized views that aggregate positions, apply current mark-to-market prices, and surface unrealized gains or losses within milliseconds of a price tick.
Why Batch P&L Is No Longer Acceptable
Traditional trading desk infrastructure runs P&L calculations in batch windows—sometimes hourly, sometimes only at end-of-day. By the time a risk manager sees a number, positions have shifted, prices have moved, and the calculation is already stale. In volatile markets, stale P&L is not just inconvenient—it is a risk management failure.
The core challenge is that P&L is a join problem: you must continuously join live position data with streaming market prices, apply cost-basis adjustments, and roll up results across hierarchies (trader → desk → book → portfolio). Doing that in a relational database with repeated queries is expensive and slow. Streaming SQL solves it differently—by keeping the join result incrementally maintained as a materialized view.
Architecture Overview
A streaming P&L system on RisingWave looks like this:
- Trade feed — ingested from Kafka (FIX messages, exchange adapters)
- Market data feed — ingested from Kafka (price ticks from Bloomberg, Refinitiv, exchange feeds)
- Reference data — instrument master, desk hierarchy loaded as tables
- Materialized views — incrementally maintained P&L by position, desk, and portfolio
-- Ingest trade executions from Kafka
CREATE SOURCE trade_executions (
trade_id VARCHAR,
instrument_id VARCHAR,
desk_id VARCHAR,
trader_id VARCHAR,
side VARCHAR, -- 'BUY' or 'SELL'
quantity DECIMAL(18,6),
exec_price DECIMAL(18,6),
exec_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'trade-executions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Ingest real-time market price ticks
CREATE SOURCE market_prices (
instrument_id VARCHAR,
bid DECIMAL(18,6),
ask DECIMAL(18,6),
mid_price DECIMAL(18,6),
price_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'market-prices',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Computing Live Net Positions
Before calculating P&L, you need net positions per instrument per desk. A materialized view handles this incrementally:
-- Net position and average cost per instrument per desk
CREATE MATERIALIZED VIEW net_positions AS
SELECT
instrument_id,
desk_id,
SUM(CASE WHEN side = 'BUY' THEN quantity
WHEN side = 'SELL' THEN -quantity
ELSE 0 END) AS net_quantity,
SUM(CASE WHEN side = 'BUY' THEN quantity * exec_price
WHEN side = 'SELL' THEN -quantity * exec_price
ELSE 0 END) AS net_cost_basis,
COUNT(*) FILTER (WHERE side = 'BUY') AS buy_trade_count,
COUNT(*) FILTER (WHERE side = 'SELL') AS sell_trade_count,
MAX(exec_time) AS last_trade_time
FROM trade_executions
GROUP BY instrument_id, desk_id;
-- Mark-to-market P&L joining positions with latest prices
CREATE MATERIALIZED VIEW desk_pnl AS
SELECT
np.desk_id,
np.instrument_id,
np.net_quantity,
np.net_cost_basis,
mp.mid_price AS current_price,
np.net_quantity * mp.mid_price AS market_value,
(np.net_quantity * mp.mid_price) - np.net_cost_basis AS unrealized_pnl,
np.last_trade_time,
mp.price_time AS price_as_of
FROM net_positions np
JOIN market_prices mp
ON np.instrument_id = mp.instrument_id;
Every time a new price tick arrives for an instrument, RisingWave recomputes only the affected rows in desk_pnl—not the entire table. This is the core efficiency gain over batch queries.
Rolling Up P&L Across the Desk Hierarchy
-- Aggregate unrealized P&L by desk for risk dashboards
CREATE MATERIALIZED VIEW desk_pnl_summary AS
SELECT
desk_id,
COUNT(DISTINCT instrument_id) AS open_positions,
SUM(market_value) AS total_market_value,
SUM(unrealized_pnl) AS total_unrealized_pnl,
SUM(unrealized_pnl) FILTER
(WHERE unrealized_pnl < 0) AS total_losing_pnl,
SUM(unrealized_pnl) FILTER
(WHERE unrealized_pnl > 0) AS total_winning_pnl,
MAX(price_as_of) AS data_freshness
FROM desk_pnl
GROUP BY desk_id;
Results are served directly via RisingWave's PostgreSQL-compatible wire protocol—risk dashboards can query desk_pnl_summary using any Postgres client with sub-millisecond latency because the answer is pre-computed.
Comparison: Batch vs. Streaming P&L
| Dimension | Batch (Nightly/Hourly) | Streaming with RisingWave |
| Data freshness | Minutes to hours | Milliseconds |
| Infrastructure | ETL jobs + data warehouse | Streaming SQL + materialized views |
| Query latency | Seconds to minutes | Sub-millisecond (pre-computed) |
| Risk during market hours | High (stale data) | Low (always current) |
| Operational complexity | High (job scheduling, failures) | Low (always-on views) |
| Scalability | Horizontal scale costly | Kafka partitions + RW workers |
Operational Considerations
Backfilling historical positions. When RisingWave starts, the Kafka source can replay from the earliest offset, rebuilding net positions from trade history before the view goes live.
Late-arriving data. Exchange confirmations sometimes arrive seconds or minutes after execution. RisingWave's append-only sources handle late events by reprocessing them into materialized views as they arrive.
Price staleness detection. The data_freshness column in desk_pnl_summary lets monitoring systems alert when price feeds go stale—a simple WHERE data_freshness < NOW() - INTERVAL '30 seconds' query surfaces lagging feeds.
FAQ
Q: Can RisingWave handle the throughput of high-frequency market data feeds? A: Yes. RisingWave ingests from Kafka, which decouples ingestion throughput from processing. Market price topics with thousands of ticks per second are processed by multiple workers, and only affected materialized view rows are updated. For extremely high tick rates, you can pre-aggregate prices to best-bid/best-ask in a Kafka Streams layer before ingesting.
Q: How do we handle corporate actions (splits, dividends) that change cost basis?
A: Model corporate actions as adjustment records in the trade_executions source with a special side value (e.g., 'ADJUST'), and update the CASE expressions in net_positions to apply them. Reference data tables can store corporate action factors joined at query time.
Q: Is the PostgreSQL-compatible interface production-grade for trading applications? A: RisingWave exposes a full PostgreSQL wire protocol. Any Postgres client library—psycopg2, JDBC, node-postgres—connects without modification. For production trading dashboards, read-replicas can be fronted with a connection pool like PgBouncer.
Q: What happens if a Kafka broker goes down during market hours? A: RisingWave checkpoints its processing state. When the broker recovers, processing resumes exactly from the last committed offset. No trades are double-counted and no P&L updates are lost.
Q: Can we sink P&L results back to Kafka for downstream consumers?
A: Yes. Use CREATE SINK ... WITH (connector = 'kafka') on any materialized view. Risk systems, compliance monitors, and client-facing APIs can all consume the same P&L stream.
Get Started
- Read the RisingWave documentation to deploy your first streaming P&L pipeline.
- Join the RisingWave community Slack to ask questions and share your use case.

