Digital Wallet Transaction Monitoring with RisingWave

Digital Wallet Transaction Monitoring with RisingWave

Digital wallet transaction monitoring with RisingWave means analyzing every top-up, transfer, payment, and withdrawal as it occurs — detecting fraud, enforcing balance limits, and maintaining compliance — across millions of active wallets with millisecond latency and standard SQL.

Digital Wallets: Scale Meets Compliance

Digital wallet platforms — whether consumer payment apps, embedded finance products, or crypto-fiat bridges — face a unique operational challenge: the scale of a high-frequency transaction system combined with the compliance obligations of a regulated financial institution.

A mid-size digital wallet platform may process 5–20 million transactions per day across millions of active users. Each transaction requires:

  • Real-time balance verification (is there enough to cover this payment?)
  • Fraud detection (does this look like account takeover or payment fraud?)
  • Velocity limit enforcement (has this user hit their daily/weekly transfer cap?)
  • AML screening (does this transaction pattern look like money laundering?)
  • Operational monitoring (is payment success rate normal right now?)

Doing all of this with batch processing means accepting multi-minute windows where fraud and limit violations go unchecked. Streaming SQL closes those windows permanently.

Wallet Event Architecture

Mobile/Web App → API Gateway → Event Bus (Kafka)
                                    ↓
                              RisingWave
                         ┌──────────────────┐
                         │ Materialized Views│
                         │ - Balance state   │
                         │ - Velocity counts │
                         │ - Fraud signals   │
                         │ - AML patterns    │
                         └──────────────────┘
                                    ↓
              Dashboards | Alerts | Compliance | ML

Ingesting Wallet Events

