Building a Real-Time PnL Engine with RisingWave and Streaming SQL

Building a Real-Time PnL Engine with RisingWave and Streaming SQL

In financial trading, calculating Profit and Loss (PnL) in real time is a critical requirement for risk management, performance monitoring, and algorithmic decision-making. Traditional batch-based systems struggle to keep up with the high-velocity data streams of modern markets. This technical guide provides a comprehensive playbook for building a robust, real-time PnL calculation engine using a streaming database and a single set of SQL queries.

We'll cover everything from ingesting raw trade and price data to calculating realized and unrealized PnL on the fly, and address common production challenges like late-arriving data and multi-currency conversions.

What You'll Learn (TL;DR)

  • Real-Time PnL Maintenance: Use a single set of streaming SQL queries to continuously maintain positions, average cost, and both realized and unrealized PnL.

  • Handle Real-World Data: Correctly process out-of-order and late events using watermarks, convert transaction fees, and normalize multi-currency trades to a common currency like USD.

  • Incremental & Efficient: Leverage Materialized View-on-Materialized View (MV-on-MV) architecture for incremental maintenance, enabling sub-second updates and making it easy to replay and reconstruct state.


Data Model and Inputs

Our PnL calculation pipeline will rely on two primary data streams: a stream of individual trade events and a stream of real-time market prices for currency conversion.

Trade Events Stream

This stream contains raw, per-transaction trade data.

Fields (Example):

  • account_id, symbol, ts (event time)

  • side (BUY/SELL), qty, price (denominated in a quote currency or USD)

  • fee, fee_ccy (the fee amount and its currency)

