Building a Real-Time Transaction Risk Scoring System with SQL

Building a Real-Time Transaction Risk Scoring System with SQL

Building a Real-Time Transaction Risk Scoring System with SQL

Risk scoring at transaction time requires two things that seem contradictory: millisecond response and awareness of long-running historical patterns. A streaming database resolves this by pre-computing risk signals as materialized views that update continuously, so the final score is assembled at query time from already-computed results — not recalculated from scratch per transaction.


Why Transaction Risk Scoring Is Hard

Every payment system needs a risk score attached to a transaction before it is approved or declined. The score has to be ready in under 100 milliseconds, often under 50. But the signals that feed the score — average spend per merchant, frequency of cross-border transactions, account age, recent dispute history — are computed from weeks or months of historical data.

The naive approach is to pre-compute everything in a nightly batch job and look it up at decision time. The problem is that fraud patterns evolve intraday. A card that was clean at midnight may have been compromised by 10am. A user who normally spends $50 per transaction may have had their account takeover detected in a batch job that won't run for another 14 hours.

Batch-based risk signals are stale by definition. The question is whether staleness is acceptable. For risk scoring, it usually isn't.


The Architecture Trap: Microservices for Everything

Many fintech risk platforms end up with a patchwork of services: a feature store updated by Spark jobs, a scoring service that calls an ML model, a Redis cache for low-latency lookups, and a Kafka consumer that ties it together. Each component adds latency, operational surface area, and a failure mode.

The seams between these systems create the real risk: if your feature store is 15 minutes behind because a Spark job failed, you're scoring transactions on stale signals without knowing it.

A streaming database collapses this stack. It ingests the raw event stream, maintains materialized aggregates continuously, and serves the assembled score at query time — all in one system.


Building the Risk Scoring Pipeline with RisingWave

RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with state stored durably in S3. You write SQL; it handles the incremental computation.

Ingest Transaction and Reference Data

