In the fast-paced world of cryptocurrency trading, having access to real-time, accurate data isn't just a competitive edge—it's a necessity. Traders, platforms, and analysts all rely on up-to-the-second information to make critical decisions. One of the most fundamental tools for this is the candlestick chart, which visualizes price movements through Open, High, Low, Close, and Volume (OHLCV) data.
But building a scalable, efficient, and correct system to generate these charts from a high-velocity stream of raw trades is a significant engineering challenge. You need to handle massive data volumes, process events with low latency, and account for real-world complexities like out-of-order data and inactive trading pairs.
This technical guide provides a complete playbook for building a robust candlestick charting pipeline using RisingWave, a streaming database designed for real-time analytics. We'll walk through the entire process, from ingesting raw trades to serving multi-granularity candlestick data, and we'll tackle common production challenges along the way.
What You'll Learn (TL;DR)
Generate multi-granularity OHLCV (1s, 1m, 5m, 1h, 1d, etc.) with a single set of streaming SQL queries.
Correctly handle out-of-order and late trade events using watermarks.
Optimize for millions of sparse trading pairs by computing only for active pairs, saving over 99% of resources.
Use Materialized View-on-Materialized View (MV-on-MV) for hierarchical rollups to avoid repeatedly scanning raw trade data.
Push the results to a charting service or API via a Kafka sink.
Data Model and Input Assumptions
We'll assume we have a stream of individual trades from a source like Kafka. Each trade event in the trades
stream has the following structure:
pair_id
: The internal ID of the trading pair (e.g., an integer key forETH/USDC
).symbol
: The display name of the pair (optional).ts
: The trade execution timestamp (event time).price
: The execution price.qty
: The trade quantity.
Creating a Source (with a Watermark)
First, let's connect to the Kafka topic containing the trade data. We'll define a SOURCE
in RisingWave and specify a watermark. The watermark tells the stream processor how to handle late-arriving events.
CREATE SOURCE trades_src (
pair_id BIGINT,
symbol VARCHAR,
ts TIMESTAMP,
price DOUBLE PRECISION,
qty DOUBLE PRECISION
) WITH (
connector = 'kafka',
topic = 'trades',
properties.bootstrap.servers = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON
-- Use `ts` as the event time, allowing for a 5-second disorder window.
WATERMARK FOR ts AS ts - INTERVAL '5 seconds';
Fine-Grained Candlesticks (1-Second OHLCV)
We'll start by aggregating trades into the finest granularity we need, such as 1-second candlesticks. All coarser granularities (like 1-minute or 1-hour) will be built on top of this base layer. This hierarchical approach, often called "MV-on-MV," is highly efficient as it avoids re-processing the raw trade stream for each granularity.
CREATE MATERIALIZED VIEW kline_1s AS
SELECT
pair_id,
TUMBLE_START(ts, INTERVAL '1 second') AS bucket_start,
FIRST_VALUE(price) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST_VALUE(price) AS close,
SUM(qty) AS volume,
SUM(price * qty) AS turnover
FROM trades_src
GROUP BY
pair_id,
TUMBLE(ts, INTERVAL '1 second');
turnover
? Pre-calculating turnover (the total value of trades) makes it easy to compute the Volume-Weighted Average Price (VWAP) later on (turnover / volume
).Hierarchical Rollups (1-Minute, 5-Minute, 1-Hour…)
Now, we can efficiently create higher-level aggregations by querying the kline_1s
materialized view we just created. This "MV-on-MV" (Materialized View on Materialized View) approach is a cornerstone of efficient stream processing, allowing you to build complex, multi-stage pipelines without the high cost of re-scanning the original source data.
From 1-Second to 1-Minute
CREATE MATERIALIZED VIEW kline_1m AS
SELECT
pair_id,
TUMBLE_START(bucket_start, INTERVAL '1 minute') AS bucket_start,
-- Use MIN_BY/MAX_BY to get the open/close from the earliest/latest 1s bucket.
MIN_BY(open, bucket_start) AS open,
MAX(high) AS high,
MIN(low) AS low,
MAX_BY(close, bucket_start) AS close,
SUM(volume) AS volume,
SUM(turnover) AS turnover
FROM kline_1s
GROUP BY
pair_id,
TUMBLE(bucket_start, INTERVAL '1 minute');
MIN_BY
/MAX_BY
: If your system doesn't support MIN_BY
/MAX_BY
, you can achieve the same result using FIRST_VALUE
/LAST_VALUE
with an ORDER BY
clause.From 1-Minute to 5-Minute, 1-Hour, etc.
We can continue this pattern to roll up to any desired granularity. Notice that kline_5m
and kline_1h
both build upon kline_1m
for efficiency.
CREATE MATERIALIZED VIEW kline_5m AS
SELECT
pair_id,
TUMBLE_START(bucket_start, INTERVAL '5 minutes') AS bucket_start,
MIN_BY(open, bucket_start) AS open,
MAX(high) AS high,
MIN(low) AS low,
MAX_BY(close, bucket_start) AS close,
SUM(volume) AS volume,
SUM(turnover) AS turnover
FROM kline_1m
GROUP BY
pair_id,
TUMBLE(bucket_start, INTERVAL '5 minutes');
CREATE MATERIALIZED VIEW kline_1h AS
SELECT
pair_id,
TUMBLE_START(bucket_start, INTERVAL '1 hour') AS bucket_start,
MIN_BY(open, bucket_start) AS open,
MAX(high) AS high,
MIN(low) AS low,
MAX_BY(close, bucket_start) AS close,
SUM(volume) AS volume,
SUM(turnover) AS turnover
FROM kline_1m
GROUP BY
pair_id,
TUMBLE(bucket_start, INTERVAL '1 hour');
Enriching the Data (VWAP and Trade Count)
We can create another materialized view to enrich our candlestick data with additional metrics like VWAP and the total number of trades within each bucket.
-- Create an enriched view for the 1-minute granularity.
CREATE MATERIALIZED VIEW kline_1m_enriched AS
SELECT
*,
-- Add trade count (alternatively, COUNT(*) in kline_1s and SUM here)
SUM(cnt) AS trades,
-- Calculate VWAP
CASE WHEN volume > 0 THEN turnover / volume ELSE NULL END AS vwap
FROM (
SELECT
*,
1 AS cnt
FROM kline_1m
) t
GROUP BY pair_id, bucket_start, open, high, low, close, volume, turnover;
COUNT(*) AS trades
) in the base kline_1s
view and then use SUM(trades)
in the subsequent rollups.Optimization for Sparse Trading Pairs
The Pain Point: In a real-world crypto exchange, you might have millions of trading pairs, but 99.9% of them are inactive for long periods. Calculating empty candlesticks for all of them is a huge waste of resources.
The Solution: We can maintain a dynamic list of "active" pairs and only perform calculations for them.
Maintaining an Active Pair List
This view tracks the last trade time for each pair.
CREATE MATERIALIZED VIEW active_pairs AS
SELECT
pair_id,
MAX(ts) AS last_trade_ts
FROM trades_src
GROUP BY pair_id;
Then, we can create a view that contains only the pairs active within a certain lookback window, for example, the last 24 hours.
CREATE MATERIALIZED VIEW active_pairs_24h AS
SELECT pair_id
FROM active_pairs
WHERE last_trade_ts >= NOW() - INTERVAL '24 hours';
Aggregating Only for Active Pairs
Now, we can JOIN
our trade stream with the active_pairs_24h
list. This ensures that the expensive aggregation logic only runs for pairs that have recently traded. This is a crucial optimization for production systems, preventing wasted computation on pairs that might not have traded for days or weeks.
CREATE MATERIALIZED VIEW kline_1s_active AS
SELECT
t.pair_id,
TUMBLE_START(t.ts, INTERVAL '1 second') AS bucket_start,
FIRST_VALUE(t.price) AS open,
MAX(t.price) AS high,
MIN(t.price) AS low,
LAST_VALUE(t.price) AS close,
SUM(t.qty) AS volume,
SUM(t.price * t.qty) AS turnover
FROM trades_src t
JOIN active_pairs_24h a
ON t.pair_id = a.pair_id
GROUP BY
t.pair_id,
TUMBLE(t.ts, INTERVAL '1 second');
trades_src
stream.Gap Filling for Charting
The Challenge: Charting libraries typically expect a continuous series of candlesticks, one for each time interval. However, if there were no trades in a given minute, our previous queries won't produce a row for that interval, creating a gap.
The Solution: We can generate a complete time series and LEFT JOIN
our candlestick data against it. This process, known as "gap filling," ensures that every interval is represented.
Example: Using a Time Dimension Table
First, we need a table that contains a complete, uninterrupted sequence of timestamps for our desired granularity.
-- This table should be pre-populated with a continuous series of minute-by-minute timestamps.
CREATE TABLE minutes_dim (
bucket_start TIMESTAMP PRIMARY KEY
);
Next, we can CROSS JOIN
the list of active pairs with the time dimension table to create a complete grid of all pairs and all time buckets. Then, we LEFT JOIN
our actual kline_1m
data. We use LAG
and COALESCE
to fill the gaps, carrying forward the last known closing price for intervals with no trades.
CREATE MATERIALIZED VIEW kline_1m_filled AS
SELECT
p.pair_id,
d.bucket_start,
COALESCE(k.open, LAG(k.close) OVER w) AS open,
COALESCE(k.high, LAG(k.close) OVER w) AS high,
COALESCE(k.low, LAG(k.close) OVER w) AS low,
COALESCE(k.close, LAG(k.close) OVER w) AS close,
COALESCE(k.volume, 0) AS volume
FROM (SELECT DISTINCT pair_id FROM active_pairs_24h) p
CROSS JOIN minutes_dim d
LEFT JOIN kline_1m k
ON k.pair_id = p.pair_id AND k.bucket_start = d.bucket_start
WINDOW w AS (PARTITION BY p.pair_id ORDER BY d.bucket_start);
ffill
) strategy. Depending on your needs, you could also leave the gaps as NULL
or use other interpolation methods. Some advanced stream processing engines provide built-in FILL_GAPS
functions to simplify this.Handling Late Data and Corrections
Watermarks define a grace period for out-of-order events (e.g., 5-10 seconds). The engine will wait this long before finalizing a window's aggregation.
Very Late Events: If an event arrives after the window has been finalized (i.e., it's later than the watermark), there are two common strategies:
Emit Corrections: The engine can issue a retraction for the old result and emit a new, corrected result. This is the most accurate approach but requires downstream systems to be able to handle these corrections.
Send to a Dead-Letter Queue: The very late events can be sent to a separate table or topic for offline auditing and manual correction.
Serving the Results (Sinks)
Once the data is processed, you need to send it to downstream consumers like a web application, API, or data warehouse. RisingWave makes this easy by defining SINK
connectors that can push data to a variety of external systems.
Pushing to Kafka for Real-Time Consumers
You can create a SINK
to push the enriched 1-minute candlestick data into a new Kafka topic.
CREATE SINK kline_1m_sink
FROM kline_1m_enriched
WITH (
connector = 'kafka',
topic = 'kline_1m',
properties.bootstrap.servers = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Archiving to a Data Warehouse
For long-term storage, historical analysis, and backtesting, you can also sink the data to a column-oriented data warehouse like ClickHouse, PostgreSQL (with TimescaleDB), or a data lake format like Apache Iceberg on S3. This creates a powerful architecture where you can serve real-time data from a low-latency source (like Kafka) and historical data from a cost-effective analytical store.
Best Practices for Scale and Cost
Use MV-on-MV: This is the most important optimization. Only the finest-grained (1s) view should read from the raw
trades_src
. All other views should build upon a lower-level MV.Partitioning: Partition your data by
pair_id
to distribute the workload, separating hot (high-volume) pairs from cold (low-volume) ones.Filter for Active Pairs: This is highly recommended and can drastically reduce CPU and storage costs.
Retention Policies: You don't need to store fine-grained data forever. Keep 1s/1m data for a few days, but store 1h/1d data for the long term.
Replayability: Ensure your upstream data source (Kafka, CDC, etc.) has sufficient retention to allow for rebuilding state if necessary.
End-to-End MVP (Summary SQL)
Copy the SQL in this section and adapt the field names to match your topic's schema to get a minimum viable product up and running quickly.
-- 1) Create a source with a watermark
CREATE SOURCE trades_src (
pair_id BIGINT,
symbol VARCHAR,
ts TIMESTAMP,
price DOUBLE PRECISION,
qty DOUBLE PRECISION
) WITH (
connector = 'kafka',
topic = 'trades',
properties.bootstrap.servers = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON
WATERMARK FOR ts AS ts - INTERVAL '5 seconds';
-- 2) Create fine-grained 1s k-lines
CREATE MATERIALIZED VIEW kline_1s AS
SELECT
pair_id,
TUMBLE_START(ts, INTERVAL '1 second') AS bucket_start,
FIRST_VALUE(price) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST_VALUE(price) AS close,
SUM(qty) AS volume,
SUM(price * qty) AS turnover
FROM trades_src
GROUP BY pair_id, TUMBLE(ts, INTERVAL '1 second');
-- 3) Roll up to 1m and 5m
CREATE MATERIALIZED VIEW kline_1m AS
SELECT
pair_id,
TUMBLE_START(bucket_start, INTERVAL '1 minute') AS bucket_start,
MIN_BY(open, bucket_start) AS open,
MAX(high) AS high,
MIN(low) AS low,
MAX_BY(close, bucket_start) AS close,
SUM(volume) AS volume,
SUM(turnover) AS turnover
FROM kline_1s
GROUP BY pair_id, TUMBLE(bucket_start, INTERVAL '1 minute');
CREATE MATERIALIZED VIEW kline_5m AS
SELECT
pair_id,
TUMBLE_START(bucket_start, INTERVAL '5 minutes') AS bucket_start,
MIN_BY(open, bucket_start) AS open,
MAX(high) AS high,
MIN(low) AS low,
MAX_BY(close, bucket_start) AS close,
SUM(volume) AS volume,
SUM(turnover) AS turnover
FROM kline_1m
GROUP BY pair_id, TUMBLE(bucket_start, INTERVAL '5 minutes');
-- 4) Enrich with VWAP
CREATE MATERIALIZED VIEW kline_1m_enriched AS
SELECT
*,
CASE WHEN volume > 0 THEN turnover / volume ELSE NULL END AS vwap
FROM kline_1m;
-- 5) Sink the 1-minute results to Kafka
CREATE SINK kline_1m_sink
FROM kline_1m_enriched
WITH (
connector = 'kafka',
topic = 'kline_1m',
properties.bootstrap.servers = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Common Pitfalls and Troubleshooting
Incorrect Open/Close Prices: A common mistake is using
MIN(price)
orMAX(price)
for the open/close values. Always use a function that finds the first or last value within the window.Watermark Too Small/Large: If the watermark is too small, you'll get frequent corrections as late data arrives. If it's too large, your pipeline's latency will increase. Analyze your actual event delay distribution and set it to the 95th or 99th percentile.
Ignoring Sparse Pairs: Don't skip the "active pairs" optimization. For a system with millions of pairs, failing to do this will overwhelm your resources with empty window calculations.
Reproducibility: For reliable production systems, ensure your upstream source is replayable, your stream processing jobs use checkpoints, and your SQL/schema changes are managed through version control.
Next Steps & Conclusion
We've walked through a comprehensive, production-ready approach to generating real-time candlestick data from a high-volume trade stream. By leveraging the power of RisingWave's materialized views, hierarchical rollups, and optimizations for sparse data, you can build a system that is not only accurate and low-latency but also incredibly resource-efficient.
From here, you can extend this pipeline even further:
Add materialized views for even larger time granularities (e.g., 1d, 7d, 1mo).
Implement a gap-filling view to provide a chart-friendly, continuous data stream.
Extend the pipeline to calculate other real-time metrics like PnL (Profit and Loss) or to power a real-time risk management and alerting system, all from the same
trades_src
stream.
With a solid foundation for real-time aggregation in place, the possibilities for building sophisticated crypto analytics are endless.
Try RisingWave Today
Download the open-sourced version of RisingWave to deploy on your own infrastructure.
Get started quickly with RisingWave Cloud for a fully managed experience.
Talk to Our Experts: Have a complex use case or want to see a personalized demo? Contact us to discuss how RisingWave can address your specific challenges.
Join Our Community: Connect with fellow developers, ask questions, and share your experiences in our vibrant Slack community.