Building a Real-Time Transaction Risk Scoring System with SQL
Risk scoring at transaction time requires two things that seem contradictory: millisecond response and awareness of long-running historical patterns. A streaming database resolves this by pre-computing risk signals as materialized views that update continuously, so the final score is assembled at query time from already-computed results — not recalculated from scratch per transaction.
Why Transaction Risk Scoring Is Hard
Every payment system needs a risk score attached to a transaction before it is approved or declined. The score has to be ready in under 100 milliseconds, often under 50. But the signals that feed the score — average spend per merchant, frequency of cross-border transactions, account age, recent dispute history — are computed from weeks or months of historical data.
The naive approach is to pre-compute everything in a nightly batch job and look it up at decision time. The problem is that fraud patterns evolve intraday. A card that was clean at midnight may have been compromised by 10am. A user who normally spends $50 per transaction may have had their account takeover detected in a batch job that won't run for another 14 hours.
Batch-based risk signals are stale by definition. The question is whether staleness is acceptable. For risk scoring, it usually isn't.
The Architecture Trap: Microservices for Everything
Many fintech risk platforms end up with a patchwork of services: a feature store updated by Spark jobs, a scoring service that calls an ML model, a Redis cache for low-latency lookups, and a Kafka consumer that ties it together. Each component adds latency, operational surface area, and a failure mode.
The seams between these systems create the real risk: if your feature store is 15 minutes behind because a Spark job failed, you're scoring transactions on stale signals without knowing it.
A streaming database collapses this stack. It ingests the raw event stream, maintains materialized aggregates continuously, and serves the assembled score at query time — all in one system.
Building the Risk Scoring Pipeline with RisingWave
RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with state stored durably in S3. You write SQL; it handles the incremental computation.
Ingest Transaction and Reference Data
-- Live transaction stream from Kafka
CREATE SOURCE transactions (
transaction_id VARCHAR,
user_id VARCHAR,
card_id VARCHAR,
merchant_id VARCHAR,
merchant_country VARCHAR,
amount NUMERIC,
currency VARCHAR,
channel VARCHAR, -- 'online', 'pos', 'atm'
device_id VARCHAR,
ip_address VARCHAR,
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'transactions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Dispute events: charge-backs and fraud reports
CREATE SOURCE dispute_events (
dispute_id VARCHAR,
transaction_id VARCHAR,
user_id VARCHAR,
dispute_type VARCHAR, -- 'chargeback', 'fraud_report', 'friendly_fraud'
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'disputes',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
Signal 1: User Behavioral Baseline (30-Day Window)
The user's normal spending pattern is the most important baseline for anomaly detection:
CREATE MATERIALIZED VIEW user_risk_baseline_30d AS
SELECT
user_id,
COUNT(*) AS total_txn_30d,
AVG(amount) AS avg_amount,
STDDEV(amount) AS stddev_amount,
PERCENTILE_CONT(0.95) WITHIN GROUP
(ORDER BY amount) AS p95_amount,
COUNT(DISTINCT merchant_country) AS unique_countries,
COUNT(DISTINCT merchant_id) AS unique_merchants,
COUNT(DISTINCT device_id) AS unique_devices,
SUM(CASE WHEN channel = 'online'
THEN 1 ELSE 0 END) AS online_txn_count,
SUM(CASE WHEN channel = 'atm'
THEN 1 ELSE 0 END) AS atm_txn_count
FROM transactions
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY user_id;
Signal 2: Recent High-Frequency Activity (Last 1 Hour)
Short-window velocity captures account takeover and card testing patterns:
CREATE MATERIALIZED VIEW user_velocity_1h AS
SELECT
user_id,
COUNT(*) AS txn_count_1h,
SUM(amount) AS total_amount_1h,
COUNT(DISTINCT merchant_id) AS merchants_1h,
COUNT(DISTINCT merchant_country) AS countries_1h,
COUNT(DISTINCT device_id) AS devices_1h,
MAX(amount) AS max_single_txn_1h
FROM transactions
WHERE event_time > NOW() - INTERVAL '1 hour'
GROUP BY user_id;
Signal 3: User Dispute History (90-Day Window)
Prior disputes are a strong predictor of future disputes:
CREATE MATERIALIZED VIEW user_dispute_history_90d AS
SELECT
user_id,
COUNT(*) AS total_disputes,
SUM(CASE WHEN dispute_type = 'chargeback'
THEN 1 ELSE 0 END) AS chargebacks,
SUM(CASE WHEN dispute_type = 'fraud_report'
THEN 1 ELSE 0 END) AS fraud_reports,
MAX(event_time) AS last_dispute_time
FROM dispute_events
WHERE event_time > NOW() - INTERVAL '90 days'
GROUP BY user_id;
Signal 4: Merchant Risk Profile
Aggregate the risk profile of each merchant based on transaction history:
CREATE MATERIALIZED VIEW merchant_risk_profile AS
SELECT
merchant_id,
COUNT(*) AS total_txn_count,
AVG(amount) AS avg_txn_amount,
COUNT(DISTINCT user_id) AS unique_users,
-- dispute rate requires joining with disputes, simplified here
COUNT(DISTINCT merchant_country) AS countries_active
FROM transactions
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY merchant_id;
Assembling the Risk Score
Now compose all signals into a scored view. Each signal contributes points to a risk score — this is a rule-based scoring model, but the same pattern works if you're calling an external ML model via UDF:
CREATE MATERIALIZED VIEW transaction_risk_scores AS
SELECT
t.transaction_id,
t.user_id,
t.card_id,
t.merchant_id,
t.amount,
t.channel,
t.event_time,
-- Raw signals
COALESCE(v.txn_count_1h, 0) AS velocity_1h,
COALESCE(v.countries_1h, 0) AS countries_1h,
COALESCE(v.devices_1h, 1) AS devices_1h,
COALESCE(b.avg_amount, 0) AS user_avg_amount,
COALESCE(d.total_disputes, 0) AS dispute_count_90d,
-- Amount deviation (z-score capped at 10)
LEAST(
CASE WHEN COALESCE(b.stddev_amount, 0) > 0
THEN (t.amount - b.avg_amount) / b.stddev_amount
ELSE 0 END,
10
) AS amount_z_score,
-- Composite risk score (0-100)
LEAST(100, GREATEST(0,
-- High velocity: up to 30 points
CASE WHEN COALESCE(v.txn_count_1h, 0) > 20 THEN 30
WHEN COALESCE(v.txn_count_1h, 0) > 10 THEN 20
WHEN COALESCE(v.txn_count_1h, 0) > 5 THEN 10
ELSE 0 END
-- Multi-country in 1h: up to 25 points
+ CASE WHEN COALESCE(v.countries_1h, 0) > 3 THEN 25
WHEN COALESCE(v.countries_1h, 0) > 1 THEN 15
ELSE 0 END
-- Amount anomaly: up to 20 points
+ CASE WHEN t.amount > COALESCE(b.avg_amount, 0) * 5 THEN 20
WHEN t.amount > COALESCE(b.avg_amount, 0) * 3 THEN 10
ELSE 0 END
-- Prior disputes: up to 25 points
+ CASE WHEN COALESCE(d.total_disputes, 0) > 3 THEN 25
WHEN COALESCE(d.total_disputes, 0) > 1 THEN 15
WHEN COALESCE(d.total_disputes, 0) = 1 THEN 5
ELSE 0 END
-- New device in online channel: 10 points
+ CASE WHEN t.channel = 'online'
AND COALESCE(v.devices_1h, 1) > 1 THEN 10
ELSE 0 END
)) AS risk_score,
-- Risk tier
CASE
WHEN LEAST(100, GREATEST(0,
CASE WHEN COALESCE(v.txn_count_1h, 0) > 20 THEN 30
WHEN COALESCE(v.txn_count_1h, 0) > 10 THEN 20
WHEN COALESCE(v.txn_count_1h, 0) > 5 THEN 10
ELSE 0 END
+ CASE WHEN COALESCE(v.countries_1h, 0) > 3 THEN 25
WHEN COALESCE(v.countries_1h, 0) > 1 THEN 15
ELSE 0 END
+ CASE WHEN t.amount > COALESCE(b.avg_amount, 0) * 5 THEN 20
WHEN t.amount > COALESCE(b.avg_amount, 0) * 3 THEN 10
ELSE 0 END
+ CASE WHEN COALESCE(d.total_disputes, 0) > 3 THEN 25
WHEN COALESCE(d.total_disputes, 0) > 1 THEN 15
WHEN COALESCE(d.total_disputes, 0) = 1 THEN 5
ELSE 0 END
+ CASE WHEN t.channel = 'online'
AND COALESCE(v.devices_1h, 1) > 1 THEN 10
ELSE 0 END
)) >= 70 THEN 'HIGH'
WHEN LEAST(100, GREATEST(0,
CASE WHEN COALESCE(v.txn_count_1h, 0) > 20 THEN 30
ELSE 0 END
)) >= 40 THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_tier
FROM transactions t
LEFT JOIN user_velocity_1h v ON t.user_id = v.user_id
LEFT JOIN user_risk_baseline_30d b ON t.user_id = b.user_id
LEFT JOIN user_dispute_history_90d d ON t.user_id = d.user_id;
Serving the Score to Decision Systems
Applications query the score synchronously at decision time:
-- Called by the payment authorization service
SELECT risk_score, risk_tier, velocity_1h, amount_z_score, dispute_count_90d
FROM transaction_risk_scores
WHERE transaction_id = $1;
Because the underlying signal views (user_velocity_1h, user_risk_baseline_30d, etc.) are already computed and maintained continuously, this query returns in single-digit milliseconds.
High-risk transactions can also be sinked downstream automatically:
CREATE SINK high_risk_review_queue
FROM transaction_risk_scores
WHERE risk_tier = 'HIGH'
WITH (
connector = 'kafka',
topic = 'high-risk-review',
properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;
Why Pre-Computed Signals Beat Point-in-Time Calculation
| Approach | Latency at Decision Time | Signal Freshness | Operational Complexity |
| Batch feature store (Spark) | Low (cache hit) | Hours stale | High (Spark cluster + scheduler) |
| Real-time per-request calculation | High (db queries under load) | Fresh | Medium |
| Streaming materialized views | Low (pre-computed) | Seconds or less | Low (single system) |
| ML feature store + streaming | Low | Near-real-time | Very high (multiple systems) |
The streaming materialized view approach delivers fresh signals with low query-time latency because the computation happens continuously in the background, not at decision time.
Keeping Score Weights Up to Date
The rule weights embedded in the SQL above are a starting point. In practice, you tune them based on model performance metrics — precision, recall, false positive rates on reviewed transactions.
Rather than hardcoding weights in SQL, you can externalize them to a configuration table:
CREATE TABLE risk_weight_config (
signal_name VARCHAR PRIMARY KEY,
weight NUMERIC,
threshold_low NUMERIC,
threshold_mid NUMERIC,
threshold_high NUMERIC,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Your materialized view then joins against this table, and updating weights is an UPDATE statement — no view rebuild required.
FAQ
How fresh are the risk signals in this system? Signals are updated with sub-second latency after each new transaction or dispute event arrives. For the 1-hour and 30-day windows, the rolling aggregates update incrementally — only the delta from the new event is computed, not a full re-scan of the window.
Can I plug an ML model into this pipeline? Yes. RisingWave supports Python UDFs, so you can call a model inference function from within a SQL expression. Alternatively, run ML inference in a separate service and write scores back to RisingWave as a reference table that the scoring view joins against.
What happens if RisingWave restarts mid-stream? RisingWave checkpoints its streaming state to S3 continuously. On restart, processing resumes from the last checkpoint with no data loss. This is equivalent to Flink's checkpoint recovery mechanism.
How do I backtest a new scoring model against historical transactions? RisingWave supports querying historical data from its S3-backed storage. You can run the scoring SQL against a backfilled dataset to compute precision/recall before deploying to the live stream.
Can this scale to handle peak card transaction volumes (e.g., Black Friday)? RisingWave's compute and storage scale independently. During peak periods, you can scale out compute nodes while storage (S3) scales automatically. The architecture is designed for elasticity without manual repartitioning.

