Crypto AML failures cost exchanges their operating licenses. The FATF travel rule requires Virtual Asset Service Providers to transmit originator and beneficiary information for transfers above $1,000—but detecting structuring, mixer usage, and sanctioned-address flows requires continuous, sub-second transaction scoring that batch systems cannot provide.
Why Crypto AML Matters
The Financial Action Task Force (FATF) Travel Rule has now been adopted by over 50 jurisdictions. Under it, VASPs must collect and transmit counterparty information for transactions above threshold—but compliance doesn't end at data collection. Regulators expect VASPs to detect:
- Structuring (smurfing): multiple transactions just below the $1,000/$10,000 reporting threshold from the same address within a short window.
- Mixer/tumbler usage: funds routed through known mixing contracts (e.g., Tornado Cash) or chain-hopping patterns across bridges.
- Sanctioned address flows: direct or indirect transactions involving OFAC SDN-listed addresses.
- Unusual velocity: an address that transacted once a week suddenly sending dozens of transfers in an hour.
- Round-trip patterns: funds leaving an address and returning within a short time window through different intermediaries.
Traditional batch-based AML systems score transactions hourly or nightly. By then, a structuring campaign may have completed across dozens of wallets. Real-time scoring—where each new transaction is evaluated against rolling behavioral baselines within milliseconds—closes this window.
How Streaming SQL Solves This
RisingWave is a PostgreSQL-compatible streaming database that reads transaction events from Kafka, maintains rolling behavioral windows per address, and materializes risk scores that update with every new transaction. Downstream systems (case management platforms, alert queues, block/freeze workflows) query these materialized views directly over a standard Postgres wire protocol.
This eliminates the need for a custom stateful stream processing application. AML rules become SQL views that compliance officers can read and audit, lowering the barrier between engineering and compliance teams.
Building It Step by Step
Step 1: Connect the Data Source
CREATE SOURCE crypto_transactions (
tx_hash VARCHAR,
from_address VARCHAR,
to_address VARCHAR,
amount_usd NUMERIC,
asset_symbol VARCHAR,
chain_id INT,
block_number BIGINT,
block_ts TIMESTAMPTZ,
is_contract BOOLEAN,
via_bridge BOOLEAN
) WITH (
connector = 'kafka',
topic = 'chain.transactions.normalized',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE sanctioned_addresses (
address VARCHAR,
list_name VARCHAR, -- 'OFAC_SDN', 'EU_SANCTIONS', 'UN_SANCTIONS'
added_at TIMESTAMPTZ,
reason VARCHAR
) WITH (
connector = 'kafka',
topic = 'compliance.sanctions.list',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
Compute rolling behavioral baselines per address over multiple time windows:
CREATE MATERIALIZED VIEW address_velocity AS
SELECT
from_address,
COUNT(*) AS tx_count_1h,
SUM(amount_usd) AS volume_usd_1h,
AVG(amount_usd) AS avg_tx_usd_1h,
MAX(amount_usd) AS max_tx_usd_1h,
COUNT(DISTINCT to_address) AS unique_recipients_1h,
SUM(CASE WHEN amount_usd < 1000 THEN 1 ELSE 0 END) AS sub_threshold_count_1h
FROM crypto_transactions
WHERE block_ts > NOW() - INTERVAL '1 hour'
GROUP BY from_address;
CREATE MATERIALIZED VIEW address_velocity_24h AS
SELECT
from_address,
COUNT(*) AS tx_count_24h,
SUM(amount_usd) AS volume_usd_24h,
COUNT(DISTINCT to_address) AS unique_recipients_24h
FROM crypto_transactions
WHERE block_ts > NOW() - INTERVAL '24 hours'
GROUP BY from_address;
Step 3: Detection Logic and Alerts
Score each address across multiple AML typologies and emit high-risk alerts:
CREATE MATERIALIZED VIEW aml_risk_scores AS
SELECT
v.from_address,
v.tx_count_1h,
v.volume_usd_1h,
v.sub_threshold_count_1h,
v.unique_recipients_1h,
v24.tx_count_24h,
v24.volume_usd_24h,
-- Structuring score: many sub-threshold transactions
CASE WHEN v.sub_threshold_count_1h >= 5 AND v.volume_usd_1h > 4000 THEN 40 ELSE 0 END
-- High velocity score
+ CASE WHEN v.tx_count_1h > 20 THEN 30 ELSE 0 END
-- Fan-out score: many unique recipients
+ CASE WHEN v.unique_recipients_1h > 10 THEN 20 ELSE 0 END
-- Volume spike: 24h volume is extreme
+ CASE WHEN v24.volume_usd_24h > 100000 THEN 10 ELSE 0 END
AS risk_score,
NOW() AS scored_at
FROM address_velocity v
JOIN address_velocity_24h v24 ON v.from_address = v24.from_address;
CREATE MATERIALIZED VIEW sanctioned_flows AS
SELECT
t.tx_hash,
t.from_address,
t.to_address,
t.amount_usd,
t.asset_symbol,
t.block_ts,
s.list_name,
s.reason,
'SANCTIONED_ADDRESS' AS alert_type,
'CRITICAL' AS severity
FROM crypto_transactions t
JOIN sanctioned_addresses s
ON t.from_address = s.address OR t.to_address = s.address
WHERE t.block_ts > NOW() - INTERVAL '5 minutes';
CREATE SINK aml_alerts_sink AS
SELECT
from_address,
risk_score,
tx_count_1h,
volume_usd_1h,
sub_threshold_count_1h,
scored_at,
CASE
WHEN risk_score >= 70 THEN 'HIGH'
WHEN risk_score >= 40 THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_level
FROM aml_risk_scores
WHERE risk_score >= 40
WITH (
connector = 'kafka',
topic = 'compliance.aml.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 4: Querying Live Results
Pull the current high-risk address queue for compliance review:
SELECT
from_address,
risk_score,
tx_count_1h,
ROUND(volume_usd_1h, 2) AS volume_usd_1h,
sub_threshold_count_1h,
unique_recipients_1h,
ROUND(volume_usd_24h, 2) AS volume_usd_24h,
scored_at
FROM aml_risk_scores
WHERE risk_score >= 40
ORDER BY risk_score DESC, volume_usd_1h DESC
LIMIT 10;
Comparison Table
| Capability | Batch AML (nightly) | Real-Time Streaming SQL |
| Detection latency | 8–24 hours | < 2 seconds |
| Structuring detection | Post-facto | Within the structuring window |
| Sanctioned address match | Delayed | Immediate |
| Rolling behavioral baseline | Approximate | Exact (sliding window) |
| Auditability of rules | Complex code | Readable SQL |
| FATF Travel Rule latency | Delayed reporting | Inline, per-transaction |
FAQ
Does RisingWave support joining a live transaction stream against a sanctions list that updates in real time?
Yes. Both crypto_transactions and sanctioned_addresses are Kafka-backed sources. When a new address is added to the sanctions list topic, RisingWave's sanctioned_flows materialized view will immediately join subsequent transactions against it. The join is maintained incrementally—no full recompute required.
How do you handle chain reorganizations (reorgs) in Ethereum transaction data?
Reorgs are best handled upstream in the data pipeline: the chain indexer should publish a reorg event to Kafka when it detects a reorganized block, and consumers should process it as a retraction. RisingWave supports upsert sources that can update or delete records by primary key, allowing reorged transactions to be corrected in the materialized views.
What is a good risk score threshold for automatic transaction blocking versus human review? This depends on your jurisdiction and risk appetite. A common framework: score ≥ 70 triggers an automated hold pending compliance review; score 40–69 flags for manual review within 24 hours; score < 40 is logged but not actioned. Thresholds should be tuned against historical labeled data from your case management system.
Key Takeaways
- Real-time AML monitoring closes the window that structuring campaigns exploit in batch-based systems.
- RisingWave maintains rolling behavioral windows (1h, 24h) per address as incrementally updated materialized views—no caching or custom aggregation code needed.
- Sanctioned address matching runs as a continuous stream-table join, catching flows the moment they touch a listed address.
- AML rules expressed in SQL are auditable by compliance officers, not just engineers—reducing friction in regulatory examination.