-- Live transaction stream from Kafka
CREATE SOURCE transactions (
    transaction_id   VARCHAR,
    user_id          VARCHAR,
    card_id          VARCHAR,
    merchant_id      VARCHAR,
    merchant_country VARCHAR,
    amount           NUMERIC,
    currency         VARCHAR,
    channel          VARCHAR,   -- 'online', 'pos', 'atm'
    device_id        VARCHAR,
    ip_address       VARCHAR,
    event_time       TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'transactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Dispute events: charge-backs and fraud reports
CREATE SOURCE dispute_events (
    dispute_id       VARCHAR,
    transaction_id   VARCHAR,
    user_id          VARCHAR,
    dispute_type     VARCHAR,   -- 'chargeback', 'fraud_report', 'friendly_fraud'
    event_time       TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'disputes',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Signal 1: User Behavioral Baseline (30-Day Window)

The user's normal spending pattern is the most important baseline for anomaly detection:

CREATE MATERIALIZED VIEW user_risk_baseline_30d AS
SELECT
    user_id,
    COUNT(*)                              AS total_txn_30d,
    AVG(amount)                           AS avg_amount,
    STDDEV(amount)                        AS stddev_amount,
    PERCENTILE_CONT(0.95) WITHIN GROUP
        (ORDER BY amount)                 AS p95_amount,
    COUNT(DISTINCT merchant_country)      AS unique_countries,
    COUNT(DISTINCT merchant_id)           AS unique_merchants,
    COUNT(DISTINCT device_id)             AS unique_devices,
    SUM(CASE WHEN channel = 'online'
             THEN 1 ELSE 0 END)           AS online_txn_count,
    SUM(CASE WHEN channel = 'atm'
             THEN 1 ELSE 0 END)           AS atm_txn_count
FROM transactions
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY user_id;

Signal 2: Recent High-Frequency Activity (Last 1 Hour)

Short-window velocity captures account takeover and card testing patterns:

CREATE MATERIALIZED VIEW user_velocity_1h AS
SELECT
    user_id,
    COUNT(*)                          AS txn_count_1h,
    SUM(amount)                       AS total_amount_1h,
    COUNT(DISTINCT merchant_id)       AS merchants_1h,
    COUNT(DISTINCT merchant_country)  AS countries_1h,
    COUNT(DISTINCT device_id)         AS devices_1h,
    MAX(amount)                       AS max_single_txn_1h
FROM transactions
WHERE event_time > NOW() - INTERVAL '1 hour'
GROUP BY user_id;

Signal 3: User Dispute History (90-Day Window)

Prior disputes are a strong predictor of future disputes:

CREATE MATERIALIZED VIEW user_dispute_history_90d AS
SELECT
    user_id,
    COUNT(*)                                              AS total_disputes,
    SUM(CASE WHEN dispute_type = 'chargeback'
             THEN 1 ELSE 0 END)                           AS chargebacks,
    SUM(CASE WHEN dispute_type = 'fraud_report'
             THEN 1 ELSE 0 END)                           AS fraud_reports,
    MAX(event_time)                                       AS last_dispute_time
FROM dispute_events
WHERE event_time > NOW() - INTERVAL '90 days'
GROUP BY user_id;

Signal 4: Merchant Risk Profile

Aggregate the risk profile of each merchant based on transaction history:

CREATE MATERIALIZED VIEW merchant_risk_profile AS
SELECT
    merchant_id,
    COUNT(*)                            AS total_txn_count,
    AVG(amount)                         AS avg_txn_amount,
    COUNT(DISTINCT user_id)             AS unique_users,
    -- dispute rate requires joining with disputes, simplified here
    COUNT(DISTINCT merchant_country)    AS countries_active
FROM transactions
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY merchant_id;

Assembling the Risk Score

Now compose all signals into a scored view. Each signal contributes points to a risk score — this is a rule-based scoring model, but the same pattern works if you're calling an external ML model via UDF:

CREATE MATERIALIZED VIEW transaction_risk_scores AS
SELECT
    t.transaction_id,
    t.user_id,
    t.card_id,
    t.merchant_id,
    t.amount,
    t.channel,
    t.event_time,

    -- Raw signals
    COALESCE(v.txn_count_1h, 0)          AS velocity_1h,
    COALESCE(v.countries_1h, 0)          AS countries_1h,
    COALESCE(v.devices_1h, 1)            AS devices_1h,
    COALESCE(b.avg_amount, 0)            AS user_avg_amount,
    COALESCE(d.total_disputes, 0)        AS dispute_count_90d,

    -- Amount deviation (z-score capped at 10)
    LEAST(
        CASE WHEN COALESCE(b.stddev_amount, 0) > 0
             THEN (t.amount - b.avg_amount) / b.stddev_amount
             ELSE 0 END,
        10
    ) AS amount_z_score,

    -- Composite risk score (0-100)
    LEAST(100, GREATEST(0,
          -- High velocity: up to 30 points
          CASE WHEN COALESCE(v.txn_count_1h, 0) > 20 THEN 30
               WHEN COALESCE(v.txn_count_1h, 0) > 10 THEN 20
               WHEN COALESCE(v.txn_count_1h, 0) > 5  THEN 10
               ELSE 0 END
          -- Multi-country in 1h: up to 25 points
        + CASE WHEN COALESCE(v.countries_1h, 0) > 3 THEN 25
               WHEN COALESCE(v.countries_1h, 0) > 1 THEN 15
               ELSE 0 END
          -- Amount anomaly: up to 20 points
        + CASE WHEN t.amount > COALESCE(b.avg_amount, 0) * 5 THEN 20
               WHEN t.amount > COALESCE(b.avg_amount, 0) * 3 THEN 10
               ELSE 0 END
          -- Prior disputes: up to 25 points
        + CASE WHEN COALESCE(d.total_disputes, 0) > 3 THEN 25
               WHEN COALESCE(d.total_disputes, 0) > 1 THEN 15
               WHEN COALESCE(d.total_disputes, 0) = 1 THEN 5
               ELSE 0 END
          -- New device in online channel: 10 points
        + CASE WHEN t.channel = 'online'
               AND COALESCE(v.devices_1h, 1) > 1 THEN 10
               ELSE 0 END
    )) AS risk_score,

    -- Risk tier
    CASE
        WHEN LEAST(100, GREATEST(0,
              CASE WHEN COALESCE(v.txn_count_1h, 0) > 20 THEN 30
                   WHEN COALESCE(v.txn_count_1h, 0) > 10 THEN 20
                   WHEN COALESCE(v.txn_count_1h, 0) > 5  THEN 10
                   ELSE 0 END
            + CASE WHEN COALESCE(v.countries_1h, 0) > 3 THEN 25
                   WHEN COALESCE(v.countries_1h, 0) > 1 THEN 15
                   ELSE 0 END
            + CASE WHEN t.amount > COALESCE(b.avg_amount, 0) * 5 THEN 20
                   WHEN t.amount > COALESCE(b.avg_amount, 0) * 3 THEN 10
                   ELSE 0 END
            + CASE WHEN COALESCE(d.total_disputes, 0) > 3 THEN 25
                   WHEN COALESCE(d.total_disputes, 0) > 1 THEN 15
                   WHEN COALESCE(d.total_disputes, 0) = 1 THEN 5
                   ELSE 0 END
            + CASE WHEN t.channel = 'online'
                   AND COALESCE(v.devices_1h, 1) > 1 THEN 10
                   ELSE 0 END
        )) >= 70 THEN 'HIGH'
        WHEN LEAST(100, GREATEST(0,
              CASE WHEN COALESCE(v.txn_count_1h, 0) > 20 THEN 30
                   ELSE 0 END
        )) >= 40 THEN 'MEDIUM'
        ELSE 'LOW'
    END AS risk_tier

FROM transactions t
LEFT JOIN user_velocity_1h          v ON t.user_id = v.user_id
LEFT JOIN user_risk_baseline_30d    b ON t.user_id = b.user_id
LEFT JOIN user_dispute_history_90d  d ON t.user_id = d.user_id;

Serving the Score to Decision Systems

Applications query the score synchronously at decision time:

-- Called by the payment authorization service
SELECT risk_score, risk_tier, velocity_1h, amount_z_score, dispute_count_90d
FROM transaction_risk_scores
WHERE transaction_id = $1;

Because the underlying signal views (user_velocity_1h, user_risk_baseline_30d, etc.) are already computed and maintained continuously, this query returns in single-digit milliseconds.

High-risk transactions can also be sinked downstream automatically:

CREATE SINK high_risk_review_queue
FROM transaction_risk_scores
WHERE risk_tier = 'HIGH'
WITH (
    connector = 'kafka',
    topic = 'high-risk-review',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

Why Pre-Computed Signals Beat Point-in-Time Calculation

ApproachLatency at Decision TimeSignal FreshnessOperational Complexity
Batch feature store (Spark)Low (cache hit)Hours staleHigh (Spark cluster + scheduler)
Real-time per-request calculationHigh (db queries under load)FreshMedium
Streaming materialized viewsLow (pre-computed)Seconds or lessLow (single system)
ML feature store + streamingLowNear-real-timeVery high (multiple systems)

The streaming materialized view approach delivers fresh signals with low query-time latency because the computation happens continuously in the background, not at decision time.


Keeping Score Weights Up to Date

The rule weights embedded in the SQL above are a starting point. In practice, you tune them based on model performance metrics — precision, recall, false positive rates on reviewed transactions.

Rather than hardcoding weights in SQL, you can externalize them to a configuration table:

CREATE TABLE risk_weight_config (
    signal_name  VARCHAR PRIMARY KEY,
    weight       NUMERIC,
    threshold_low  NUMERIC,
    threshold_mid  NUMERIC,
    threshold_high NUMERIC,
    updated_at   TIMESTAMPTZ DEFAULT NOW()
);

Your materialized view then joins against this table, and updating weights is an UPDATE statement — no view rebuild required.


FAQ

How fresh are the risk signals in this system? Signals are updated with sub-second latency after each new transaction or dispute event arrives. For the 1-hour and 30-day windows, the rolling aggregates update incrementally — only the delta from the new event is computed, not a full re-scan of the window.

Can I plug an ML model into this pipeline? Yes. RisingWave supports Python UDFs, so you can call a model inference function from within a SQL expression. Alternatively, run ML inference in a separate service and write scores back to RisingWave as a reference table that the scoring view joins against.

What happens if RisingWave restarts mid-stream? RisingWave checkpoints its streaming state to S3 continuously. On restart, processing resumes from the last checkpoint with no data loss. This is equivalent to Flink's checkpoint recovery mechanism.

How do I backtest a new scoring model against historical transactions? RisingWave supports querying historical data from its S3-backed storage. You can run the scoring SQL against a backfilled dataset to compute precision/recall before deploying to the live stream.

Can this scale to handle peak card transaction volumes (e.g., Black Friday)? RisingWave's compute and storage scale independently. During peak periods, you can scale out compute nodes while storage (S3) scales automatically. The architecture is designed for elasticity without manual repartitioning.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.