Open banking data aggregation with RisingWave means ingesting transaction streams, balance updates, and consent events from multiple bank connectors into a single streaming database, then using materialized views to produce unified customer financial profiles in real time—without polling APIs on a schedule or running nightly batch jobs.
The Open Banking Data Problem
Open banking regulations (PSD2 in Europe, CDR in Australia, Section 1033 in the US) require banks to expose customer data through standardized APIs. Aggregators—personal finance apps, lending platforms, accounting tools—consume these APIs to build a unified view of a user's finances.
The challenge: bank APIs are pull-based and rate-limited. Aggregators traditionally poll on schedules (hourly, daily), cache responses, and run batch reconciliation jobs. This creates stale data, complex cache invalidation logic, and a pipeline that breaks whenever a bank changes its API format.
A better model uses event-driven feeds where available (many open banking platforms now support webhooks or Kafka-compatible streams), feeds everything into RisingWave, and lets materialized views do the aggregation work continuously.
Ingesting Multi-Bank Transaction Feeds
-- Unified transaction stream from multiple bank connectors
CREATE SOURCE bank_transactions (
event_id VARCHAR,
bank_id VARCHAR,
account_id VARCHAR,
user_id VARCHAR,
transaction_id VARCHAR,
amount DECIMAL(18,2),
currency VARCHAR(3),
transaction_type VARCHAR, -- 'DEBIT', 'CREDIT'
merchant_category VARCHAR,
description VARCHAR,
booked_at TIMESTAMPTZ,
ingested_at TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'open-banking-transactions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
-- Account balance snapshots pushed by bank webhooks
CREATE SOURCE account_balances (
account_id VARCHAR,
user_id VARCHAR,
bank_id VARCHAR,
available_balance DECIMAL(18,2),
current_balance DECIMAL(18,2),
currency VARCHAR(3),
snapshot_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'account-balance-snapshots',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Building the Aggregated Customer View
With transactions flowing in, a materialized view can maintain a rolling 30-day spending summary per user per category—updated with every new transaction:
-- Spending summary by user and merchant category (rolling 30 days)
CREATE MATERIALIZED VIEW user_spending_summary AS
SELECT
user_id,
merchant_category,
COUNT(*) AS transaction_count,
SUM(amount) FILTER (WHERE transaction_type = 'DEBIT') AS total_spent,
SUM(amount) FILTER (WHERE transaction_type = 'CREDIT') AS total_received,
AVG(amount) FILTER (WHERE transaction_type = 'DEBIT') AS avg_transaction,
MAX(booked_at) AS last_transaction_time
FROM bank_transactions
WHERE booked_at >= NOW() - INTERVAL '30 days'
GROUP BY user_id, merchant_category;
-- Latest balance per account (using max snapshot_time per account)
CREATE MATERIALIZED VIEW latest_account_balances AS
SELECT
ab.user_id,
ab.bank_id,
ab.account_id,
ab.available_balance,
ab.current_balance,
ab.currency,
ab.snapshot_time
FROM account_balances ab
WHERE ab.snapshot_time = (
SELECT MAX(snapshot_time)
FROM account_balances ab2
WHERE ab2.account_id = ab.account_id
);
-- Unified customer financial profile
CREATE MATERIALIZED VIEW customer_financial_profile AS
SELECT
lab.user_id,
SUM(lab.available_balance) AS total_available,
SUM(lab.current_balance) AS total_current,
COUNT(DISTINCT lab.account_id) AS linked_accounts,
COUNT(DISTINCT lab.bank_id) AS linked_banks,
uss.merchant_category,
uss.total_spent,
uss.transaction_count
FROM latest_account_balances lab
LEFT JOIN user_spending_summary uss
ON lab.user_id = uss.user_id
GROUP BY lab.user_id, uss.merchant_category, uss.total_spent, uss.transaction_count;
Consent and Compliance Tracking
Open banking requires active consent management. Aggregators must stop processing data when a user revokes access. RisingWave can handle consent events as a source and filter transaction processing accordingly:
-- Reference table: active consents (updated via CDC from consent service)
CREATE TABLE active_consents (
consent_id VARCHAR PRIMARY KEY,
user_id VARCHAR,
bank_id VARCHAR,
scope VARCHAR,
granted_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ
);
-- Only aggregate transactions for users with active consent
CREATE MATERIALIZED VIEW consented_spending_summary AS
SELECT
bt.user_id,
bt.bank_id,
bt.merchant_category,
SUM(bt.amount) FILTER (WHERE bt.transaction_type = 'DEBIT') AS total_spent,
COUNT(*) AS tx_count
FROM bank_transactions bt
JOIN active_consents ac
ON bt.user_id = ac.user_id
AND bt.bank_id = ac.bank_id
AND ac.expires_at > NOW()
WHERE bt.booked_at >= NOW() - INTERVAL '90 days'
GROUP BY bt.user_id, bt.bank_id, bt.merchant_category;
Windowed Anomaly Detection
Personal finance apps benefit from detecting unusual spending within short windows:
-- Hourly transaction volume per user (for anomaly alerting)
CREATE MATERIALIZED VIEW hourly_tx_volume AS
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS tx_count,
SUM(amount) AS total_amount,
MAX(amount) AS max_single_tx
FROM TUMBLE(bank_transactions, booked_at, INTERVAL '1 HOUR')
GROUP BY window_start, window_end, user_id;
Comparison: Polling vs. Streaming Aggregation
| Dimension | Polling-Based Aggregation | Streaming with RisingWave |
| Data freshness | Hours (batch) or minutes (frequent polling) | Seconds to milliseconds |
| API rate limit risk | High (constant polling) | Low (event-driven ingestion) |
| Consent enforcement | Manual filter at query time | Built into materialized view joins |
| Infrastructure | Scheduler + cache + ETL + warehouse | Single streaming database |
| Spending analytics latency | Next batch run | Immediate on new transaction |
| Scalability | Scales with polling frequency limits | Scales with Kafka partition count |
FAQ
Q: Most bank APIs are REST, not Kafka. How do we get events into RisingWave? A: Use an integration layer—tools like Airbyte, Meroxa, or a custom webhook receiver—to publish bank API responses to Kafka topics. RisingWave then ingests from Kafka. Alternatively, open banking platforms like TrueLayer, Plaid, and Nordigen offer webhook push notifications that can feed a Kafka topic via a lightweight adapter.
Q: How does RisingWave handle currency conversion for multi-currency accounts?
A: Maintain a CREATE TABLE fx_rates with currency pairs and rates (updated via CDC or a scheduled upsert). Join bank_transactions with fx_rates in your materialized views to normalize all amounts to a base currency.
Q: Can we use RisingWave as the backend for a real-time personal finance dashboard?
A: Yes. Materialized views are queryable via the PostgreSQL wire protocol. A Next.js or React frontend can query customer_financial_profile directly through a Postgres client. For live updates, a polling interval of 1-2 seconds on a pre-computed view is extremely cheap.
Q: What if a bank sends duplicate transactions (common with webhook retries)?
A: Add a deduplication step. Use a materialized view that groups by transaction_id and takes MIN(ingested_at) to deduplicate before aggregating. Or use RisingWave's CREATE TABLE with a primary key to upsert transactions by transaction_id.
Q: How do we handle GDPR right-to-erasure requests? A: Mark deleted users in a reference table and filter them out of all materialized views. For physical deletion, drop and recreate views after purging the underlying source data from Kafka (via retention policies) and any CDC source tables.
Get Started
- Explore the RisingWave documentation to set up your first open banking aggregation pipeline.
- Join the RisingWave Slack community to connect with engineers building similar systems.

