Price impact—the relationship between trade size and slippage—tells you whether a market can absorb institutional flows without moving against you. Measuring it in real time, rather than from historical snapshots, lets traders, protocols, and risk systems react to live market depth changes before executing large orders.
Why Token Price Impact Analysis Matters
In traditional finance, market impact models are core to execution algorithms: VWAP, TWAP, and Implementation Shortfall strategies all rely on estimates of how much a given order size will move the price. In DeFi, price impact is deterministic on AMMs (governed by the constant product formula or concentrated liquidity curves), but it changes continuously as liquidity is added, removed, and consumed.
The key reason to track price impact in real time:
- Large order detection: a sequence of swaps that each fall below a whale-detection threshold but collectively represent a large directional order reveals itself through a rising impact coefficient over time.
- Market depth changes: liquidity providers removing capital from a Uniswap v3 pool narrows the available depth around the current price, increasing price impact for subsequent trades even if no trade has occurred yet.
- Cross-venue comparison: the same token may have significantly different price impact profiles across Uniswap v3, Curve, and Balancer. Real-time comparison informs routing decisions.
- Slippage outliers: a single swap with unexpectedly high slippage may indicate a thin market, a miner extractable value (MEV) sandwich attack, or an error in the router's quote.
Impact coefficient is the core metric: impact_coefficient = price_impact_pct / sqrt(trade_size_usd). A stable impact coefficient means the market is absorbing trades proportionally. A rising coefficient means depth is degrading.
How Streaming SQL Solves This
RisingWave ingests swap events from Kafka and maintains materialized views of rolling impact statistics per pool and token pair. The impact coefficient, slippage distribution, and large-order alerts update with every swap event—without requiring a separate analytics pipeline or warehouse query.
Building It Step by Step
Step 1: Connect the Data Source
CREATE SOURCE dex_swaps (
tx_hash VARCHAR,
pool_address VARCHAR,
protocol VARCHAR, -- 'uniswap-v3', 'curve', 'balancer'
token_in VARCHAR,
token_out VARCHAR,
symbol_in VARCHAR,
symbol_out VARCHAR,
amount_in_usd NUMERIC,
amount_out_usd NUMERIC,
price_before NUMERIC,
price_after NUMERIC,
price_impact_pct NUMERIC, -- (price_before - price_after) / price_before * 100
block_number BIGINT,
block_ts TIMESTAMPTZ,
sender VARCHAR
) WITH (
connector = 'kafka',
topic = 'dex.swap.events.decoded',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE liquidity_events (
pool_address VARCHAR,
protocol VARCHAR,
event_type VARCHAR, -- 'Mint', 'Burn', 'Sync'
token0 VARCHAR,
token1 VARCHAR,
liquidity_delta NUMERIC,
tvl_usd_before NUMERIC,
tvl_usd_after NUMERIC,
block_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'dex.liquidity.events',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
Compute rolling impact statistics per pool over a 1-hour window:
CREATE MATERIALIZED VIEW pool_impact_stats AS
SELECT
pool_address,
protocol,
symbol_in,
symbol_out,
COUNT(*) AS swap_count_1h,
AVG(amount_in_usd) AS avg_swap_size_usd,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount_in_usd)
AS median_swap_size_usd,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY amount_in_usd)
AS p95_swap_size_usd,
AVG(price_impact_pct) AS avg_price_impact_pct,
MAX(price_impact_pct) AS max_price_impact_pct,
-- Impact coefficient: impact per unit of sqrt(trade_size)
AVG(price_impact_pct / NULLIF(SQRT(amount_in_usd), 0))
AS avg_impact_coefficient,
SUM(amount_in_usd) AS total_volume_usd_1h
FROM dex_swaps
WHERE block_ts > NOW() - INTERVAL '1 hour'
AND price_impact_pct IS NOT NULL
AND amount_in_usd > 100
GROUP BY pool_address, protocol, symbol_in, symbol_out;
CREATE MATERIALIZED VIEW pool_depth AS
SELECT
pool_address,
protocol,
token0,
token1,
tvl_usd_after AS current_tvl_usd,
MAX(block_ts) AS last_updated
FROM liquidity_events
WHERE block_ts > NOW() - INTERVAL '5 minutes'
GROUP BY pool_address, protocol, token0, token1, tvl_usd_after;
Step 3: Detection Logic and Alerts
Detect large orders and anomalous slippage events:
CREATE MATERIALIZED VIEW price_impact_alerts AS
SELECT
s.tx_hash,
s.pool_address,
s.protocol,
s.symbol_in,
s.symbol_out,
s.amount_in_usd,
s.price_impact_pct,
s.price_impact_pct / NULLIF(SQRT(s.amount_in_usd), 0) AS impact_coefficient,
stats.avg_impact_coefficient AS pool_avg_coefficient,
CASE
WHEN s.amount_in_usd > 1000000 THEN 'WHALE_TRADE'
WHEN s.price_impact_pct > 2.0 THEN 'HIGH_IMPACT'
WHEN s.price_impact_pct > 5 * stats.avg_price_impact_pct
THEN 'IMPACT_ANOMALY'
ELSE 'NOTABLE'
END AS alert_type,
s.block_ts
FROM dex_swaps s
JOIN pool_impact_stats stats ON s.pool_address = stats.pool_address
WHERE s.block_ts > NOW() - INTERVAL '5 minutes'
AND (
s.amount_in_usd > 500000
OR s.price_impact_pct > 2.0
OR s.price_impact_pct > 5 * stats.avg_price_impact_pct
);
CREATE SINK impact_alert_sink AS
SELECT * FROM price_impact_alerts
WITH (
connector = 'kafka',
topic = 'alerts.dex.price-impact',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 4: Querying Live Results
Compare price impact profiles across pools for a specific token pair:
SELECT
protocol,
pool_address,
swap_count_1h,
ROUND(avg_swap_size_usd, 0) AS avg_swap_usd,
ROUND(p95_swap_size_usd, 0) AS p95_swap_usd,
ROUND(avg_price_impact_pct, 4) AS avg_impact_pct,
ROUND(max_price_impact_pct, 4) AS max_impact_pct,
ROUND(avg_impact_coefficient, 6) AS impact_coeff,
ROUND(total_volume_usd_1h, 0) AS volume_1h_usd
FROM pool_impact_stats
WHERE symbol_in = 'WETH' OR symbol_out = 'WETH'
ORDER BY total_volume_usd_1h DESC
LIMIT 10;
Comparison Table
| Capability | Static Depth Model | Real-Time Streaming SQL |
| Depth freshness | Snapshot (minutes to hours) | Per-swap (~seconds) |
| Large order detection | Manual review | Automatic via materialized view |
| Cross-pool comparison | Batch query | Live JOIN across pools |
| Impact coefficient trending | Historical only | Rolling 1h window, always current |
| Liquidity removal detection | Delayed | Per-event update |
FAQ
How does price impact differ between Uniswap v3 and Curve pools?
Uniswap v3 uses a concentrated liquidity model where impact depends on which price ticks are active. Curve's stableswap invariant provides much lower impact for stablecoin pairs near peg but degrades rapidly when pools become imbalanced. The protocol column in the materialized views lets you compare impact coefficients side-by-side, which is valuable for routing large stable swaps.
Can the impact coefficient detect MEV sandwich attacks?
A sandwich attack produces two swaps in the same block around a victim transaction: one pushing the price up, then the victim trade at a worse price, then a counter-swap. The victim's swap will show an anomalously high price impact relative to its size compared to the pool's rolling average impact coefficient. This is detectable in the price_impact_alerts view with an appropriate multiplier threshold.
What trade size should trigger a large-order alert?
The threshold depends on the pool's TVL. A $500,000 swap in a $100M Curve pool has minimal impact; the same swap in a $2M pool would be catastrophic. Consider using a TVL-relative threshold: flag trades where amount_in_usd / pool_tvl_usd > 0.01 (i.e., the trade is more than 1% of pool TVL). Join pool_depth against dex_swaps to compute this ratio per swap.
Key Takeaways
- Price impact is not static—it changes with every swap and liquidity event, making real-time measurement essential for routing and risk.
- The impact coefficient (
impact_pct / sqrt(trade_usd)) normalizes impact across trade sizes and provides a stable baseline for anomaly detection. - RisingWave maintains rolling impact statistics per pool as continuously updated materialized views, updated with every swap event.
- Cross-pool routing decisions benefit from live impact comparison across protocols—a capability that requires real-time data, not cached snapshots.

