Building Real-Time Crypto Candlestick Charts with RisingWave

Building Real-Time Crypto Candlestick Charts with RisingWave

In the fast-paced world of cryptocurrency trading, having access to real-time, accurate data isn't just a competitive edge—it's a necessity. Traders, platforms, and analysts all rely on up-to-the-second information to make critical decisions. One of the most fundamental tools for this is the candlestick chart, which visualizes price movements through Open, High, Low, Close, and Volume (OHLCV) data.

But building a scalable, efficient, and correct system to generate these charts from a high-velocity stream of raw trades is a significant engineering challenge. You need to handle massive data volumes, process events with low latency, and account for real-world complexities like out-of-order data and inactive trading pairs.

This technical guide provides a complete playbook for building a robust candlestick charting pipeline using RisingWave, a streaming database designed for real-time analytics. We'll walk through the entire process, from ingesting raw trades to serving multi-granularity candlestick data, and we'll tackle common production challenges along the way.

What You'll Learn (TL;DR)

  • Generate multi-granularity OHLCV (1s, 1m, 5m, 1h, 1d, etc.) with a single set of streaming SQL queries.

  • Correctly handle out-of-order and late trade events using watermarks.

  • Optimize for millions of sparse trading pairs by computing only for active pairs, saving over 99% of resources.

  • Use Materialized View-on-Materialized View (MV-on-MV) for hierarchical rollups to avoid repeatedly scanning raw trade data.

  • Push the results to a charting service or API via a Kafka sink.


Data Model and Input Assumptions

We'll assume we have a stream of individual trades from a source like Kafka. Each trade event in the trades stream has the following structure:

  • pair_id: The internal ID of the trading pair (e.g., an integer key for ETH/USDC).

  • symbol: The display name of the pair (optional).

  • ts: The trade execution timestamp (event time).

  • price: The execution price.

  • qty: The trade quantity.

Creating a Source (with a Watermark)

First, let's connect to the Kafka topic containing the trade data. We'll define a SOURCE in RisingWave and specify a watermark. The watermark tells the stream processor how to handle late-arriving events.

