Yield farming analytics without real-time data is a leaky bucket: APY figures go stale within hours as reward token prices shift, harvest events compound returns, and impermanent loss silently erodes LP positions. Streaming SQL keeps every metric current—from vault strategy performance to epoch reward distributions.
Why Real-Time Yield Farming Analytics Matters
Yield farming in DeFi is dynamic. A vault's advertised APY is a point-in-time estimate derived from current reward token prices, emission rates, and underlying pool fees. Each of these inputs changes continuously:
- Reward token price: if the governance token used for incentives drops 20%, the APY denominated in USD falls proportionally—immediately.
- Harvest events: auto-compounding vaults (like Yearn, Beefy, or Convex) trigger harvest transactions periodically. The post-harvest price-per-share is the ground truth for actual realized APY, not the advertised rate.
- Impermanent loss (IL): LP positions in AMM pools accrue IL relative to simply holding the underlying tokens. For volatile pairs, IL can exceed fee revenue in hours.
- Epoch timing: some protocols emit rewards in discrete epochs. The effective APY changes at each epoch boundary as emission rates are adjusted.
- Pool TVL growth: as TVL increases from capital inflows, reward emissions are diluted across more capital, reducing per-dollar APY.
Farmers who act on 24-hour-old APY figures are making allocation decisions on materially wrong information. A streaming analytics platform that reflects the current state—post-harvest APY, current IL, live reward rates—gives users an informational edge.
How Streaming SQL Solves This
RisingWave ingests harvest events, price updates, and LP position snapshots from Kafka and maintains materialized views of vault APY, IL, and reward accrual that update with each new event. The platform serves these metrics over a standard PostgreSQL connection—no custom API layer required.
Building It Step by Step
Step 1: Connect the Data Source
CREATE SOURCE harvest_events (
tx_hash VARCHAR,
vault_address VARCHAR,
strategy_address VARCHAR,
protocol_id VARCHAR, -- 'yearn', 'beefy', 'convex', 'aura'
asset_symbol VARCHAR,
shares_before NUMERIC,
price_per_share_before NUMERIC,
price_per_share_after NUMERIC,
profit_usd NUMERIC,
fee_usd NUMERIC,
reward_tokens VARCHAR[],
block_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'defi.vault.harvests',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE lp_positions (
position_id VARCHAR,
user_address VARCHAR,
pool_address VARCHAR,
protocol_id VARCHAR,
token0 VARCHAR,
token1 VARCHAR,
amount0_usd NUMERIC,
amount1_usd NUMERIC,
entry_price_ratio NUMERIC, -- token0/token1 at entry
current_price_ratio NUMERIC,
fees_earned_usd NUMERIC,
il_usd NUMERIC, -- negative = loss vs hold
net_pnl_usd NUMERIC,
block_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'defi.lp.positions',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE reward_emissions (
protocol_id VARCHAR,
pool_address VARCHAR,
reward_token VARCHAR,
emission_rate_per_sec NUMERIC,
reward_token_price_usd NUMERIC,
total_staked_usd NUMERIC,
epoch_start_ts TIMESTAMPTZ,
epoch_end_ts TIMESTAMPTZ,
block_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'defi.reward.emissions',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
Compute vault performance from harvest events and live reward APY:
CREATE MATERIALIZED VIEW vault_performance AS
SELECT
vault_address,
protocol_id,
asset_symbol,
COUNT(*) AS harvest_count_30d,
SUM(profit_usd) AS total_profit_usd_30d,
AVG(price_per_share_after / NULLIF(price_per_share_before, 0) - 1) * 100
AS avg_harvest_return_pct,
-- Annualized APY from realized harvest returns
POWER(1 + SUM(profit_usd) / NULLIF(
AVG(shares_before * price_per_share_before), 0), 365.0 / 30) - 1
* 100 AS realized_apy_30d_pct,
MAX(block_ts) AS last_harvest_ts
FROM harvest_events
WHERE block_ts > NOW() - INTERVAL '30 days'
GROUP BY vault_address, protocol_id, asset_symbol;
CREATE MATERIALIZED VIEW live_reward_apy AS
SELECT
pool_address,
protocol_id,
reward_token,
emission_rate_per_sec,
reward_token_price_usd,
total_staked_usd,
-- Annualized reward APY: (emissions per year * price) / TVL
emission_rate_per_sec * 31536000 * reward_token_price_usd
/ NULLIF(total_staked_usd, 0) * 100 AS reward_apy_pct,
epoch_end_ts,
block_ts AS last_updated
FROM reward_emissions
WHERE block_ts > NOW() - INTERVAL '5 minutes';
CREATE MATERIALIZED VIEW il_summary AS
SELECT
pool_address,
protocol_id,
token0,
token1,
COUNT(*) AS position_count,
SUM(fees_earned_usd) AS total_fees_usd,
SUM(il_usd) AS total_il_usd,
SUM(net_pnl_usd) AS total_net_pnl_usd,
AVG(il_usd / NULLIF(amount0_usd + amount1_usd, 0)) * 100
AS avg_il_pct
FROM lp_positions
WHERE block_ts > NOW() - INTERVAL '1 hour'
GROUP BY pool_address, protocol_id, token0, token1;
Step 3: Detection Logic and Alerts
Alert on APY degradation and high IL events:
CREATE MATERIALIZED VIEW yield_alerts AS
SELECT
vp.vault_address,
vp.protocol_id,
vp.asset_symbol,
vp.realized_apy_30d_pct,
ra.reward_apy_pct,
vp.realized_apy_30d_pct + COALESCE(ra.reward_apy_pct, 0) AS total_apy_pct,
CASE
WHEN ra.reward_apy_pct < 0.5 * vp.realized_apy_30d_pct
THEN 'APY_DEGRADED_50PCT'
WHEN ra.epoch_end_ts < NOW() + INTERVAL '24 hours'
THEN 'EPOCH_ENDING_SOON'
ELSE 'NORMAL'
END AS alert_type,
ra.epoch_end_ts
FROM vault_performance vp
LEFT JOIN live_reward_apy ra ON vp.vault_address = ra.pool_address
WHERE ra.reward_apy_pct < 0.5 * vp.realized_apy_30d_pct
OR ra.epoch_end_ts < NOW() + INTERVAL '24 hours';
CREATE SINK yield_alert_sink AS
SELECT * FROM yield_alerts
WHERE alert_type != 'NORMAL'
WITH (
connector = 'kafka',
topic = 'alerts.defi.yield',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 4: Querying Live Results
View a live yield farming leaderboard with IL-adjusted returns:
SELECT
il.protocol_id,
il.token0 || '/' || il.token1 AS pair,
ROUND(il.total_fees_usd, 2) AS fees_earned_usd,
ROUND(il.total_il_usd, 2) AS il_usd,
ROUND(il.total_net_pnl_usd, 2) AS net_pnl_usd,
ROUND(il.avg_il_pct, 4) AS avg_il_pct,
ROUND(ra.reward_apy_pct, 2) AS reward_apy_pct,
il.position_count
FROM il_summary il
LEFT JOIN live_reward_apy ra ON il.pool_address = ra.pool_address
ORDER BY il.total_net_pnl_usd DESC
LIMIT 10;
Comparison Table
| Metric | Advertised APY | Batch Analytics | Streaming SQL (RisingWave) |
| Reward APY freshness | Seconds (but naive) | Hours | Per-emission-update |
| Realized APY source | None | Post-harvest batch | Per-harvest event |
| IL tracking | None | Daily snapshot | Per-position-update |
| Epoch-end warnings | None | Manual | Automatic (24h look-ahead) |
| Cross-vault comparison | Manual | Slow query | Live JOIN |
FAQ
How is impermanent loss calculated in this pipeline?
The IL formula for a 50/50 pool is IL = 2 * sqrt(price_ratio) / (1 + price_ratio) - 1, where price_ratio = current_price / entry_price. Rather than recomputing this in RisingWave, the upstream position tracker computes IL per position snapshot and publishes the result to the lp_positions Kafka topic. RisingWave then aggregates IL across positions. This separation keeps complex math in the application layer and aggregation logic in SQL.
Can this platform track Convex/Aura boosted positions?
Yes. Convex and Aura wrap Curve/Balancer LP tokens and emit CVX/AURA rewards on top of underlying pool fees. Include these protocol-specific events in the harvest_events and reward_emissions sources with protocol_id = 'convex' or 'aura'. The total_apy_pct calculation naturally sums base pool APY and additional reward token APY from all sources.
How do you handle the dilution of reward APY as TVL grows?
The live_reward_apy view uses total_staked_usd in the denominator. When new capital enters the pool and total_staked_usd increases, the reward APY updates automatically—because reward_emissions is a Kafka-backed source and the materialized view recalculates when a new snapshot arrives. The key is ensuring your upstream emission tracker publishes snapshots at the right cadence (ideally per-block for high-TVL pools where APY dilution matters most).
Key Takeaways
- Yield farming APY is highly dynamic; advertised rates become misleading within hours as reward token prices, TVL, and harvest timing shift.
- RisingWave materializes vault APY from actual harvest events (realized returns) and live emission rates (forward-looking), giving a complete picture without stale estimates.
- IL tracking requires per-position snapshot data from the upstream indexer; RisingWave's role is to aggregate across positions and flag pools with structurally high IL relative to fee revenue.
- Epoch-end warnings and APY degradation alerts run as standing streaming SQL queries, notifying farmers before capital needs to be reallocated rather than after.

