How Fintech Companies Monitor Real-Time Compliance with a Streaming Database

How Fintech Companies Monitor Real-Time Compliance with a Streaming Database

How Fintech Companies Monitor Real-Time Compliance with a Streaming Database

Compliance teams at fintechs have historically relied on batch ETL pipelines feeding OLAP dashboards that are hours or days stale. A streaming database replaces that entire stack — ingestion, transformation, and serving — with a single SQL layer that keeps compliance metrics current in seconds, not overnight.


The Batch Compliance Dashboard Problem

Most compliance monitoring looks like this: transactions land in a data warehouse overnight via ETL, a dbt model transforms them into compliance metrics by morning, and a BI tool like Tableau or Looker renders a dashboard that compliance officers review at 9am.

The data is from yesterday. The compliance violation may have occurred at 11:59pm. By the time it surfaces, it is already 10+ hours old.

For many compliance requirements, that latency is not just inconvenient — it creates regulatory exposure. Velocity limits, position limits, concentration limits, and AML thresholds have intraday significance. Reviewing them on a day-old dashboard is like monitoring speed on a highway using yesterday's radar data.

The stack that produces this latency is also expensive. A typical batch compliance pipeline involves a transactional database, a Kafka or CDC pipeline, a data warehouse (Snowflake, BigQuery, Redshift), dbt or Spark transformations, and a BI tool. Each layer adds cost, latency, and failure modes.


What Real-Time Compliance Monitoring Requires

Compliance monitoring is not the same as fraud detection. The urgency is different, the stakeholders are different, and the failure modes are different. Compliance monitoring needs:

  1. Continuous aggregation — running totals, counts, and rates updated as events arrive
  2. Threshold alerting — trigger when a metric crosses a regulatory limit
  3. Audit trails — immutable records of what was monitored, when
  4. Dashboard serving — low-latency reads for compliance officer dashboards
  5. Regulatory reporting — periodic reports that can be generated on demand from live data

A streaming database addresses all five. The monitoring logic lives in SQL, the state is maintained continuously, and queries against materialized views serve dashboards directly.


Replacing the Batch Stack with a Streaming Database

The architecture shift is significant:

Before:

Transactions → Kafka → S3 → Snowflake (ETL nightly) → dbt models → Tableau

After:

Transactions → Kafka → RisingWave (live materialized views) → Compliance Dashboard / Alerts

The streaming database is both the transformation layer and the serving layer. There is no separate OLAP warehouse for compliance metrics. The materialized views are queryable directly via PostgreSQL protocol — any BI tool that connects to Postgres connects to RisingWave.


Building Real-Time Compliance Monitoring with RisingWave

RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with S3-backed storage. It ingests Kafka streams, maintains materialized views incrementally, and serves queries via the PostgreSQL wire protocol.

Ingesting Core Financial Events

