Building a Real-Time Yield Farming Analytics Platform

Building a Real-Time Yield Farming Analytics Platform

Yield farming analytics without real-time data is a leaky bucket: APY figures go stale within hours as reward token prices shift, harvest events compound returns, and impermanent loss silently erodes LP positions. Streaming SQL keeps every metric current—from vault strategy performance to epoch reward distributions.

Why Real-Time Yield Farming Analytics Matters

Yield farming in DeFi is dynamic. A vault's advertised APY is a point-in-time estimate derived from current reward token prices, emission rates, and underlying pool fees. Each of these inputs changes continuously:

  • Reward token price: if the governance token used for incentives drops 20%, the APY denominated in USD falls proportionally—immediately.
  • Harvest events: auto-compounding vaults (like Yearn, Beefy, or Convex) trigger harvest transactions periodically. The post-harvest price-per-share is the ground truth for actual realized APY, not the advertised rate.
  • Impermanent loss (IL): LP positions in AMM pools accrue IL relative to simply holding the underlying tokens. For volatile pairs, IL can exceed fee revenue in hours.
  • Epoch timing: some protocols emit rewards in discrete epochs. The effective APY changes at each epoch boundary as emission rates are adjusted.
  • Pool TVL growth: as TVL increases from capital inflows, reward emissions are diluted across more capital, reducing per-dollar APY.

Farmers who act on 24-hour-old APY figures are making allocation decisions on materially wrong information. A streaming analytics platform that reflects the current state—post-harvest APY, current IL, live reward rates—gives users an informational edge.

How Streaming SQL Solves This

RisingWave ingests harvest events, price updates, and LP position snapshots from Kafka and maintains materialized views of vault APY, IL, and reward accrual that update with each new event. The platform serves these metrics over a standard PostgreSQL connection—no custom API layer required.

Building It Step by Step

Step 1: Connect the Data Source

