Digital wallet transaction monitoring with RisingWave means analyzing every top-up, transfer, payment, and withdrawal as it occurs — detecting fraud, enforcing balance limits, and maintaining compliance — across millions of active wallets with millisecond latency and standard SQL.
Digital Wallets: Scale Meets Compliance
Digital wallet platforms — whether consumer payment apps, embedded finance products, or crypto-fiat bridges — face a unique operational challenge: the scale of a high-frequency transaction system combined with the compliance obligations of a regulated financial institution.
A mid-size digital wallet platform may process 5–20 million transactions per day across millions of active users. Each transaction requires:
- Real-time balance verification (is there enough to cover this payment?)
- Fraud detection (does this look like account takeover or payment fraud?)
- Velocity limit enforcement (has this user hit their daily/weekly transfer cap?)
- AML screening (does this transaction pattern look like money laundering?)
- Operational monitoring (is payment success rate normal right now?)
Doing all of this with batch processing means accepting multi-minute windows where fraud and limit violations go unchecked. Streaming SQL closes those windows permanently.
Wallet Event Architecture
Mobile/Web App → API Gateway → Event Bus (Kafka)
↓
RisingWave
┌──────────────────┐
│ Materialized Views│
│ - Balance state │
│ - Velocity counts │
│ - Fraud signals │
│ - AML patterns │
└──────────────────┘
↓
Dashboards | Alerts | Compliance | ML
Ingesting Wallet Events
-- Core wallet transaction stream
CREATE SOURCE wallet_transactions (
event_id VARCHAR,
wallet_id VARCHAR,
user_id VARCHAR,
txn_type VARCHAR, -- 'topup', 'payment', 'transfer_out', 'transfer_in', 'withdrawal'
amount NUMERIC,
currency VARCHAR,
counterparty_id VARCHAR,
counterparty_type VARCHAR, -- 'merchant', 'wallet', 'bank', 'crypto'
device_id VARCHAR,
ip_address VARCHAR,
geo_country VARCHAR,
txn_status VARCHAR, -- 'completed', 'pending', 'failed', 'reversed'
txn_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'wallet.transactions',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- User profile and tier limits reference
CREATE TABLE wallet_user_limits (
user_id VARCHAR PRIMARY KEY,
tier VARCHAR, -- 'basic', 'verified', 'premium'
daily_send_limit NUMERIC,
weekly_send_limit NUMERIC,
max_balance NUMERIC,
kyc_level INTEGER,
account_status VARCHAR
);
Real-Time Velocity Limit Tracking
Continuously track each user's send volume against their tier limits:
CREATE MATERIALIZED VIEW wallet_velocity_tracking AS
SELECT
t.user_id,
SUM(t.amount) FILTER (
WHERE t.txn_type IN ('payment', 'transfer_out', 'withdrawal')
AND t.txn_status = 'completed'
) AS sent_last_24h,
SUM(t.amount) FILTER (
WHERE t.txn_type IN ('payment', 'transfer_out', 'withdrawal')
AND t.txn_status = 'completed'
) AS sent_last_7d,
COUNT(*) FILTER (
WHERE t.txn_type IN ('payment', 'transfer_out', 'withdrawal')
) AS send_txn_count_24h,
COUNT(DISTINCT t.counterparty_id) FILTER (
WHERE t.txn_time >= NOW() - INTERVAL '1 HOUR'
) AS distinct_recipients_1h,
COUNT(DISTINCT t.geo_country) FILTER (
WHERE t.txn_time >= NOW() - INTERVAL '6 HOURS'
) AS distinct_countries_6h,
l.daily_send_limit,
l.weekly_send_limit,
l.tier,
window_start,
window_end
FROM HOP(
wallet_transactions,
txn_time,
INTERVAL '1 HOUR',
INTERVAL '24 HOURS'
) t
JOIN wallet_user_limits FOR SYSTEM_TIME AS OF t.txn_time AS l
ON t.user_id = l.user_id
GROUP BY
t.user_id,
l.daily_send_limit,
l.weekly_send_limit,
l.tier,
window_start,
window_end;
Fraud Signal Detection
Detect account takeover and payment fraud patterns using tumbling windows:
CREATE MATERIALIZED VIEW wallet_fraud_signals AS
SELECT
user_id,
COUNT(*) AS total_txns,
COUNT(DISTINCT device_id) AS distinct_devices,
COUNT(DISTINCT ip_address) AS distinct_ips,
COUNT(DISTINCT geo_country) AS distinct_countries,
COUNT(*) FILTER (WHERE txn_status = 'failed') AS failed_txns,
SUM(amount) FILTER (
WHERE txn_type = 'transfer_out'
AND txn_status = 'completed'
) AS outbound_volume,
COUNT(*) FILTER (
WHERE txn_type = 'transfer_out'
AND counterparty_type = 'crypto'
) AS crypto_transfers,
CASE
WHEN COUNT(DISTINCT device_id) > 3
AND COUNT(DISTINCT geo_country) > 1 THEN 'ACCOUNT_TAKEOVER'
WHEN COUNT(*) FILTER (WHERE txn_status = 'failed') > 5
THEN 'CREDENTIAL_STUFFING'
WHEN COUNT(*) FILTER (
WHERE txn_type = 'transfer_out'
AND counterparty_type = 'crypto'
) > 2 THEN 'FRAUD_CASHOUT'
ELSE NULL
END AS fraud_signal,
window_start,
window_end
FROM TUMBLE(
wallet_transactions,
txn_time,
INTERVAL '1 HOUR'
)
GROUP BY
user_id,
window_start,
window_end
HAVING
COUNT(DISTINCT device_id) > 3
OR COUNT(DISTINCT geo_country) > 1
OR COUNT(*) FILTER (WHERE txn_status = 'failed') > 5
OR COUNT(*) FILTER (
WHERE txn_type = 'transfer_out'
AND counterparty_type = 'crypto'
) > 2;
Sending Alerts to Compliance and Operations
CREATE SINK wallet_compliance_alerts_sink
FROM (
SELECT
user_id,
'VELOCITY_LIMIT' AS alert_type,
sent_last_24h AS metric_value,
daily_send_limit AS threshold,
window_end AS alert_time
FROM wallet_velocity_tracking
WHERE sent_last_24h > daily_send_limit * 0.9
UNION ALL
SELECT
user_id,
fraud_signal,
outbound_volume,
0,
window_end
FROM wallet_fraud_signals
WHERE fraud_signal IS NOT NULL
)
WITH (
connector = 'kafka',
topic = 'wallet.compliance.alerts',
properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;
Batch vs. Streaming Wallet Monitoring
| Dimension | Batch Monitoring | Streaming (RisingWave) |
| Fraud detection latency | Minutes to hours | Seconds |
| Velocity limit enforcement | After-the-fact | Real-time pre-clearance |
| Account takeover detection | Delayed | Immediate |
| AML pattern detection | Next-day | Within transaction window |
| Operational visibility | Lagging dashboards | Live metrics |
| False positive rate | High (stale context) | Lower (current context) |
| Regulatory reporting readiness | Difficult intraday | Always ready |
FAQ
Q: How does RisingWave integrate with real-time pre-authorization checks?
A: The velocity tracking and fraud signal materialized views serve as a real-time context store. Before authorizing a transaction, the payment service queries these views by user_id and uses the results to enrich the authorization decision.
Q: Can we use RisingWave for crypto wallet monitoring as well as fiat? A: Yes. RisingWave is agnostic to the asset type. As long as crypto wallet events are published to Kafka in a structured format, the same materialized views and SQL patterns apply.
Q: How do we handle micro-transactions at very high frequency (e.g., gaming wallets)? A: RisingWave handles high-frequency ingestion efficiently through incremental materialized view maintenance. Window sizes can be tuned to match the transaction cadence — for gaming wallets, shorter windows (minutes rather than hours) provide more responsive fraud detection.
Q: What eMoney and payment institution compliance requirements can RisingWave help with? A: RisingWave can support PSD2/EMD2 compliance in the EU and equivalent eMoney regulations by maintaining real-time transaction monitoring views, generating automated SAR candidates, and providing audit trails via Kafka or Iceberg sinks.
Q: Can we monitor cross-wallet transfer networks for money mule patterns?
A: Aggregations over counterparty_id across multiple wallets can identify accounts sending to the same set of recipients — a key money mule signal. More complex graph traversal requires a dedicated graph analytics system, but RisingWave can serve as the feeder data system.
Get Started
Build real-time monitoring for your digital wallet platform:
- Start the quickstart: docs.risingwave.com/get-started
- Join the fintech builders community: risingwave.com/slack

