Real-Time DEX Volume Analytics with Streaming SQL

Real-Time DEX Volume Analytics with Streaming SQL

DEX volume analytics built on daily snapshots miss intraday liquidity shifts, fee tier migrations, and pool imbalances that liquidity providers and traders need to act on. Streaming SQL materializes swap volume, LP fees, and pool TVL continuously—updated with every on-chain swap event.

Why Real-Time DEX Volume Analytics Matters

Decentralized exchanges—Uniswap v3, Curve, Balancer, and their L2 counterparts—collectively process billions of dollars in daily volume. For the participants who depend on this data, staleness has real cost:

  • Liquidity providers choosing between Uniswap v3 fee tiers (0.01%, 0.05%, 0.3%, 1%) need to know where volume is actually flowing right now, not yesterday.
  • Traders routing large swaps need to know which pool has the deepest liquidity and lowest current fee-adjusted slippage.
  • Protocol treasuries monitoring fee revenue need real-time accrual data for operational decisions.
  • Researchers studying MEV need intraday swap sequence data, not daily aggregates.

The key DEX metrics that require real-time treatment:

  • Swap volume: rolling 1h and 24h volume per pool and token pair.
  • LP fees collected: cumulative fee revenue per pool, updated per swap.
  • Pool TVL: the USD value of token reserves in each pool, which changes with every swap and liquidity event.
  • Fee tier distribution: which Uniswap v3 fee tier is capturing the majority of volume for a given pair.
  • Pool imbalance: for Curve pools, the ratio of each token in the pool relative to target weights.

How Streaming SQL Solves This

RisingWave ingests swap events and liquidity events from Kafka and maintains materialized views that are always current. Adding a new pool or protocol is as simple as including it in the source data—no pipeline code changes required. Dashboard queries are standard SQL SELECT statements that always return live data.

Building It Step by Step

Step 1: Connect the Data Source

