On-Chain Transaction Monitoring with Streaming SQL

On-Chain Transaction Monitoring with Streaming SQL

On-chain transaction monitoring gives blockchain analysts, security teams, and compliance officers continuous visibility into wallet activity, gas dynamics, and contract interactions — updating in real time as new blocks are confirmed. With RisingWave, a PostgreSQL-compatible streaming database, you can write SQL materialized views over a stream of blockchain transaction data that always reflect the current state of the chain.

Why On-Chain Monitoring Matters

The blockchain is a public ledger, but raw transaction data is noisy and voluminous. Ethereum mainnet produces hundreds of transactions per block, each carrying a transaction hash, sender and recipient addresses, ETH value, gas limit, gas price in gwei, and input data identifying the contract function called.

Meaningful monitoring requires correlating these events in real time:

  • Security teams watch for large unexpected transfers from known wallets or hot exchange wallets
  • Compliance teams track interactions with flagged addresses and contract deployments
  • Gas analytics identify periods of network congestion before they affect time-sensitive operations
  • Protocol dashboards show user activity, TVL changes, and function call distributions as they happen

Polling an archive node or waiting for block explorers to index data introduces latency measured in minutes. A streaming approach processes transactions as they land on-chain, maintaining up-to-date aggregations continuously.

How Streaming SQL Solves This

