Open Banking Data Aggregation with RisingWave

Open Banking Data Aggregation with RisingWave

Open banking data aggregation with RisingWave means ingesting transaction streams, balance updates, and consent events from multiple bank connectors into a single streaming database, then using materialized views to produce unified customer financial profiles in real time—without polling APIs on a schedule or running nightly batch jobs.

The Open Banking Data Problem

Open banking regulations (PSD2 in Europe, CDR in Australia, Section 1033 in the US) require banks to expose customer data through standardized APIs. Aggregators—personal finance apps, lending platforms, accounting tools—consume these APIs to build a unified view of a user's finances.

The challenge: bank APIs are pull-based and rate-limited. Aggregators traditionally poll on schedules (hourly, daily), cache responses, and run batch reconciliation jobs. This creates stale data, complex cache invalidation logic, and a pipeline that breaks whenever a bank changes its API format.

A better model uses event-driven feeds where available (many open banking platforms now support webhooks or Kafka-compatible streams), feeds everything into RisingWave, and lets materialized views do the aggregation work continuously.

Ingesting Multi-Bank Transaction Feeds

-- Unified transaction stream from multiple bank connectors
CREATE SOURCE bank_transactions (
    event_id            VARCHAR,
    bank_id             VARCHAR,
    account_id          VARCHAR,
    user_id             VARCHAR,
    transaction_id      VARCHAR,
    amount              DECIMAL(18,2),
    currency            VARCHAR(3),
    transaction_type    VARCHAR,    -- 'DEBIT', 'CREDIT'
    merchant_category   VARCHAR,
    description         VARCHAR,
    booked_at           TIMESTAMPTZ,
    ingested_at         TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'open-banking-transactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

-- Account balance snapshots pushed by bank webhooks
CREATE SOURCE account_balances (
    account_id          VARCHAR,
    user_id             VARCHAR,
    bank_id             VARCHAR,
    available_balance   DECIMAL(18,2),
    current_balance     DECIMAL(18,2),
    currency            VARCHAR(3),
    snapshot_time       TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'account-balance-snapshots',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Building the Aggregated Customer View

With transactions flowing in, a materialized view can maintain a rolling 30-day spending summary per user per category—updated with every new transaction:

-- Spending summary by user and merchant category (rolling 30 days)
CREATE MATERIALIZED VIEW user_spending_summary AS
SELECT
    user_id,
    merchant_category,
    COUNT(*)                                        AS transaction_count,
    SUM(amount) FILTER (WHERE transaction_type = 'DEBIT')   AS total_spent,
    SUM(amount) FILTER (WHERE transaction_type = 'CREDIT')  AS total_received,
    AVG(amount) FILTER (WHERE transaction_type = 'DEBIT')   AS avg_transaction,
    MAX(booked_at)                                  AS last_transaction_time
FROM bank_transactions
WHERE booked_at >= NOW() - INTERVAL '30 days'
GROUP BY user_id, merchant_category;

-- Latest balance per account (using max snapshot_time per account)
CREATE MATERIALIZED VIEW latest_account_balances AS
SELECT
    ab.user_id,
    ab.bank_id,
    ab.account_id,
    ab.available_balance,
    ab.current_balance,
    ab.currency,
    ab.snapshot_time
FROM account_balances ab
WHERE ab.snapshot_time = (
    SELECT MAX(snapshot_time)
    FROM account_balances ab2
    WHERE ab2.account_id = ab.account_id
);

-- Unified customer financial profile
CREATE MATERIALIZED VIEW customer_financial_profile AS
SELECT
    lab.user_id,
    SUM(lab.available_balance)              AS total_available,
    SUM(lab.current_balance)                AS total_current,
    COUNT(DISTINCT lab.account_id)          AS linked_accounts,
    COUNT(DISTINCT lab.bank_id)             AS linked_banks,
    uss.merchant_category,
    uss.total_spent,
    uss.transaction_count
FROM latest_account_balances lab
LEFT JOIN user_spending_summary uss
    ON lab.user_id = uss.user_id
GROUP BY lab.user_id, uss.merchant_category, uss.total_spent, uss.transaction_count;

Open banking requires active consent management. Aggregators must stop processing data when a user revokes access. RisingWave can handle consent events as a source and filter transaction processing accordingly:

-- Reference table: active consents (updated via CDC from consent service)
CREATE TABLE active_consents (
    consent_id      VARCHAR PRIMARY KEY,
    user_id         VARCHAR,
    bank_id         VARCHAR,
    scope           VARCHAR,
    granted_at      TIMESTAMPTZ,
    expires_at      TIMESTAMPTZ
);

-- Only aggregate transactions for users with active consent
CREATE MATERIALIZED VIEW consented_spending_summary AS
SELECT
    bt.user_id,
    bt.bank_id,
    bt.merchant_category,
    SUM(bt.amount) FILTER (WHERE bt.transaction_type = 'DEBIT') AS total_spent,
    COUNT(*)                                                     AS tx_count
FROM bank_transactions bt
JOIN active_consents ac
    ON bt.user_id = ac.user_id
    AND bt.bank_id = ac.bank_id
    AND ac.expires_at > NOW()
WHERE bt.booked_at >= NOW() - INTERVAL '90 days'
GROUP BY bt.user_id, bt.bank_id, bt.merchant_category;

Windowed Anomaly Detection

Personal finance apps benefit from detecting unusual spending within short windows:

-- Hourly transaction volume per user (for anomaly alerting)
CREATE MATERIALIZED VIEW hourly_tx_volume AS
SELECT
    window_start,
    window_end,
    user_id,
    COUNT(*)                AS tx_count,
    SUM(amount)             AS total_amount,
    MAX(amount)             AS max_single_tx
FROM TUMBLE(bank_transactions, booked_at, INTERVAL '1 HOUR')
GROUP BY window_start, window_end, user_id;

Comparison: Polling vs. Streaming Aggregation

DimensionPolling-Based AggregationStreaming with RisingWave
Data freshnessHours (batch) or minutes (frequent polling)Seconds to milliseconds
API rate limit riskHigh (constant polling)Low (event-driven ingestion)
Consent enforcementManual filter at query timeBuilt into materialized view joins
InfrastructureScheduler + cache + ETL + warehouseSingle streaming database
Spending analytics latencyNext batch runImmediate on new transaction
ScalabilityScales with polling frequency limitsScales with Kafka partition count

FAQ

Q: Most bank APIs are REST, not Kafka. How do we get events into RisingWave? A: Use an integration layer—tools like Airbyte, Meroxa, or a custom webhook receiver—to publish bank API responses to Kafka topics. RisingWave then ingests from Kafka. Alternatively, open banking platforms like TrueLayer, Plaid, and Nordigen offer webhook push notifications that can feed a Kafka topic via a lightweight adapter.

Q: How does RisingWave handle currency conversion for multi-currency accounts? A: Maintain a CREATE TABLE fx_rates with currency pairs and rates (updated via CDC or a scheduled upsert). Join bank_transactions with fx_rates in your materialized views to normalize all amounts to a base currency.

Q: Can we use RisingWave as the backend for a real-time personal finance dashboard? A: Yes. Materialized views are queryable via the PostgreSQL wire protocol. A Next.js or React frontend can query customer_financial_profile directly through a Postgres client. For live updates, a polling interval of 1-2 seconds on a pre-computed view is extremely cheap.

Q: What if a bank sends duplicate transactions (common with webhook retries)? A: Add a deduplication step. Use a materialized view that groups by transaction_id and takes MIN(ingested_at) to deduplicate before aggregating. Or use RisingWave's CREATE TABLE with a primary key to upsert transactions by transaction_id.

Q: How do we handle GDPR right-to-erasure requests? A: Mark deleted users in a reference table and filter them out of all materialized views. For physical deletion, drop and recreate views after purging the underlying source data from Kafka (via retention policies) and any CDC source tables.


Get Started

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.