CREATE SOURCE trades_src (
  account_id BIGINT,
  symbol     VARCHAR,
  ts         TIMESTAMP,
  side       VARCHAR,          -- 'BUY' / 'SELL'
  qty        DOUBLE PRECISION,
  price      DOUBLE PRECISION, -- Assumes USD price; if it's a quote currency, conversion is shown below.
  fee        DOUBLE PRECISION,
  fee_ccy    VARCHAR
) WITH (
  connector = 'kafka',
  topic = 'trades',
  properties.bootstrap.server = 'kafka:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON
  -- Use `ts` as the event time, allowing for a 5-second disorder window.
  WATERMARK FOR ts AS ts - INTERVAL '5 seconds';

Price and FX Stream

This stream provides real-time prices for all relevant assets, used to normalize different currencies to USD.

CREATE SOURCE prices_src (
  symbol  VARCHAR,
  ts      TIMESTAMP,
  px_usd  DOUBLE PRECISION
) WITH (
  connector = 'kafka',
  topic = 'prices',
  properties.bootstrap.server = 'kafka:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON
  -- Watermark allows for a 2-second disorder window.
  WATERMARK FOR ts AS ts - INTERVAL '2 seconds';

-- Create a materialized view to always have the latest USD price for each symbol.
CREATE MATERIALIZED VIEW latest_price AS
SELECT
  symbol,
  MAX_BY(px_usd, ts) AS px_usd,
  MAX(ts)            AS ts
FROM prices_src
GROUP BY symbol;
💡
Note on Currency Conversion: If trades_src.price is not in USD but in a quote currency (e.g., USDT), you can perform a temporal join with prices_src to convert it to USD. The same logic applies to fees.

Trade Normalization and Fee Conversion

The first step is to create a unified view of trades by normalizing the BUY/SELL sides into a numeric sign (+1/-1) and converting all fees into USD. This simplifies downstream calculations.

-- Use a temporal join (±3s) to find the corresponding FX rate and convert the fee currency to USD.
CREATE MATERIALIZED VIEW trades_enriched AS
SELECT
  t.account_id,
  t.symbol,
  t.ts,
  UPPER(t.side) AS side,
  CASE WHEN UPPER(t.side)='BUY'  THEN  1
       WHEN UPPER(t.side)='SELL' THEN -1 END      AS sign,
  t.qty,
  t.price,                         -- Assumes this is already the USD price.
  (t.qty * t.price)                AS notional_usd,
  t.fee,
  t.fee_ccy,
  -- Convert fee to USD: if fee_ccy is already USD, use 1.0; otherwise, use the latest px_usd.
  t.fee * COALESCE(p.px_usd, 1.0)  AS fee_usd
FROM trades_src t
LEFT JOIN prices_src p
  ON p.symbol = t.fee_ccy
 AND p.ts BETWEEN t.ts - INTERVAL '3 seconds' AND t.ts + INTERVAL '3 seconds';

Real-Time PnL Aggregation with Moving Average Cost

To calculate PnL, we need a method to determine the cost basis of the assets being sold. While methods like FIFO/LIFO are precise, they are complex to implement in a real-time streaming context. The moving average cost method is far more suitable for incremental, stateful stream processing.

Core Logic (Moving Average Cost):

  • On a BUY:

    • new_qty = old_qty + qty

    • avg_cost = (old_qty * avg_cost + qty * trade_price) / new_qty

  • On a SELL:

    • realized_pnl += (trade_price - avg_cost) * qty - fee_usd

    • new_qty = old_qty - qty (The avg_cost remains unchanged)

This stateful, path-dependent logic can be encapsulated within a User-Defined Aggregate Function (UDAF). The UDAF maintains the state for each key (i.e., each account-symbol pair) as trades stream in.

-- Pseudo-code: Register a stateful aggregate UDAF.
-- The implementation would be provided by an engineering team in Python/Java/Rust.
-- Input: price, qty, side, fee_usd
-- State: pos_qty, avg_cost, realized_pnl, fee_cum
CREATE AGGREGATE pnl_avg_cost(
  price DOUBLE PRECISION,
  qty   DOUBLE PRECISION,
  side  VARCHAR,
  fee_usd DOUBLE PRECISION
) RETURNS JSON;  -- The return type could be a STRUCT or JSON, depending on engine capabilities.
💡
If your streaming engine supports returning a STRUCT or other composite type from a UDAF, that is generally more efficient. Otherwise, returning a JSON object is a flexible alternative.

Calculating Per-Account, Per-Symbol PnL State

With the UDAF defined, we can now create a materialized view that computes the PnL state for every account and symbol.

-- Aggregate trades from the enriched view for each account and symbol.
CREATE MATERIALIZED VIEW account_symbol_pnl_state AS
SELECT
  account_id,
  symbol,
  -- The UDAF incrementally maintains the state (pos_qty, avg_cost, realized_pnl, fee_cum).
  pnl_avg_cost(price, qty, side, fee_usd) AS state_json
FROM trades_enriched
GROUP BY account_id, symbol;

Parsing the UDAF Output

If the UDAF returns a JSON object, we create another view to parse it into separate columns for easier querying.

-- Unpack the JSON object into columns.
CREATE MATERIALIZED VIEW account_symbol_pnl AS
SELECT
  account_id,
  symbol,
  CAST(state_json->>'pos_qty'       AS DOUBLE PRECISION) AS pos_qty,
  CAST(state_json->>'avg_cost'      AS DOUBLE PRECISION) AS avg_cost,
  CAST(state_json->>'realized_pnl'  AS DOUBLE PRECISION) AS realized_pnl_usd,
  CAST(state_json->>'fee_cum'       AS DOUBLE PRECISION) AS fee_usd
FROM account_symbol_pnl_state;

Unrealized and Total PnL (Joined with Latest Prices)

With the realized PnL and current position state calculated, we can now compute the unrealized PnL by joining with our latest_price view. The total PnL is the sum of realized and unrealized PnL.

CREATE MATERIALIZED VIEW account_symbol_pnl_live AS
SELECT
  s.account_id,
  s.symbol,
  s.pos_qty,
  s.avg_cost,
  s.realized_pnl_usd,
  s.fee_usd,
  lp.px_usd AS px_now,
  -- Unrealized PnL = Position Quantity * (Current Price - Average Cost)
  s.pos_qty * (lp.px_usd - s.avg_cost) AS unrealized_pnl_usd,
  -- Total PnL = Realized PnL + Unrealized PnL
  s.realized_pnl_usd + s.pos_qty * (lp.px_usd - s.avg_cost) AS total_pnl_usd
FROM account_symbol_pnl s
LEFT JOIN latest_price lp
  ON lp.symbol = s.symbol;

Time-Windowed Views (24h / 7d / 30d)

A common requirement is to view PnL changes over specific time windows like the last 24 hours. This can be complex, but one highly effective approach is to estimate the change in unrealized PnL and combine it with the realized PnL from that window.

Saving Historical Price Snapshots

First, we need to create periodic snapshots of prices to use as reference points.

CREATE MATERIALIZED VIEW price_snap_1h AS
SELECT
  symbol,
  DATE_TRUNC('hour', ts) AS snap_ts,
  MAX_BY(px_usd, ts)     AS px_usd
FROM prices_src
GROUP BY symbol, DATE_TRUNC('hour', ts);

Estimating 24-Hour PnL Change

Now we can join the current PnL state with a price snapshot from 24 hours ago to approximate the PnL change.

-- Get the closest hourly price snapshot from 24 hours ago as the reference price.
CREATE MATERIALIZED VIEW account_symbol_pnl_24h AS
SELECT
  s.account_id,
  s.symbol,
  s.pos_qty,
  s.avg_cost,
  s.realized_pnl_usd,
  s.fee_usd,
  lp.px_usd     AS px_now,
  ref.px_usd    AS px_24h_ago,
  -- Unrealized PnL change in the last 24h ≈ pos_qty * (px_now - px_24h_ago)
  s.pos_qty * (lp.px_usd - ref.px_usd) AS unrealized_delta_24h,
  -- To get the total 24h PnL, the UDAF would need to be extended to also output
  -- realized PnL for the specific window.
  NULL::DOUBLE PRECISION AS realized_24h,
  NULL::DOUBLE PRECISION AS total_24h
FROM account_symbol_pnl s
LEFT JOIN latest_price lp
  ON lp.symbol = s.symbol
LEFT JOIN LATERAL (
  SELECT px_usd
  FROM price_snap_1h
  WHERE symbol = s.symbol
    AND snap_ts <= NOW() - INTERVAL '24 hours'
  ORDER BY snap_ts DESC
  LIMIT 1
) ref ON TRUE;
💡
For exact windowed realized PnL, the UDAF would need to be enhanced to support tumbling or hopping window aggregations, maintaining separate realized PnL values for each active window (e.g., 24h, 7d).

Serving the Results (Sinks)

Finally, the real-time PnL results can be pushed to downstream systems like a dashboard, API, or data warehouse using a SINK.

CREATE SINK pnl_live_sink
FROM account_symbol_pnl_live
WITH (
  connector = 'kafka',
  topic = 'pnl_live',
  properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Correctness and Engineering Considerations

  • Watermarks: Set watermarks to cover the 95th or 99th percentile of your event time delay, plus a small safety margin. This provides a good balance between correctness and end-to-end latency.

  • Late Events: Extremely late events arriving after the watermark can be routed to a "dead-letter queue" for offline auditing or handled via retractions if the UDAF and downstream systems support corrections.

  • Fees & Funding: Additional costs like funding rates or commissions can be modeled as separate input streams, converted to USD, and factored into the PnL calculation within the UDAF.

  • Closing Positions: When a position's quantity returns to zero, the avg_cost should be reset to zero inside the UDAF to prevent floating-point precision errors from accumulating.

  • FIFO/LIFO: If strict FIFO/LIFO accounting is a business requirement, the UDAF logic must be designed to maintain an ordered queue of acquisition prices and quantities, which is significantly more complex than the moving average cost method.

  • Replayability: Ensure your upstream data sources (Kafka, CDC logs, etc.) have sufficient data retention. This allows you to rebuild the PnL state from scratch in case of schema changes or logic updates, ensuring full auditability.


End-to-End MVP (Summary SQL)

Copy the SQL in this section and adapt the field names to match your topic's schema to get a minimum viable product up and running quickly. (This example assumes trades.price is already in USD).

-- 1) Sources
CREATE SOURCE trades_src (..., WATERMARK FOR ts AS ts - INTERVAL '5 seconds');
CREATE SOURCE prices_src (..., WATERMARK FOR ts AS ts - INTERVAL '2 seconds');

-- 2) Latest price
CREATE MATERIALIZED VIEW latest_price AS
SELECT symbol, MAX_BY(px_usd, ts) AS px_usd, MAX(ts) AS ts
FROM prices_src GROUP BY symbol;

-- 3) Enrich trades (convert fee to USD)
CREATE MATERIALIZED VIEW trades_enriched AS
SELECT t.*, t.fee * COALESCE(p.px_usd, 1.0) AS fee_usd
FROM trades_src t
LEFT JOIN prices_src p
  ON p.symbol = t.fee_ccy
 AND p.ts BETWEEN t.ts - INTERVAL '3 seconds' AND t.ts + INTERVAL '3 seconds';

