Blockchain Data Streaming: From Nodes to Real-Time SQL

Blockchain Data Streaming: From Nodes to Real-Time SQL

Getting blockchain data into a real-time analytics system requires bridging two worlds: the event-driven model of Ethereum RPC nodes and the relational model of SQL analytics. This guide walks through a production-grade pipeline from eth_subscribe to queryable materialized views in RisingWave.

Why Blockchain Data Streaming Is Hard

Ethereum produces roughly one block every 12 seconds. Each block contains hundreds of transactions, each of which may emit dozens of log events. At current mainnet volumes, this means tens of thousands of log events per minute that need to be decoded, normalized, and made queryable.

The challenges unique to blockchain data pipelines are:

  • ABI decoding: raw log data is hex-encoded and must be decoded against a contract ABI to be meaningful. This happens upstream, in the indexer layer.
  • Reorg handling: Ethereum reorganizations (reorgs) can invalidate recently confirmed blocks. A production pipeline must handle retractions—events that were emitted in a block that was later replaced.
  • Finality depth: transactions are considered final only after a sufficient number of confirmations (typically 12–64 for Ethereum, 1 for L2s with sequencer finality). Risk-sensitive applications should distinguish between "seen" and "finalized" events.
  • Block timestamps: Ethereum block timestamps are miner-set and can vary by several seconds. Accurate time-series analysis requires using block numbers as the primary ordering key, not wall-clock timestamps.
  • Log filter management: subscribing to every event on mainnet is impractical. Production pipelines maintain per-contract, per-event-signature log filters and fan decoded events into per-topic Kafka streams.

How Streaming SQL Solves This

