Real-Time AML Transaction Monitoring with Streaming SQL
Anti-money laundering monitoring requires watching transaction patterns across multiple time windows simultaneously — the last hour, day, week, and month all matter for different typologies. Sliding window aggregations in SQL replace what would otherwise require complex stateful operators in Flink or custom consumer code, and they stay current automatically as new transactions arrive.
Why AML Is a Streaming Problem, Not a Batch Problem
Traditional AML monitoring runs nightly batch jobs. A transaction that arrived at 9am is reviewed — at best — when the overnight job completes. In many cases, the SAR (Suspicious Activity Report) deadline has already been approached before the first review even begins.
Regulators are increasingly expecting real-time or near-real-time monitoring. FinCEN guidance, the EU's AMLD6, and domestic central bank requirements in multiple jurisdictions reference prompt detection as a standard. Nightly batch processing is no longer defensible for high-risk typologies.
The fundamental challenge is that AML patterns are time-bound. Structuring (the deliberate breaking of transactions below reporting thresholds) only becomes visible when you aggregate across multiple transactions over a defined window. Layering through multiple accounts only reveals itself when you can see cross-account flows within a time boundary. These patterns cannot be detected on a single transaction in isolation.
The Flink Complexity Problem
Flink can handle AML workloads. It is genuinely powerful for stateful stream processing over time windows. The problem is the implementation cost.
A structuring detection operator in Flink requires custom state management: you define a state descriptor, a keyed stream, a process function that accesses and updates state per-key, and a timer to evict state at window boundaries. If the logic changes — the window widens from 24 hours to 48 hours, or the threshold drops from $10,000 to $9,500 — you update Java code, rebuild, redeploy, and deal with state migration.
In SQL, the same structuring detection is a GROUP BY with a window filter in the WHERE clause. Changing the window is changing INTERVAL '24 hours' to INTERVAL '48 hours'. No deployment required.
AML rules change frequently as typologies evolve and regulators update guidance. SQL iteration speed is a material operational advantage.
Building AML Monitoring with RisingWave
RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with state persisted in S3. It maintains materialized views incrementally — as new transactions arrive, only the affected aggregates update, not the entire dataset.
Ingest the Transaction Stream
CREATE SOURCE transactions (
transaction_id VARCHAR,
account_id VARCHAR,
customer_id VARCHAR,
transaction_type VARCHAR, -- 'cash_deposit', 'cash_withdrawal', 'wire_in',
-- 'wire_out', 'transfer', 'check'
amount NUMERIC,
currency VARCHAR,
counterparty_account VARCHAR,
counterparty_name VARCHAR,
counterparty_country VARCHAR,
branch_id VARCHAR,
teller_id VARCHAR,
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'transactions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Customer risk tier table (updated from KYC/onboarding system)
CREATE SOURCE customer_risk_tiers (
customer_id VARCHAR,
risk_tier VARCHAR, -- 'LOW', 'MEDIUM', 'HIGH', 'PROHIBITED'
jurisdiction VARCHAR,
pep_flag BOOLEAN, -- Politically Exposed Person
updated_at TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'customer-risk',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
Typology 1: Structuring (Smurfing)
Structuring is the deliberate breaking of transactions into amounts below the Currency Transaction Report (CTR) threshold — $10,000 in the US — to avoid mandatory reporting. Detection requires seeing the cumulative daily total per customer.
Daily Cash Aggregation (Rolling 24-Hour Window)
CREATE MATERIALIZED VIEW cash_aggregates_24h AS
SELECT
customer_id,
account_id,
SUM(CASE WHEN transaction_type IN ('cash_deposit', 'cash_withdrawal')
THEN amount ELSE 0 END) AS total_cash_24h,
SUM(CASE WHEN transaction_type = 'cash_deposit'
THEN amount ELSE 0 END) AS cash_deposits_24h,
SUM(CASE WHEN transaction_type = 'cash_withdrawal'
THEN amount ELSE 0 END) AS cash_withdrawals_24h,
COUNT(CASE WHEN transaction_type IN ('cash_deposit', 'cash_withdrawal')
THEN 1 END) AS cash_txn_count_24h,
MAX(amount) AS largest_single_cash_txn,
MIN(amount) AS smallest_single_cash_txn
FROM transactions
WHERE event_time > NOW() - INTERVAL '24 hours'
AND transaction_type IN ('cash_deposit', 'cash_withdrawal')
GROUP BY customer_id, account_id;
Structuring Alert View
CREATE MATERIALIZED VIEW aml_alert_structuring AS
SELECT
c.customer_id,
c.account_id,
c.total_cash_24h,
c.cash_txn_count_24h,
c.largest_single_cash_txn,
NOW() AS detected_at,
'STRUCTURING' AS typology,
'HIGH' AS alert_severity,
FORMAT(
'Customer accumulated %s in cash transactions over 24h across %s transactions, '
'with no single transaction exceeding CTR threshold (%s max)',
c.total_cash_24h::TEXT,
c.cash_txn_count_24h::TEXT,
c.largest_single_cash_txn::TEXT
) AS narrative
FROM cash_aggregates_24h c
WHERE c.total_cash_24h >= 9000 -- approaching threshold
AND c.largest_single_cash_txn < 10000 -- no single transaction triggers CTR
AND c.cash_txn_count_24h >= 2; -- multiple transactions required
Typology 2: Velocity Anomaly — Weekly and Monthly Windows
Money laundering often involves unusual spikes in transaction activity relative to a customer's profile. Multiple time windows give different perspectives:
-- 7-day rolling aggregates
CREATE MATERIALIZED VIEW transaction_velocity_7d AS
SELECT
customer_id,
COUNT(*) AS txn_count_7d,
SUM(amount) AS total_volume_7d,
COUNT(DISTINCT counterparty_account) AS unique_counterparties_7d,
COUNT(DISTINCT counterparty_country) AS unique_countries_7d,
SUM(CASE WHEN transaction_type = 'wire_out' THEN amount ELSE 0 END)
AS wire_out_7d,
SUM(CASE WHEN transaction_type = 'wire_in' THEN amount ELSE 0 END)
AS wire_in_7d
FROM transactions
WHERE event_time > NOW() - INTERVAL '7 days'
GROUP BY customer_id;
-- 30-day rolling baseline
CREATE MATERIALIZED VIEW transaction_baseline_30d AS
SELECT
customer_id,
COUNT(*) AS txn_count_30d,
SUM(amount) AS total_volume_30d,
COUNT(DISTINCT counterparty_account) AS counterparties_30d,
COUNT(DISTINCT counterparty_country) AS countries_30d,
-- Normalize to weekly rate for comparison
COUNT(*) / 4.0 AS avg_weekly_txn_count,
SUM(amount) / 4.0 AS avg_weekly_volume
FROM transactions
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY customer_id;
-- Alert: this week's activity is 3x the monthly average
CREATE MATERIALIZED VIEW aml_alert_velocity_spike AS
SELECT
v.customer_id,
v.txn_count_7d,
v.total_volume_7d,
b.avg_weekly_txn_count,
b.avg_weekly_volume,
v.txn_count_7d / NULLIF(b.avg_weekly_txn_count, 0) AS volume_ratio,
NOW() AS detected_at,
'VELOCITY_SPIKE' AS typology,
'MEDIUM' AS alert_severity,
FORMAT(
'Weekly transaction volume (%s) is %.1fx the 30-day weekly average (%s)',
v.total_volume_7d::TEXT,
v.total_volume_7d / NULLIF(b.avg_weekly_volume, 0),
b.avg_weekly_volume::TEXT
) AS narrative
FROM transaction_velocity_7d v
JOIN transaction_baseline_30d b ON v.customer_id = b.customer_id
WHERE v.total_volume_7d > b.avg_weekly_volume * 3
AND b.avg_weekly_volume > 0;
Typology 3: High-Risk Jurisdiction Wire Activity
Wires to or from high-risk or sanctioned jurisdictions require immediate attention:
-- Reference table: high-risk countries (maintained separately)
CREATE TABLE high_risk_jurisdictions (
country_code VARCHAR PRIMARY KEY,
risk_level VARCHAR, -- 'HIGH', 'SANCTIONED', 'MONITORED'
fatf_status VARCHAR, -- 'BLACKLIST', 'GREYLIST', 'COMPLIANT'
updated_at TIMESTAMPTZ
);
-- Rolling 7-day wire activity to/from high-risk jurisdictions
CREATE MATERIALIZED VIEW hri_wire_activity_7d AS
SELECT
t.customer_id,
t.counterparty_country,
j.risk_level,
j.fatf_status,
COUNT(*) AS wire_count_7d,
SUM(t.amount) AS wire_total_7d,
MAX(t.event_time) AS last_wire_time
FROM transactions t
JOIN high_risk_jurisdictions j ON t.counterparty_country = j.country_code
WHERE t.event_time > NOW() - INTERVAL '7 days'
AND t.transaction_type IN ('wire_out', 'wire_in')
GROUP BY t.customer_id, t.counterparty_country, j.risk_level, j.fatf_status;
-- Alert immediately on sanctioned country wires
CREATE MATERIALIZED VIEW aml_alert_sanctions AS
SELECT
h.customer_id,
h.counterparty_country,
h.wire_total_7d,
h.wire_count_7d,
h.fatf_status,
NOW() AS detected_at,
'SANCTIONS_EXPOSURE' AS typology,
CASE WHEN h.fatf_status = 'BLACKLIST' THEN 'CRITICAL'
WHEN h.fatf_status = 'GREYLIST' THEN 'HIGH'
ELSE 'MEDIUM' END AS alert_severity,
FORMAT(
'%s wires totaling %s to/from %s (%s jurisdiction) in last 7 days',
h.wire_count_7d::TEXT,
h.wire_total_7d::TEXT,
h.counterparty_country,
h.fatf_status
) AS narrative
FROM hri_wire_activity_7d h
WHERE h.fatf_status IN ('BLACKLIST', 'GREYLIST');
Typology 4: Round-Trip Funds (Layering Detection)
Layering involves moving money through multiple accounts to obscure its origin. A simple signature is money leaving an account and returning within a short window:
CREATE MATERIALIZED VIEW aml_alert_round_trip AS
SELECT
out_txn.customer_id,
out_txn.account_id,
out_txn.transaction_id AS outflow_txn_id,
in_txn.transaction_id AS inflow_txn_id,
out_txn.amount AS outflow_amount,
in_txn.amount AS inflow_amount,
out_txn.counterparty_account AS via_account,
out_txn.event_time AS outflow_time,
in_txn.event_time AS inflow_time,
'ROUND_TRIP' AS typology,
'HIGH' AS alert_severity,
FORMAT(
'Outflow of %s to account %s followed by near-equal inflow of %s within 24h',
out_txn.amount::TEXT,
out_txn.counterparty_account,
in_txn.amount::TEXT
) AS narrative
FROM transactions out_txn
JOIN transactions in_txn
ON out_txn.account_id = in_txn.account_id
AND out_txn.customer_id = in_txn.customer_id
AND out_txn.transaction_type IN ('wire_out', 'transfer')
AND in_txn.transaction_type IN ('wire_in', 'transfer')
AND out_txn.amount > 5000
AND in_txn.amount BETWEEN out_txn.amount * 0.85 AND out_txn.amount * 1.15
AND in_txn.event_time > out_txn.event_time
AND in_txn.event_time < out_txn.event_time + INTERVAL '24 hours';
Enriching Alerts with Customer Risk Tier
AML alerts should be prioritized by customer risk. A structuring alert on a PEP or high-risk-tier customer is more urgent than the same pattern on a standard retail customer:
CREATE MATERIALIZED VIEW aml_alerts_enriched AS
SELECT
a.customer_id,
a.typology,
a.alert_severity,
a.narrative,
a.detected_at,
r.risk_tier,
r.pep_flag,
r.jurisdiction,
-- Escalate severity if customer is high-risk or PEP
CASE
WHEN r.pep_flag = TRUE AND a.alert_severity != 'CRITICAL' THEN 'CRITICAL'
WHEN r.risk_tier = 'HIGH' AND a.alert_severity = 'MEDIUM' THEN 'HIGH'
ELSE a.alert_severity
END AS effective_severity
FROM (
SELECT customer_id, account_id, typology, alert_severity, narrative, detected_at
FROM aml_alert_structuring
UNION ALL
SELECT customer_id, NULL, typology, alert_severity, narrative, detected_at
FROM aml_alert_velocity_spike
UNION ALL
SELECT customer_id, NULL, typology, alert_severity, narrative, NOW()
FROM aml_alert_sanctions
UNION ALL
SELECT customer_id, account_id, typology, alert_severity, narrative, NOW()
FROM aml_alert_round_trip
) a
LEFT JOIN customer_risk_tiers r ON a.customer_id = r.customer_id;
Routing Alerts to Case Management
CREATE SINK aml_alerts_sink
FROM aml_alerts_enriched
WHERE effective_severity IN ('HIGH', 'CRITICAL')
WITH (
connector = 'kafka',
topic = 'aml-alerts',
properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;
Your compliance case management system (Actimize, Quantexa, or a custom tool) consumes this topic and creates investigation cases. Critically, CRITICAL-severity alerts arrive in the case management system within seconds of the transaction occurring — not the next morning.
AML Monitoring Architecture Comparison
| Approach | Rule Update Latency | Alert Latency | Window Flexibility | Operational Cost |
| Nightly batch (SQL/Spark) | Days (deploy cycle) | Up to 24h | Limited by job schedule | Medium |
| Apache Flink (custom operators) | Hours (build + deploy) | Seconds | High (custom state) | High |
| CEP engine (Drools, Esper) | Hours | Seconds | Medium | High |
| Streaming database (RisingWave) | Seconds (ALTER VIEW) | Seconds | High (SQL windows) | Low |
FAQ
Does this replace a dedicated AML system like Actimize or Quantexa? No — dedicated AML platforms provide case management workflows, regulatory reporting, model libraries, and audit trails that go well beyond what a streaming database provides. RisingWave can feed these systems with real-time alerts and enriched data, replacing their batch ingestion pipelines with live streams.
How do I tune the structuring threshold for different regulatory jurisdictions?
Store thresholds in a configuration table and join against it in the alert views. The CTR threshold differs by jurisdiction and transaction type. A single UPDATE to the config table updates the effective threshold immediately.
Can I detect multi-account structuring (one customer using multiple accounts)?
Yes. Group by customer_id rather than account_id in the aggregation views. If your data model links related accounts (household, business group), add those relationships as a reference table and join before aggregating.
How does the system handle corrections and reversals? When a transaction is reversed, a reversal event arrives in the stream. Include reversal handling in your aggregation logic by treating reversals as negative amounts, or by filtering out transaction IDs that have corresponding reversal events.
What audit trail does this create for regulatory examination? Materialized views maintain their computation history in RisingWave's changelog. For formal regulatory audit trails, sink all alerts (including resolved ones) to an immutable store (S3, a dedicated audit database, or an append-only Kafka topic). Regulators typically want to see what was detected, when, and what action was taken.

