Getting blockchain data into a real-time analytics system requires bridging two worlds: the event-driven model of Ethereum RPC nodes and the relational model of SQL analytics. This guide walks through a production-grade pipeline from eth_subscribe to queryable materialized views in RisingWave.
Why Blockchain Data Streaming Is Hard
Ethereum produces roughly one block every 12 seconds. Each block contains hundreds of transactions, each of which may emit dozens of log events. At current mainnet volumes, this means tens of thousands of log events per minute that need to be decoded, normalized, and made queryable.
The challenges unique to blockchain data pipelines are:
- ABI decoding: raw log data is hex-encoded and must be decoded against a contract ABI to be meaningful. This happens upstream, in the indexer layer.
- Reorg handling: Ethereum reorganizations (reorgs) can invalidate recently confirmed blocks. A production pipeline must handle retractions—events that were emitted in a block that was later replaced.
- Finality depth: transactions are considered final only after a sufficient number of confirmations (typically 12–64 for Ethereum, 1 for L2s with sequencer finality). Risk-sensitive applications should distinguish between "seen" and "finalized" events.
- Block timestamps: Ethereum block timestamps are miner-set and can vary by several seconds. Accurate time-series analysis requires using block numbers as the primary ordering key, not wall-clock timestamps.
- Log filter management: subscribing to every event on mainnet is impractical. Production pipelines maintain per-contract, per-event-signature log filters and fan decoded events into per-topic Kafka streams.
How Streaming SQL Solves This
Once decoded events land in Kafka, RisingWave ingests them and maintains materialized views that are always up to date. The heavy lifting—ABI decoding, log filtering, reorg handling—happens in the indexer layer (tools like The Graph's Firehose, Goldsky, or a custom Ethers.js subscriber). RisingWave's job is to aggregate, join, and expose these events over a PostgreSQL interface.
This separation of concerns keeps the streaming SQL layer simple: you write queries against clean, decoded events, not raw hex blobs.
Building It Step by Step
Step 1: Connect the Data Source
-- Decoded ERC-20 Transfer events
CREATE SOURCE erc20_transfers (
tx_hash VARCHAR,
log_index INT,
block_number BIGINT,
block_ts TIMESTAMPTZ,
contract_address VARCHAR,
token_symbol VARCHAR,
from_address VARCHAR,
to_address VARCHAR,
amount NUMERIC,
amount_usd NUMERIC,
is_finalized BOOLEAN,
reorg_depth INT -- 0 = latest, negative = retracted
) WITH (
connector = 'kafka',
topic = 'blockchain.erc20.transfers',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Block metadata
CREATE SOURCE block_metadata (
block_number BIGINT,
block_hash VARCHAR,
parent_hash VARCHAR,
timestamp_unix BIGINT,
block_ts TIMESTAMPTZ,
tx_count INT,
gas_used BIGINT,
base_fee_gwei NUMERIC,
is_finalized BOOLEAN
) WITH (
connector = 'kafka',
topic = 'blockchain.blocks',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
Track transfer volume and active addresses per token, using only finalized events:
CREATE MATERIALIZED VIEW token_transfer_stats AS
SELECT
token_symbol,
contract_address,
COUNT(*) AS transfer_count_1h,
SUM(amount_usd) AS volume_usd_1h,
COUNT(DISTINCT from_address) AS unique_senders_1h,
COUNT(DISTINCT to_address) AS unique_receivers_1h,
MAX(block_number) AS latest_block,
MAX(block_ts) AS latest_ts
FROM erc20_transfers
WHERE block_ts > NOW() - INTERVAL '1 hour'
AND is_finalized = TRUE
AND reorg_depth = 0
GROUP BY token_symbol, contract_address;
CREATE MATERIALIZED VIEW block_throughput AS
SELECT
DATE_TRUNC('minute', block_ts) AS minute,
COUNT(*) AS block_count,
SUM(tx_count) AS total_txs,
AVG(gas_used) AS avg_gas_used,
AVG(base_fee_gwei) AS avg_base_fee_gwei,
MAX(block_number) AS latest_block
FROM block_metadata
WHERE block_ts > NOW() - INTERVAL '1 hour'
AND is_finalized = TRUE
GROUP BY 1;
Step 3: Detection Logic and Alerts
Detect anomalous transfer activity and unusual gas spikes:
CREATE MATERIALIZED VIEW large_transfer_alerts AS
SELECT
tx_hash,
block_number,
block_ts,
token_symbol,
from_address,
to_address,
amount_usd,
CASE
WHEN amount_usd > 10000000 THEN 'WHALE_TRANSFER'
WHEN amount_usd > 1000000 THEN 'LARGE_TRANSFER'
ELSE 'NOTABLE_TRANSFER'
END AS alert_type
FROM erc20_transfers
WHERE block_ts > NOW() - INTERVAL '5 minutes'
AND is_finalized = TRUE
AND amount_usd > 500000;
CREATE MATERIALIZED VIEW gas_spike_alerts AS
SELECT
minute,
avg_base_fee_gwei,
total_txs,
CASE
WHEN avg_base_fee_gwei > 100 THEN 'EXTREME_GAS'
WHEN avg_base_fee_gwei > 50 THEN 'HIGH_GAS'
ELSE 'ELEVATED_GAS'
END AS severity
FROM block_throughput
WHERE avg_base_fee_gwei > 50;
CREATE SINK blockchain_alerts_sink AS
SELECT * FROM large_transfer_alerts
WITH (
connector = 'kafka',
topic = 'alerts.blockchain.transfers',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 4: Querying Live Results
View the most active tokens by 1-hour transfer volume:
SELECT
token_symbol,
transfer_count_1h,
ROUND(volume_usd_1h, 0) AS volume_usd,
unique_senders_1h,
unique_receivers_1h,
latest_block,
latest_ts
FROM token_transfer_stats
ORDER BY volume_usd_1h DESC
LIMIT 10;
Comparison Table
| Approach | Reorg Safety | Latency | Query Interface | Operational Complexity |
| Direct RPC polling | Poor | 12s+ | Custom API | High |
| The Graph subgraph | Good | 5–30s | GraphQL | Medium |
| Goldsky + RisingWave | Good | < 2s | Standard SQL | Low |
| Custom Firehose + Flink | Excellent | < 1s | Custom | Very High |
| RisingWave (Kafka source) | Good (upstream) | < 1s | PostgreSQL SQL | Low |
FAQ
How does RisingWave handle reorgs if the upstream Kafka topic does not emit retraction events?
If the upstream indexer emits only append-only events without retractions, RisingWave will treat them as-is. The best practice is to handle reorgs upstream: the indexer marks reorged events with a negative reorg_depth value or a is_retracted flag, and downstream RisingWave views filter on reorg_depth = 0 to include only canonical chain data. RisingWave also supports upsert sources where an updated row with is_retracted = true can correct the materialized view.
What finality depth should be used for different use cases?
For display purposes (dashboards), using the latest un-finalized data with a visual indicator is acceptable. For risk or financial calculations, use only events marked is_finalized = TRUE. For Ethereum mainnet, this means waiting for approximately 64 block confirmations (~13 minutes). On L2s like Arbitrum or Optimism with a trusted sequencer, finality is effectively immediate for operational purposes.
How do you handle ABI decoding in this pipeline? ABI decoding happens in the indexer layer before data reaches Kafka—not in RisingWave. Tools like Goldsky, Alchemy Subgraphs, or custom Ethers.js subscribers decode raw log data against contract ABIs and publish structured JSON events to Kafka topics. RisingWave consumes the already-decoded JSON, keeping the SQL layer clean and focused on aggregation.
Key Takeaways
- Blockchain data pipelines require specialized handling for reorgs, finality depth, and block timestamps before the data is fit for SQL analytics.
- The right architecture separates ABI decoding and reorg handling (indexer layer) from aggregation and alerting (RisingWave streaming SQL layer).
- RisingWave's PostgreSQL compatibility allows existing BI tools and dashboards to query blockchain data over standard SQL without custom API development.
- Finality depth filtering in materialized views ensures risk-sensitive calculations use only canonical chain data, even as the latest unconfirmed events flow through the pipeline.