-- Core wallet transaction stream
CREATE SOURCE wallet_transactions (
    event_id         VARCHAR,
    wallet_id        VARCHAR,
    user_id          VARCHAR,
    txn_type         VARCHAR,  -- 'topup', 'payment', 'transfer_out', 'transfer_in', 'withdrawal'
    amount           NUMERIC,
    currency         VARCHAR,
    counterparty_id  VARCHAR,
    counterparty_type VARCHAR, -- 'merchant', 'wallet', 'bank', 'crypto'
    device_id        VARCHAR,
    ip_address       VARCHAR,
    geo_country      VARCHAR,
    txn_status       VARCHAR,  -- 'completed', 'pending', 'failed', 'reversed'
    txn_time         TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'wallet.transactions',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- User profile and tier limits reference
CREATE TABLE wallet_user_limits (
    user_id            VARCHAR PRIMARY KEY,
    tier               VARCHAR,  -- 'basic', 'verified', 'premium'
    daily_send_limit   NUMERIC,
    weekly_send_limit  NUMERIC,
    max_balance        NUMERIC,
    kyc_level          INTEGER,
    account_status     VARCHAR
);

Real-Time Velocity Limit Tracking

Continuously track each user's send volume against their tier limits:

CREATE MATERIALIZED VIEW wallet_velocity_tracking AS
SELECT
    t.user_id,
    SUM(t.amount) FILTER (
        WHERE t.txn_type IN ('payment', 'transfer_out', 'withdrawal')
        AND t.txn_status = 'completed'
    )                                                  AS sent_last_24h,
    SUM(t.amount) FILTER (
        WHERE t.txn_type IN ('payment', 'transfer_out', 'withdrawal')
        AND t.txn_status = 'completed'
    )                                                  AS sent_last_7d,
    COUNT(*) FILTER (
        WHERE t.txn_type IN ('payment', 'transfer_out', 'withdrawal')
    )                                                  AS send_txn_count_24h,
    COUNT(DISTINCT t.counterparty_id) FILTER (
        WHERE t.txn_time >= NOW() - INTERVAL '1 HOUR'
    )                                                  AS distinct_recipients_1h,
    COUNT(DISTINCT t.geo_country) FILTER (
        WHERE t.txn_time >= NOW() - INTERVAL '6 HOURS'
    )                                                  AS distinct_countries_6h,
    l.daily_send_limit,
    l.weekly_send_limit,
    l.tier,
    window_start,
    window_end
FROM HOP(
    wallet_transactions,
    txn_time,
    INTERVAL '1 HOUR',
    INTERVAL '24 HOURS'
) t
JOIN wallet_user_limits FOR SYSTEM_TIME AS OF t.txn_time AS l
    ON t.user_id = l.user_id
GROUP BY
    t.user_id,
    l.daily_send_limit,
    l.weekly_send_limit,
    l.tier,
    window_start,
    window_end;

Fraud Signal Detection

Detect account takeover and payment fraud patterns using tumbling windows:

CREATE MATERIALIZED VIEW wallet_fraud_signals AS
SELECT
    user_id,
    COUNT(*)                                           AS total_txns,
    COUNT(DISTINCT device_id)                          AS distinct_devices,
    COUNT(DISTINCT ip_address)                         AS distinct_ips,
    COUNT(DISTINCT geo_country)                        AS distinct_countries,
    COUNT(*) FILTER (WHERE txn_status = 'failed')      AS failed_txns,
    SUM(amount) FILTER (
        WHERE txn_type = 'transfer_out'
        AND txn_status = 'completed'
    )                                                  AS outbound_volume,
    COUNT(*) FILTER (
        WHERE txn_type = 'transfer_out'
        AND counterparty_type = 'crypto'
    )                                                  AS crypto_transfers,
    CASE
        WHEN COUNT(DISTINCT device_id) > 3
             AND COUNT(DISTINCT geo_country) > 1       THEN 'ACCOUNT_TAKEOVER'
        WHEN COUNT(*) FILTER (WHERE txn_status = 'failed') > 5
                                                       THEN 'CREDENTIAL_STUFFING'
        WHEN COUNT(*) FILTER (
            WHERE txn_type = 'transfer_out'
            AND counterparty_type = 'crypto'
        ) > 2                                          THEN 'FRAUD_CASHOUT'
        ELSE NULL
    END                                                AS fraud_signal,
    window_start,
    window_end
FROM TUMBLE(
    wallet_transactions,
    txn_time,
    INTERVAL '1 HOUR'
)
GROUP BY
    user_id,
    window_start,
    window_end
HAVING
    COUNT(DISTINCT device_id) > 3
    OR COUNT(DISTINCT geo_country) > 1
    OR COUNT(*) FILTER (WHERE txn_status = 'failed') > 5
    OR COUNT(*) FILTER (
        WHERE txn_type = 'transfer_out'
        AND counterparty_type = 'crypto'
    ) > 2;

Sending Alerts to Compliance and Operations

CREATE SINK wallet_compliance_alerts_sink
FROM (
    SELECT
        user_id,
        'VELOCITY_LIMIT' AS alert_type,
        sent_last_24h    AS metric_value,
        daily_send_limit AS threshold,
        window_end       AS alert_time
    FROM wallet_velocity_tracking
    WHERE sent_last_24h > daily_send_limit * 0.9
    UNION ALL
    SELECT
        user_id,
        fraud_signal,
        outbound_volume,
        0,
        window_end
    FROM wallet_fraud_signals
    WHERE fraud_signal IS NOT NULL
)
WITH (
    connector = 'kafka',
    topic = 'wallet.compliance.alerts',
    properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;

Batch vs. Streaming Wallet Monitoring

DimensionBatch MonitoringStreaming (RisingWave)
Fraud detection latencyMinutes to hoursSeconds
Velocity limit enforcementAfter-the-factReal-time pre-clearance
Account takeover detectionDelayedImmediate
AML pattern detectionNext-dayWithin transaction window
Operational visibilityLagging dashboardsLive metrics
False positive rateHigh (stale context)Lower (current context)
Regulatory reporting readinessDifficult intradayAlways ready

FAQ

Q: How does RisingWave integrate with real-time pre-authorization checks? A: The velocity tracking and fraud signal materialized views serve as a real-time context store. Before authorizing a transaction, the payment service queries these views by user_id and uses the results to enrich the authorization decision.

Q: Can we use RisingWave for crypto wallet monitoring as well as fiat? A: Yes. RisingWave is agnostic to the asset type. As long as crypto wallet events are published to Kafka in a structured format, the same materialized views and SQL patterns apply.

Q: How do we handle micro-transactions at very high frequency (e.g., gaming wallets)? A: RisingWave handles high-frequency ingestion efficiently through incremental materialized view maintenance. Window sizes can be tuned to match the transaction cadence — for gaming wallets, shorter windows (minutes rather than hours) provide more responsive fraud detection.

Q: What eMoney and payment institution compliance requirements can RisingWave help with? A: RisingWave can support PSD2/EMD2 compliance in the EU and equivalent eMoney regulations by maintaining real-time transaction monitoring views, generating automated SAR candidates, and providing audit trails via Kafka or Iceberg sinks.

Q: Can we monitor cross-wallet transfer networks for money mule patterns? A: Aggregations over counterparty_id across multiple wallets can identify accounts sending to the same set of recipients — a key money mule signal. More complex graph traversal requires a dedicated graph analytics system, but RisingWave can serve as the feeder data system.

Get Started

Build real-time monitoring for your digital wallet platform:

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.