from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import databento as db
import pandas as pd
import threading
# Setup database function
def setup_database(rw: RisingWave) -> None:
with rw.getconn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS es_futures_live (
timestamp TIMESTAMP,
symbol VARCHAR,
price DOUBLE PRECISION,
size BIGINT
)"""
)
conn.execute(
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS vwap_analysis_live AS
SELECT
window_start,
symbol,
SUM(price * size) / SUM(size) as vwap,
AVG(price) as simple_avg_price,
SUM(size) as total_volume,
COUNT(*) as trade_count
FROM TUMBLE(es_futures_live, timestamp, INTERVAL '5 SECONDS')
GROUP BY window_start, symbol;"""
)
# Event handler for MV changes
def handle_vwap_changes(event_df: pd.DataFrame) -> None:
# Only include update operations
event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
if event_df.empty:
return
# Format the dataframe for printing
event_df = event_df.rename(
{
"window_start": "Timestamp",
"symbol": "Symbol",
"vwap": "VWAP",
"simple_avg_price": "Avg Price",
"total_volume": "Volume",
"trade_count": "Trades",
},
axis=1,
)
event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
event_df = event_df.set_index(["Timestamp", "Symbol"])
print()
print("VWAP Analysis Update:")
print(event_df)
def main() -> None:
# Initialize RisingWave connection
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)
setup_database(rw)
# Subscribe to MV changes
threading.Thread(
target=lambda: rw.on_change(
subscribe_from="vwap_analysis_live",
handler=handle_vwap_changes,
output_format=OutputFormat.DATAFRAME,
persist_progress=False,
max_batch_size=10,
)
).start()
# Subscribe to CME data through Databento
db.enable_logging()
client = db.Live()
client.subscribe(
dataset="GLBX.MDP3",
schema="trades",
stype_in="parent",
symbols="ES.FUT",
)
# Send trades to RisingWave
with rw.getconn() as conn:
for record in client:
# Only handle Trade records
if not isinstance(record, db.TradeMsg):
continue
# Get the human-readable symbol name
symbol = client.symbology_map.get(record.instrument_id)
if symbol is None:
continue
params = {
"timestamp": record.pretty_ts_recv,
"symbol": symbol,
"price": record.pretty_price,
"size": record.size,
}
conn.execute(
"""
INSERT INTO es_futures_live
(timestamp, symbol, price, size)
VALUES (:timestamp, :symbol, :price, :size)""",
params,
)
if __name__ == "__main__":
main()