CREATE SOURCE amm_swaps (
    tx_hash         VARCHAR,
    log_index       INT,
    pool_address    VARCHAR,
    protocol        VARCHAR,   -- 'uniswap-v3', 'curve', 'balancer-v2'
    fee_tier        NUMERIC,   -- basis points: 1, 5, 30, 100 for Uniswap v3
    token_in        VARCHAR,
    token_out       VARCHAR,
    symbol_in       VARCHAR,
    symbol_out      VARCHAR,
    amount_in       NUMERIC,
    amount_out      NUMERIC,
    amount_in_usd   NUMERIC,
    amount_out_usd  NUMERIC,
    fee_usd         NUMERIC,
    lp_fee_usd      NUMERIC,
    protocol_fee_usd NUMERIC,
    sender          VARCHAR,
    block_number    BIGINT,
    block_ts        TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'dex.amm.swaps',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

CREATE SOURCE pool_snapshots (
    pool_address    VARCHAR,
    protocol        VARCHAR,
    token0          VARCHAR,
    token1          VARCHAR,
    symbol0         VARCHAR,
    symbol1         VARCHAR,
    reserve0_usd    NUMERIC,
    reserve1_usd    NUMERIC,
    tvl_usd         NUMERIC,
    fee_tier        NUMERIC,
    tick_spacing    INT,       -- Uniswap v3
    block_ts        TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'dex.pool.snapshots',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

Rolling swap volume and fee accrual per pool:

CREATE MATERIALIZED VIEW pool_volume_1h AS
SELECT
    pool_address,
    protocol,
    symbol_in,
    symbol_out,
    fee_tier,
    COUNT(*)                        AS swap_count,
    SUM(amount_in_usd)              AS volume_usd,
    SUM(lp_fee_usd)                 AS lp_fees_usd,
    SUM(protocol_fee_usd)           AS protocol_fees_usd,
    AVG(amount_in_usd)              AS avg_swap_usd,
    MAX(amount_in_usd)              AS max_swap_usd,
    COUNT(DISTINCT sender)          AS unique_traders
FROM amm_swaps
WHERE block_ts > NOW() - INTERVAL '1 hour'
GROUP BY pool_address, protocol, symbol_in, symbol_out, fee_tier;

CREATE MATERIALIZED VIEW pool_volume_24h AS
SELECT
    pool_address,
    protocol,
    symbol_in,
    symbol_out,
    SUM(amount_in_usd)              AS volume_usd_24h,
    SUM(lp_fee_usd)                 AS lp_fees_usd_24h,
    COUNT(*)                        AS swap_count_24h
FROM amm_swaps
WHERE block_ts > NOW() - INTERVAL '24 hours'
GROUP BY pool_address, protocol, symbol_in, symbol_out;

-- Fee tier market share for a token pair
CREATE MATERIALIZED VIEW fee_tier_market_share AS
SELECT
    protocol,
    symbol_in,
    symbol_out,
    fee_tier,
    SUM(amount_in_usd)              AS volume_usd_1h,
    SUM(amount_in_usd) / NULLIF(SUM(SUM(amount_in_usd)) OVER (
        PARTITION BY protocol, symbol_in, symbol_out
    ), 0) * 100                     AS market_share_pct
FROM amm_swaps
WHERE block_ts > NOW() - INTERVAL '1 hour'
GROUP BY protocol, symbol_in, symbol_out, fee_tier;

Step 3: Detection Logic and Alerts

Alert on unusual volume spikes and pool imbalances:

CREATE MATERIALIZED VIEW volume_spike_alerts AS
SELECT
    v1h.pool_address,
    v1h.protocol,
    v1h.symbol_in,
    v1h.symbol_out,
    v1h.volume_usd                                          AS volume_1h,
    v24h.volume_usd_24h / 24                                AS avg_hourly_volume,
    v1h.volume_usd / NULLIF(v24h.volume_usd_24h / 24, 0)   AS volume_ratio,
    CASE
        WHEN v1h.volume_usd > 3 * v24h.volume_usd_24h / 24 THEN 'VOLUME_SPIKE_3X'
        WHEN v1h.volume_usd > 2 * v24h.volume_usd_24h / 24 THEN 'VOLUME_SPIKE_2X'
        ELSE 'ELEVATED_VOLUME'
    END AS alert_type
FROM pool_volume_1h v1h
JOIN pool_volume_24h v24h ON v1h.pool_address = v24h.pool_address
WHERE v1h.volume_usd > 2 * v24h.volume_usd_24h / 24
  AND v24h.volume_usd_24h > 0;

CREATE SINK dex_alerts_sink AS
SELECT * FROM volume_spike_alerts
WHERE alert_type = 'VOLUME_SPIKE_3X'
WITH (
    connector                   = 'kafka',
    topic                       = 'alerts.dex.volume',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 4: Querying Live Results

Top pools by 1-hour volume with fee breakdown:

SELECT
    protocol,
    symbol_in || '/' || symbol_out          AS pair,
    fee_tier / 100 || '%'                   AS fee_tier_pct,
    swap_count,
    ROUND(volume_usd, 0)                    AS volume_1h_usd,
    ROUND(lp_fees_usd, 0)                   AS lp_fees_1h_usd,
    ROUND(protocol_fees_usd, 0)             AS protocol_fees_1h_usd,
    unique_traders
FROM pool_volume_1h
ORDER BY volume_usd DESC
LIMIT 10;

Comparison Table

Analytics TypeDune AnalyticsSubgraphRisingWave Streaming SQL
Data freshnessHours5–15 minutes< 30 seconds
Query interfaceSQL (Spark dialect)GraphQLPostgreSQL SQL
Custom aggregationsYes (batch)LimitedYes (streaming)
Rolling time windowsManualManualNative (WHERE block_ts > NOW() - INTERVAL)
Real-time alertsNoNoYes (via Kafka sink)
Cross-protocol joinsBatchPer-subgraphSingle query

FAQ

How do you track Curve pool imbalance in real time? Curve's StableSwap pools have target weights (usually equal for all tokens in a 3pool). The imbalance is (reserve_i / total_reserve) - target_weight_i. Using pool_snapshots, you can compute the per-token reserve ratio after each swap and flag pools where any single token exceeds, say, 40% of the pool (for a 3-token pool targeting 33.3%). This is a leading indicator of rising slippage and potential depegging for stablecoin pools.

Does RisingWave support Uniswap v3 tick-level analysis? RisingWave is a SQL layer—it doesn't implement the Uniswap v3 tick math natively. However, if your upstream indexer decodes Uniswap v3 Swap events to include the pre-trade and post-trade sqrt price, you can compute effective price, tick crossed, and fee amounts in SQL. For tick liquidity distribution analysis, you would need a separate data source that tracks Mint and Burn events at the tick level and include it in RisingWave as another Kafka source.

How should protocol fee revenue be attributed for Balancer multi-token pools? Balancer pools can contain 2–8 tokens. Protocol fees are denominated in the token received by the protocol fee collector, not split by input/output token. The cleanest approach is to ingest the ProtocolFeeCollected event separately from the swap event and join on pool_address in your fee revenue materialized view. This gives accurate, event-driven fee attribution rather than estimated values derived from swap events.

Key Takeaways

  • DEX volume analytics require sub-minute freshness for operational decisions by LPs, traders, and protocol treasuries—daily or hourly snapshots are insufficient.
  • RisingWave maintains rolling 1h and 24h volume, LP fee accrual, and fee tier market share as incrementally updated materialized views.
  • Cross-protocol comparison (Uniswap v3 vs. Curve vs. Balancer) in a single SQL query requires a unified streaming pipeline—RisingWave provides this without custom join logic.
  • Volume spike detection runs as a continuous SQL view, alerting on unusual activity as it happens rather than after the fact.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.