Wondering how to build a real-time market analysis pipeline? In this tutorial, we'll combine Databento's market data streams with RisingWave's stream processing capabilities to create a live market analysis system.


Why This Matters


Sourcing high-quality market data can be challenging, especially when dealing with multiple financial exchanges. Databento offers a powerful but lightweight solution with comprehensive coverage for live and historical market data, designed to be:

  • Easy-to-use: A small API surface and straightforward protocols enable users to get started in minutes.
  • Fast: 6.1-microsecond normalization latency, and close to zero data gaps with FPGA-based capture.
  • Versatile: Supports multiple asset classes and venues with a unified message format. Listen to every order book message or hundreds of thousands of symbols at once.


Meet Our Tools


Databento: Your Market Data Source


Databento provides clean, normalized market data through a modern API. Think of it as your window into the markets: you get both historical and real-time data in consistent formats, regardless of which exchange you're interested in.


RisingWave: Your Real-Time Processing and Management Platform


RisingWave helps you analyze streaming data in real-time using familiar SQL. If you've worked with databases before, you'll feel right at home. The Python SDK makes it even easier to integrate RisingWave into your Python applications.


Procedure


First Steps


Before we dive in, you'll need:

  1. A Databento account and license for real-time data. Licensing is a fairly straightforward process and can be instant for non-professional users — check out Databento's licensing guide for full details!
  2. A running RisingWave instance. The easiest way is to create a free RisingWave Cloud account. It’ll take a couple of minutes. You can also install the open-source version and start it locally. For detailed instructions, see the quick start guide.
  3. Python 3.7+.

Install the required Python packages to your Python environment.

pip3 install databento
pip3 install risingwave-py


Getting Market Data from Databento


Let's start with E-mini S&P 500 futures data - it's one of the most liquid futures contracts and perfect for learning. Here's how to get the live data:

import databento as db
   
client = db.Live()
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Print incoming data for testing
for record in client:
    print(record)


Set up RisingWave


Now Let’s connect to RisingWave. The parameters shown below are for connecting to a local instance of RisingWave. For Cloud users, you can grab the connection string by navigating to your cluster and clicking Connect from your cluster card.

from risingwave import RisingWave, RisingWaveConnOptions

rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="root", database="dev"
    )
)

To store the data from Databento, we need to create a table in RisingWave with the correct schema.

   with rw.getconn() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS es_futures_live (
                timestamp TIMESTAMPTZ,
                symbol VARCHAR,
                price NUMERIC,
                size BIGINT
            );
        """)

We can define a materialized view for each metric we want to get real-time results for. Materialized views in RisingWave will process data continuously as new data records arrive.

  with rw.getconn() as conn:
  conn.execute("""
            CREATE MATERIALIZED VIEW vwap_analysis_live AS
            SELECT 
                window_start,
                SUM(price * size) / SUM(size) as vwap,
                AVG(price) as simple_avg_price,
                SUM(size) as total_volume,
                COUNT(*) as trade_count
            FROM TUMBLE(es_futures_live, timestamp, INTERVAL '5 SECONDS')
            GROUP BY window_start;
        """)

This materialized view gives us VWAP calculations every 5 seconds - crucial for measuring trading execution quality. Institutional traders often aim to match or beat VWAP to demonstrate they're getting good prices for their trades.


Handling data records


We can define a function to format data records as soon as they arrive and insert into RisingWave.

async def handle_trade(record):
    with rw.getconn() as conn:
        timestamp = datetime.fromtimestamp(record.ts_event / 1e9)
        params = {
            "timestamp": timestamp,
            "symbol": record.symbol,
            "price": float(record.price),
            "size": int(record.size)
        }
        conn.execute("""
            INSERT INTO es_futures_live 
            (timestamp, symbol, price, size)
            VALUES (:timestamp, :symbol, :price, :size)
        """, params)


Subscribing to data changes


As our materialized view contains time-windowed values, we may want to subscribe to its changes to make our application event-driven. For that purpose, we’ll need to define a change event handler first, and then use that handler to subscribe to changes in the materialized view.

# Event handler for MV changes
def handle_vwap_changes(event_df: pd.DataFrame) -> None:
    # Only include update operations
    event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
    if event_df.empty:
        return

    # Format the dataframe for printing
    event_df = event_df.rename(
        {
            "window_start": "Timestamp",
            "symbol": "Symbol",
            "vwap": "VWAP",
            "simple_avg_price": "Avg Price",
            "total_volume": "Volume",
            "trade_count": "Trades",
        },
        axis=1,
    )
    event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
    event_df = event_df.set_index(["Timestamp", "Symbol"])

    print()
    print("VWAP Analysis Update:")
    print(event_df)      

            
# Subscribe to MV changes
threading.Thread(
    target=lambda: rw.on_change(
        subscribe_from="vwap_analysis_live",
        handler=handle_vwap_changes,
        output_format=OutputFormat.DATAFRAME,
        persist_progress=False,
        max_batch_size=10
    )
).start()


Putting it all together


Now let’s put everything together and see how the results might look like.

from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import databento as db
import pandas as pd
import threading

# Setup database function
def setup_database(rw: RisingWave) -> None:
    with rw.getconn() as conn:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS es_futures_live (
                timestamp TIMESTAMP,
                symbol VARCHAR,
                price DOUBLE PRECISION,
                size BIGINT
            )"""
        )

        conn.execute(
            """
            CREATE MATERIALIZED VIEW IF NOT EXISTS vwap_analysis_live AS
            SELECT 
                window_start,
                symbol,
                SUM(price * size) / SUM(size) as vwap,
                AVG(price) as simple_avg_price,
                SUM(size) as total_volume,
                COUNT(*) as trade_count
            FROM TUMBLE(es_futures_live, timestamp, INTERVAL '5 SECONDS')
            GROUP BY window_start, symbol;"""
        )

