Smart contract event streaming turns the raw event logs emitted by EVM contracts into queryable, continuously updated analytics — tracking token transfers, protocol state changes, and user interactions as they're confirmed on-chain. With RisingWave, a PostgreSQL-compatible streaming database, you can write SQL materialized views over decoded contract event streams that always reflect the current on-chain state without polling or reprocessing historical data.
Why Smart Contract Event Streaming Matters
Every interaction with an EVM smart contract — a token transfer, a liquidity pool swap, a governance vote, a lending position update — emits one or more event logs. These logs are the authoritative record of everything that happened on-chain, structured as ABI-encoded topics and data fields.
The challenge is making them useful at scale:
- A major DeFi protocol emits millions of events per day across thousands of users
- Raw event logs contain hex-encoded parameters that require ABI decoding before they're meaningful
- Point-in-time queries against an archive node are slow and expensive for aggregations
- Real-time dashboards require continuous processing, not batch reruns
Smart contract event streaming matters for:
- Protocol dashboards: TVL, active users, transaction volume — updated in real time
- Token analytics: Transfer volumes, holder distributions, whale movements
- Governance tracking: Vote tallies and proposal state as they change
- Security monitoring: Unexpected contract calls, ownership transfers, privilege escalations
- Compliance: Interaction tracking for flagged addresses across specific contracts
How Streaming SQL Solves This
A blockchain indexer (such as a custom event listener, The Graph equivalent, or services like Alchemy Webhooks) decodes ABI-encoded event logs and publishes structured records to Kafka. RisingWave consumes these decoded events and maintains materialized views: token holder balances, protocol TVL, user activity summaries, and security alerts — all queryable without re-indexing.
Building It Step by Step
Step 1: Connect the Data Source
-- Decoded ERC-20 Transfer events
CREATE SOURCE erc20_transfers (
tx_hash VARCHAR,
block_number BIGINT,
log_index INTEGER,
block_timestamp TIMESTAMPTZ,
contract_address VARCHAR, -- token contract
token_symbol VARCHAR,
token_decimals INTEGER,
from_address VARCHAR,
to_address VARCHAR,
raw_amount NUMERIC, -- raw on-chain amount (before decimals)
amount NUMERIC, -- human-readable amount (raw / 10^decimals)
amount_usd NUMERIC,
chain_id INTEGER
) WITH (
connector = 'kafka',
topic = 'blockchain.events.erc20-transfers',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
-- DeFi protocol events (swaps, deposits, withdrawals, borrows)
CREATE SOURCE protocol_events (
tx_hash VARCHAR,
block_number BIGINT,
log_index INTEGER,
block_timestamp TIMESTAMPTZ,
protocol_name VARCHAR,
contract_address VARCHAR,
event_name VARCHAR, -- 'Swap', 'Deposit', 'Withdraw', 'Borrow'
user_address VARCHAR,
token_in VARCHAR,
token_out VARCHAR,
amount_in_usd NUMERIC,
amount_out_usd NUMERIC,
fee_usd NUMERIC,
indexed_param_1 VARCHAR, -- protocol-specific indexed field
indexed_param_2 VARCHAR,
chain_id INTEGER
) WITH (
connector = 'kafka',
topic = 'blockchain.events.protocol',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
-- Token holder balances derived from transfer events
-- (mint = from address 0x000..., burn = to address 0x000...)
CREATE MATERIALIZED VIEW token_holder_balances AS
SELECT
contract_address,
token_symbol,
address,
SUM(amount) AS balance,
SUM(amount_usd) AS balance_usd,
COUNT(*) AS transfer_count,
MAX(block_timestamp) AS last_active
FROM (
-- Inbound transfers (received)
SELECT contract_address, token_symbol, to_address AS address,
amount, amount_usd, block_timestamp
FROM erc20_transfers
UNION ALL
-- Outbound transfers (sent, negative)
SELECT contract_address, token_symbol, from_address AS address,
-amount AS amount, -amount_usd AS amount_usd, block_timestamp
FROM erc20_transfers
) transfers
WHERE address <> '0x0000000000000000000000000000000000000000'
GROUP BY contract_address, token_symbol, address
HAVING SUM(amount) > 0;
-- Protocol TVL and activity (rolling 24 hours)
CREATE MATERIALIZED VIEW protocol_activity_24h AS
SELECT
protocol_name,
event_name,
window_start,
window_end,
COUNT(*) AS event_count,
COUNT(DISTINCT user_address) AS unique_users,
SUM(amount_in_usd) AS volume_in_usd,
SUM(fee_usd) AS fees_collected_usd,
AVG(fee_usd) AS avg_fee_usd
FROM TUMBLE(protocol_events, block_timestamp, INTERVAL '24 HOURS')
GROUP BY protocol_name, event_name, window_start, window_end;
-- Token transfer volume per contract per hour
CREATE MATERIALIZED VIEW token_transfer_volume AS
SELECT
contract_address,
token_symbol,
window_start,
window_end,
COUNT(*) AS transfer_count,
SUM(amount) AS total_amount,
SUM(amount_usd) AS total_usd,
COUNT(DISTINCT from_address) AS unique_senders,
COUNT(DISTINCT to_address) AS unique_recipients,
AVG(amount_usd) AS avg_transfer_usd
FROM TUMBLE(erc20_transfers, block_timestamp, INTERVAL '1 HOUR')
GROUP BY contract_address, token_symbol, window_start, window_end;
Step 3: Add Alerts and Detection Logic
-- Whale transfer alert: single transfer > $100,000
CREATE MATERIALIZED VIEW whale_transfer_alerts AS
SELECT
tx_hash,
block_number,
block_timestamp,
contract_address,
token_symbol,
from_address,
to_address,
ROUND(amount, 4) AS amount,
ROUND(amount_usd, 2) AS amount_usd
FROM erc20_transfers
WHERE amount_usd >= 100000;
-- Unusual contract interaction alert: flagged contract addresses
CREATE MATERIALIZED VIEW flagged_contract_interactions AS
SELECT
pe.tx_hash,
pe.block_timestamp,
pe.protocol_name,
pe.contract_address,
pe.event_name,
pe.user_address,
pe.amount_in_usd,
fc.flag_reason
FROM protocol_events pe
JOIN (VALUES
('0xabc123...exploit1', 'Known exploit contract'),
('0xdef456...proxy1', 'Unverified proxy'),
('0x789abc...rug1', 'Rug pull pattern address')
) AS fc(address, flag_reason)
ON pe.contract_address = fc.address
OR pe.user_address = fc.address;
-- Sink security alerts to Kafka
CREATE SINK contract_security_sink AS
SELECT
tx_hash, block_timestamp, contract_address,
from_address AS subject_address, amount_usd, 'WHALE_TRANSFER' AS alert_type
FROM whale_transfer_alerts
UNION ALL
SELECT
tx_hash, block_timestamp, contract_address,
user_address AS subject_address, amount_in_usd AS amount_usd,
'FLAGGED_CONTRACT' AS alert_type
FROM flagged_contract_interactions
WITH (
connector = 'kafka',
topic = 'blockchain.security.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 4: Query for Real-Time Insights
-- Top token holders by balance for USDC
SELECT
address,
ROUND(balance, 2) AS balance,
ROUND(balance_usd, 2) AS balance_usd,
transfer_count,
last_active
FROM token_holder_balances
WHERE token_symbol = 'USDC'
ORDER BY balance DESC
LIMIT 20;
-- Protocol activity summary: last 24-hour window
SELECT
protocol_name,
event_name,
event_count,
unique_users,
ROUND(volume_in_usd, 0) AS volume_usd,
ROUND(fees_collected_usd, 2) AS fees_usd
FROM protocol_activity_24h
WHERE window_end = (SELECT MAX(window_end) FROM protocol_activity_24h)
ORDER BY volume_in_usd DESC;
Comparison: Batch vs Streaming
| Aspect | Batch ETL | Streaming SQL (RisingWave) |
| Latency | Minutes to hours | Sub-second |
| Balance Updates | End-of-batch | Per-transfer |
| Protocol TVL | Daily snapshots | Continuously maintained |
| Whale Alerts | Delayed detection | Real-time |
| Event Coverage | Re-indexing required | Append-only stream |
| ABI Decoding | Batch job | Indexer pre-processing |
FAQ
Q: How do you handle contract upgrades where the ABI changes?
Upgradeable contracts (e.g., using the transparent proxy pattern) emit events from the proxy address but the implementation address changes on upgrade. The recommended approach is to maintain a version mapping in a reference table — (proxy_address, implementation_address, effective_from_block) — and join event streams against this table using a temporal join in RisingWave. When a new implementation is deployed, the indexer begins decoding events with the new ABI while historical events retain their original schema.
Q: Can this track contract state variables, not just events?
Contract storage slots don't emit events automatically — you only see what the contract explicitly emits. For tracking state variables (e.g., the total supply of a token, or the current owner of a contract), you have two options: call the contract's view functions periodically and publish results to Kafka as synthetic events, or use a storage diff indexer that captures state changes at the execution trace level and emits them as events. Both approaches feed into the same RisingWave source pattern.
Q: How do you reconstruct accurate token holder balances when starting mid-stream?
Starting from a mid-stream Kafka offset means you don't have historical transfers, so balance accumulation will be incomplete. The standard approach is to bootstrap from a full historical snapshot: either import all past transfers into RisingWave from a full-history indexer, or seed balances from a checkpoint snapshot and then apply incremental deltas from the stream going forward. RisingWave supports upsert sources for the bootstrap case and append-only sources for the delta stream.
Key Takeaways
- Blockchain indexers decode ABI-encoded event logs and publish structured records to Kafka for RisingWave to consume
- Token holder balances are maintained as a running sum of inbound and outbound transfer events — no periodic snapshots
- Protocol activity, transfer volume, and user engagement metrics update with every new block
- Whale transfer alerts and flagged address interactions sink to Kafka within milliseconds of on-chain confirmation
- The full analytics stack uses standard PostgreSQL-compatible SQL — portable, auditable, and maintainable without specialized streaming expertise
Ready to try this? Get started with RisingWave. Join our Slack community.

