Real-Time KYC Monitoring with a Streaming Database

Real-Time KYC Monitoring with a Streaming Database

Real-time KYC monitoring with a streaming database means continuously evaluating customer risk indicators—transaction patterns, sanction list matches, beneficial ownership changes—as events occur, rather than running periodic batch reviews. RisingWave maintains incrementally updated risk profiles via materialized views, triggering re-verification workflows within seconds of a risk-relevant event.

The Problem with Periodic KYC Reviews

Know Your Customer compliance has traditionally been structured around periodic reviews: onboard a customer, do a full KYC check, schedule the next review in 12 months (or 3 years for low-risk customers). This model was designed for a paper-based world.

Modern regulators increasingly expect event-driven KYC: if a customer's transaction behavior changes materially, if they appear on a sanctions list update, or if their beneficial ownership structure changes, the bank should know—and respond—quickly. Fines for KYC failures now routinely exceed $100M, and regulators cite "inadequate ongoing monitoring" as the specific deficiency in most enforcement actions.

A streaming database solves ongoing monitoring by maintaining live customer risk profiles that update automatically as new data arrives, rather than requiring scheduled jobs to recompute them.

Ingesting Customer Activity and Reference Data

-- Customer transactions stream
CREATE SOURCE customer_transactions (
    transaction_id      VARCHAR,
    customer_id         VARCHAR,
    account_id          VARCHAR,
    transaction_type    VARCHAR,
    amount              DECIMAL(18,2),
    currency            VARCHAR(3),
    counterparty_id     VARCHAR,
    counterparty_bank   VARCHAR,
    country_code        VARCHAR(2),
    channel             VARCHAR,
    transaction_time    TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'customer-transactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Customer master data via CDC from core banking
CREATE SOURCE customer_profiles (
    customer_id         VARCHAR,
    risk_tier           VARCHAR,    -- 'LOW', 'MEDIUM', 'HIGH', 'PEP'
    onboarding_date     DATE,
    last_kyc_review     DATE,
    nationality         VARCHAR(2),
    residence_country   VARCHAR(2),
    occupation_code     VARCHAR,
    is_pep              BOOLEAN,
    is_sanctioned       BOOLEAN
) WITH (
    connector = 'postgres-cdc',
    hostname = 'core-banking.internal',
    port = '5432',
    username = 'rw_reader',
    password = 'secret',
    database.name = 'customers',
    schema.name = 'public',
    table.name = 'customer_profiles'
) FORMAT DEBEZIUM ENCODE JSON;

Building Ongoing Monitoring Profiles

-- High-risk countries reference table
CREATE TABLE high_risk_countries (
    country_code    VARCHAR(2) PRIMARY KEY,
    risk_level      VARCHAR,
    fatf_status     VARCHAR,
    last_updated    DATE
);

-- Rolling 90-day behavioral profile per customer
CREATE MATERIALIZED VIEW customer_behavioral_profile AS
SELECT
    customer_id,
    COUNT(*)                                                    AS tx_count_90d,
    SUM(amount)                                                 AS total_volume_90d,
    AVG(amount)                                                 AS avg_tx_amount,
    MAX(amount)                                                 AS max_single_tx,
    COUNT(DISTINCT counterparty_bank)                           AS distinct_banks,
    COUNT(DISTINCT country_code)                                AS distinct_countries,
    COUNT(*) FILTER (WHERE transaction_type = 'WIRE_TRANSFER')  AS wire_count,
    COUNT(*) FILTER (WHERE transaction_type = 'CASH_DEPOSIT')   AS cash_deposit_count,
    SUM(amount) FILTER (WHERE transaction_type = 'CASH_DEPOSIT') AS cash_deposit_volume,
    MAX(transaction_time)                                       AS last_activity
FROM customer_transactions
WHERE transaction_time >= NOW() - INTERVAL '90 days'
GROUP BY customer_id;

-- Real-time KYC risk signal view
CREATE MATERIALIZED VIEW kyc_risk_signals AS
SELECT
    bp.customer_id,
    cp.risk_tier,
    cp.last_kyc_review,
    cp.is_pep,
    -- Signal 1: Cash-intensive activity
    CASE WHEN bp.cash_deposit_volume > 10000 THEN 1 ELSE 0 END      AS structuring_flag,
    -- Signal 2: High cross-border wire activity
    CASE WHEN bp.wire_count > 20
          AND bp.distinct_countries > 5 THEN 1 ELSE 0 END            AS cross_border_flag,
    -- Signal 3: Volume spike vs. expected profile
    CASE WHEN bp.total_volume_90d > 500000
          AND cp.risk_tier = 'LOW' THEN 1 ELSE 0 END                  AS volume_anomaly_flag,
    -- Signal 4: KYC review overdue
    CASE WHEN cp.last_kyc_review < CURRENT_DATE - INTERVAL '365 days'
          AND cp.risk_tier = 'HIGH' THEN 1 ELSE 0 END                 AS overdue_review_flag,
    bp.tx_count_90d,
    bp.total_volume_90d,
    bp.distinct_countries,
    bp.last_activity
FROM customer_behavioral_profile bp
JOIN customer_profiles cp ON bp.customer_id = cp.customer_id;

High-Risk Country Transaction Monitoring

-- Flag transactions involving high-risk countries
CREATE MATERIALIZED VIEW high_risk_country_transactions AS
SELECT
    ct.transaction_id,
    ct.customer_id,
    ct.amount,
    ct.currency,
    ct.country_code,
    ct.transaction_time,
    hrc.risk_level,
    hrc.fatf_status,
    cp.risk_tier AS customer_risk_tier
FROM customer_transactions ct
JOIN high_risk_countries hrc
    ON ct.country_code = hrc.country_code
JOIN customer_profiles cp
    ON ct.customer_id = cp.customer_id;

-- Hourly summary of high-risk country exposure
CREATE MATERIALIZED VIEW high_risk_exposure_summary AS
SELECT
    window_start,
    window_end,
    country_code,
    COUNT(DISTINCT customer_id) AS unique_customers,
    COUNT(*)                    AS transaction_count,
    SUM(amount)                 AS total_exposure
FROM TUMBLE(high_risk_country_transactions, transaction_time, INTERVAL '1 HOUR')
GROUP BY window_start, window_end, country_code;

Triggering Re-Verification Workflows

-- Sink customers requiring immediate KYC review to workflow system
CREATE SINK kyc_review_triggers AS
SELECT
    customer_id,
    risk_tier,
    (structuring_flag + cross_border_flag + volume_anomaly_flag + overdue_review_flag) AS total_flags,
    last_activity,
    NOW() AS trigger_time
FROM kyc_risk_signals
WHERE (structuring_flag + cross_border_flag + volume_anomaly_flag + overdue_review_flag) >= 2
   OR overdue_review_flag = 1
WITH (
    connector = 'kafka',
    properties.bootstrap.server = 'kafka:9092',
    topic = 'kyc-review-queue'
) FORMAT PLAIN ENCODE JSON;

Comparison: Periodic vs. Real-Time KYC Monitoring

DimensionPeriodic Review (Annual/Quarterly)Streaming with RisingWave
Risk detection speedMonthsSeconds
CoverageScheduled subsetAll customers, all events
Sanctions response timeNext batch runReal-time (seconds of list update)
Behavioral drift detectionAnnual snapshot comparisonContinuous rolling window
Regulatory compliance postureReactiveProactive
Operational costHigh (manual review cycles)Lower (automated risk scoring)

FAQ

Q: How does RisingWave handle sanctions list updates that need to be applied to all customers immediately? A: Store the sanctions list in a CREATE TABLE (updated via CDC or batch upsert). A materialized view joins this table against customer_profiles—when a new sanctions entry is added, the join result updates immediately, surfacing all affected customers within seconds.

Q: Can we use RisingWave to build a Suspicious Activity Report (SAR) filing queue? A: Yes. Create a materialized view that computes SAR-eligible patterns (structuring, round-dollar transactions, rapid fund movement). Sink high-scoring records to a Kafka topic consumed by your compliance case management system. The SAR narrative generation can be handled downstream.

Q: How do we maintain an audit trail of risk score changes for regulators? A: Use a CREATE SINK to write every change to kyc_risk_signals to an append-only Kafka topic or an Iceberg table. This creates an immutable, time-stamped audit trail of every risk signal change—queryable for regulatory examination.

Q: Does RisingWave support Personally Identifiable Information (PII) masking? A: RisingWave supports column-level access control and views can be defined to project only non-PII columns for less privileged users. For field-level encryption, encrypt at the application layer before ingesting into Kafka topics that RisingWave consumes.

Q: What happens to the KYC monitoring views if RisingWave restarts? A: RisingWave checkpoints its state to object storage (S3-compatible). On restart, it recovers from the last checkpoint and resumes processing from the corresponding Kafka offsets. Materialized view state is rebuilt from the checkpoint rather than from scratch.


Get Started

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.