# Event handler for MV changes
def handle_vwap_changes(event_df: pd.DataFrame) -> None:
    # Only include update operations
    event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
    if event_df.empty:
        return

    # Format the dataframe for printing
    event_df = event_df.rename(
        {
            "window_start": "Timestamp",
            "symbol": "Symbol",
            "vwap": "VWAP",
            "simple_avg_price": "Avg Price",
            "total_volume": "Volume",
            "trade_count": "Trades",
        },
        axis=1,
    )
    event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
    event_df = event_df.set_index(["Timestamp", "Symbol"])

    print()
    print("VWAP Analysis Update:")
    print(event_df)

def main() -> None:
    # Initialize RisingWave connection
    rw = RisingWave(
        RisingWaveConnOptions.from_connection_info(
            host="localhost", port=4566, user="root", password="root", database="dev"
        )
    )
    setup_database(rw)

    # Subscribe to MV changes
    threading.Thread(
        target=lambda: rw.on_change(
            subscribe_from="vwap_analysis_live",
            handler=handle_vwap_changes,
            output_format=OutputFormat.DATAFRAME,
            persist_progress=False,
            max_batch_size=10,
        )
    ).start()

    # Subscribe to CME data through Databento
    db.enable_logging()
    client = db.Live()
    client.subscribe(
        dataset="GLBX.MDP3",
        schema="trades",
        stype_in="parent",
        symbols="ES.FUT",
    )

    # Send trades to RisingWave
    with rw.getconn() as conn:
        for record in client:
            # Only handle Trade records
            if not isinstance(record, db.TradeMsg):
                continue

            # Get the human-readable symbol name
            symbol = client.symbology_map.get(record.instrument_id)
            if symbol is None:
                continue

            params = {
                "timestamp": record.pretty_ts_recv,
                "symbol": symbol,
                "price": record.pretty_price,
                "size": record.size,
            }
            conn.execute(
                """
                INSERT INTO es_futures_live 
                (timestamp, symbol, price, size)
                VALUES (:timestamp, :symbol, :price, :size)""",
                params,
            )

if __name__ == "__main__":
    main()

Below is an example of the dynamic output from the live VWAP analysis, which is continuously updated in real-time as new market data becomes available.

| Timestamp           | Symbol        | VWAP     | Avg Price | Volume | Trades |
|---------------------|---------------|----------|-----------|--------|--------|
| 2024-12-13 20:03:30 | ESZ4-ESH5     | 69.3259  | 69.3250   | 297    | 44     |
| 2024-12-13 20:03:35 | ESZ4          | 6057.875 | 6057.875  | 22     | 22     |
| 2024-12-13 20:03:35 | ESZ4-ESH5     | 69.3313  | 69.3200   | 1265   | 55     |
| 2024-12-13 20:03:40 | ESH5          | 6127.587 | 6127.588  | 1199   | 407    |
| 2024-12-13 20:03:40 | ESZ4          | 6058.076 | 6058.034  | 3839   | 1045   |

What's Next?

Now that you have real-time market data flowing through your system, you could:

  • Add more sophisticated analyses like order flow imbalance;
  • Create trading signals based on VWAP crossovers;
  • Build a dashboard to visualize your metrics!
Avatar

Heng Ma

Content Lead

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.