Building a Real-Time Prediction Market with SQL

Building a Real-Time Prediction Market with SQL

Building a Real-Time Prediction Market with SQL

You can build the core data infrastructure for a prediction market — live odds, position tracking, P&L calculation, and instant settlement — using nothing but SQL in a streaming database. This tutorial walks through building a Polymarket-style prediction engine with RisingWave, where every component is a SQL materialized view that updates in real time as trades flow in.

No Java. No Flink. No Redis. Just SQL.

Architecture Overview

A prediction market has four core data components:

Trade Stream (Kafka) ──→ ┌─────────────────────────────┐
                         │       RisingWave             │
Oracle Feed (Kafka)  ──→ │                             │
                         │  ┌─ Market Pricing MV       │ ──→ API / Dashboard
                         │  ├─ User Positions MV       │
                         │  ├─ Settlement MV           │
                         │  ├─ User Balances MV        │
                         │  └─ Risk Monitoring MV      │
                         └─────────────────────────────┘

All five components are SQL materialized views that update automatically. Let's build each one.

Prerequisites

  • RisingWave running (Docker, Kubernetes, or RisingWave Cloud)
  • Kafka cluster with trade events
  • psql or any PostgreSQL client

Connect to RisingWave:

psql -h localhost -p 4566 -d dev -U root

Step 1: Define the Trade Stream

Trades arrive via Kafka. Each trade contains the market, user, side (YES/NO), quantity, price, and timestamp.

