Real-time KYC monitoring with a streaming database means continuously evaluating customer risk indicators—transaction patterns, sanction list matches, beneficial ownership changes—as events occur, rather than running periodic batch reviews. RisingWave maintains incrementally updated risk profiles via materialized views, triggering re-verification workflows within seconds of a risk-relevant event.
The Problem with Periodic KYC Reviews
Know Your Customer compliance has traditionally been structured around periodic reviews: onboard a customer, do a full KYC check, schedule the next review in 12 months (or 3 years for low-risk customers). This model was designed for a paper-based world.
Modern regulators increasingly expect event-driven KYC: if a customer's transaction behavior changes materially, if they appear on a sanctions list update, or if their beneficial ownership structure changes, the bank should know—and respond—quickly. Fines for KYC failures now routinely exceed $100M, and regulators cite "inadequate ongoing monitoring" as the specific deficiency in most enforcement actions.
A streaming database solves ongoing monitoring by maintaining live customer risk profiles that update automatically as new data arrives, rather than requiring scheduled jobs to recompute them.
Ingesting Customer Activity and Reference Data
-- Customer transactions stream
CREATE SOURCE customer_transactions (
transaction_id VARCHAR,
customer_id VARCHAR,
account_id VARCHAR,
transaction_type VARCHAR,
amount DECIMAL(18,2),
currency VARCHAR(3),
counterparty_id VARCHAR,
counterparty_bank VARCHAR,
country_code VARCHAR(2),
channel VARCHAR,
transaction_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'customer-transactions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Customer master data via CDC from core banking
CREATE SOURCE customer_profiles (
customer_id VARCHAR,
risk_tier VARCHAR, -- 'LOW', 'MEDIUM', 'HIGH', 'PEP'
onboarding_date DATE,
last_kyc_review DATE,
nationality VARCHAR(2),
residence_country VARCHAR(2),
occupation_code VARCHAR,
is_pep BOOLEAN,
is_sanctioned BOOLEAN
) WITH (
connector = 'postgres-cdc',
hostname = 'core-banking.internal',
port = '5432',
username = 'rw_reader',
password = 'secret',
database.name = 'customers',
schema.name = 'public',
table.name = 'customer_profiles'
) FORMAT DEBEZIUM ENCODE JSON;
Building Ongoing Monitoring Profiles
-- High-risk countries reference table
CREATE TABLE high_risk_countries (
country_code VARCHAR(2) PRIMARY KEY,
risk_level VARCHAR,
fatf_status VARCHAR,
last_updated DATE
);
-- Rolling 90-day behavioral profile per customer
CREATE MATERIALIZED VIEW customer_behavioral_profile AS
SELECT
customer_id,
COUNT(*) AS tx_count_90d,
SUM(amount) AS total_volume_90d,
AVG(amount) AS avg_tx_amount,
MAX(amount) AS max_single_tx,
COUNT(DISTINCT counterparty_bank) AS distinct_banks,
COUNT(DISTINCT country_code) AS distinct_countries,
COUNT(*) FILTER (WHERE transaction_type = 'WIRE_TRANSFER') AS wire_count,
COUNT(*) FILTER (WHERE transaction_type = 'CASH_DEPOSIT') AS cash_deposit_count,
SUM(amount) FILTER (WHERE transaction_type = 'CASH_DEPOSIT') AS cash_deposit_volume,
MAX(transaction_time) AS last_activity
FROM customer_transactions
WHERE transaction_time >= NOW() - INTERVAL '90 days'
GROUP BY customer_id;
-- Real-time KYC risk signal view
CREATE MATERIALIZED VIEW kyc_risk_signals AS
SELECT
bp.customer_id,
cp.risk_tier,
cp.last_kyc_review,
cp.is_pep,
-- Signal 1: Cash-intensive activity
CASE WHEN bp.cash_deposit_volume > 10000 THEN 1 ELSE 0 END AS structuring_flag,
-- Signal 2: High cross-border wire activity
CASE WHEN bp.wire_count > 20
AND bp.distinct_countries > 5 THEN 1 ELSE 0 END AS cross_border_flag,
-- Signal 3: Volume spike vs. expected profile
CASE WHEN bp.total_volume_90d > 500000
AND cp.risk_tier = 'LOW' THEN 1 ELSE 0 END AS volume_anomaly_flag,
-- Signal 4: KYC review overdue
CASE WHEN cp.last_kyc_review < CURRENT_DATE - INTERVAL '365 days'
AND cp.risk_tier = 'HIGH' THEN 1 ELSE 0 END AS overdue_review_flag,
bp.tx_count_90d,
bp.total_volume_90d,
bp.distinct_countries,
bp.last_activity
FROM customer_behavioral_profile bp
JOIN customer_profiles cp ON bp.customer_id = cp.customer_id;
High-Risk Country Transaction Monitoring
-- Flag transactions involving high-risk countries
CREATE MATERIALIZED VIEW high_risk_country_transactions AS
SELECT
ct.transaction_id,
ct.customer_id,
ct.amount,
ct.currency,
ct.country_code,
ct.transaction_time,
hrc.risk_level,
hrc.fatf_status,
cp.risk_tier AS customer_risk_tier
FROM customer_transactions ct
JOIN high_risk_countries hrc
ON ct.country_code = hrc.country_code
JOIN customer_profiles cp
ON ct.customer_id = cp.customer_id;
-- Hourly summary of high-risk country exposure
CREATE MATERIALIZED VIEW high_risk_exposure_summary AS
SELECT
window_start,
window_end,
country_code,
COUNT(DISTINCT customer_id) AS unique_customers,
COUNT(*) AS transaction_count,
SUM(amount) AS total_exposure
FROM TUMBLE(high_risk_country_transactions, transaction_time, INTERVAL '1 HOUR')
GROUP BY window_start, window_end, country_code;
Triggering Re-Verification Workflows
-- Sink customers requiring immediate KYC review to workflow system
CREATE SINK kyc_review_triggers AS
SELECT
customer_id,
risk_tier,
(structuring_flag + cross_border_flag + volume_anomaly_flag + overdue_review_flag) AS total_flags,
last_activity,
NOW() AS trigger_time
FROM kyc_risk_signals
WHERE (structuring_flag + cross_border_flag + volume_anomaly_flag + overdue_review_flag) >= 2
OR overdue_review_flag = 1
WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'kyc-review-queue'
) FORMAT PLAIN ENCODE JSON;
Comparison: Periodic vs. Real-Time KYC Monitoring
| Dimension | Periodic Review (Annual/Quarterly) | Streaming with RisingWave |
| Risk detection speed | Months | Seconds |
| Coverage | Scheduled subset | All customers, all events |
| Sanctions response time | Next batch run | Real-time (seconds of list update) |
| Behavioral drift detection | Annual snapshot comparison | Continuous rolling window |
| Regulatory compliance posture | Reactive | Proactive |
| Operational cost | High (manual review cycles) | Lower (automated risk scoring) |
FAQ
Q: How does RisingWave handle sanctions list updates that need to be applied to all customers immediately?
A: Store the sanctions list in a CREATE TABLE (updated via CDC or batch upsert). A materialized view joins this table against customer_profiles—when a new sanctions entry is added, the join result updates immediately, surfacing all affected customers within seconds.
Q: Can we use RisingWave to build a Suspicious Activity Report (SAR) filing queue? A: Yes. Create a materialized view that computes SAR-eligible patterns (structuring, round-dollar transactions, rapid fund movement). Sink high-scoring records to a Kafka topic consumed by your compliance case management system. The SAR narrative generation can be handled downstream.
Q: How do we maintain an audit trail of risk score changes for regulators?
A: Use a CREATE SINK to write every change to kyc_risk_signals to an append-only Kafka topic or an Iceberg table. This creates an immutable, time-stamped audit trail of every risk signal change—queryable for regulatory examination.
Q: Does RisingWave support Personally Identifiable Information (PII) masking? A: RisingWave supports column-level access control and views can be defined to project only non-PII columns for less privileged users. For field-level encryption, encrypt at the application layer before ingesting into Kafka topics that RisingWave consumes.
Q: What happens to the KYC monitoring views if RisingWave restarts? A: RisingWave checkpoints its state to object storage (S3-compatible). On restart, it recovers from the last checkpoint and resumes processing from the corresponding Kafka offsets. Materialized view state is rebuilt from the checkpoint rather than from scratch.
Get Started
- Build your first KYC monitoring pipeline with the RisingWave documentation.
- Connect with compliance and fintech engineers in the RisingWave Slack community.

