Real-Time Token Price Impact Analysis with SQL

Real-Time Token Price Impact Analysis with SQL

Price impact—the relationship between trade size and slippage—tells you whether a market can absorb institutional flows without moving against you. Measuring it in real time, rather than from historical snapshots, lets traders, protocols, and risk systems react to live market depth changes before executing large orders.

Why Token Price Impact Analysis Matters

In traditional finance, market impact models are core to execution algorithms: VWAP, TWAP, and Implementation Shortfall strategies all rely on estimates of how much a given order size will move the price. In DeFi, price impact is deterministic on AMMs (governed by the constant product formula or concentrated liquidity curves), but it changes continuously as liquidity is added, removed, and consumed.

The key reason to track price impact in real time:

  • Large order detection: a sequence of swaps that each fall below a whale-detection threshold but collectively represent a large directional order reveals itself through a rising impact coefficient over time.
  • Market depth changes: liquidity providers removing capital from a Uniswap v3 pool narrows the available depth around the current price, increasing price impact for subsequent trades even if no trade has occurred yet.
  • Cross-venue comparison: the same token may have significantly different price impact profiles across Uniswap v3, Curve, and Balancer. Real-time comparison informs routing decisions.
  • Slippage outliers: a single swap with unexpectedly high slippage may indicate a thin market, a miner extractable value (MEV) sandwich attack, or an error in the router's quote.

Impact coefficient is the core metric: impact_coefficient = price_impact_pct / sqrt(trade_size_usd). A stable impact coefficient means the market is absorbing trades proportionally. A rising coefficient means depth is degrading.

How Streaming SQL Solves This

RisingWave ingests swap events from Kafka and maintains materialized views of rolling impact statistics per pool and token pair. The impact coefficient, slippage distribution, and large-order alerts update with every swap event—without requiring a separate analytics pipeline or warehouse query.

Building It Step by Step

Step 1: Connect the Data Source

CREATE SOURCE dex_swaps (
    tx_hash         VARCHAR,
    pool_address    VARCHAR,
    protocol        VARCHAR,   -- 'uniswap-v3', 'curve', 'balancer'
    token_in        VARCHAR,
    token_out       VARCHAR,
    symbol_in       VARCHAR,
    symbol_out      VARCHAR,
    amount_in_usd   NUMERIC,
    amount_out_usd  NUMERIC,
    price_before    NUMERIC,
    price_after     NUMERIC,
    price_impact_pct NUMERIC,  -- (price_before - price_after) / price_before * 100
    block_number    BIGINT,
    block_ts        TIMESTAMPTZ,
    sender          VARCHAR
) WITH (
    connector     = 'kafka',
    topic         = 'dex.swap.events.decoded',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

CREATE SOURCE liquidity_events (
    pool_address    VARCHAR,
    protocol        VARCHAR,
    event_type      VARCHAR,   -- 'Mint', 'Burn', 'Sync'
    token0          VARCHAR,
    token1          VARCHAR,
    liquidity_delta NUMERIC,
    tvl_usd_before  NUMERIC,
    tvl_usd_after   NUMERIC,
    block_ts        TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'dex.liquidity.events',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

Compute rolling impact statistics per pool over a 1-hour window:

CREATE MATERIALIZED VIEW pool_impact_stats AS
SELECT
    pool_address,
    protocol,
    symbol_in,
    symbol_out,
    COUNT(*)                                AS swap_count_1h,
    AVG(amount_in_usd)                      AS avg_swap_size_usd,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount_in_usd)
                                            AS median_swap_size_usd,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY amount_in_usd)
                                            AS p95_swap_size_usd,
    AVG(price_impact_pct)                   AS avg_price_impact_pct,
    MAX(price_impact_pct)                   AS max_price_impact_pct,
    -- Impact coefficient: impact per unit of sqrt(trade_size)
    AVG(price_impact_pct / NULLIF(SQRT(amount_in_usd), 0))
                                            AS avg_impact_coefficient,
    SUM(amount_in_usd)                      AS total_volume_usd_1h
FROM dex_swaps
WHERE block_ts > NOW() - INTERVAL '1 hour'
  AND price_impact_pct IS NOT NULL
  AND amount_in_usd > 100
GROUP BY pool_address, protocol, symbol_in, symbol_out;

CREATE MATERIALIZED VIEW pool_depth AS
SELECT
    pool_address,
    protocol,
    token0,
    token1,
    tvl_usd_after                           AS current_tvl_usd,
    MAX(block_ts)                           AS last_updated
