How Fintech Companies Monitor Real-Time Compliance with a Streaming Database
Compliance teams at fintechs have historically relied on batch ETL pipelines feeding OLAP dashboards that are hours or days stale. A streaming database replaces that entire stack — ingestion, transformation, and serving — with a single SQL layer that keeps compliance metrics current in seconds, not overnight.
The Batch Compliance Dashboard Problem
Most compliance monitoring looks like this: transactions land in a data warehouse overnight via ETL, a dbt model transforms them into compliance metrics by morning, and a BI tool like Tableau or Looker renders a dashboard that compliance officers review at 9am.
The data is from yesterday. The compliance violation may have occurred at 11:59pm. By the time it surfaces, it is already 10+ hours old.
For many compliance requirements, that latency is not just inconvenient — it creates regulatory exposure. Velocity limits, position limits, concentration limits, and AML thresholds have intraday significance. Reviewing them on a day-old dashboard is like monitoring speed on a highway using yesterday's radar data.
The stack that produces this latency is also expensive. A typical batch compliance pipeline involves a transactional database, a Kafka or CDC pipeline, a data warehouse (Snowflake, BigQuery, Redshift), dbt or Spark transformations, and a BI tool. Each layer adds cost, latency, and failure modes.
What Real-Time Compliance Monitoring Requires
Compliance monitoring is not the same as fraud detection. The urgency is different, the stakeholders are different, and the failure modes are different. Compliance monitoring needs:
- Continuous aggregation — running totals, counts, and rates updated as events arrive
- Threshold alerting — trigger when a metric crosses a regulatory limit
- Audit trails — immutable records of what was monitored, when
- Dashboard serving — low-latency reads for compliance officer dashboards
- Regulatory reporting — periodic reports that can be generated on demand from live data
A streaming database addresses all five. The monitoring logic lives in SQL, the state is maintained continuously, and queries against materialized views serve dashboards directly.
Replacing the Batch Stack with a Streaming Database
The architecture shift is significant:
Before:
Transactions → Kafka → S3 → Snowflake (ETL nightly) → dbt models → Tableau
After:
Transactions → Kafka → RisingWave (live materialized views) → Compliance Dashboard / Alerts
The streaming database is both the transformation layer and the serving layer. There is no separate OLAP warehouse for compliance metrics. The materialized views are queryable directly via PostgreSQL protocol — any BI tool that connects to Postgres connects to RisingWave.
Building Real-Time Compliance Monitoring with RisingWave
RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with S3-backed storage. It ingests Kafka streams, maintains materialized views incrementally, and serves queries via the PostgreSQL wire protocol.
Ingesting Core Financial Events
-- Payment transactions
CREATE SOURCE payments (
payment_id VARCHAR,
customer_id VARCHAR,
account_id VARCHAR,
payment_type VARCHAR, -- 'p2p', 'merchant', 'bill_pay', 'international'
amount NUMERIC,
currency VARCHAR,
destination_country VARCHAR,
processing_status VARCHAR, -- 'initiated', 'cleared', 'failed', 'reversed'
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'payments',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Account onboarding and KYC events
CREATE SOURCE kyc_events (
customer_id VARCHAR,
event_type VARCHAR, -- 'onboarding_started', 'kyc_approved',
-- 'kyc_failed', 'kyc_expired', 'review_triggered'
kyc_tier VARCHAR, -- 'basic', 'enhanced', 'full'
reviewer_id VARCHAR,
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'kyc-events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Customer limit configurations (from product/compliance system)
CREATE SOURCE customer_limits (
customer_id VARCHAR,
daily_payment_limit NUMERIC,
monthly_payment_limit NUMERIC,
international_limit NUMERIC,
kyc_tier VARCHAR,
limit_updated_at TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'customer-limits',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
Metric 1: Real-Time Payment Limit Utilization
Track each customer's usage against their daily and monthly limits continuously:
CREATE MATERIALIZED VIEW daily_payment_utilization AS
SELECT
p.customer_id,
SUM(p.amount) AS amount_used_today,
COUNT(*) AS payments_today,
l.daily_payment_limit,
SUM(p.amount) / NULLIF(l.daily_payment_limit, 0) * 100
AS pct_daily_limit_used,
MAX(p.event_time) AS last_payment_time
FROM payments p
LEFT JOIN customer_limits l ON p.customer_id = l.customer_id
WHERE p.event_time > DATE_TRUNC('day', NOW())
AND p.processing_status NOT IN ('failed', 'reversed')
GROUP BY p.customer_id, l.daily_payment_limit;
CREATE MATERIALIZED VIEW monthly_payment_utilization AS
SELECT
p.customer_id,
SUM(p.amount) AS amount_used_this_month,
COUNT(*) AS payments_this_month,
l.monthly_payment_limit,
SUM(p.amount) / NULLIF(l.monthly_payment_limit, 0) * 100
AS pct_monthly_limit_used
FROM payments p
LEFT JOIN customer_limits l ON p.customer_id = l.customer_id
WHERE p.event_time > DATE_TRUNC('month', NOW())
AND p.processing_status NOT IN ('failed', 'reversed')
GROUP BY p.customer_id, l.monthly_payment_limit;
Metric 2: Customers Approaching or Exceeding Limits
CREATE MATERIALIZED VIEW limit_breach_monitoring AS
SELECT
d.customer_id,
d.amount_used_today,
d.daily_payment_limit,
d.pct_daily_limit_used,
m.amount_used_this_month,
m.monthly_payment_limit,
m.pct_monthly_limit_used,
CASE
WHEN d.pct_daily_limit_used >= 100 THEN 'DAILY_LIMIT_EXCEEDED'
WHEN m.pct_monthly_limit_used >= 100 THEN 'MONTHLY_LIMIT_EXCEEDED'
WHEN d.pct_daily_limit_used >= 80
OR m.pct_monthly_limit_used >= 80 THEN 'APPROACHING_LIMIT'
ELSE 'WITHIN_LIMITS'
END AS limit_status
FROM daily_payment_utilization d
JOIN monthly_payment_utilization m ON d.customer_id = m.customer_id;
Metric 3: KYC Compliance Rate — Live Dashboard Metric
The percentage of active customers with current KYC is a fundamental compliance metric:
-- Latest KYC status per customer (using window function over event stream)
CREATE MATERIALIZED VIEW customer_kyc_status AS
SELECT DISTINCT ON (customer_id)
customer_id,
event_type AS current_status,
kyc_tier,
event_time AS status_as_of
FROM kyc_events
ORDER BY customer_id, event_time DESC;
-- Active customers in the last 30 days
CREATE MATERIALIZED VIEW active_customers_30d AS
SELECT DISTINCT customer_id
FROM payments
WHERE event_time > NOW() - INTERVAL '30 days';
-- KYC compliance rate across active customers
CREATE MATERIALIZED VIEW kyc_compliance_rate AS
SELECT
COUNT(*) AS active_customers,
SUM(CASE WHEN k.current_status = 'kyc_approved'
THEN 1 ELSE 0 END) AS kyc_compliant,
SUM(CASE WHEN k.current_status IN ('kyc_failed', 'kyc_expired')
THEN 1 ELSE 0 END) AS kyc_non_compliant,
SUM(CASE WHEN k.current_status IS NULL
THEN 1 ELSE 0 END) AS kyc_unknown,
ROUND(
SUM(CASE WHEN k.current_status = 'kyc_approved' THEN 1 ELSE 0 END)
* 100.0 / COUNT(*), 2
) AS compliance_rate_pct
FROM active_customers_30d a
LEFT JOIN customer_kyc_status k ON a.customer_id = k.customer_id;
This single query, run against the live materialized view, gives compliance officers the current KYC compliance rate — updated within seconds of any KYC event or new payment.
Metric 4: International Payment Compliance
Many regulatory frameworks require enhanced due diligence or reporting for cross-border payments above certain thresholds:
CREATE MATERIALIZED VIEW international_payment_monitoring AS
SELECT
p.customer_id,
p.destination_country,
COUNT(*) AS intl_payment_count_30d,
SUM(p.amount) AS intl_payment_volume_30d,
MAX(p.amount) AS largest_single_intl_payment,
l.international_limit,
SUM(p.amount) / NULLIF(l.international_limit, 0) * 100
AS pct_intl_limit_used
FROM payments p
LEFT JOIN customer_limits l ON p.customer_id = l.customer_id
WHERE p.event_time > NOW() - INTERVAL '30 days'
AND p.payment_type = 'international'
AND p.processing_status NOT IN ('failed', 'reversed')
GROUP BY p.customer_id, p.destination_country, l.international_limit;
Metric 5: Operational Compliance Dashboard Aggregates
Roll-up metrics for the compliance team's overview dashboard:
CREATE MATERIALIZED VIEW compliance_dashboard_summary AS
SELECT
DATE_TRUNC('hour', NOW()) AS snapshot_time,
-- Today's payment volume
(SELECT SUM(amount) FROM payments
WHERE event_time > DATE_TRUNC('day', NOW())
AND processing_status = 'cleared') AS total_cleared_volume_today,
-- Count of limit breaches today
(SELECT COUNT(*) FROM limit_breach_monitoring
WHERE limit_status = 'DAILY_LIMIT_EXCEEDED') AS daily_limit_breaches,
-- Count of customers approaching limits
(SELECT COUNT(*) FROM limit_breach_monitoring
WHERE limit_status = 'APPROACHING_LIMIT') AS approaching_limit_count,
-- KYC compliance rate (from live view)
(SELECT compliance_rate_pct FROM kyc_compliance_rate) AS kyc_compliance_pct,
-- Failed payments rate (last 1 hour)
(SELECT ROUND(
SUM(CASE WHEN processing_status = 'failed' THEN 1 ELSE 0 END)
* 100.0 / NULLIF(COUNT(*), 0), 2)
FROM payments
WHERE event_time > NOW() - INTERVAL '1 hour') AS payment_failure_rate_1h;
Connecting Compliance Dashboards
Because RisingWave speaks the PostgreSQL wire protocol, any tool that connects to Postgres connects to RisingWave directly:
Grafana:
Data source type: PostgreSQL
Host: risingwave-host:4566
Database: dev
SSL mode: disable
Add a dashboard panel querying compliance_dashboard_summary. Grafana auto-refreshes every N seconds. The data served is always the latest materialized view state.
Metabase, Superset, Redash: Same PostgreSQL connection configuration. No special driver needed.
Custom compliance UI (direct query):
import psycopg2
conn = psycopg2.connect(
host="risingwave-host",
port=4566,
dbname="dev",
user="root"
)
cur = conn.cursor()
cur.execute("""
SELECT snapshot_time, total_cleared_volume_today,
daily_limit_breaches, kyc_compliance_pct
FROM compliance_dashboard_summary
""")
rows = cur.fetchall()
No ORM, no special client library. Standard PostgreSQL driver.
Alert Routing for Compliance Violations
Critical compliance metrics should push alerts, not just update dashboards:
-- Sink limit breaches to your alerting system
CREATE SINK compliance_limit_breach_alerts
FROM limit_breach_monitoring
WHERE limit_status IN ('DAILY_LIMIT_EXCEEDED', 'MONTHLY_LIMIT_EXCEEDED')
WITH (
connector = 'kafka',
topic = 'compliance-alerts',
properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;
-- Alert when KYC compliance rate drops below threshold
CREATE SINK kyc_rate_alert_sink
FROM kyc_compliance_rate
WHERE compliance_rate_pct < 95.0
WITH (
connector = 'kafka',
topic = 'compliance-alerts',
properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;
Architecture Comparison: Batch vs. Streaming Compliance Stack
| Dimension | Batch ETL + Warehouse | Streaming Database (RisingWave) |
| Dashboard freshness | Hours (nightly ETL) | Seconds |
| Stack components | Kafka + ETL + Warehouse + dbt + BI | Kafka + RisingWave + BI |
| Compliance alert latency | Hours to next batch | Sub-second |
| Infrastructure cost | High (warehouse + ETL compute) | Lower (S3-backed, no ETL compute) |
| Rule/metric change cycle | Days (dbt model + deployment) | Seconds (ALTER VIEW) |
| Historical query capability | Full historical warehouse | Rolling windows + S3 archive |
| BI tool compatibility | Native warehouse connectors | PostgreSQL protocol (universal) |
| Regulatory audit trail | Warehouse snapshots | Append-only changelog + S3 |
The most important practical difference is metric freshness. For regulatory compliance, the difference between "yesterday's data" and "current data" is not just an engineering convenience — it is a material compliance posture difference.
Handling Regulatory Reporting Periods
Many compliance reports require data for specific calendar periods: daily position reports, monthly suspicious activity summaries, quarterly concentration reports. With a streaming database, these are point-in-time queries against the materialized state:
-- Monthly payment volume by type (current month)
SELECT
payment_type,
COUNT(*) AS payment_count,
SUM(amount) AS total_volume,
currency
FROM payments
WHERE event_time >= DATE_TRUNC('month', NOW())
AND event_time < DATE_TRUNC('month', NOW()) + INTERVAL '1 month'
AND processing_status = 'cleared'
GROUP BY payment_type, currency
ORDER BY total_volume DESC;
For prior-period reporting (last month, last quarter), RisingWave's time-travel queries and S3-backed storage allow you to reconstruct historical state. For immutable regulatory records, sink materialized view snapshots to S3 at end-of-period — a one-line CREATE SINK to an S3 connector.
FAQ
Does this approach meet audit and regulatory recordkeeping requirements? The streaming database itself is the live computation layer. For immutable audit records, configure sinks to write all compliance events and alerts to S3 in Parquet or JSON format. This creates an append-only, tamper-evident record that satisfies most regulatory recordkeeping requirements. Consult your compliance counsel for jurisdiction-specific requirements.
Can we backfill historical data into RisingWave for initial dashboard population?
Yes. RisingWave supports scan.startup.mode = 'earliest' on Kafka sources, which replays the full Kafka topic history. If data predates your Kafka retention window, you can seed reference tables via SQL INSERT statements or batch ingestion from your data warehouse.
How does this integrate with our existing data warehouse? RisingWave can sink live compliance metrics into your existing warehouse via JDBC or Kafka, keeping the warehouse as the system of record for historical analysis. You gain real-time compliance monitoring without migrating away from your existing warehouse investment.
What happens to the streaming state if RisingWave has an outage? RisingWave checkpoints its materialized view state to S3 continuously. On restart, processing resumes from the last checkpoint. The recovery window is typically under a minute, and the materialized views are populated from the checkpoint state without requiring a full replay.
Can compliance rules be version-controlled and audited? Because all compliance logic is SQL, it lives in your version control system like any other code. View definitions can be reviewed, diffed, and approved through standard pull request workflows. A change to a compliance threshold is a one-line SQL diff — auditable and reversible.