-- Payment transactions
CREATE SOURCE payments (
    payment_id       VARCHAR,
    customer_id      VARCHAR,
    account_id       VARCHAR,
    payment_type     VARCHAR,    -- 'p2p', 'merchant', 'bill_pay', 'international'
    amount           NUMERIC,
    currency         VARCHAR,
    destination_country VARCHAR,
    processing_status   VARCHAR, -- 'initiated', 'cleared', 'failed', 'reversed'
    event_time       TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'payments',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Account onboarding and KYC events
CREATE SOURCE kyc_events (
    customer_id      VARCHAR,
    event_type       VARCHAR,    -- 'onboarding_started', 'kyc_approved',
                                 --  'kyc_failed', 'kyc_expired', 'review_triggered'
    kyc_tier         VARCHAR,    -- 'basic', 'enhanced', 'full'
    reviewer_id      VARCHAR,
    event_time       TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'kyc-events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Customer limit configurations (from product/compliance system)
CREATE SOURCE customer_limits (
    customer_id           VARCHAR,
    daily_payment_limit   NUMERIC,
    monthly_payment_limit NUMERIC,
    international_limit   NUMERIC,
    kyc_tier              VARCHAR,
    limit_updated_at      TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'customer-limits',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Metric 1: Real-Time Payment Limit Utilization

Track each customer's usage against their daily and monthly limits continuously:

CREATE MATERIALIZED VIEW daily_payment_utilization AS
SELECT
    p.customer_id,
    SUM(p.amount)    AS amount_used_today,
    COUNT(*)         AS payments_today,
    l.daily_payment_limit,
    SUM(p.amount) / NULLIF(l.daily_payment_limit, 0) * 100
                     AS pct_daily_limit_used,
    MAX(p.event_time) AS last_payment_time
FROM payments p
LEFT JOIN customer_limits l ON p.customer_id = l.customer_id
WHERE p.event_time > DATE_TRUNC('day', NOW())
  AND p.processing_status NOT IN ('failed', 'reversed')
GROUP BY p.customer_id, l.daily_payment_limit;

CREATE MATERIALIZED VIEW monthly_payment_utilization AS
SELECT
    p.customer_id,
    SUM(p.amount)     AS amount_used_this_month,
    COUNT(*)          AS payments_this_month,
    l.monthly_payment_limit,
    SUM(p.amount) / NULLIF(l.monthly_payment_limit, 0) * 100
                      AS pct_monthly_limit_used
FROM payments p
LEFT JOIN customer_limits l ON p.customer_id = l.customer_id
WHERE p.event_time > DATE_TRUNC('month', NOW())
  AND p.processing_status NOT IN ('failed', 'reversed')
GROUP BY p.customer_id, l.monthly_payment_limit;

Metric 2: Customers Approaching or Exceeding Limits

CREATE MATERIALIZED VIEW limit_breach_monitoring AS
SELECT
    d.customer_id,
    d.amount_used_today,
    d.daily_payment_limit,
    d.pct_daily_limit_used,
    m.amount_used_this_month,
    m.monthly_payment_limit,
    m.pct_monthly_limit_used,
    CASE
        WHEN d.pct_daily_limit_used >= 100 THEN 'DAILY_LIMIT_EXCEEDED'
        WHEN m.pct_monthly_limit_used >= 100 THEN 'MONTHLY_LIMIT_EXCEEDED'
        WHEN d.pct_daily_limit_used >= 80
          OR m.pct_monthly_limit_used >= 80   THEN 'APPROACHING_LIMIT'
        ELSE 'WITHIN_LIMITS'
    END AS limit_status
FROM daily_payment_utilization d
JOIN monthly_payment_utilization m ON d.customer_id = m.customer_id;

Metric 3: KYC Compliance Rate — Live Dashboard Metric

The percentage of active customers with current KYC is a fundamental compliance metric:

-- Latest KYC status per customer (using window function over event stream)
CREATE MATERIALIZED VIEW customer_kyc_status AS
SELECT DISTINCT ON (customer_id)
    customer_id,
    event_type      AS current_status,
    kyc_tier,
    event_time      AS status_as_of
FROM kyc_events
ORDER BY customer_id, event_time DESC;

-- Active customers in the last 30 days
CREATE MATERIALIZED VIEW active_customers_30d AS
SELECT DISTINCT customer_id
FROM payments
WHERE event_time > NOW() - INTERVAL '30 days';

-- KYC compliance rate across active customers
CREATE MATERIALIZED VIEW kyc_compliance_rate AS
SELECT
    COUNT(*)                                             AS active_customers,
    SUM(CASE WHEN k.current_status = 'kyc_approved'
             THEN 1 ELSE 0 END)                          AS kyc_compliant,
    SUM(CASE WHEN k.current_status IN ('kyc_failed', 'kyc_expired')
             THEN 1 ELSE 0 END)                          AS kyc_non_compliant,
    SUM(CASE WHEN k.current_status IS NULL
             THEN 1 ELSE 0 END)                          AS kyc_unknown,
    ROUND(
        SUM(CASE WHEN k.current_status = 'kyc_approved' THEN 1 ELSE 0 END)
        * 100.0 / COUNT(*), 2
    )                                                    AS compliance_rate_pct
FROM active_customers_30d a
LEFT JOIN customer_kyc_status k ON a.customer_id = k.customer_id;

This single query, run against the live materialized view, gives compliance officers the current KYC compliance rate — updated within seconds of any KYC event or new payment.

Metric 4: International Payment Compliance

Many regulatory frameworks require enhanced due diligence or reporting for cross-border payments above certain thresholds:

CREATE MATERIALIZED VIEW international_payment_monitoring AS
SELECT
    p.customer_id,
    p.destination_country,
    COUNT(*)                         AS intl_payment_count_30d,
    SUM(p.amount)                    AS intl_payment_volume_30d,
    MAX(p.amount)                    AS largest_single_intl_payment,
    l.international_limit,
    SUM(p.amount) / NULLIF(l.international_limit, 0) * 100
                                     AS pct_intl_limit_used
FROM payments p
LEFT JOIN customer_limits l ON p.customer_id = l.customer_id
WHERE p.event_time > NOW() - INTERVAL '30 days'
  AND p.payment_type = 'international'
  AND p.processing_status NOT IN ('failed', 'reversed')
GROUP BY p.customer_id, p.destination_country, l.international_limit;

Metric 5: Operational Compliance Dashboard Aggregates

Roll-up metrics for the compliance team's overview dashboard:

CREATE MATERIALIZED VIEW compliance_dashboard_summary AS
SELECT
    DATE_TRUNC('hour', NOW())          AS snapshot_time,

    -- Today's payment volume
    (SELECT SUM(amount) FROM payments
     WHERE event_time > DATE_TRUNC('day', NOW())
       AND processing_status = 'cleared')                AS total_cleared_volume_today,

    -- Count of limit breaches today
    (SELECT COUNT(*) FROM limit_breach_monitoring
     WHERE limit_status = 'DAILY_LIMIT_EXCEEDED')        AS daily_limit_breaches,

    -- Count of customers approaching limits
    (SELECT COUNT(*) FROM limit_breach_monitoring
     WHERE limit_status = 'APPROACHING_LIMIT')           AS approaching_limit_count,

    -- KYC compliance rate (from live view)
    (SELECT compliance_rate_pct FROM kyc_compliance_rate) AS kyc_compliance_pct,

    -- Failed payments rate (last 1 hour)
    (SELECT ROUND(
        SUM(CASE WHEN processing_status = 'failed' THEN 1 ELSE 0 END)
        * 100.0 / NULLIF(COUNT(*), 0), 2)
     FROM payments
     WHERE event_time > NOW() - INTERVAL '1 hour')       AS payment_failure_rate_1h;

Connecting Compliance Dashboards

Because RisingWave speaks the PostgreSQL wire protocol, any tool that connects to Postgres connects to RisingWave directly:

Grafana:

Data source type: PostgreSQL
Host: risingwave-host:4566
Database: dev
SSL mode: disable

Add a dashboard panel querying compliance_dashboard_summary. Grafana auto-refreshes every N seconds. The data served is always the latest materialized view state.

Metabase, Superset, Redash: Same PostgreSQL connection configuration. No special driver needed.

Custom compliance UI (direct query):

import psycopg2

conn = psycopg2.connect(
    host="risingwave-host",
    port=4566,
    dbname="dev",
    user="root"
)

cur = conn.cursor()
cur.execute("""
    SELECT snapshot_time, total_cleared_volume_today,
           daily_limit_breaches, kyc_compliance_pct
    FROM compliance_dashboard_summary
""")
rows = cur.fetchall()

No ORM, no special client library. Standard PostgreSQL driver.


Alert Routing for Compliance Violations

Critical compliance metrics should push alerts, not just update dashboards:

-- Sink limit breaches to your alerting system
CREATE SINK compliance_limit_breach_alerts
FROM limit_breach_monitoring
WHERE limit_status IN ('DAILY_LIMIT_EXCEEDED', 'MONTHLY_LIMIT_EXCEEDED')
WITH (
    connector = 'kafka',
    topic = 'compliance-alerts',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

-- Alert when KYC compliance rate drops below threshold
CREATE SINK kyc_rate_alert_sink
FROM kyc_compliance_rate
WHERE compliance_rate_pct < 95.0
WITH (
    connector = 'kafka',
    topic = 'compliance-alerts',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

Architecture Comparison: Batch vs. Streaming Compliance Stack

DimensionBatch ETL + WarehouseStreaming Database (RisingWave)
Dashboard freshnessHours (nightly ETL)Seconds
Stack componentsKafka + ETL + Warehouse + dbt + BIKafka + RisingWave + BI
Compliance alert latencyHours to next batchSub-second
Infrastructure costHigh (warehouse + ETL compute)Lower (S3-backed, no ETL compute)
Rule/metric change cycleDays (dbt model + deployment)Seconds (ALTER VIEW)
Historical query capabilityFull historical warehouseRolling windows + S3 archive
BI tool compatibilityNative warehouse connectorsPostgreSQL protocol (universal)
Regulatory audit trailWarehouse snapshotsAppend-only changelog + S3

The most important practical difference is metric freshness. For regulatory compliance, the difference between "yesterday's data" and "current data" is not just an engineering convenience — it is a material compliance posture difference.


Handling Regulatory Reporting Periods

Many compliance reports require data for specific calendar periods: daily position reports, monthly suspicious activity summaries, quarterly concentration reports. With a streaming database, these are point-in-time queries against the materialized state:

-- Monthly payment volume by type (current month)
SELECT
    payment_type,
    COUNT(*)     AS payment_count,
    SUM(amount)  AS total_volume,
    currency
FROM payments
WHERE event_time >= DATE_TRUNC('month', NOW())
  AND event_time < DATE_TRUNC('month', NOW()) + INTERVAL '1 month'
  AND processing_status = 'cleared'
GROUP BY payment_type, currency
ORDER BY total_volume DESC;

For prior-period reporting (last month, last quarter), RisingWave's time-travel queries and S3-backed storage allow you to reconstruct historical state. For immutable regulatory records, sink materialized view snapshots to S3 at end-of-period — a one-line CREATE SINK to an S3 connector.


FAQ

Does this approach meet audit and regulatory recordkeeping requirements? The streaming database itself is the live computation layer. For immutable audit records, configure sinks to write all compliance events and alerts to S3 in Parquet or JSON format. This creates an append-only, tamper-evident record that satisfies most regulatory recordkeeping requirements. Consult your compliance counsel for jurisdiction-specific requirements.

Can we backfill historical data into RisingWave for initial dashboard population? Yes. RisingWave supports scan.startup.mode = 'earliest' on Kafka sources, which replays the full Kafka topic history. If data predates your Kafka retention window, you can seed reference tables via SQL INSERT statements or batch ingestion from your data warehouse.

How does this integrate with our existing data warehouse? RisingWave can sink live compliance metrics into your existing warehouse via JDBC or Kafka, keeping the warehouse as the system of record for historical analysis. You gain real-time compliance monitoring without migrating away from your existing warehouse investment.

What happens to the streaming state if RisingWave has an outage? RisingWave checkpoints its materialized view state to S3 continuously. On restart, processing resumes from the last checkpoint. The recovery window is typically under a minute, and the materialized views are populated from the checkpoint state without requiring a full replay.

Can compliance rules be version-controlled and audited? Because all compliance logic is SQL, it lives in your version control system like any other code. View definitions can be reviewed, diffed, and approved through standard pull request workflows. A change to a compliance threshold is a one-line SQL diff — auditable and reversible.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.