Smart Contract Event Streaming with RisingWave

Smart Contract Event Streaming with RisingWave

Smart contract event streaming turns the raw event logs emitted by EVM contracts into queryable, continuously updated analytics — tracking token transfers, protocol state changes, and user interactions as they're confirmed on-chain. With RisingWave, a PostgreSQL-compatible streaming database, you can write SQL materialized views over decoded contract event streams that always reflect the current on-chain state without polling or reprocessing historical data.

Why Smart Contract Event Streaming Matters

Every interaction with an EVM smart contract — a token transfer, a liquidity pool swap, a governance vote, a lending position update — emits one or more event logs. These logs are the authoritative record of everything that happened on-chain, structured as ABI-encoded topics and data fields.

The challenge is making them useful at scale:

  • A major DeFi protocol emits millions of events per day across thousands of users
  • Raw event logs contain hex-encoded parameters that require ABI decoding before they're meaningful
  • Point-in-time queries against an archive node are slow and expensive for aggregations
  • Real-time dashboards require continuous processing, not batch reruns

Smart contract event streaming matters for:

  • Protocol dashboards: TVL, active users, transaction volume — updated in real time
  • Token analytics: Transfer volumes, holder distributions, whale movements
  • Governance tracking: Vote tallies and proposal state as they change
  • Security monitoring: Unexpected contract calls, ownership transfers, privilege escalations
  • Compliance: Interaction tracking for flagged addresses across specific contracts

How Streaming SQL Solves This

A blockchain indexer (such as a custom event listener, The Graph equivalent, or services like Alchemy Webhooks) decodes ABI-encoded event logs and publishes structured records to Kafka. RisingWave consumes these decoded events and maintains materialized views: token holder balances, protocol TVL, user activity summaries, and security alerts — all queryable without re-indexing.

Building It Step by Step

Step 1: Connect the Data Source

