In the fast-paced world of cryptocurrency trading, having access to real-time, accurate data isn't just a competitive edge—it's a necessity. Traders, platforms, and analysts all rely on up-to-the-second information to make critical decisions. One of the most fundamental tools for this is the candlestick chart, which visualizes price movements through Open, High, Low, Close, and Volume (OHLCV) data.
But building a scalable, efficient, and correct system to generate these charts from a high-velocity stream of raw trades is a significant engineering challenge. You need to handle massive data volumes, process events with low latency, and account for real-world complexities like out-of-order data and inactive trading pairs.
This technical guide provides a complete playbook for building a robust candlestick charting pipeline using RisingWave, a streaming database designed for real-time analytics. We'll walk through the entire process, from ingesting raw trades to serving multi-granularity candlestick data, and we'll tackle common production challenges along the way.
What You'll Learn (TL;DR)
Generate multi-granularity OHLCV (1s, 1m, 5m, 1h, 1d, etc.) with a single set of streaming SQL queries.
Correctly handle out-of-order and late trade events using watermarks.
Optimize for millions of sparse trading pairs by computing only for active pairs, saving over 99% of resources.
Use Materialized View-on-Materialized View (MV-on-MV) for hierarchical rollups to avoid repeatedly scanning raw trade data.
Push the results to a charting service or API via a Kafka sink.
Data Model and Input Assumptions
We'll assume we have a stream of individual trades from a source like Kafka. Each trade event in the trades
stream has the following structure:
pair_id
: The internal ID of the trading pair (e.g., an integer key forETH/USDC
).symbol
: The display name of the pair (optional).ts
: The trade execution timestamp (event time).price
: The execution price.qty
: The trade quantity.
Creating a Source (with a Watermark)
First, let's connect to the Kafka topic containing the trade data. We'll define a SOURCE
in RisingWave and specify a watermark. The watermark tells the stream processor how to handle late-arriving events.
CREATE SOURCE trades_src (
pair_id BIGINT,
symbol VARCHAR,
ts TIMESTAMP,
price DOUBLE PRECISION,
qty DOUBLE PRECISION
) WITH (
connector = 'kafka',
topic = 'trades',
properties.bootstrap.servers = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Fine-Grained Candlesticks (1-Second OHLCV)
We'll start by aggregating trades into the finest granularity we need, such as 1-second candlesticks. All coarser granularities (like 1-minute or 1-hour) will be built on top of this base layer. This hierarchical approach, often called "MV-on-MV," is highly efficient as it avoids re-processing the raw trade stream for each granularity.
CREATE MATERIALIZED VIEW kline_1s AS
SELECT
pair_id,
window_start AS bucket_start,
first_value(price ORDER BY ts) AS open,
MAX(price) AS high,
MIN(price) AS low,
last_value(price ORDER BY ts) AS close,
SUM(qty) AS volume,
SUM(price * qty) AS turnover,
COUNT(*) AS trade_count
FROM TUMBLE(trades_src, ts, '1 second')
GROUP BY
pair_id,
window_start;
Hierarchical Rollups (1-Minute, 5-Minute, 1-Hour…)
Now, we can efficiently create higher-level aggregations by querying the kline_1s
materialized view we just created. This "MV-on-MV" (Materialized View on Materialized View) approach is a cornerstone of efficient stream processing, allowing you to build complex, multi-stage pipelines without the high cost of re-scanning the original source data.
From 1-Second to 1-Minute
CREATE MATERIALIZED VIEW kline_1m AS
SELECT
pair_id,
window_start AS bucket_start,
first_value(open ORDER BY bucket_start) AS open,
MAX(high) AS high,
MIN(low) AS low,
last_value(close ORDER BY bucket_start) AS close,
SUM(volume) AS volume,
SUM(turnover) AS turnover,
SUM(trade_count) AS trades -- Add this line to sum the counts
FROM TUMBLE(kline_1s, bucket_start, '1 minute')
GROUP BY
pair_id,
window_start;
From 1-Minute to 5-Minute, 1-Hour, etc.
We can continue this pattern to roll up to any desired granularity.
CREATE MATERIALIZED VIEW kline_5m AS
SELECT
pair_id,
window_start AS bucket_start,
-- Get the 'open' from the first 1m candle in this 5-minute window
first_value(open ORDER BY bucket_start) AS open,
-- The 5-minute high is the max of all 1m highs in the window
MAX(high) AS high,
-- The 5-minute low is the min of all 1m lows in the window
MIN(low) AS low,
-- Get the 'close' from the last 1m candle in this 5-minute window
last_value(close ORDER BY bucket_start) AS close,
-- Sum the volumes and turnovers from the five 1m candles
SUM(volume) AS volume,
SUM(turnover) AS turnover
SUM(trade_count) AS trades
FROM TUMBLE(kline_1m, bucket_start, '5 minutes')
GROUP BY
pair_id,
window_start;
CREATE MATERIALIZED VIEW kline_1h AS
SELECT
pair_id,
window_start AS bucket_start,
-- Get the 'open' from the first 1m candle in this 1-hour window
first_value(open ORDER BY bucket_start) AS open,
-- The 1-hour high is the max of all 1m highs in the window
MAX(high) AS high,
-- The 1-hour low is the min of all 1m lows in the window
MIN(low) AS low,
-- Get the 'close' from the last 1m candle in this 1-hour window
last_value(close ORDER BY bucket_start) AS close,
-- Sum the volumes and turnovers from the sixty 1m candles
SUM(volume) AS volume,
SUM(turnover) AS turnover
SUM(trade_count) AS trades
FROM TUMBLE(kline_5m, bucket_start, '1 hour')
GROUP BY
pair_id,
window_start;
Enriching the Data (VWAP and Trade Count)
We can create another materialized view to enrich our candlestick data with additional metrics like VWAP and the total number of trades within each bucket.
CREATE MATERIALIZED VIEW kline_1m_enriched AS
SELECT
*,
-- VWAP is the only new calculation needed here
CASE
WHEN volume > 0 THEN turnover / volume
ELSE NULL
END AS vwap
FROM kline_1m;
Optimization for Sparse Trading Pairs
The Pain Point: In a real-world crypto exchange, you might have millions of trading pairs, but 99.9% of them are inactive for long periods. Calculating empty candlesticks for all of them is a huge waste of resources.
The Solution: We can maintain a dynamic list of "active" pairs and only perform calculations for them.
Maintaining an Active Pair List
This view tracks the last trade time for each pair.
CREATE MATERIALIZED VIEW active_pairs AS
SELECT
pair_id,
MAX(ts) AS last_trade_ts
FROM trades_src
GROUP BY pair_id;
Then, we can create a view that contains only the pairs active within a certain lookback window, for example, the last 24 hours.
CREATE MATERIALIZED VIEW active_pairs_24h AS
SELECT pair_id
FROM active_pairs
WHERE last_trade_ts >= NOW() - INTERVAL '24 hours';
Aggregating Only for Active Pairs
Now, we can JOIN
our trade stream with the active_pairs_24h
list. This ensures that the expensive aggregation logic only runs for pairs that have recently traded. This is a crucial optimization for production systems, preventing wasted computation on pairs that might not have traded for days or weeks.
CREATE MATERIALIZED VIEW kline_1m_active AS
WITH active_trades AS (
SELECT
t.pair_id,
t.ts,
t.price,
t.qty
FROM trades_src t
JOIN active_pairs_24h a
ON t.pair_id = a.pair_id
)
SELECT
pair_id,
window_start AS bucket_start,
first_value(price ORDER BY ts) AS open,
MAX(price) AS high,
MIN(price) AS low,
last_value(price ORDER BY ts) AS close,
SUM(qty) AS volume,
SUM(price * qty) AS turnover,
COUNT(*) AS trade_count
FROM TUMBLE(active_trades, ts, '1 minute')
GROUP BY
pair_id,
window_start
ORDER BY
pair_id
window_start;
Gap Filling for Charting
The Challenge: Charting libraries and financial analysis tools expect a continuous series of candlesticks, one for each time interval. However, our materialized views are event-driven; if there were no trades in a given minute for a specific pair, our kline_1m_active view won't produce a row for that interval. This creates gaps in the data, which can break charts or lead to incorrect analysis.
The Solution: We can solve this by creating a complete "scaffold" or "grid" that contains a row for every active pair for every single minute in our desired time window. We then LEFT JOIN our actual candlestick data onto this complete grid. For any intervals where no real data exists (the gaps), we can intelligently fill them by carrying forward the last known closing price.
Due to the nature of high-performance stream processing, creating this grid from two dynamic sources (a changing list of active pairs and the constantly advancing clock) using a CROSS JOIN in a single materialized view is not supported. Therefore, we use a robust and common two-part pattern:
A batch process that periodically creates the complete grid and stores it in a standard table.
A streaming materialized view that performs an efficient LEFT JOIN between our streaming kline data and this stable, pre-built grid.
Step 1: Create a Standard Table for the Time-Pair Scaffold
First, we create a regular TABLE, not a materialized view. This table will act as the target for our batch process. We ensure the data types, especially for pair_id, match our source data to prevent type-mismatch errors later.
-- This table will hold the complete grid of every active pair and every minute.
CREATE TABLE time_pair_grid_scaffold (
bucket_start TIMESTAMP,
pair_id BIGINT
);
Step 2: Periodically Populate the Scaffold with a Batch Script
Next, we need an external scheduler (like a cron job, a Python script, or an orchestrator like Airflow) to run the following SQL script on a regular interval (e.g., once every minute).
This script is a batch job. It first clears the table using DELETE and then repopulates it with a fresh grid of all active pairs for every minute of the last 24 hours, generated on the fly.
-- This script must be run by an external, scheduled process.
-- Step 2a: Clear the existing data from the table.
DELETE FROM time_pair_grid_scaffold;
-- Step 2b: Insert the newly generated grid for the last 24 hours.
INSERT INTO time_pair_grid_scaffold
SELECT
series.bucket_start,
pairs.pair_id
FROM
-- Generate a continuous series of 1-minute timestamps.
generate_series(
NOW() - INTERVAL '24 hours',
NOW(),
INTERVAL '1 minute'
) AS series(bucket_start)
CROSS JOIN
-- Get the distinct list of currently active pairs.
(SELECT DISTINCT pair_id FROM active_pairs_24h) AS pairs;
Step 3: Create the Final Streaming View to Join and Fill Gaps
Finally, with our scaffold table being refreshed periodically, we can create our streaming materialized view. This view performs a highly efficient LEFT JOIN
between the scaffold table and our real-time kline_1m_active
stream.
The LEFT JOIN ensures that all rows from the scaffold are kept. When there is no matching kline data, the columns from kline_1m_active
will be NULL. We then use COALESCE
and the LAG
window function to fill these NULL gaps.
-- This materialized view joins the scaffold with the stream and fills the gaps.
CREATE MATERIALIZED VIEW kline_1m_filled AS
SELECT
grid.pair_id,
grid.bucket_start,
-- If k.open is NULL (a gap), use the last known 'close' price for this pair.
COALESCE(
k.open,
LAG(k.close) OVER (PARTITION BY grid.pair_id ORDER BY grid.bucket_start)
) AS open,
-- For a gap, the high, low, and close are all the same carried-forward price.
COALESCE(
k.high,
LAG(k.close) OVER (PARTITION BY grid.pair_id ORDER BY grid.bucket_start)
) AS high,
COALESCE(
k.low,
LAG(k.close) OVER (PARTITION BY grid.pair_id ORDER BY grid.bucket_start)
) AS low,
COALESCE(
k.close,
LAG(k.close) OVER (PARTITION BY grid.pair_id ORDER BY grid.bucket_start)
) AS close,
-- If volume is NULL, there were no trades, so the value should be 0.
COALESCE(k.volume, 0) AS volume,
COALESCE(k.turnover, 0) AS turnover,
-- Use the correct source column 'trade_count' and alias the final column as 'trades'.
COALESCE(k.trade_count, 0) AS trades
FROM time_pair_grid_scaffold AS grid
LEFT JOIN kline_1m_active AS k
-- The join condition works because the data types are consistent.
ON grid.pair_id = k.pair_id AND grid.bucket_start = k.bucket_start;
Handling Late Data and Corrections
Watermarks define a grace period for out-of-order events (e.g., 5-10 seconds). The engine will wait this long before finalizing a window's aggregation.
Very Late Events: If an event arrives after the window has been finalized (i.e., it's later than the watermark), there are two common strategies:
Emit Corrections: The engine can issue a retraction for the old result and emit a new, corrected result. This is the most accurate approach but requires downstream systems to be able to handle these corrections.
Send to a Dead-Letter Queue: The very late events can be sent to a separate table or topic for offline auditing and manual correction.
Serving the Results (Sinks)
Once the data is processed, you need to send it to downstream consumers like a web application, API, or data warehouse. RisingWave makes this easy by defining SINK
connectors that can push data to a variety of external systems.
Pushing to Kafka for Real-Time Consumers
You can create a SINK
to push the enriched 1-minute candlestick data into a new Kafka topic.
CREATE SINK kline_1m_sink
FROM kline_1m_enriched
WITH (
connector = 'kafka',
topic = 'kline_1m',
properties.bootstrap.servers = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Archiving to a Data Warehouse
For long-term storage, historical analysis, and backtesting, you can also sink the data to a column-oriented data warehouse like ClickHouse, PostgreSQL (with TimescaleDB), or a data lake format like Apache Iceberg on S3. This creates a powerful architecture where you can serve real-time data from a low-latency source (like Kafka) and historical data from a cost-effective analytical store.
Best Practices for Scale and Cost
Use MV-on-MV: This is the most important optimization. Only the finest-grained (1s) view should read from the raw
trades_src
. All other views should build upon a lower-level MV.Filter for Active Pairs: This is highly recommended and can drastically reduce CPU and storage costs.
Retention Policies: You don't need to store fine-grained data forever. Keep 1s/1m data for a few days, but store 1h/1d data for the long term.
Replayability: Ensure your upstream data source (Kafka, CDC, etc.) has sufficient retention to allow for rebuilding state if necessary.
Common Pitfalls and Troubleshooting
Incorrect Open/Close Prices: A common mistake is using
MIN(price)
orMAX(price)
for the open/close values. Always use a function that finds the first or last value within the window.Watermark Too Small/Large: If the watermark is too small, you'll get frequent corrections as late data arrives. If it's too large, your pipeline's latency will increase. Analyze your actual event delay distribution and set it to the 95th or 99th percentile.
Ignoring Sparse Pairs: Don't skip the "active pairs" optimization. For a system with millions of pairs, failing to do this will overwhelm your resources with empty window calculations.
Reproducibility: For reliable production systems, ensure your upstream source is replayable, your stream processing jobs use checkpoints, and your SQL/schema changes are managed through version control.
Next Steps & Conclusion
We've walked through a comprehensive, production-ready approach to generating real-time candlestick data from a high-volume trade stream. By leveraging the power of RisingWave's materialized views, hierarchical rollups, and optimizations for sparse data, you can build a system that is not only accurate and low-latency but also incredibly resource-efficient.
From here, you can extend this pipeline even further:
Add materialized views for even larger time granularities (e.g., 1d, 7d, 1mo).
Implement a gap-filling view to provide a chart-friendly, continuous data stream.
Extend the pipeline to calculate other real-time metrics like PnL (Profit and Loss) or to power a real-time risk management and alerting system, all from the same
trades_src
stream.
With a solid foundation for real-time aggregation in place, the possibilities for building sophisticated crypto analytics are endless.
Try RisingWave Today
Download the open-sourced version of RisingWave to deploy on your own infrastructure.
Get started quickly with RisingWave Cloud for a fully managed experience.
Talk to Our Experts: Have a complex use case or want to see a personalized demo? Contact us to discuss how RisingWave can address your specific challenges.
Join Our Community: Connect with fellow developers, ask questions, and share your experiences in our vibrant Slack community.