-- 4) UDAF: pnl_avg_cost(...) RETURNS JSON (to be implemented by engineering)
CREATE MATERIALIZED VIEW account_symbol_pnl_state AS
SELECT account_id, symbol,
       pnl_avg_cost(price, qty, side, fee_usd) AS state_json
FROM trades_enriched
GROUP BY account_id, symbol;

CREATE MATERIALIZED VIEW account_symbol_pnl AS
SELECT account_id, symbol,
  CAST(state_json->>'pos_qty'       AS DOUBLE PRECISION) AS pos_qty,
  CAST(state_json->>'avg_cost'      AS DOUBLE PRECISION) AS avg_cost,
  CAST(state_json->>'realized_pnl'  AS DOUBLE PRECISION) AS realized_pnl_usd,
  CAST(state_json->>'fee_cum'       AS DOUBLE PRECISION) AS fee_usd
FROM account_symbol_pnl_state;

-- 5) Live PnL
CREATE MATERIALIZED VIEW account_symbol_pnl_live AS
SELECT s.account_id, s.symbol, s.pos_qty, s.avg_cost, s.realized_pnl_usd, s.fee_usd,
       lp.px_usd AS px_now,
       s.pos_qty * (lp.px_usd - s.avg_cost) AS unrealized_pnl_usd,
       s.realized_pnl_usd
       + s.pos_qty * (lp.px_usd - s.avg_cost) AS total_pnl_usd