CREATE SOURCE trades_source (
  trade_id VARCHAR,
  market_id VARCHAR,
  user_id VARCHAR,
  side VARCHAR,        -- 'YES' or 'NO'
  quantity DECIMAL,    -- Number of contracts
  price DECIMAL,       -- Price per contract (0.01 - 0.99)
  trade_time TIMESTAMP WITH TIME ZONE
) WITH (
  connector = 'kafka',
  topic = 'prediction-market-trades',
  properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 2: Define the Oracle Feed

Oracles publish event resolutions — the final outcome of each market.

CREATE SOURCE oracle_feed (
  market_id VARCHAR,
  outcome VARCHAR,         -- 'YES' or 'NO'
  resolution_time TIMESTAMP WITH TIME ZONE,
  oracle_source VARCHAR    -- 'uma', 'kalshi-internal', etc.
) WITH (
  connector = 'kafka',
  topic = 'oracle-resolutions',
  properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 3: Define Market Metadata

Static market information stored as a RisingWave table:

CREATE TABLE markets (
  market_id VARCHAR PRIMARY KEY,
  question VARCHAR,          -- "Will BTC exceed $100K by June 2026?"
  category VARCHAR,          -- "crypto", "politics", "sports"
  created_at TIMESTAMP WITH TIME ZONE,
  closes_at TIMESTAMP WITH TIME ZONE,
  status VARCHAR DEFAULT 'open'  -- 'open', 'closed', 'resolved'
);

-- Insert some markets
INSERT INTO markets VALUES
  ('btc-100k-june', 'Will BTC exceed $100K by June 2026?', 'crypto',
   '2026-01-01', '2026-06-30', 'open'),
  ('fed-rate-cut-q2', 'Will the Fed cut rates in Q2 2026?', 'economics',
   '2026-01-15', '2026-06-30', 'open'),
  ('warriors-championship', 'Will the Warriors win the 2026 NBA Championship?', 'sports',
   '2025-10-01', '2026-06-20', 'open');

Step 4: Real-Time Market Pricing

This is the core — a materialized view that shows live pricing for every market:

CREATE MATERIALIZED VIEW market_pricing AS
SELECT
  t.market_id,
  m.question,
  m.category,
  -- Current YES price (= implied probability)
  last_value(t.price ORDER BY t.trade_time)
    FILTER (WHERE t.side = 'YES') as yes_price,
  -- Current NO price
  last_value(t.price ORDER BY t.trade_time)
    FILTER (WHERE t.side = 'NO') as no_price,
  -- 24-hour metrics
  SUM(t.quantity)
    FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as volume_24h,
  SUM(t.quantity * t.price)
    FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as notional_24h,
  COUNT(*)
    FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as trades_24h,
  -- Price range
  MIN(t.price) FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as low_24h,
  MAX(t.price) FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as high_24h,
  -- Freshness
  MAX(t.trade_time) as last_trade_time
FROM trades_source t
JOIN markets m ON t.market_id = m.market_id
GROUP BY t.market_id, m.question, m.category;

Query it:

SELECT market_id, question, yes_price, volume_24h, trades_24h
FROM market_pricing
ORDER BY volume_24h DESC;
 market_id          | question                              | yes_price | volume_24h | trades_24h
--------------------+---------------------------------------+-----------+------------+-----------
 btc-100k-june      | Will BTC exceed $100K by June 2026?   |      0.62 |     45230  |      3841
 fed-rate-cut-q2    | Will the Fed cut rates in Q2 2026?    |      0.43 |     28100  |      2105
 warriors-champion  | Will Warriors win 2026 NBA Champ?     |      0.08 |      5670  |       412

Step 5: User Position Tracking

Track every user's open positions with average entry price:

CREATE MATERIALIZED VIEW user_positions AS
SELECT
  user_id,
  market_id,
  side,
  SUM(quantity) as position_size,
  -- Weighted average entry price
  CASE
    WHEN SUM(quantity) > 0
    THEN SUM(quantity * price) / SUM(quantity)
    ELSE 0
  END as avg_entry_price,
  SUM(quantity * price) as total_cost,
  COUNT(*) as trade_count,
  MIN(trade_time) as first_trade,
  MAX(trade_time) as last_trade
FROM trades_source
GROUP BY user_id, market_id, side;

Step 6: Real-Time P&L

Join positions with live market prices to compute unrealized P&L:

CREATE MATERIALIZED VIEW user_pnl AS
SELECT
  p.user_id,
  p.market_id,
  m.question,
  p.side,
  p.position_size,
  p.avg_entry_price,
  -- Current market price for this side
  CASE
    WHEN p.side = 'YES' THEN mp.yes_price
    ELSE mp.no_price
  END as current_price,
  -- Unrealized P&L
  CASE
    WHEN p.side = 'YES'
    THEN p.position_size * (mp.yes_price - p.avg_entry_price)
    ELSE p.position_size * (mp.no_price - p.avg_entry_price)
  END as unrealized_pnl,
  -- P&L percentage
  CASE
    WHEN p.avg_entry_price > 0
    THEN ((CASE WHEN p.side = 'YES' THEN mp.yes_price ELSE mp.no_price END)
          - p.avg_entry_price) / p.avg_entry_price * 100
    ELSE 0
  END as pnl_pct
FROM user_positions p
JOIN market_pricing mp ON p.market_id = mp.market_id
JOIN markets m ON p.market_id = m.market_id;

Query a user's portfolio:

SELECT market_id, side, position_size, avg_entry_price,
       current_price, unrealized_pnl, pnl_pct
FROM user_pnl
WHERE user_id = 'trader_42'
ORDER BY ABS(unrealized_pnl) DESC;

Step 7: Settlement Engine

The settlement engine is a streaming join between trades and oracle resolutions. When an oracle publishes a result, payouts are computed instantly for every position:

CREATE MATERIALIZED VIEW settlements AS
SELECT
  t.user_id,
  t.market_id,
  t.side,
  t.quantity,
  t.price as entry_price,
  o.outcome,
  o.resolution_time,
  -- Payout calculation
  CASE
    WHEN t.side = o.outcome
    THEN t.quantity * (1.0 - t.price)  -- Winner: receives $1 - entry price per contract
    ELSE -(t.quantity * t.price)        -- Loser: loses entry price per contract
  END as payout
FROM trades_source t
JOIN oracle_feed o ON t.market_id = o.market_id;

How it works: Before an oracle resolution arrives, this view is empty for that market. The moment the oracle publishes {market_id: "btc-100k-june", outcome: "YES"}, the streaming join instantly matches it against every trade in that market and computes payouts. This is the "fan-out" — one oracle event triggers settlement for potentially hundreds of thousands of positions.

Step 8: User Balance Ledger

Aggregate all cash flows into a unified balance:

CREATE MATERIALIZED VIEW user_balances AS
SELECT
  user_id,
  SUM(amount) as balance
FROM (
  -- Cash deposits
  SELECT user_id, amount FROM deposits

  UNION ALL

  -- Cost of trades (money spent buying contracts)
  SELECT user_id, -(quantity * price) as amount FROM trades_source

  UNION ALL

  -- Settlement payouts
  SELECT user_id, payout as amount FROM settlements
) all_flows
GROUP BY user_id;

Step 9: Risk Monitoring

Real-time risk dashboard for platform operators:

CREATE MATERIALIZED VIEW platform_risk AS
SELECT
  -- Per-market risk
  market_id,
  SUM(position_size) FILTER (WHERE side = 'YES') as total_yes_exposure,
  SUM(position_size) FILTER (WHERE side = 'NO') as total_no_exposure,
  -- Imbalance (risk to the platform)
  ABS(
    SUM(position_size) FILTER (WHERE side = 'YES') -
    SUM(position_size) FILTER (WHERE side = 'NO')
  ) as position_imbalance,
  COUNT(DISTINCT user_id) as unique_traders,
  MAX(position_size) as largest_single_position
FROM user_positions
GROUP BY market_id;

Step 10: Connect Your Frontend

Since RisingWave speaks PostgreSQL, connect any frontend framework directly:

// Node.js API endpoint
const { Pool } = require('pg');
const pool = new Pool({
  host: 'risingwave-host',
  port: 4566,
  database: 'dev',
  user: 'root'
});

// GET /api/markets — live market data
app.get('/api/markets', async (req, res) => {
  const result = await pool.query(`
    SELECT market_id, question, category, yes_price,
           volume_24h, trades_24h, last_trade_time
    FROM market_pricing
    ORDER BY volume_24h DESC
    LIMIT 50
  `);
  res.json(result.rows);
});

// GET /api/portfolio/:userId — user's live P&L
app.get('/api/portfolio/:userId', async (req, res) => {
  const result = await pool.query(`
    SELECT market_id, side, position_size,
           avg_entry_price, current_price, unrealized_pnl
    FROM user_pnl
    WHERE user_id = $1
  `, [req.params.userId]);
  res.json(result.rows);
});

What You've Built

With ~100 lines of SQL, you now have:

ComponentSQL ViewUpdates
Live odds/pricingmarket_pricingEvery trade (milliseconds)
User positionsuser_positionsEvery trade
Real-time P&Luser_pnlEvery trade + price change
Instant settlementsettlementsOn oracle resolution
User balancesuser_balancesEvery deposit/trade/settlement
Risk monitoringplatform_riskEvery position change

All views update automatically. No cron jobs, no batch processing, no manual refresh.

Why a Streaming Database Instead of a Traditional Stack

AspectTraditional StackRisingWave
ComponentsKafka + Flink + Redis + PostgreSQLRisingWave (single system)
LanguagesJava (Flink) + Python (API) + SQL (DB)SQL only
Odds update latencySeconds (multi-hop)Milliseconds
SettlementCustom batch jobStreaming join (automatic)
ServingSeparate Redis cache + PostgreSQLBuilt-in (PostgreSQL protocol)
Ops overhead4 systems to monitor1 system

Frequently Asked Questions

Can I build a production prediction market with just SQL?

The data infrastructure layer — pricing, positions, P&L, settlement, risk — can be built entirely in SQL with RisingWave. You'll still need application-layer components: user authentication, order validation, regulatory compliance, and frontend. But the real-time data backbone is pure SQL.

How does settlement scale to millions of positions?

Settlement in RisingWave is a streaming join between trades and oracle resolutions. When an oracle publishes a result, the join evaluates against all matching trades in parallel across RisingWave's distributed compute nodes. State is stored on S3, so there's no local memory limit. The settlement completes in seconds regardless of position count.

What latency can I expect for odds updates?

RisingWave materialized views update within milliseconds of new data arriving. The market_pricing view reflects the latest trade price within sub-100ms of the trade entering the Kafka topic. Point queries against the view return in 10-20ms p99.

Can I use this for sports betting odds?

Yes. The same architecture applies to sports betting: the trade stream represents bets, market pricing calculates live odds, and the oracle feed delivers game results. The main difference is data source — sports odds typically come from external feeds rather than an internal order book. RisingWave can ingest these feeds via Kafka or direct API integration.

Do I need Kafka for this architecture?

Kafka is the most common source for trade events, but RisingWave also supports direct ingestion from other sources. For a production prediction market, Kafka provides the durability and replay capability that's important for trade audit trails. For prototyping, you can insert trades directly into RisingWave tables.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.