CREATE SOURCE harvest_events (
    tx_hash             VARCHAR,
    vault_address       VARCHAR,
    strategy_address    VARCHAR,
    protocol_id         VARCHAR,   -- 'yearn', 'beefy', 'convex', 'aura'
    asset_symbol        VARCHAR,
    shares_before       NUMERIC,
    price_per_share_before NUMERIC,
    price_per_share_after  NUMERIC,
    profit_usd          NUMERIC,
    fee_usd             NUMERIC,
    reward_tokens       VARCHAR[],
    block_ts            TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'defi.vault.harvests',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

CREATE SOURCE lp_positions (
    position_id         VARCHAR,
    user_address        VARCHAR,
    pool_address        VARCHAR,
    protocol_id         VARCHAR,
    token0              VARCHAR,
    token1              VARCHAR,
    amount0_usd         NUMERIC,
    amount1_usd         NUMERIC,
    entry_price_ratio   NUMERIC,   -- token0/token1 at entry
    current_price_ratio NUMERIC,
    fees_earned_usd     NUMERIC,
    il_usd              NUMERIC,   -- negative = loss vs hold
    net_pnl_usd         NUMERIC,
    block_ts            TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'defi.lp.positions',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

CREATE SOURCE reward_emissions (
    protocol_id         VARCHAR,
    pool_address        VARCHAR,
    reward_token        VARCHAR,
    emission_rate_per_sec NUMERIC,
    reward_token_price_usd NUMERIC,
    total_staked_usd    NUMERIC,
    epoch_start_ts      TIMESTAMPTZ,
    epoch_end_ts        TIMESTAMPTZ,
    block_ts            TIMESTAMPTZ
) WITH (
    connector     = 'kafka',
    topic         = 'defi.reward.emissions',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

Compute vault performance from harvest events and live reward APY:

CREATE MATERIALIZED VIEW vault_performance AS
SELECT
    vault_address,
    protocol_id,
    asset_symbol,
    COUNT(*)                            AS harvest_count_30d,
    SUM(profit_usd)                     AS total_profit_usd_30d,
    AVG(price_per_share_after / NULLIF(price_per_share_before, 0) - 1) * 100
                                        AS avg_harvest_return_pct,
    -- Annualized APY from realized harvest returns
    POWER(1 + SUM(profit_usd) / NULLIF(
        AVG(shares_before * price_per_share_before), 0), 365.0 / 30) - 1
    * 100                               AS realized_apy_30d_pct,
    MAX(block_ts)                       AS last_harvest_ts
FROM harvest_events
WHERE block_ts > NOW() - INTERVAL '30 days'
GROUP BY vault_address, protocol_id, asset_symbol;

CREATE MATERIALIZED VIEW live_reward_apy AS
SELECT
    pool_address,
    protocol_id,
    reward_token,
    emission_rate_per_sec,
    reward_token_price_usd,
    total_staked_usd,
    -- Annualized reward APY: (emissions per year * price) / TVL
    emission_rate_per_sec * 31536000 * reward_token_price_usd
    / NULLIF(total_staked_usd, 0) * 100  AS reward_apy_pct,
    epoch_end_ts,
    block_ts AS last_updated
FROM reward_emissions
WHERE block_ts > NOW() - INTERVAL '5 minutes';

CREATE MATERIALIZED VIEW il_summary AS
SELECT
    pool_address,
    protocol_id,
    token0,
    token1,
    COUNT(*)                            AS position_count,
    SUM(fees_earned_usd)                AS total_fees_usd,
    SUM(il_usd)                         AS total_il_usd,
    SUM(net_pnl_usd)                    AS total_net_pnl_usd,
    AVG(il_usd / NULLIF(amount0_usd + amount1_usd, 0)) * 100
                                        AS avg_il_pct
FROM lp_positions
WHERE block_ts > NOW() - INTERVAL '1 hour'
GROUP BY pool_address, protocol_id, token0, token1;

Step 3: Detection Logic and Alerts

Alert on APY degradation and high IL events:

CREATE MATERIALIZED VIEW yield_alerts AS
SELECT
    vp.vault_address,
    vp.protocol_id,
    vp.asset_symbol,
    vp.realized_apy_30d_pct,
    ra.reward_apy_pct,
    vp.realized_apy_30d_pct + COALESCE(ra.reward_apy_pct, 0) AS total_apy_pct,
    CASE
        WHEN ra.reward_apy_pct < 0.5 * vp.realized_apy_30d_pct
            THEN 'APY_DEGRADED_50PCT'
        WHEN ra.epoch_end_ts < NOW() + INTERVAL '24 hours'
            THEN 'EPOCH_ENDING_SOON'
        ELSE 'NORMAL'
    END AS alert_type,
    ra.epoch_end_ts
FROM vault_performance vp
LEFT JOIN live_reward_apy ra ON vp.vault_address = ra.pool_address
WHERE ra.reward_apy_pct < 0.5 * vp.realized_apy_30d_pct
   OR ra.epoch_end_ts < NOW() + INTERVAL '24 hours';

CREATE SINK yield_alert_sink AS
SELECT * FROM yield_alerts
WHERE alert_type != 'NORMAL'
WITH (
    connector                   = 'kafka',
    topic                       = 'alerts.defi.yield',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 4: Querying Live Results

View a live yield farming leaderboard with IL-adjusted returns:

SELECT
    il.protocol_id,
    il.token0 || '/' || il.token1   AS pair,
    ROUND(il.total_fees_usd, 2)     AS fees_earned_usd,
    ROUND(il.total_il_usd, 2)       AS il_usd,
    ROUND(il.total_net_pnl_usd, 2)  AS net_pnl_usd,
    ROUND(il.avg_il_pct, 4)         AS avg_il_pct,
    ROUND(ra.reward_apy_pct, 2)     AS reward_apy_pct,
    il.position_count
FROM il_summary il
LEFT JOIN live_reward_apy ra ON il.pool_address = ra.pool_address
ORDER BY il.total_net_pnl_usd DESC
LIMIT 10;

Comparison Table

MetricAdvertised APYBatch AnalyticsStreaming SQL (RisingWave)
Reward APY freshnessSeconds (but naive)HoursPer-emission-update
Realized APY sourceNonePost-harvest batchPer-harvest event
IL trackingNoneDaily snapshotPer-position-update
Epoch-end warningsNoneManualAutomatic (24h look-ahead)
Cross-vault comparisonManualSlow queryLive JOIN

FAQ

How is impermanent loss calculated in this pipeline? The IL formula for a 50/50 pool is IL = 2 * sqrt(price_ratio) / (1 + price_ratio) - 1, where price_ratio = current_price / entry_price. Rather than recomputing this in RisingWave, the upstream position tracker computes IL per position snapshot and publishes the result to the lp_positions Kafka topic. RisingWave then aggregates IL across positions. This separation keeps complex math in the application layer and aggregation logic in SQL.

Can this platform track Convex/Aura boosted positions? Yes. Convex and Aura wrap Curve/Balancer LP tokens and emit CVX/AURA rewards on top of underlying pool fees. Include these protocol-specific events in the harvest_events and reward_emissions sources with protocol_id = 'convex' or 'aura'. The total_apy_pct calculation naturally sums base pool APY and additional reward token APY from all sources.

How do you handle the dilution of reward APY as TVL grows? The live_reward_apy view uses total_staked_usd in the denominator. When new capital enters the pool and total_staked_usd increases, the reward APY updates automatically—because reward_emissions is a Kafka-backed source and the materialized view recalculates when a new snapshot arrives. The key is ensuring your upstream emission tracker publishes snapshots at the right cadence (ideally per-block for high-TVL pools where APY dilution matters most).

Key Takeaways

  • Yield farming APY is highly dynamic; advertised rates become misleading within hours as reward token prices, TVL, and harvest timing shift.
  • RisingWave materializes vault APY from actual harvest events (realized returns) and live emission rates (forward-looking), giving a complete picture without stale estimates.
  • IL tracking requires per-position snapshot data from the upstream indexer; RisingWave's role is to aggregate across positions and flag pools with structurally high IL relative to fee revenue.
  • Epoch-end warnings and APY degradation alerts run as standing streaming SQL queries, notifying farmers before capital needs to be reallocated rather than after.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.