An on-chain indexer (such as a custom node listener or a service like Alchemy's stream API) publishes transactions and receipts to Kafka. RisingWave consumes this stream and maintains materialized views: per-address activity summaries, gas market statistics, large transfer trackers, and contract interaction counts — all queryable over a PostgreSQL-compatible interface without custom application code.

Building It Step by Step

Step 1: Connect the Data Source

CREATE SOURCE blockchain_transactions (
    tx_hash             VARCHAR,
    block_number        BIGINT,
    block_timestamp     TIMESTAMPTZ,
    from_address        VARCHAR,
    to_address          VARCHAR,
    value_eth           NUMERIC,
    gas_limit           BIGINT,
    gas_used            BIGINT,
    gas_price_gwei      NUMERIC,
    tx_fee_eth          NUMERIC,
    input_function_sig  VARCHAR,    -- first 4 bytes of input data
    is_contract_call    BOOLEAN,
    is_contract_create  BOOLEAN,
    status              SMALLINT,   -- 1 = success, 0 = reverted
    chain_id            INTEGER
) WITH (
    connector = 'kafka',
    topic = 'blockchain.ethereum.transactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

-- Per-address activity summary (rolling 24 hours)
CREATE MATERIALIZED VIEW address_activity_24h AS
SELECT
    from_address,
    COUNT(*)                                            AS tx_count,
    COUNT(*) FILTER (WHERE is_contract_call)           AS contract_calls,
    COUNT(*) FILTER (WHERE is_contract_create)         AS contracts_deployed,
    SUM(value_eth)                                     AS total_eth_sent,
    SUM(tx_fee_eth)                                    AS total_fees_paid,
    AVG(gas_price_gwei)                                AS avg_gas_price_gwei,
    COUNT(*) FILTER (WHERE status = 0)                 AS failed_tx_count,
    MAX(block_timestamp)                               AS last_active
FROM HOP(blockchain_transactions, block_timestamp,
         INTERVAL '1 HOUR', INTERVAL '24 HOURS')
GROUP BY from_address;
-- Block-level gas market statistics (per block summary)
CREATE MATERIALIZED VIEW block_gas_stats AS
SELECT
    block_number,
    MIN(block_timestamp)            AS block_time,
    COUNT(*)                        AS tx_count,
    AVG(gas_price_gwei)             AS avg_gas_price_gwei,
    MIN(gas_price_gwei)             AS min_gas_price_gwei,
    MAX(gas_price_gwei)             AS max_gas_price_gwei,
    PERCENTILE_CONT(0.5)
        WITHIN GROUP (ORDER BY gas_price_gwei)
                                    AS median_gas_price_gwei,
    SUM(gas_used)                   AS total_gas_used,
    SUM(value_eth)                  AS total_eth_transferred,
    COUNT(*) FILTER (WHERE status = 0) AS failed_tx_count
FROM blockchain_transactions
GROUP BY block_number;
-- Inbound large transfer tracker
CREATE MATERIALIZED VIEW large_transfers AS
SELECT
    tx_hash,
    block_number,
    block_timestamp,
    from_address,
    to_address,
    value_eth,
    tx_fee_eth,
    gas_price_gwei
FROM blockchain_transactions
WHERE value_eth >= 100.0
  AND status = 1;

Step 3: Add Alerts and Detection Logic

-- Alert when a watched address sends any transaction
CREATE MATERIALIZED VIEW watched_address_alerts AS
SELECT
    t.tx_hash,
    t.block_number,
    t.block_timestamp,
    t.from_address,
    t.to_address,
    t.value_eth,
    t.input_function_sig,
    w.label
FROM blockchain_transactions t
JOIN (VALUES
    ('0xabc123...watchlist1', 'Exchange Hot Wallet A'),
    ('0xdef456...watchlist2', 'Known Mixer Contract'),
    ('0x789abc...watchlist3', 'Flagged Deployer Address')
) AS w(address, label)
    ON t.from_address = w.address
    OR t.to_address   = w.address;
-- Alert on gas price spikes (> 3x the 10-minute moving average)
CREATE MATERIALIZED VIEW gas_spike_alerts AS
SELECT
    current_block.block_number,
    current_block.avg_gas_price_gwei    AS current_avg_gwei,
    baseline.avg_gas_price_gwei         AS baseline_avg_gwei,
    current_block.avg_gas_price_gwei
        / NULLIF(baseline.avg_gas_price_gwei, 0) AS spike_ratio
FROM block_gas_stats current_block
JOIN (
    SELECT AVG(avg_gas_price_gwei) AS avg_gas_price_gwei
    FROM block_gas_stats
    WHERE block_time >= NOW() - INTERVAL '10 MINUTES'
) baseline ON TRUE
WHERE current_block.avg_gas_price_gwei
    > baseline.avg_gas_price_gwei * 3;
-- Sink large transfers and watched address hits to Kafka
CREATE SINK transaction_alerts_sink AS
SELECT
    tx_hash, block_number, block_timestamp,
    from_address, to_address, value_eth, 'LARGE_TRANSFER' AS alert_type
FROM large_transfers
UNION ALL
SELECT
    tx_hash, block_number, block_timestamp,
    from_address, to_address, value_eth, 'WATCHED_ADDRESS' AS alert_type
FROM watched_address_alerts
WITH (
    connector = 'kafka',
    topic = 'blockchain.alerts.transactions',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 4: Query for Real-Time Insights

-- Most active senders in the past 24 hours
SELECT
    from_address,
    tx_count,
    contract_calls,
    ROUND(total_eth_sent, 4)    AS total_eth_sent,
    ROUND(total_fees_paid, 6)   AS total_fees_paid_eth,
    last_active
FROM address_activity_24h
ORDER BY tx_count DESC
LIMIT 10;
-- Recent gas market trend (last 20 blocks)
SELECT
    block_number,
    block_time,
    tx_count,
    ROUND(avg_gas_price_gwei, 2)    AS avg_gwei,
    ROUND(median_gas_price_gwei, 2) AS median_gwei,
    total_gas_used
FROM block_gas_stats
ORDER BY block_number DESC
LIMIT 20;

Comparison: Batch vs Streaming

AspectBatch ETLStreaming SQL (RisingWave)
LatencyMinutesSub-second
Address MonitoringPeriodic scansPer-transaction
Gas AnalyticsHistorical reportsPer-block updates
Large Transfer AlertsDelayed detectionReal-time
Watchlist MatchingBatch cross-referenceContinuous join
InfrastructureArchive node + ETL pipelineKafka + RisingWave

FAQ

Q: How do you handle chain reorganizations (reorgs)?

Reorgs mean a previously confirmed block gets replaced by a competing chain. The indexer should emit tombstone events for transactions in the orphaned blocks (a retraction event with the same tx_hash). RisingWave can process these as negative events if your source schema includes a retraction flag, allowing materialized views to reverse affected aggregations. For monitoring use cases that tolerate rare inconsistencies, a simpler approach is to only process transactions from blocks with at least 6 confirmations, accepting the latency tradeoff for greater certainty.

Q: Can this work across multiple chains (Ethereum, Polygon, Arbitrum, etc.)?

Yes. Include a chain_id field in the source schema (which the example above already does) and route all chains through the same Kafka topic. Materialized views then group by chain_id alongside other dimensions, giving you cross-chain aggregations from a single source definition. Each chain's indexer writes to the same topic with its respective chain_id value.

Q: How do you decode input data to identify which contract function was called?

The input_function_sig field contains the first four bytes of the input data — the function selector. Maintain a reference table of known selectors mapped to function signatures (e.g., 0xa9059cbb maps to transfer(address,uint256) for ERC-20). Join this table to the transaction stream in a materialized view to produce human-readable function labels for analytics. This reference table can be stored in RisingWave as a regular table and joined using a temporal join pattern.

Key Takeaways

  • Blockchain transactions stream from an on-chain indexer into Kafka, feeding a single RisingWave source
  • Address activity, gas market, and large transfer materialized views update with every new block
  • Watchlist matching uses a stream-to-static join — no polling, no scheduled queries
  • Gas spike detection compares per-block averages to a rolling baseline in real time
  • Alert sinks push flagged events to Kafka for integration with security platforms and compliance tools

Ready to try this? Get started with RisingWave. Join our Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.