-- Decoded ERC-20 Transfer events
CREATE SOURCE erc20_transfers (
    tx_hash             VARCHAR,
    block_number        BIGINT,
    log_index           INTEGER,
    block_timestamp     TIMESTAMPTZ,
    contract_address    VARCHAR,      -- token contract
    token_symbol        VARCHAR,
    token_decimals      INTEGER,
    from_address        VARCHAR,
    to_address          VARCHAR,
    raw_amount          NUMERIC,      -- raw on-chain amount (before decimals)
    amount              NUMERIC,      -- human-readable amount (raw / 10^decimals)
    amount_usd          NUMERIC,
    chain_id            INTEGER
) WITH (
    connector = 'kafka',
    topic = 'blockchain.events.erc20-transfers',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
-- DeFi protocol events (swaps, deposits, withdrawals, borrows)
CREATE SOURCE protocol_events (
    tx_hash             VARCHAR,
    block_number        BIGINT,
    log_index           INTEGER,
    block_timestamp     TIMESTAMPTZ,
    protocol_name       VARCHAR,
    contract_address    VARCHAR,
    event_name          VARCHAR,      -- 'Swap', 'Deposit', 'Withdraw', 'Borrow'
    user_address        VARCHAR,
    token_in            VARCHAR,
    token_out           VARCHAR,
    amount_in_usd       NUMERIC,
    amount_out_usd      NUMERIC,
    fee_usd             NUMERIC,
    indexed_param_1     VARCHAR,      -- protocol-specific indexed field
    indexed_param_2     VARCHAR,
    chain_id            INTEGER
) WITH (
    connector = 'kafka',
    topic = 'blockchain.events.protocol',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

-- Token holder balances derived from transfer events
-- (mint = from address 0x000..., burn = to address 0x000...)
CREATE MATERIALIZED VIEW token_holder_balances AS
SELECT
    contract_address,
    token_symbol,
    address,
    SUM(amount)         AS balance,
    SUM(amount_usd)     AS balance_usd,
    COUNT(*)            AS transfer_count,
    MAX(block_timestamp) AS last_active
FROM (
    -- Inbound transfers (received)
    SELECT contract_address, token_symbol, to_address AS address,
           amount, amount_usd, block_timestamp
    FROM erc20_transfers

    UNION ALL

    -- Outbound transfers (sent, negative)
    SELECT contract_address, token_symbol, from_address AS address,
           -amount AS amount, -amount_usd AS amount_usd, block_timestamp
    FROM erc20_transfers
) transfers
WHERE address <> '0x0000000000000000000000000000000000000000'
GROUP BY contract_address, token_symbol, address
HAVING SUM(amount) > 0;
-- Protocol TVL and activity (rolling 24 hours)
CREATE MATERIALIZED VIEW protocol_activity_24h AS
SELECT
    protocol_name,
    event_name,
    window_start,
    window_end,
    COUNT(*)                        AS event_count,
    COUNT(DISTINCT user_address)    AS unique_users,
    SUM(amount_in_usd)              AS volume_in_usd,
    SUM(fee_usd)                    AS fees_collected_usd,
    AVG(fee_usd)                    AS avg_fee_usd
FROM TUMBLE(protocol_events, block_timestamp, INTERVAL '24 HOURS')
GROUP BY protocol_name, event_name, window_start, window_end;
-- Token transfer volume per contract per hour
CREATE MATERIALIZED VIEW token_transfer_volume AS
SELECT
    contract_address,
    token_symbol,
    window_start,
    window_end,
    COUNT(*)                        AS transfer_count,
    SUM(amount)                     AS total_amount,
    SUM(amount_usd)                 AS total_usd,
    COUNT(DISTINCT from_address)    AS unique_senders,
    COUNT(DISTINCT to_address)      AS unique_recipients,
    AVG(amount_usd)                 AS avg_transfer_usd
FROM TUMBLE(erc20_transfers, block_timestamp, INTERVAL '1 HOUR')
GROUP BY contract_address, token_symbol, window_start, window_end;

Step 3: Add Alerts and Detection Logic

-- Whale transfer alert: single transfer > $100,000
CREATE MATERIALIZED VIEW whale_transfer_alerts AS
SELECT
    tx_hash,
    block_number,
    block_timestamp,
    contract_address,
    token_symbol,
    from_address,
    to_address,
    ROUND(amount, 4)        AS amount,
    ROUND(amount_usd, 2)    AS amount_usd
FROM erc20_transfers
WHERE amount_usd >= 100000;
-- Unusual contract interaction alert: flagged contract addresses
CREATE MATERIALIZED VIEW flagged_contract_interactions AS
SELECT
    pe.tx_hash,
    pe.block_timestamp,
    pe.protocol_name,
    pe.contract_address,
    pe.event_name,
    pe.user_address,
    pe.amount_in_usd,
    fc.flag_reason
FROM protocol_events pe
JOIN (VALUES
    ('0xabc123...exploit1', 'Known exploit contract'),
    ('0xdef456...proxy1',   'Unverified proxy'),
    ('0x789abc...rug1',     'Rug pull pattern address')
) AS fc(address, flag_reason)
    ON pe.contract_address = fc.address
    OR pe.user_address     = fc.address;
-- Sink security alerts to Kafka
CREATE SINK contract_security_sink AS
SELECT
    tx_hash, block_timestamp, contract_address,
    from_address AS subject_address, amount_usd, 'WHALE_TRANSFER' AS alert_type
FROM whale_transfer_alerts
UNION ALL
SELECT
    tx_hash, block_timestamp, contract_address,
    user_address AS subject_address, amount_in_usd AS amount_usd,
    'FLAGGED_CONTRACT' AS alert_type
FROM flagged_contract_interactions
WITH (
    connector = 'kafka',
    topic = 'blockchain.security.alerts',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 4: Query for Real-Time Insights

-- Top token holders by balance for USDC
SELECT
    address,
    ROUND(balance, 2)       AS balance,
    ROUND(balance_usd, 2)   AS balance_usd,
    transfer_count,
    last_active
FROM token_holder_balances
WHERE token_symbol = 'USDC'
ORDER BY balance DESC
LIMIT 20;
-- Protocol activity summary: last 24-hour window
SELECT
    protocol_name,
    event_name,
    event_count,
    unique_users,
    ROUND(volume_in_usd, 0)         AS volume_usd,
    ROUND(fees_collected_usd, 2)    AS fees_usd
FROM protocol_activity_24h
WHERE window_end = (SELECT MAX(window_end) FROM protocol_activity_24h)
ORDER BY volume_in_usd DESC;

Comparison: Batch vs Streaming

AspectBatch ETLStreaming SQL (RisingWave)
LatencyMinutes to hoursSub-second
Balance UpdatesEnd-of-batchPer-transfer
Protocol TVLDaily snapshotsContinuously maintained
Whale AlertsDelayed detectionReal-time
Event CoverageRe-indexing requiredAppend-only stream
ABI DecodingBatch jobIndexer pre-processing

FAQ

Q: How do you handle contract upgrades where the ABI changes?

Upgradeable contracts (e.g., using the transparent proxy pattern) emit events from the proxy address but the implementation address changes on upgrade. The recommended approach is to maintain a version mapping in a reference table — (proxy_address, implementation_address, effective_from_block) — and join event streams against this table using a temporal join in RisingWave. When a new implementation is deployed, the indexer begins decoding events with the new ABI while historical events retain their original schema.

Q: Can this track contract state variables, not just events?

Contract storage slots don't emit events automatically — you only see what the contract explicitly emits. For tracking state variables (e.g., the total supply of a token, or the current owner of a contract), you have two options: call the contract's view functions periodically and publish results to Kafka as synthetic events, or use a storage diff indexer that captures state changes at the execution trace level and emits them as events. Both approaches feed into the same RisingWave source pattern.

Q: How do you reconstruct accurate token holder balances when starting mid-stream?

Starting from a mid-stream Kafka offset means you don't have historical transfers, so balance accumulation will be incomplete. The standard approach is to bootstrap from a full historical snapshot: either import all past transfers into RisingWave from a full-history indexer, or seed balances from a checkpoint snapshot and then apply incremental deltas from the stream going forward. RisingWave supports upsert sources for the bootstrap case and append-only sources for the delta stream.

Key Takeaways

  • Blockchain indexers decode ABI-encoded event logs and publish structured records to Kafka for RisingWave to consume
  • Token holder balances are maintained as a running sum of inbound and outbound transfer events — no periodic snapshots
  • Protocol activity, transfer volume, and user engagement metrics update with every new block
  • Whale transfer alerts and flagged address interactions sink to Kafka within milliseconds of on-chain confirmation
  • The full analytics stack uses standard PostgreSQL-compatible SQL — portable, auditable, and maintainable without specialized streaming expertise

Ready to try this? Get started with RisingWave. Join our Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.