Real-Time AML Transaction Monitoring with Streaming SQL

Real-Time AML Transaction Monitoring with Streaming SQL

Real-Time AML Transaction Monitoring with Streaming SQL

Anti-money laundering monitoring requires watching transaction patterns across multiple time windows simultaneously — the last hour, day, week, and month all matter for different typologies. Sliding window aggregations in SQL replace what would otherwise require complex stateful operators in Flink or custom consumer code, and they stay current automatically as new transactions arrive.


Why AML Is a Streaming Problem, Not a Batch Problem

Traditional AML monitoring runs nightly batch jobs. A transaction that arrived at 9am is reviewed — at best — when the overnight job completes. In many cases, the SAR (Suspicious Activity Report) deadline has already been approached before the first review even begins.

Regulators are increasingly expecting real-time or near-real-time monitoring. FinCEN guidance, the EU's AMLD6, and domestic central bank requirements in multiple jurisdictions reference prompt detection as a standard. Nightly batch processing is no longer defensible for high-risk typologies.

The fundamental challenge is that AML patterns are time-bound. Structuring (the deliberate breaking of transactions below reporting thresholds) only becomes visible when you aggregate across multiple transactions over a defined window. Layering through multiple accounts only reveals itself when you can see cross-account flows within a time boundary. These patterns cannot be detected on a single transaction in isolation.


Flink can handle AML workloads. It is genuinely powerful for stateful stream processing over time windows. The problem is the implementation cost.

A structuring detection operator in Flink requires custom state management: you define a state descriptor, a keyed stream, a process function that accesses and updates state per-key, and a timer to evict state at window boundaries. If the logic changes — the window widens from 24 hours to 48 hours, or the threshold drops from $10,000 to $9,500 — you update Java code, rebuild, redeploy, and deal with state migration.

In SQL, the same structuring detection is a GROUP BY with a window filter in the WHERE clause. Changing the window is changing INTERVAL '24 hours' to INTERVAL '48 hours'. No deployment required.

AML rules change frequently as typologies evolve and regulators update guidance. SQL iteration speed is a material operational advantage.


Building AML Monitoring with RisingWave

RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with state persisted in S3. It maintains materialized views incrementally — as new transactions arrive, only the affected aggregates update, not the entire dataset.

Ingest the Transaction Stream

