On-chain transaction monitoring gives blockchain analysts, security teams, and compliance officers continuous visibility into wallet activity, gas dynamics, and contract interactions — updating in real time as new blocks are confirmed. With RisingWave, a PostgreSQL-compatible streaming database, you can write SQL materialized views over a stream of blockchain transaction data that always reflect the current state of the chain.
Why On-Chain Monitoring Matters
The blockchain is a public ledger, but raw transaction data is noisy and voluminous. Ethereum mainnet produces hundreds of transactions per block, each carrying a transaction hash, sender and recipient addresses, ETH value, gas limit, gas price in gwei, and input data identifying the contract function called.
Meaningful monitoring requires correlating these events in real time:
- Security teams watch for large unexpected transfers from known wallets or hot exchange wallets
- Compliance teams track interactions with flagged addresses and contract deployments
- Gas analytics identify periods of network congestion before they affect time-sensitive operations
- Protocol dashboards show user activity, TVL changes, and function call distributions as they happen
Polling an archive node or waiting for block explorers to index data introduces latency measured in minutes. A streaming approach processes transactions as they land on-chain, maintaining up-to-date aggregations continuously.
How Streaming SQL Solves This
An on-chain indexer (such as a custom node listener or a service like Alchemy's stream API) publishes transactions and receipts to Kafka. RisingWave consumes this stream and maintains materialized views: per-address activity summaries, gas market statistics, large transfer trackers, and contract interaction counts — all queryable over a PostgreSQL-compatible interface without custom application code.
Building It Step by Step
Step 1: Connect the Data Source
CREATE SOURCE blockchain_transactions (
tx_hash VARCHAR,
block_number BIGINT,
block_timestamp TIMESTAMPTZ,
from_address VARCHAR,
to_address VARCHAR,
value_eth NUMERIC,
gas_limit BIGINT,
gas_used BIGINT,
gas_price_gwei NUMERIC,
tx_fee_eth NUMERIC,
input_function_sig VARCHAR, -- first 4 bytes of input data
is_contract_call BOOLEAN,
is_contract_create BOOLEAN,
status SMALLINT, -- 1 = success, 0 = reverted
chain_id INTEGER
) WITH (
connector = 'kafka',
topic = 'blockchain.ethereum.transactions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
-- Per-address activity summary (rolling 24 hours)
CREATE MATERIALIZED VIEW address_activity_24h AS
SELECT
from_address,
COUNT(*) AS tx_count,
COUNT(*) FILTER (WHERE is_contract_call) AS contract_calls,
COUNT(*) FILTER (WHERE is_contract_create) AS contracts_deployed,
SUM(value_eth) AS total_eth_sent,
SUM(tx_fee_eth) AS total_fees_paid,
AVG(gas_price_gwei) AS avg_gas_price_gwei,
COUNT(*) FILTER (WHERE status = 0) AS failed_tx_count,
MAX(block_timestamp) AS last_active
FROM HOP(blockchain_transactions, block_timestamp,
INTERVAL '1 HOUR', INTERVAL '24 HOURS')
GROUP BY from_address;
-- Block-level gas market statistics (per block summary)
CREATE MATERIALIZED VIEW block_gas_stats AS
SELECT
block_number,
MIN(block_timestamp) AS block_time,
COUNT(*) AS tx_count,
AVG(gas_price_gwei) AS avg_gas_price_gwei,
MIN(gas_price_gwei) AS min_gas_price_gwei,
MAX(gas_price_gwei) AS max_gas_price_gwei,
PERCENTILE_CONT(0.5)
WITHIN GROUP (ORDER BY gas_price_gwei)
AS median_gas_price_gwei,
SUM(gas_used) AS total_gas_used,
SUM(value_eth) AS total_eth_transferred,
COUNT(*) FILTER (WHERE status = 0) AS failed_tx_count
FROM blockchain_transactions
GROUP BY block_number;
-- Inbound large transfer tracker
CREATE MATERIALIZED VIEW large_transfers AS
SELECT
tx_hash,
block_number,
block_timestamp,
from_address,
to_address,
value_eth,
tx_fee_eth,
gas_price_gwei
FROM blockchain_transactions
WHERE value_eth >= 100.0
AND status = 1;
Step 3: Add Alerts and Detection Logic
-- Alert when a watched address sends any transaction
CREATE MATERIALIZED VIEW watched_address_alerts AS
SELECT
t.tx_hash,
t.block_number,
t.block_timestamp,
t.from_address,
t.to_address,
t.value_eth,
t.input_function_sig,
w.label
FROM blockchain_transactions t
JOIN (VALUES
('0xabc123...watchlist1', 'Exchange Hot Wallet A'),
('0xdef456...watchlist2', 'Known Mixer Contract'),
('0x789abc...watchlist3', 'Flagged Deployer Address')
) AS w(address, label)
ON t.from_address = w.address
OR t.to_address = w.address;
-- Alert on gas price spikes (> 3x the 10-minute moving average)
CREATE MATERIALIZED VIEW gas_spike_alerts AS
SELECT
current_block.block_number,
current_block.avg_gas_price_gwei AS current_avg_gwei,
baseline.avg_gas_price_gwei AS baseline_avg_gwei,
current_block.avg_gas_price_gwei
/ NULLIF(baseline.avg_gas_price_gwei, 0) AS spike_ratio
FROM block_gas_stats current_block
JOIN (
SELECT AVG(avg_gas_price_gwei) AS avg_gas_price_gwei
FROM block_gas_stats
WHERE block_time >= NOW() - INTERVAL '10 MINUTES'
) baseline ON TRUE
WHERE current_block.avg_gas_price_gwei
> baseline.avg_gas_price_gwei * 3;
-- Sink large transfers and watched address hits to Kafka
CREATE SINK transaction_alerts_sink AS
SELECT
tx_hash, block_number, block_timestamp,
from_address, to_address, value_eth, 'LARGE_TRANSFER' AS alert_type
FROM large_transfers
UNION ALL
SELECT
tx_hash, block_number, block_timestamp,
from_address, to_address, value_eth, 'WATCHED_ADDRESS' AS alert_type
FROM watched_address_alerts
WITH (
connector = 'kafka',
topic = 'blockchain.alerts.transactions',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 4: Query for Real-Time Insights
-- Most active senders in the past 24 hours
SELECT
from_address,
tx_count,
contract_calls,
ROUND(total_eth_sent, 4) AS total_eth_sent,
ROUND(total_fees_paid, 6) AS total_fees_paid_eth,
last_active
FROM address_activity_24h
ORDER BY tx_count DESC
LIMIT 10;
-- Recent gas market trend (last 20 blocks)
SELECT
block_number,
block_time,
tx_count,
ROUND(avg_gas_price_gwei, 2) AS avg_gwei,
ROUND(median_gas_price_gwei, 2) AS median_gwei,
total_gas_used
FROM block_gas_stats
ORDER BY block_number DESC
LIMIT 20;
Comparison: Batch vs Streaming
| Aspect | Batch ETL | Streaming SQL (RisingWave) |
| Latency | Minutes | Sub-second |
| Address Monitoring | Periodic scans | Per-transaction |
| Gas Analytics | Historical reports | Per-block updates |
| Large Transfer Alerts | Delayed detection | Real-time |
| Watchlist Matching | Batch cross-reference | Continuous join |
| Infrastructure | Archive node + ETL pipeline | Kafka + RisingWave |
FAQ
Q: How do you handle chain reorganizations (reorgs)?
Reorgs mean a previously confirmed block gets replaced by a competing chain. The indexer should emit tombstone events for transactions in the orphaned blocks (a retraction event with the same tx_hash). RisingWave can process these as negative events if your source schema includes a retraction flag, allowing materialized views to reverse affected aggregations. For monitoring use cases that tolerate rare inconsistencies, a simpler approach is to only process transactions from blocks with at least 6 confirmations, accepting the latency tradeoff for greater certainty.
Q: Can this work across multiple chains (Ethereum, Polygon, Arbitrum, etc.)?
Yes. Include a chain_id field in the source schema (which the example above already does) and route all chains through the same Kafka topic. Materialized views then group by chain_id alongside other dimensions, giving you cross-chain aggregations from a single source definition. Each chain's indexer writes to the same topic with its respective chain_id value.
Q: How do you decode input data to identify which contract function was called?
The input_function_sig field contains the first four bytes of the input data — the function selector. Maintain a reference table of known selectors mapped to function signatures (e.g., 0xa9059cbb maps to transfer(address,uint256) for ERC-20). Join this table to the transaction stream in a materialized view to produce human-readable function labels for analytics. This reference table can be stored in RisingWave as a regular table and joined using a temporal join pattern.
Key Takeaways
- Blockchain transactions stream from an on-chain indexer into Kafka, feeding a single RisingWave source
- Address activity, gas market, and large transfer materialized views update with every new block
- Watchlist matching uses a stream-to-static join — no polling, no scheduled queries
- Gas spike detection compares per-block averages to a rolling baseline in real time
- Alert sinks push flagged events to Kafka for integration with security platforms and compliance tools
Ready to try this? Get started with RisingWave. Join our Slack community.