FROM liquidity_events
WHERE block_ts > NOW() - INTERVAL '5 minutes'
GROUP BY pool_address, protocol, token0, token1, tvl_usd_after;

Step 3: Detection Logic and Alerts

Detect large orders and anomalous slippage events:

CREATE MATERIALIZED VIEW price_impact_alerts AS
SELECT
    s.tx_hash,
    s.pool_address,
    s.protocol,
    s.symbol_in,
    s.symbol_out,
    s.amount_in_usd,
    s.price_impact_pct,
    s.price_impact_pct / NULLIF(SQRT(s.amount_in_usd), 0)  AS impact_coefficient,
    stats.avg_impact_coefficient                             AS pool_avg_coefficient,
    CASE
        WHEN s.amount_in_usd > 1000000            THEN 'WHALE_TRADE'
        WHEN s.price_impact_pct > 2.0             THEN 'HIGH_IMPACT'
        WHEN s.price_impact_pct > 5 * stats.avg_price_impact_pct
                                                  THEN 'IMPACT_ANOMALY'
        ELSE                                           'NOTABLE'
    END AS alert_type,
    s.block_ts
FROM dex_swaps s
JOIN pool_impact_stats stats ON s.pool_address = stats.pool_address
WHERE s.block_ts > NOW() - INTERVAL '5 minutes'
  AND (
    s.amount_in_usd > 500000
    OR s.price_impact_pct > 2.0
    OR s.price_impact_pct > 5 * stats.avg_price_impact_pct
  );

CREATE SINK impact_alert_sink AS
SELECT * FROM price_impact_alerts
WITH (
    connector                   = 'kafka',
    topic                       = 'alerts.dex.price-impact',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 4: Querying Live Results

Compare price impact profiles across pools for a specific token pair:

SELECT
    protocol,
    pool_address,
    swap_count_1h,
    ROUND(avg_swap_size_usd, 0)       AS avg_swap_usd,
    ROUND(p95_swap_size_usd, 0)       AS p95_swap_usd,
    ROUND(avg_price_impact_pct, 4)    AS avg_impact_pct,
    ROUND(max_price_impact_pct, 4)    AS max_impact_pct,
    ROUND(avg_impact_coefficient, 6)  AS impact_coeff,
    ROUND(total_volume_usd_1h, 0)     AS volume_1h_usd
FROM pool_impact_stats
WHERE symbol_in = 'WETH' OR symbol_out = 'WETH'
ORDER BY total_volume_usd_1h DESC
LIMIT 10;

Comparison Table

CapabilityStatic Depth ModelReal-Time Streaming SQL
Depth freshnessSnapshot (minutes to hours)Per-swap (~seconds)
Large order detectionManual reviewAutomatic via materialized view
Cross-pool comparisonBatch queryLive JOIN across pools
Impact coefficient trendingHistorical onlyRolling 1h window, always current
Liquidity removal detectionDelayedPer-event update

FAQ

How does price impact differ between Uniswap v3 and Curve pools? Uniswap v3 uses a concentrated liquidity model where impact depends on which price ticks are active. Curve's stableswap invariant provides much lower impact for stablecoin pairs near peg but degrades rapidly when pools become imbalanced. The protocol column in the materialized views lets you compare impact coefficients side-by-side, which is valuable for routing large stable swaps.

Can the impact coefficient detect MEV sandwich attacks? A sandwich attack produces two swaps in the same block around a victim transaction: one pushing the price up, then the victim trade at a worse price, then a counter-swap. The victim's swap will show an anomalously high price impact relative to its size compared to the pool's rolling average impact coefficient. This is detectable in the price_impact_alerts view with an appropriate multiplier threshold.

What trade size should trigger a large-order alert? The threshold depends on the pool's TVL. A $500,000 swap in a $100M Curve pool has minimal impact; the same swap in a $2M pool would be catastrophic. Consider using a TVL-relative threshold: flag trades where amount_in_usd / pool_tvl_usd > 0.01 (i.e., the trade is more than 1% of pool TVL). Join pool_depth against dex_swaps to compute this ratio per swap.

Key Takeaways

  • Price impact is not static—it changes with every swap and liquidity event, making real-time measurement essential for routing and risk.
  • The impact coefficient (impact_pct / sqrt(trade_usd)) normalizes impact across trade sizes and provides a stable baseline for anomaly detection.
  • RisingWave maintains rolling impact statistics per pool as continuously updated materialized views, updated with every swap event.
  • Cross-pool routing decisions benefit from live impact comparison across protocols—a capability that requires real-time data, not cached snapshots.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.