Building a Real-Time Prediction Market with SQL
You can build the core data infrastructure for a prediction market — live odds, position tracking, P&L calculation, and instant settlement — using nothing but SQL in a streaming database. This tutorial walks through building a Polymarket-style prediction engine with RisingWave, where every component is a SQL materialized view that updates in real time as trades flow in.
No Java. No Flink. No Redis. Just SQL.
Architecture Overview
A prediction market has four core data components:
Trade Stream (Kafka) ──→ ┌─────────────────────────────┐
│ RisingWave │
Oracle Feed (Kafka) ──→ │ │
│ ┌─ Market Pricing MV │ ──→ API / Dashboard
│ ├─ User Positions MV │
│ ├─ Settlement MV │
│ ├─ User Balances MV │
│ └─ Risk Monitoring MV │
└─────────────────────────────┘
All five components are SQL materialized views that update automatically. Let's build each one.
Prerequisites
- RisingWave running (Docker, Kubernetes, or RisingWave Cloud)
- Kafka cluster with trade events
psqlor any PostgreSQL client
Connect to RisingWave:
psql -h localhost -p 4566 -d dev -U root
Step 1: Define the Trade Stream
Trades arrive via Kafka. Each trade contains the market, user, side (YES/NO), quantity, price, and timestamp.
CREATE SOURCE trades_source (
trade_id VARCHAR,
market_id VARCHAR,
user_id VARCHAR,
side VARCHAR, -- 'YES' or 'NO'
quantity DECIMAL, -- Number of contracts
price DECIMAL, -- Price per contract (0.01 - 0.99)
trade_time TIMESTAMP WITH TIME ZONE
) WITH (
connector = 'kafka',
topic = 'prediction-market-trades',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 2: Define the Oracle Feed
Oracles publish event resolutions — the final outcome of each market.
CREATE SOURCE oracle_feed (
market_id VARCHAR,
outcome VARCHAR, -- 'YES' or 'NO'
resolution_time TIMESTAMP WITH TIME ZONE,
oracle_source VARCHAR -- 'uma', 'kalshi-internal', etc.
) WITH (
connector = 'kafka',
topic = 'oracle-resolutions',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 3: Define Market Metadata
Static market information stored as a RisingWave table:
CREATE TABLE markets (
market_id VARCHAR PRIMARY KEY,
question VARCHAR, -- "Will BTC exceed $100K by June 2026?"
category VARCHAR, -- "crypto", "politics", "sports"
created_at TIMESTAMP WITH TIME ZONE,
closes_at TIMESTAMP WITH TIME ZONE,
status VARCHAR DEFAULT 'open' -- 'open', 'closed', 'resolved'
);
-- Insert some markets
INSERT INTO markets VALUES
('btc-100k-june', 'Will BTC exceed $100K by June 2026?', 'crypto',
'2026-01-01', '2026-06-30', 'open'),
('fed-rate-cut-q2', 'Will the Fed cut rates in Q2 2026?', 'economics',
'2026-01-15', '2026-06-30', 'open'),
('warriors-championship', 'Will the Warriors win the 2026 NBA Championship?', 'sports',
'2025-10-01', '2026-06-20', 'open');
Step 4: Real-Time Market Pricing
This is the core — a materialized view that shows live pricing for every market:
CREATE MATERIALIZED VIEW market_pricing AS
SELECT
t.market_id,
m.question,
m.category,
-- Current YES price (= implied probability)
last_value(t.price ORDER BY t.trade_time)
FILTER (WHERE t.side = 'YES') as yes_price,
-- Current NO price
last_value(t.price ORDER BY t.trade_time)
FILTER (WHERE t.side = 'NO') as no_price,
-- 24-hour metrics
SUM(t.quantity)
FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as volume_24h,
SUM(t.quantity * t.price)
FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as notional_24h,
COUNT(*)
FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as trades_24h,
-- Price range
MIN(t.price) FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as low_24h,
MAX(t.price) FILTER (WHERE t.trade_time > NOW() - INTERVAL '24 hours') as high_24h,
-- Freshness
MAX(t.trade_time) as last_trade_time
FROM trades_source t
JOIN markets m ON t.market_id = m.market_id
GROUP BY t.market_id, m.question, m.category;
Query it:
SELECT market_id, question, yes_price, volume_24h, trades_24h
FROM market_pricing
ORDER BY volume_24h DESC;
market_id | question | yes_price | volume_24h | trades_24h
--------------------+---------------------------------------+-----------+------------+-----------
btc-100k-june | Will BTC exceed $100K by June 2026? | 0.62 | 45230 | 3841
fed-rate-cut-q2 | Will the Fed cut rates in Q2 2026? | 0.43 | 28100 | 2105
warriors-champion | Will Warriors win 2026 NBA Champ? | 0.08 | 5670 | 412
Step 5: User Position Tracking
Track every user's open positions with average entry price:
CREATE MATERIALIZED VIEW user_positions AS
SELECT
user_id,
market_id,
side,
SUM(quantity) as position_size,
-- Weighted average entry price
CASE
WHEN SUM(quantity) > 0
THEN SUM(quantity * price) / SUM(quantity)
ELSE 0
END as avg_entry_price,
SUM(quantity * price) as total_cost,
COUNT(*) as trade_count,
MIN(trade_time) as first_trade,
MAX(trade_time) as last_trade
FROM trades_source
GROUP BY user_id, market_id, side;
Step 6: Real-Time P&L
Join positions with live market prices to compute unrealized P&L:
CREATE MATERIALIZED VIEW user_pnl AS
SELECT
p.user_id,
p.market_id,
m.question,
p.side,
p.position_size,
p.avg_entry_price,
-- Current market price for this side
CASE
WHEN p.side = 'YES' THEN mp.yes_price
ELSE mp.no_price
END as current_price,
-- Unrealized P&L
CASE
WHEN p.side = 'YES'
THEN p.position_size * (mp.yes_price - p.avg_entry_price)
ELSE p.position_size * (mp.no_price - p.avg_entry_price)
END as unrealized_pnl,
-- P&L percentage
CASE
WHEN p.avg_entry_price > 0
THEN ((CASE WHEN p.side = 'YES' THEN mp.yes_price ELSE mp.no_price END)
- p.avg_entry_price) / p.avg_entry_price * 100
ELSE 0
END as pnl_pct
FROM user_positions p
JOIN market_pricing mp ON p.market_id = mp.market_id
JOIN markets m ON p.market_id = m.market_id;
Query a user's portfolio:
SELECT market_id, side, position_size, avg_entry_price,
current_price, unrealized_pnl, pnl_pct
FROM user_pnl
WHERE user_id = 'trader_42'
ORDER BY ABS(unrealized_pnl) DESC;
Step 7: Settlement Engine
The settlement engine is a streaming join between trades and oracle resolutions. When an oracle publishes a result, payouts are computed instantly for every position:
CREATE MATERIALIZED VIEW settlements AS
SELECT
t.user_id,
t.market_id,
t.side,
t.quantity,
t.price as entry_price,
o.outcome,
o.resolution_time,
-- Payout calculation
CASE
WHEN t.side = o.outcome
THEN t.quantity * (1.0 - t.price) -- Winner: receives $1 - entry price per contract
ELSE -(t.quantity * t.price) -- Loser: loses entry price per contract
END as payout
FROM trades_source t
JOIN oracle_feed o ON t.market_id = o.market_id;
How it works: Before an oracle resolution arrives, this view is empty for that market. The moment the oracle publishes {market_id: "btc-100k-june", outcome: "YES"}, the streaming join instantly matches it against every trade in that market and computes payouts. This is the "fan-out" — one oracle event triggers settlement for potentially hundreds of thousands of positions.
Step 8: User Balance Ledger
Aggregate all cash flows into a unified balance:
CREATE MATERIALIZED VIEW user_balances AS
SELECT
user_id,
SUM(amount) as balance
FROM (
-- Cash deposits
SELECT user_id, amount FROM deposits
UNION ALL
-- Cost of trades (money spent buying contracts)
SELECT user_id, -(quantity * price) as amount FROM trades_source
UNION ALL
-- Settlement payouts
SELECT user_id, payout as amount FROM settlements
) all_flows
GROUP BY user_id;
Step 9: Risk Monitoring
Real-time risk dashboard for platform operators:
CREATE MATERIALIZED VIEW platform_risk AS
SELECT
-- Per-market risk
market_id,
SUM(position_size) FILTER (WHERE side = 'YES') as total_yes_exposure,
SUM(position_size) FILTER (WHERE side = 'NO') as total_no_exposure,
-- Imbalance (risk to the platform)
ABS(
SUM(position_size) FILTER (WHERE side = 'YES') -
SUM(position_size) FILTER (WHERE side = 'NO')
) as position_imbalance,
COUNT(DISTINCT user_id) as unique_traders,
MAX(position_size) as largest_single_position
FROM user_positions
GROUP BY market_id;
Step 10: Connect Your Frontend
Since RisingWave speaks PostgreSQL, connect any frontend framework directly:
// Node.js API endpoint
const { Pool } = require('pg');
const pool = new Pool({
host: 'risingwave-host',
port: 4566,
database: 'dev',
user: 'root'
});
// GET /api/markets — live market data
app.get('/api/markets', async (req, res) => {
const result = await pool.query(`
SELECT market_id, question, category, yes_price,
volume_24h, trades_24h, last_trade_time
FROM market_pricing
ORDER BY volume_24h DESC
LIMIT 50
`);
res.json(result.rows);
});
// GET /api/portfolio/:userId — user's live P&L
app.get('/api/portfolio/:userId', async (req, res) => {
const result = await pool.query(`
SELECT market_id, side, position_size,
avg_entry_price, current_price, unrealized_pnl
FROM user_pnl
WHERE user_id = $1
`, [req.params.userId]);
res.json(result.rows);
});
What You've Built
With ~100 lines of SQL, you now have:
| Component | SQL View | Updates |
| Live odds/pricing | market_pricing | Every trade (milliseconds) |
| User positions | user_positions | Every trade |
| Real-time P&L | user_pnl | Every trade + price change |
| Instant settlement | settlements | On oracle resolution |
| User balances | user_balances | Every deposit/trade/settlement |
| Risk monitoring | platform_risk | Every position change |
All views update automatically. No cron jobs, no batch processing, no manual refresh.
Why a Streaming Database Instead of a Traditional Stack
| Aspect | Traditional Stack | RisingWave |
| Components | Kafka + Flink + Redis + PostgreSQL | RisingWave (single system) |
| Languages | Java (Flink) + Python (API) + SQL (DB) | SQL only |
| Odds update latency | Seconds (multi-hop) | Milliseconds |
| Settlement | Custom batch job | Streaming join (automatic) |
| Serving | Separate Redis cache + PostgreSQL | Built-in (PostgreSQL protocol) |
| Ops overhead | 4 systems to monitor | 1 system |
Frequently Asked Questions
Can I build a production prediction market with just SQL?
The data infrastructure layer — pricing, positions, P&L, settlement, risk — can be built entirely in SQL with RisingWave. You'll still need application-layer components: user authentication, order validation, regulatory compliance, and frontend. But the real-time data backbone is pure SQL.
How does settlement scale to millions of positions?
Settlement in RisingWave is a streaming join between trades and oracle resolutions. When an oracle publishes a result, the join evaluates against all matching trades in parallel across RisingWave's distributed compute nodes. State is stored on S3, so there's no local memory limit. The settlement completes in seconds regardless of position count.
What latency can I expect for odds updates?
RisingWave materialized views update within milliseconds of new data arriving. The market_pricing view reflects the latest trade price within sub-100ms of the trade entering the Kafka topic. Point queries against the view return in 10-20ms p99.
Can I use this for sports betting odds?
Yes. The same architecture applies to sports betting: the trade stream represents bets, market pricing calculates live odds, and the oracle feed delivers game results. The main difference is data source — sports odds typically come from external feeds rather than an internal order book. RisingWave can ingest these feeds via Kafka or direct API integration.
Do I need Kafka for this architecture?
Kafka is the most common source for trade events, but RisingWave also supports direct ingestion from other sources. For a production prediction market, Kafka provides the durability and replay capability that's important for trade audit trails. For prototyping, you can insert trades directly into RisingWave tables.