FROM account_symbol_pnl s
LEFT JOIN latest_price lp ON lp.symbol = s.symbol;

-- 6) Sink
CREATE SINK pnl_live_sink
FROM account_symbol_pnl_live
WITH (
    connector='kafka',
    topic='pnl_live',
    properties.bootstrap.server='kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Conclusion & Next Steps

This guide has demonstrated how to build a powerful real-time PnL tracking engine using the moving average cost method, stateful User-Defined Aggregate Functions, and a chain of materialized views. This approach is efficient, scalable, and well-suited for modern, high-throughput financial applications.

From here, you can extend the pipeline even further:

  • Enhance the UDAF to support precise windowed calculations for realized PnL (24h, 7d, 30d).

  • Incorporate more financial data, such as funding rates, interest payments, or margin calls, to build a comprehensive risk management system.

  • Serve the results through a low-latency WebSocket API for front-end applications or sink them to a data warehouse like ClickHouse for historical analysis and backtesting.

Feel free to ask for the pseudo-code implementation of the pnl_avg_cost UDAF, including the state transition logic and handling for retractions, to help your engineering team get started.


Try RisingWave Today

  • Download the open-sourced version of RisingWave to deploy on your own infrastructure.

  • Get started quickly with RisingWave Cloud for a fully managed experience.

  • Talk to Our Experts: Have a complex use case or want to see a personalized demo? Contact us to discuss how RisingWave can address your specific challenges.

  • Join Our Community: Connect with fellow developers, ask questions, and share your experiences in our vibrant Slack community.

The Modern Backbone for Your
Data Streaming Workloads
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.