Real-time crypto price monitoring means tracking price movements across trading pairs like BTC/USDT and ETH/USDT the moment they happen — not minutes later. With RisingWave, a PostgreSQL-compatible streaming database, you can write standard SQL to build VWAP calculators, spread monitors, and price alert systems that update in milliseconds as market data arrives.
Why Price Monitoring Matters in Crypto
Crypto markets never sleep. Prices on Binance, Coinbase, and Kraken can diverge by meaningful percentages within seconds, and a slow monitoring system means missed opportunities or undetected risk.
Traditional batch pipelines aggregate prices on a schedule — every minute, every five minutes. By the time your dashboard updates, the market has already moved. A flash crash that bottoms out in 30 seconds looks like a blip in your hourly candles.
Real-time monitoring changes the picture entirely:
- Traders get alerts the moment a pair breaks a key level
- Risk systems detect abnormal spreads before positions become dangerous
- Analytics teams compute accurate VWAP and volume-weighted metrics without waiting for batch jobs
- Compliance teams catch unusual price behavior patterns as they form
The challenge is infrastructure. Streaming market data from multiple exchanges produces millions of events per hour. You need a system that can ingest that volume, maintain continuously updated aggregations, and let you query them with low latency — all without rewriting everything in Java.
How Streaming SQL Solves This
RisingWave treats incoming market data as an infinite stream of rows. You define a CREATE SOURCE that connects to your Kafka topic where exchange data lands, then build MATERIALIZED VIEW objects that maintain running aggregations. Every time a new price event arrives, RisingWave updates only the affected rows in your materialized views — not the entire dataset.
This incremental computation model means VWAP calculations, OHLCV candles, and spread monitors stay current without full recomputation. Query them like ordinary PostgreSQL tables.
Building It Step by Step
Step 1: Connect the Data Source
CREATE SOURCE market_events (
exchange VARCHAR,
trading_pair VARCHAR,
price NUMERIC,
volume NUMERIC,
bid NUMERIC,
ask NUMERIC,
event_type VARCHAR,
event_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'crypto.market.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
This source connects to a Kafka topic receiving normalized trade and quote events from exchange WebSocket feeds. The schema covers the fields you need for price, volume, and spread calculations across all pairs.
Step 2: Build the Core Materialized View
-- 1-minute OHLCV candles with VWAP per exchange and trading pair
CREATE MATERIALIZED VIEW ohlcv_1min AS
SELECT
exchange,
trading_pair,
window_start,
window_end,
FIRST_VALUE(price) OVER (
PARTITION BY exchange, trading_pair, window_start
ORDER BY event_time
) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST_VALUE(price) OVER (
PARTITION BY exchange, trading_pair, window_start
ORDER BY event_time
) AS close,
SUM(volume) AS total_volume,
SUM(price * volume) / NULLIF(SUM(volume), 0) AS vwap
FROM TUMBLE(market_events, event_time, INTERVAL '1 MINUTE')
WHERE event_type = 'trade'
GROUP BY exchange, trading_pair, window_start, window_end;
-- Real-time bid/ask spread tracker (sliding window)
CREATE MATERIALIZED VIEW live_spread AS
SELECT
exchange,
trading_pair,
AVG(ask - bid) AS avg_spread,
AVG((ask - bid) / NULLIF(bid, 0)) * 100 AS avg_spread_pct,
MAX(ask - bid) AS max_spread,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS quote_count,
MAX(event_time) AS last_update
FROM HOP(market_events, event_time, INTERVAL '5 SECONDS', INTERVAL '1 MINUTE')
WHERE event_type = 'quote'
GROUP BY exchange, trading_pair;
Step 3: Add Alerts and Detection Logic
-- Alert when spread exceeds 3x the 24-hour average
CREATE MATERIALIZED VIEW spread_alerts AS
SELECT
ls.exchange,
ls.trading_pair,
ls.avg_spread_pct AS current_spread_pct,
baseline.avg_spread_pct AS baseline_spread_pct,
ls.last_update AS alert_time
FROM live_spread ls
JOIN (
SELECT
exchange,
trading_pair,
AVG((ask - bid) / NULLIF(bid, 0)) * 100 AS avg_spread_pct
FROM TUMBLE(market_events, event_time, INTERVAL '24 HOURS')
WHERE event_type = 'quote'
GROUP BY exchange, trading_pair, window_start, window_end
) baseline
ON ls.exchange = baseline.exchange
AND ls.trading_pair = baseline.trading_pair
WHERE ls.avg_spread_pct > baseline.avg_spread_pct * 3;
-- Sink alerts to Kafka for downstream consumers
CREATE SINK spread_alert_sink AS
SELECT * FROM spread_alerts
WITH (
connector = 'kafka',
topic = 'crypto.alerts.spread',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 4: Query for Real-Time Insights
-- Top 10 most active pairs by volume in the last minute
SELECT
exchange,
trading_pair,
total_volume,
vwap,
ROUND(((close - open) / NULLIF(open, 0)) * 100, 4) AS pct_change
FROM ohlcv_1min
WHERE window_end = (SELECT MAX(window_end) FROM ohlcv_1min)
ORDER BY total_volume DESC
LIMIT 10;
-- Current spread comparison across exchanges for BTC/USDT
SELECT
exchange,
avg_spread_pct,
max_spread,
quote_count,
last_update
FROM live_spread
WHERE trading_pair = 'BTC/USDT'
ORDER BY avg_spread_pct ASC;
Comparison: Batch vs Streaming
| Aspect | Batch ETL | Streaming SQL (RisingWave) |
| Latency | Minutes to hours | Sub-second |
| VWAP Accuracy | Delayed computation | Continuously updated |
| Spread Monitoring | Periodic snapshots | Real-time per-quote |
| Alert Speed | Next batch run | Milliseconds |
| Infrastructure | Spark/Flink + scheduler | Single SQL interface |
| Historical Replay | Supported | Supported via Kafka offset |
FAQ
Q: Can RisingWave handle data from multiple exchanges simultaneously?
Yes. You can create a single Kafka source topic that aggregates feeds from Binance, Coinbase, Kraken, and others — each event tagged with the exchange field. Your materialized views then group by exchange and trading_pair, giving you per-exchange and cross-exchange aggregations from a single source definition. Alternatively, create one source per exchange and union them in your views.
Q: How does VWAP stay accurate with out-of-order events?
RisingWave handles watermarks and out-of-order event time automatically for windowed aggregations. For TUMBLE and HOP windows, late-arriving events within the allowed lateness window are incorporated into the correct window. You configure the allowed lateness when creating the source or view.
Q: Can I query tick-level data or only aggregated windows?
Both. You can query market_events directly for the latest N ticks, and simultaneously query materialized views for aggregated OHLCV or VWAP. RisingWave stores the source data and incrementally maintains the derived views — there's no recomputation cost when you switch between granularities.
Key Takeaways
- RisingWave connects directly to Kafka market data feeds using
CREATE SOURCEwith no additional ETL code TUMBLEwindows produce fixed OHLCV candles;HOPwindows power rolling spread and volume calculations- Materialized views update incrementally — adding a new trade updates only the affected window row
- Alerts sink back to Kafka for integration with trading systems, notification services, or dashboards
- The full stack uses standard PostgreSQL-compatible SQL — no new language to learn
Ready to try this? Get started with RisingWave. Join our Slack community.