CREATE SOURCE trades_src (
  pair_id     BIGINT,
  symbol      VARCHAR,
  ts          TIMESTAMP,
  price       DOUBLE PRECISION,
  qty         DOUBLE PRECISION
) WITH (
  connector = 'kafka',
  topic = 'trades',
  properties.bootstrap.servers = 'kafka:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
💡
What is a Watermark? A watermark informs the engine about the maximum expected delay for out-of-order events. This is crucial for correctly closing time windows, performing aggregations, and issuing corrections (retractions/merges) when late data arrives.

Fine-Grained Candlesticks (1-Second OHLCV)

We'll start by aggregating trades into the finest granularity we need, such as 1-second candlesticks. All coarser granularities (like 1-minute or 1-hour) will be built on top of this base layer. This hierarchical approach, often called "MV-on-MV," is highly efficient as it avoids re-processing the raw trade stream for each granularity.

CREATE MATERIALIZED VIEW kline_1s AS
SELECT
    pair_id,
    window_start AS bucket_start,
    first_value(price ORDER BY ts) AS open,
    MAX(price) AS high,
    MIN(price) AS low,
    last_value(price ORDER BY ts) AS close,
    SUM(qty) AS volume,
    SUM(price * qty) AS turnover,
    COUNT(*) AS trade_count
FROM TUMBLE(trades_src, ts, '1 second')
GROUP BY
    pair_id,
    window_start;

Hierarchical Rollups (1-Minute, 5-Minute, 1-Hour…)

Now, we can efficiently create higher-level aggregations by querying the kline_1s materialized view we just created. This "MV-on-MV" (Materialized View on Materialized View) approach is a cornerstone of efficient stream processing, allowing you to build complex, multi-stage pipelines without the high cost of re-scanning the original source data.

From 1-Second to 1-Minute

CREATE MATERIALIZED VIEW kline_1m AS
SELECT
    pair_id,
    window_start AS bucket_start,
    first_value(open ORDER BY bucket_start) AS open,
    MAX(high) AS high,
    MIN(low) AS low,
    last_value(close ORDER BY bucket_start) AS close,
    SUM(volume) AS volume,
    SUM(turnover) AS turnover,
    SUM(trade_count) AS trades -- Add this line to sum the counts
FROM TUMBLE(kline_1s, bucket_start, '1 minute')
GROUP BY
    pair_id,
    window_start;

From 1-Minute to 5-Minute, 1-Hour, etc.

We can continue this pattern to roll up to any desired granularity.

CREATE MATERIALIZED VIEW kline_5m AS
SELECT
    pair_id,
    window_start AS bucket_start,
    -- Get the 'open' from the first 1m candle in this 5-minute window
    first_value(open ORDER BY bucket_start) AS open,
    -- The 5-minute high is the max of all 1m highs in the window
    MAX(high) AS high,
    -- The 5-minute low is the min of all 1m lows in the window
    MIN(low) AS low,
    -- Get the 'close' from the last 1m candle in this 5-minute window
    last_value(close ORDER BY bucket_start) AS close,
    -- Sum the volumes and turnovers from the five 1m candles
    SUM(volume) AS volume,
    SUM(turnover) AS turnover
    SUM(trade_count) AS trades
FROM TUMBLE(kline_1m, bucket_start, '5 minutes')
GROUP BY
    pair_id,
    window_start;

CREATE MATERIALIZED VIEW kline_1h AS
SELECT
    pair_id,
    window_start AS bucket_start,
    -- Get the 'open' from the first 1m candle in this 1-hour window
    first_value(open ORDER BY bucket_start) AS open,
    -- The 1-hour high is the max of all 1m highs in the window
    MAX(high) AS high,
    -- The 1-hour low is the min of all 1m lows in the window
    MIN(low) AS low,
    -- Get the 'close' from the last 1m candle in this 1-hour window
    last_value(close ORDER BY bucket_start) AS close,
    -- Sum the volumes and turnovers from the sixty 1m candles
    SUM(volume) AS volume,
    SUM(turnover) AS turnover
    SUM(trade_count) AS trades
FROM TUMBLE(kline_5m, bucket_start, '1 hour')
GROUP BY
    pair_id,
    window_start;

Enriching the Data (VWAP and Trade Count)

We can create another materialized view to enrich our candlestick data with additional metrics like VWAP and the total number of trades within each bucket.

CREATE MATERIALIZED VIEW kline_1m_enriched AS
SELECT
  *,
  -- VWAP is the only new calculation needed here
  CASE
    WHEN volume > 0 THEN turnover / volume
    ELSE NULL
  END AS vwap
FROM kline_1m;

Optimization for Sparse Trading Pairs

The Pain Point: In a real-world crypto exchange, you might have millions of trading pairs, but 99.9% of them are inactive for long periods. Calculating empty candlesticks for all of them is a huge waste of resources.

The Solution: We can maintain a dynamic list of "active" pairs and only perform calculations for them.

Maintaining an Active Pair List

This view tracks the last trade time for each pair.

CREATE MATERIALIZED VIEW active_pairs AS
SELECT
  pair_id,
  MAX(ts) AS last_trade_ts
FROM trades_src
GROUP BY pair_id;

Then, we can create a view that contains only the pairs active within a certain lookback window, for example, the last 24 hours.

CREATE MATERIALIZED VIEW active_pairs_24h AS
SELECT pair_id
FROM active_pairs
WHERE last_trade_ts >= NOW() - INTERVAL '24 hours';

Aggregating Only for Active Pairs

Now, we can JOIN our trade stream with the active_pairs_24h list. This ensures that the expensive aggregation logic only runs for pairs that have recently traded. This is a crucial optimization for production systems, preventing wasted computation on pairs that might not have traded for days or weeks.

CREATE MATERIALIZED VIEW kline_1m_active AS
WITH active_trades AS (
  SELECT
    t.pair_id,
    t.ts,
    t.price,
    t.qty
  FROM trades_src t
  JOIN active_pairs_24h a
    ON t.pair_id = a.pair_id
)
SELECT
    pair_id,
    window_start AS bucket_start,
    first_value(price ORDER BY ts) AS open,
    MAX(price) AS high,
    MIN(price) AS low,
    last_value(price ORDER BY ts) AS close,
    SUM(qty) AS volume,
    SUM(price * qty) AS turnover,
    COUNT(*) AS trade_count
FROM TUMBLE(active_trades, ts, '1 minute')
GROUP BY
    pair_id,
    window_start
ORDER BY
    pair_id
    window_start;

Gap Filling for Charting

The Challenge: Charting libraries and financial analysis tools expect a continuous series of candlesticks, one for each time interval. However, our materialized views are event-driven; if there were no trades in a given minute for a specific pair, our kline_1m_active view won't produce a row for that interval. This creates gaps in the data, which can break charts or lead to incorrect analysis.

The Solution: We can solve this by creating a complete "scaffold" or "grid" that contains a row for every active pair for every single minute in our desired time window. We then LEFT JOIN our actual candlestick data onto this complete grid. For any intervals where no real data exists (the gaps), we can intelligently fill them by carrying forward the last known closing price.

Due to the nature of high-performance stream processing, creating this grid from two dynamic sources (a changing list of active pairs and the constantly advancing clock) using a CROSS JOIN in a single materialized view is not supported. Therefore, we use a robust and common two-part pattern:

  1. A batch process that periodically creates the complete grid and stores it in a standard table.

  2. A streaming materialized view that performs an efficient LEFT JOIN between our streaming kline data and this stable, pre-built grid.

Step 1: Create a Standard Table for the Time-Pair Scaffold

First, we create a regular TABLE, not a materialized view. This table will act as the target for our batch process. We ensure the data types, especially for pair_id, match our source data to prevent type-mismatch errors later.

-- This table will hold the complete grid of every active pair and every minute.
CREATE TABLE time_pair_grid_scaffold (
   bucket_start TIMESTAMP,
   pair_id BIGINT
);

Step 2: Periodically Populate the Scaffold with a Batch Script

Next, we need an external scheduler (like a cron job, a Python script, or an orchestrator like Airflow) to run the following SQL script on a regular interval (e.g., once every minute).

This script is a batch job. It first clears the table using DELETE and then repopulates it with a fresh grid of all active pairs for every minute of the last 24 hours, generated on the fly.

-- This script must be run by an external, scheduled process.

-- Step 2a: Clear the existing data from the table.
DELETE FROM time_pair_grid_scaffold;

-- Step 2b: Insert the newly generated grid for the last 24 hours.
INSERT INTO time_pair_grid_scaffold
SELECT
  series.bucket_start,
  pairs.pair_id
FROM
  -- Generate a continuous series of 1-minute timestamps.
  generate_series(
    NOW() - INTERVAL '24 hours',
    NOW(),
    INTERVAL '1 minute'
  ) AS series(bucket_start)
CROSS JOIN
  -- Get the distinct list of currently active pairs.
  (SELECT DISTINCT pair_id FROM active_pairs_24h) AS pairs;

Step 3: Create the Final Streaming View to Join and Fill Gaps

Finally, with our scaffold table being refreshed periodically, we can create our streaming materialized view. This view performs a highly efficient LEFT JOIN between the scaffold table and our real-time kline_1m_active stream.

The LEFT JOIN ensures that all rows from the scaffold are kept. When there is no matching kline data, the columns from kline_1m_active will be NULL. We then use COALESCE and the LAG window function to fill these NULL gaps.

-- This materialized view joins the scaffold with the stream and fills the gaps.
CREATE MATERIALIZED VIEW kline_1m_filled AS
SELECT
  grid.pair_id,
  grid.bucket_start,
  -- If k.open is NULL (a gap), use the last known 'close' price for this pair.
  COALESCE(
    k.open,
    LAG(k.close) OVER (PARTITION BY grid.pair_id ORDER BY grid.bucket_start)
  ) AS open,
  -- For a gap, the high, low, and close are all the same carried-forward price.
  COALESCE(
    k.high,
    LAG(k.close) OVER (PARTITION BY grid.pair_id ORDER BY grid.bucket_start)
  ) AS high,
  COALESCE(
    k.low,
    LAG(k.close) OVER (PARTITION BY grid.pair_id ORDER BY grid.bucket_start)
  ) AS low,
  COALESCE(
    k.close,
    LAG(k.close) OVER (PARTITION BY grid.pair_id ORDER BY grid.bucket_start)
  ) AS close,
  -- If volume is NULL, there were no trades, so the value should be 0.
  COALESCE(k.volume, 0) AS volume,
  COALESCE(k.turnover, 0) AS turnover,
  -- Use the correct source column 'trade_count' and alias the final column as 'trades'.
  COALESCE(k.trade_count, 0) AS trades
FROM time_pair_grid_scaffold AS grid
LEFT JOIN kline_1m_active AS k
  -- The join condition works because the data types are consistent.
  ON grid.pair_id = k.pair_id AND grid.bucket_start = k.bucket_start;

Handling Late Data and Corrections

  • Watermarks define a grace period for out-of-order events (e.g., 5-10 seconds). The engine will wait this long before finalizing a window's aggregation.

  • Very Late Events: If an event arrives after the window has been finalized (i.e., it's later than the watermark), there are two common strategies:

    1. Emit Corrections: The engine can issue a retraction for the old result and emit a new, corrected result. This is the most accurate approach but requires downstream systems to be able to handle these corrections.

    2. Send to a Dead-Letter Queue: The very late events can be sent to a separate table or topic for offline auditing and manual correction.

💡
Engineering Tip: Set your watermark duration to accommodate the 99th percentile of event delay in your system, plus a small buffer. In the UI, it's a good practice to visually indicate that the most recent, still-open time bucket is preliminary and subject to change.

Serving the Results (Sinks)

Once the data is processed, you need to send it to downstream consumers like a web application, API, or data warehouse. RisingWave makes this easy by defining SINK connectors that can push data to a variety of external systems.

Pushing to Kafka for Real-Time Consumers

You can create a SINK to push the enriched 1-minute candlestick data into a new Kafka topic.

CREATE SINK kline_1m_sink
FROM kline_1m_enriched
WITH (
  connector = 'kafka',
  topic = 'kline_1m',
  properties.bootstrap.servers = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Archiving to a Data Warehouse

For long-term storage, historical analysis, and backtesting, you can also sink the data to a column-oriented data warehouse like ClickHouse, PostgreSQL (with TimescaleDB), or a data lake format like Apache Iceberg on S3. This creates a powerful architecture where you can serve real-time data from a low-latency source (like Kafka) and historical data from a cost-effective analytical store.


Best Practices for Scale and Cost

  • Use MV-on-MV: This is the most important optimization. Only the finest-grained (1s) view should read from the raw trades_src. All other views should build upon a lower-level MV.

  • Filter for Active Pairs: This is highly recommended and can drastically reduce CPU and storage costs.

  • Retention Policies: You don't need to store fine-grained data forever. Keep 1s/1m data for a few days, but store 1h/1d data for the long term.

  • Replayability: Ensure your upstream data source (Kafka, CDC, etc.) has sufficient retention to allow for rebuilding state if necessary.


Common Pitfalls and Troubleshooting

  • Incorrect Open/Close Prices: A common mistake is using MIN(price) or MAX(price) for the open/close values. Always use a function that finds the first or last value within the window.

  • Watermark Too Small/Large: If the watermark is too small, you'll get frequent corrections as late data arrives. If it's too large, your pipeline's latency will increase. Analyze your actual event delay distribution and set it to the 95th or 99th percentile.

  • Ignoring Sparse Pairs: Don't skip the "active pairs" optimization. For a system with millions of pairs, failing to do this will overwhelm your resources with empty window calculations.

  • Reproducibility: For reliable production systems, ensure your upstream source is replayable, your stream processing jobs use checkpoints, and your SQL/schema changes are managed through version control.


Next Steps & Conclusion

We've walked through a comprehensive, production-ready approach to generating real-time candlestick data from a high-volume trade stream. By leveraging the power of RisingWave's materialized views, hierarchical rollups, and optimizations for sparse data, you can build a system that is not only accurate and low-latency but also incredibly resource-efficient.

From here, you can extend this pipeline even further:

  • Add materialized views for even larger time granularities (e.g., 1d, 7d, 1mo).

  • Implement a gap-filling view to provide a chart-friendly, continuous data stream.

  • Extend the pipeline to calculate other real-time metrics like PnL (Profit and Loss) or to power a real-time risk management and alerting system, all from the same trades_src stream.

With a solid foundation for real-time aggregation in place, the possibilities for building sophisticated crypto analytics are endless.

Try RisingWave Today

  • Download the open-sourced version of RisingWave to deploy on your own infrastructure.

  • Get started quickly with RisingWave Cloud for a fully managed experience.

  • Talk to Our Experts: Have a complex use case or want to see a personalized demo? Contact us to discuss how RisingWave can address your specific challenges.

  • Join Our Community: Connect with fellow developers, ask questions, and share your experiences in our vibrant Slack community.

The Modern Backbone for Your
Data Streaming Workloads
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.