CREATE SOURCE transactions (
    transaction_id   VARCHAR,
    account_id       VARCHAR,
    customer_id      VARCHAR,
    transaction_type VARCHAR,     -- 'cash_deposit', 'cash_withdrawal', 'wire_in',
                                  --  'wire_out', 'transfer', 'check'
    amount           NUMERIC,
    currency         VARCHAR,
    counterparty_account VARCHAR,
    counterparty_name    VARCHAR,
    counterparty_country VARCHAR,
    branch_id        VARCHAR,
    teller_id        VARCHAR,
    event_time       TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'transactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Customer risk tier table (updated from KYC/onboarding system)
CREATE SOURCE customer_risk_tiers (
    customer_id    VARCHAR,
    risk_tier      VARCHAR,    -- 'LOW', 'MEDIUM', 'HIGH', 'PROHIBITED'
    jurisdiction   VARCHAR,
    pep_flag       BOOLEAN,    -- Politically Exposed Person
    updated_at     TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'customer-risk',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Typology 1: Structuring (Smurfing)

Structuring is the deliberate breaking of transactions into amounts below the Currency Transaction Report (CTR) threshold — $10,000 in the US — to avoid mandatory reporting. Detection requires seeing the cumulative daily total per customer.

Daily Cash Aggregation (Rolling 24-Hour Window)

CREATE MATERIALIZED VIEW cash_aggregates_24h AS
SELECT
    customer_id,
    account_id,
    SUM(CASE WHEN transaction_type IN ('cash_deposit', 'cash_withdrawal')
             THEN amount ELSE 0 END)                  AS total_cash_24h,
    SUM(CASE WHEN transaction_type = 'cash_deposit'
             THEN amount ELSE 0 END)                  AS cash_deposits_24h,
    SUM(CASE WHEN transaction_type = 'cash_withdrawal'
             THEN amount ELSE 0 END)                  AS cash_withdrawals_24h,
    COUNT(CASE WHEN transaction_type IN ('cash_deposit', 'cash_withdrawal')
               THEN 1 END)                            AS cash_txn_count_24h,
    MAX(amount)                                       AS largest_single_cash_txn,
    MIN(amount)                                       AS smallest_single_cash_txn
FROM transactions
WHERE event_time > NOW() - INTERVAL '24 hours'
  AND transaction_type IN ('cash_deposit', 'cash_withdrawal')
GROUP BY customer_id, account_id;

Structuring Alert View

CREATE MATERIALIZED VIEW aml_alert_structuring AS
SELECT
    c.customer_id,
    c.account_id,
    c.total_cash_24h,
    c.cash_txn_count_24h,
    c.largest_single_cash_txn,
    NOW()                     AS detected_at,
    'STRUCTURING'             AS typology,
    'HIGH'                    AS alert_severity,
    FORMAT(
        'Customer accumulated %s in cash transactions over 24h across %s transactions, '
        'with no single transaction exceeding CTR threshold (%s max)',
        c.total_cash_24h::TEXT,
        c.cash_txn_count_24h::TEXT,
        c.largest_single_cash_txn::TEXT
    )                         AS narrative
FROM cash_aggregates_24h c
WHERE c.total_cash_24h >= 9000          -- approaching threshold
  AND c.largest_single_cash_txn < 10000 -- no single transaction triggers CTR
  AND c.cash_txn_count_24h >= 2;        -- multiple transactions required

Typology 2: Velocity Anomaly — Weekly and Monthly Windows

Money laundering often involves unusual spikes in transaction activity relative to a customer's profile. Multiple time windows give different perspectives:

-- 7-day rolling aggregates
CREATE MATERIALIZED VIEW transaction_velocity_7d AS
SELECT
    customer_id,
    COUNT(*)                          AS txn_count_7d,
    SUM(amount)                       AS total_volume_7d,
    COUNT(DISTINCT counterparty_account) AS unique_counterparties_7d,
    COUNT(DISTINCT counterparty_country) AS unique_countries_7d,
    SUM(CASE WHEN transaction_type = 'wire_out' THEN amount ELSE 0 END)
                                      AS wire_out_7d,
    SUM(CASE WHEN transaction_type = 'wire_in'  THEN amount ELSE 0 END)
                                      AS wire_in_7d
FROM transactions
WHERE event_time > NOW() - INTERVAL '7 days'
GROUP BY customer_id;

-- 30-day rolling baseline
CREATE MATERIALIZED VIEW transaction_baseline_30d AS
SELECT
    customer_id,
    COUNT(*)                          AS txn_count_30d,
    SUM(amount)                       AS total_volume_30d,
    COUNT(DISTINCT counterparty_account) AS counterparties_30d,
    COUNT(DISTINCT counterparty_country) AS countries_30d,
    -- Normalize to weekly rate for comparison
    COUNT(*) / 4.0                    AS avg_weekly_txn_count,
    SUM(amount) / 4.0                 AS avg_weekly_volume
FROM transactions
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY customer_id;

-- Alert: this week's activity is 3x the monthly average
CREATE MATERIALIZED VIEW aml_alert_velocity_spike AS
SELECT
    v.customer_id,
    v.txn_count_7d,
    v.total_volume_7d,
    b.avg_weekly_txn_count,
    b.avg_weekly_volume,
    v.txn_count_7d / NULLIF(b.avg_weekly_txn_count, 0) AS volume_ratio,
    NOW()                     AS detected_at,
    'VELOCITY_SPIKE'          AS typology,
    'MEDIUM'                  AS alert_severity,
    FORMAT(
        'Weekly transaction volume (%s) is %.1fx the 30-day weekly average (%s)',
        v.total_volume_7d::TEXT,
        v.total_volume_7d / NULLIF(b.avg_weekly_volume, 0),
        b.avg_weekly_volume::TEXT
    )                         AS narrative
FROM transaction_velocity_7d v
JOIN transaction_baseline_30d b ON v.customer_id = b.customer_id
WHERE v.total_volume_7d > b.avg_weekly_volume * 3
  AND b.avg_weekly_volume > 0;

Typology 3: High-Risk Jurisdiction Wire Activity

Wires to or from high-risk or sanctioned jurisdictions require immediate attention:

-- Reference table: high-risk countries (maintained separately)
CREATE TABLE high_risk_jurisdictions (
    country_code     VARCHAR PRIMARY KEY,
    risk_level       VARCHAR,     -- 'HIGH', 'SANCTIONED', 'MONITORED'
    fatf_status      VARCHAR,     -- 'BLACKLIST', 'GREYLIST', 'COMPLIANT'
    updated_at       TIMESTAMPTZ
);

-- Rolling 7-day wire activity to/from high-risk jurisdictions
CREATE MATERIALIZED VIEW hri_wire_activity_7d AS
SELECT
    t.customer_id,
    t.counterparty_country,
    j.risk_level,
    j.fatf_status,
    COUNT(*)          AS wire_count_7d,
    SUM(t.amount)     AS wire_total_7d,
    MAX(t.event_time) AS last_wire_time
FROM transactions t
JOIN high_risk_jurisdictions j ON t.counterparty_country = j.country_code
WHERE t.event_time > NOW() - INTERVAL '7 days'
  AND t.transaction_type IN ('wire_out', 'wire_in')
GROUP BY t.customer_id, t.counterparty_country, j.risk_level, j.fatf_status;

-- Alert immediately on sanctioned country wires
CREATE MATERIALIZED VIEW aml_alert_sanctions AS
SELECT
    h.customer_id,
    h.counterparty_country,
    h.wire_total_7d,
    h.wire_count_7d,
    h.fatf_status,
    NOW()              AS detected_at,
    'SANCTIONS_EXPOSURE' AS typology,
    CASE WHEN h.fatf_status = 'BLACKLIST' THEN 'CRITICAL'
         WHEN h.fatf_status = 'GREYLIST'  THEN 'HIGH'
         ELSE 'MEDIUM' END AS alert_severity,
    FORMAT(
        '%s wires totaling %s to/from %s (%s jurisdiction) in last 7 days',
        h.wire_count_7d::TEXT,
        h.wire_total_7d::TEXT,
        h.counterparty_country,
        h.fatf_status
    )                  AS narrative
FROM hri_wire_activity_7d h
WHERE h.fatf_status IN ('BLACKLIST', 'GREYLIST');

Typology 4: Round-Trip Funds (Layering Detection)

Layering involves moving money through multiple accounts to obscure its origin. A simple signature is money leaving an account and returning within a short window:

CREATE MATERIALIZED VIEW aml_alert_round_trip AS
SELECT
    out_txn.customer_id,
    out_txn.account_id,
    out_txn.transaction_id          AS outflow_txn_id,
    in_txn.transaction_id           AS inflow_txn_id,
    out_txn.amount                  AS outflow_amount,
    in_txn.amount                   AS inflow_amount,
    out_txn.counterparty_account    AS via_account,
    out_txn.event_time              AS outflow_time,
    in_txn.event_time               AS inflow_time,
    'ROUND_TRIP'                    AS typology,
    'HIGH'                          AS alert_severity,
    FORMAT(
        'Outflow of %s to account %s followed by near-equal inflow of %s within 24h',
        out_txn.amount::TEXT,
        out_txn.counterparty_account,
        in_txn.amount::TEXT
    )                               AS narrative
FROM transactions out_txn
JOIN transactions in_txn
  ON  out_txn.account_id = in_txn.account_id
  AND out_txn.customer_id = in_txn.customer_id
  AND out_txn.transaction_type IN ('wire_out', 'transfer')
  AND in_txn.transaction_type  IN ('wire_in',  'transfer')
  AND out_txn.amount > 5000
  AND in_txn.amount BETWEEN out_txn.amount * 0.85 AND out_txn.amount * 1.15
  AND in_txn.event_time > out_txn.event_time
  AND in_txn.event_time < out_txn.event_time + INTERVAL '24 hours';

Enriching Alerts with Customer Risk Tier

AML alerts should be prioritized by customer risk. A structuring alert on a PEP or high-risk-tier customer is more urgent than the same pattern on a standard retail customer:

CREATE MATERIALIZED VIEW aml_alerts_enriched AS
SELECT
    a.customer_id,
    a.typology,
    a.alert_severity,
    a.narrative,
    a.detected_at,
    r.risk_tier,
    r.pep_flag,
    r.jurisdiction,
    -- Escalate severity if customer is high-risk or PEP
    CASE
        WHEN r.pep_flag = TRUE AND a.alert_severity != 'CRITICAL' THEN 'CRITICAL'
        WHEN r.risk_tier = 'HIGH' AND a.alert_severity = 'MEDIUM'  THEN 'HIGH'
        ELSE a.alert_severity
    END AS effective_severity
FROM (
    SELECT customer_id, account_id, typology, alert_severity, narrative, detected_at
    FROM aml_alert_structuring
    UNION ALL
    SELECT customer_id, NULL, typology, alert_severity, narrative, detected_at
    FROM aml_alert_velocity_spike
    UNION ALL
    SELECT customer_id, NULL, typology, alert_severity, narrative, NOW()
    FROM aml_alert_sanctions
    UNION ALL
    SELECT customer_id, account_id, typology, alert_severity, narrative, NOW()
    FROM aml_alert_round_trip
) a
LEFT JOIN customer_risk_tiers r ON a.customer_id = r.customer_id;

Routing Alerts to Case Management

CREATE SINK aml_alerts_sink
FROM aml_alerts_enriched
WHERE effective_severity IN ('HIGH', 'CRITICAL')
WITH (
    connector = 'kafka',
    topic = 'aml-alerts',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

Your compliance case management system (Actimize, Quantexa, or a custom tool) consumes this topic and creates investigation cases. Critically, CRITICAL-severity alerts arrive in the case management system within seconds of the transaction occurring — not the next morning.


AML Monitoring Architecture Comparison

ApproachRule Update LatencyAlert LatencyWindow FlexibilityOperational Cost
Nightly batch (SQL/Spark)Days (deploy cycle)Up to 24hLimited by job scheduleMedium
Apache Flink (custom operators)Hours (build + deploy)SecondsHigh (custom state)High
CEP engine (Drools, Esper)HoursSecondsMediumHigh
Streaming database (RisingWave)Seconds (ALTER VIEW)SecondsHigh (SQL windows)Low

FAQ

Does this replace a dedicated AML system like Actimize or Quantexa? No — dedicated AML platforms provide case management workflows, regulatory reporting, model libraries, and audit trails that go well beyond what a streaming database provides. RisingWave can feed these systems with real-time alerts and enriched data, replacing their batch ingestion pipelines with live streams.

How do I tune the structuring threshold for different regulatory jurisdictions? Store thresholds in a configuration table and join against it in the alert views. The CTR threshold differs by jurisdiction and transaction type. A single UPDATE to the config table updates the effective threshold immediately.

Can I detect multi-account structuring (one customer using multiple accounts)? Yes. Group by customer_id rather than account_id in the aggregation views. If your data model links related accounts (household, business group), add those relationships as a reference table and join before aggregating.

How does the system handle corrections and reversals? When a transaction is reversed, a reversal event arrives in the stream. Include reversal handling in your aggregation logic by treating reversals as negative amounts, or by filtering out transaction IDs that have corresponding reversal events.

What audit trail does this create for regulatory examination? Materialized views maintain their computation history in RisingWave's changelog. For formal regulatory audit trails, sink all alerts (including resolved ones) to an immutable store (S3, a dedicated audit database, or an append-only Kafka topic). Regulators typically want to see what was detected, when, and what action was taken.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.