Once decoded events land in Kafka, RisingWave ingests them and maintains materialized views that are always up to date. The heavy lifting—ABI decoding, log filtering, reorg handling—happens in the indexer layer (tools like The Graph's Firehose, Goldsky, or a custom Ethers.js subscriber). RisingWave's job is to aggregate, join, and expose these events over a PostgreSQL interface.

This separation of concerns keeps the streaming SQL layer simple: you write queries against clean, decoded events, not raw hex blobs.

Building It Step by Step

Step 1: Connect the Data Source

-- Decoded ERC-20 Transfer events
CREATE SOURCE erc20_transfers (
    tx_hash         VARCHAR,
    log_index       INT,
    block_number    BIGINT,
    block_ts        TIMESTAMPTZ,
    contract_address VARCHAR,
    token_symbol    VARCHAR,
    from_address    VARCHAR,
    to_address      VARCHAR,
    amount          NUMERIC,
    amount_usd      NUMERIC,
    is_finalized    BOOLEAN,
    reorg_depth     INT       -- 0 = latest, negative = retracted
) WITH (
    connector     = 'kafka',
    topic         = 'blockchain.erc20.transfers',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Block metadata
CREATE SOURCE block_metadata (
    block_number    BIGINT,
    block_hash      VARCHAR,
    parent_hash     VARCHAR,
    timestamp_unix  BIGINT,
    block_ts        TIMESTAMPTZ,
    tx_count        INT,
    gas_used        BIGINT,
    base_fee_gwei   NUMERIC,
    is_finalized    BOOLEAN
) WITH (
    connector     = 'kafka',
    topic         = 'blockchain.blocks',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

Track transfer volume and active addresses per token, using only finalized events:

CREATE MATERIALIZED VIEW token_transfer_stats AS
SELECT
    token_symbol,
    contract_address,
    COUNT(*)                            AS transfer_count_1h,
    SUM(amount_usd)                     AS volume_usd_1h,
    COUNT(DISTINCT from_address)        AS unique_senders_1h,
    COUNT(DISTINCT to_address)          AS unique_receivers_1h,
    MAX(block_number)                   AS latest_block,
    MAX(block_ts)                       AS latest_ts
FROM erc20_transfers
WHERE block_ts > NOW() - INTERVAL '1 hour'
  AND is_finalized = TRUE
  AND reorg_depth = 0
GROUP BY token_symbol, contract_address;

CREATE MATERIALIZED VIEW block_throughput AS
SELECT
    DATE_TRUNC('minute', block_ts)      AS minute,
    COUNT(*)                            AS block_count,
    SUM(tx_count)                       AS total_txs,
    AVG(gas_used)                       AS avg_gas_used,
    AVG(base_fee_gwei)                  AS avg_base_fee_gwei,
    MAX(block_number)                   AS latest_block
FROM block_metadata
WHERE block_ts > NOW() - INTERVAL '1 hour'
  AND is_finalized = TRUE
GROUP BY 1;

Step 3: Detection Logic and Alerts

Detect anomalous transfer activity and unusual gas spikes:

CREATE MATERIALIZED VIEW large_transfer_alerts AS
SELECT
    tx_hash,
    block_number,
    block_ts,
    token_symbol,
    from_address,
    to_address,
    amount_usd,
    CASE
        WHEN amount_usd > 10000000  THEN 'WHALE_TRANSFER'
        WHEN amount_usd > 1000000   THEN 'LARGE_TRANSFER'
        ELSE                             'NOTABLE_TRANSFER'
    END AS alert_type
FROM erc20_transfers
WHERE block_ts > NOW() - INTERVAL '5 minutes'
  AND is_finalized = TRUE
  AND amount_usd > 500000;

CREATE MATERIALIZED VIEW gas_spike_alerts AS
SELECT
    minute,
    avg_base_fee_gwei,
    total_txs,
    CASE
        WHEN avg_base_fee_gwei > 100 THEN 'EXTREME_GAS'
        WHEN avg_base_fee_gwei > 50  THEN 'HIGH_GAS'
        ELSE                              'ELEVATED_GAS'
    END AS severity
FROM block_throughput
WHERE avg_base_fee_gwei > 50;

CREATE SINK blockchain_alerts_sink AS
SELECT * FROM large_transfer_alerts
WITH (
    connector                   = 'kafka',
    topic                       = 'alerts.blockchain.transfers',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 4: Querying Live Results

View the most active tokens by 1-hour transfer volume:

SELECT
    token_symbol,
    transfer_count_1h,
    ROUND(volume_usd_1h, 0)     AS volume_usd,
    unique_senders_1h,
    unique_receivers_1h,
    latest_block,
    latest_ts
FROM token_transfer_stats
ORDER BY volume_usd_1h DESC
LIMIT 10;

Comparison Table

ApproachReorg SafetyLatencyQuery InterfaceOperational Complexity
Direct RPC pollingPoor12s+Custom APIHigh
The Graph subgraphGood5–30sGraphQLMedium
Goldsky + RisingWaveGood< 2sStandard SQLLow
Custom Firehose + FlinkExcellent< 1sCustomVery High
RisingWave (Kafka source)Good (upstream)< 1sPostgreSQL SQLLow

FAQ

How does RisingWave handle reorgs if the upstream Kafka topic does not emit retraction events? If the upstream indexer emits only append-only events without retractions, RisingWave will treat them as-is. The best practice is to handle reorgs upstream: the indexer marks reorged events with a negative reorg_depth value or a is_retracted flag, and downstream RisingWave views filter on reorg_depth = 0 to include only canonical chain data. RisingWave also supports upsert sources where an updated row with is_retracted = true can correct the materialized view.

What finality depth should be used for different use cases? For display purposes (dashboards), using the latest un-finalized data with a visual indicator is acceptable. For risk or financial calculations, use only events marked is_finalized = TRUE. For Ethereum mainnet, this means waiting for approximately 64 block confirmations (~13 minutes). On L2s like Arbitrum or Optimism with a trusted sequencer, finality is effectively immediate for operational purposes.

How do you handle ABI decoding in this pipeline? ABI decoding happens in the indexer layer before data reaches Kafka—not in RisingWave. Tools like Goldsky, Alchemy Subgraphs, or custom Ethers.js subscribers decode raw log data against contract ABIs and publish structured JSON events to Kafka topics. RisingWave consumes the already-decoded JSON, keeping the SQL layer clean and focused on aggregation.

Key Takeaways

  • Blockchain data pipelines require specialized handling for reorgs, finality depth, and block timestamps before the data is fit for SQL analytics.
  • The right architecture separates ABI decoding and reorg handling (indexer layer) from aggregation and alerting (RisingWave streaming SQL layer).
  • RisingWave's PostgreSQL compatibility allows existing BI tools and dashboards to query blockchain data over standard SQL without custom API development.
  • Finality depth filtering in materialized views ensures risk-sensitive calculations use only canonical chain data, even as the latest unconfirmed events flow through the pipeline.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.