Apache Iceberg for Financial Data: Building a Streaming Lakehouse

Apache Iceberg for Financial Data: Building a Streaming Lakehouse

Financial institutions can build a streaming lakehouse with Apache Iceberg and RisingWave to achieve sub-second fraud detection, real-time risk aggregation, and immutable audit trails — all on cost-effective object storage. Iceberg's ACID transactions satisfy regulatory requirements for data immutability, while RisingWave's continuous SQL keeps risk models current without batch job delays.

The Financial Data Challenge

Financial data has uniquely demanding requirements that expose the weaknesses of traditional architectures:

  • Latency: Fraud signals must trigger within milliseconds of a transaction. Hourly batch jobs are useless.
  • Auditability: Regulators require complete, immutable records. Every transaction must be recoverable exactly as it occurred.
  • Volume: Large banks process millions of transactions per hour. Storage costs compound quickly.
  • Consistency: Risk calculations must reflect the same underlying data — no phantom reads, no dirty data.

Traditional approaches force a painful tradeoff: either run a fast but expensive warehouse (Snowflake, Redshift) or accept staleness with a cheap data lake. Apache Iceberg's snapshot isolation and RisingWave's streaming computation eliminate this tradeoff.

Architecture for Financial Streaming Lakehouse

ComponentTechnologyPurpose
Transaction feedKafka (MSK/Confluent)Raw transaction events
Reference dataPostgreSQL CDCCustomer profiles, limits
Stream processorRisingWaveContinuous risk calculations
Lakehouse formatApache IcebergACID-compliant storage
Object storageS3 / GCSCheap durable storage
Query engineTrino / AthenaRegulatory reporting
AlertingKafka sinkReal-time fraud alerts

Ingesting Transaction Data

Start by connecting RisingWave to the transaction feed and customer reference data:

-- Raw transaction stream from Kafka
CREATE SOURCE transactions_raw (
    txn_id          VARCHAR PRIMARY KEY,
    account_id      BIGINT,
    merchant_id     BIGINT,
    amount          NUMERIC(18, 4),
    currency        CHAR(3),
    channel         VARCHAR,   -- 'online', 'pos', 'atm'
    country_code    CHAR(2),
    txn_time        TIMESTAMPTZ
)
WITH (
    connector        = 'kafka',
    topic            = 'financial.transactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE JSON;

-- Customer reference data via CDC
CREATE SOURCE customer_profiles (
    account_id       BIGINT PRIMARY KEY,
    customer_tier    VARCHAR,
    home_country     CHAR(2),
    daily_limit      NUMERIC(18, 4),
    is_active        BOOLEAN,
    updated_at       TIMESTAMPTZ
)
WITH (
    connector      = 'postgres-cdc',
    hostname       = 'postgres.internal',
    port           = '5432',
    username       = 'cdc_user',
    password       = 'secret',
    database.name  = 'customer_db',
    table.name     = 'accounts'
)
FORMAT DEBEZIUM ENCODE JSON;

Real-Time Fraud Signal Detection

Build a materialized view that computes per-account rolling transaction velocity — a key fraud signal:

CREATE MATERIALIZED VIEW account_tx_velocity AS
SELECT
    account_id,
    window_start,
    window_end,
    COUNT(*)                          AS txn_count_1hr,
    SUM(amount)                       AS total_amount_1hr,
    COUNT(DISTINCT merchant_id)       AS unique_merchants,
    COUNT(DISTINCT country_code)      AS unique_countries,
    MAX(amount)                       AS max_single_txn,
    ARRAY_AGG(channel ORDER BY txn_time) AS channel_sequence
FROM HOP(transactions_raw, txn_time, INTERVAL '5 MINUTES', INTERVAL '1 HOUR')
GROUP BY account_id, window_start, window_end;

HOP() is a sliding window — each event appears in multiple overlapping windows. This gives us a continuously updated "last 1 hour" view for every account, refreshed every 5 minutes.

Now join with customer profiles to enrich with limits and flag anomalies:

CREATE MATERIALIZED VIEW fraud_signals AS
SELECT
    v.account_id,
    v.window_end                                    AS signal_time,
    v.txn_count_1hr,
    v.total_amount_1hr,
    v.unique_countries,
    c.daily_limit,
    c.home_country,
    CASE
        WHEN v.unique_countries > 2                          THEN 'MULTI_COUNTRY'
        WHEN v.total_amount_1hr > c.daily_limit * 0.8       THEN 'NEAR_LIMIT'
        WHEN v.txn_count_1hr > 30                           THEN 'HIGH_VELOCITY'
        ELSE 'NORMAL'
    END AS risk_label
FROM account_tx_velocity v
JOIN customer_profiles FOR SYSTEM_TIME AS OF v.window_end c
    ON v.account_id = c.account_id;

The FOR SYSTEM_TIME AS OF clause is a temporal join — it looks up the customer profile as it existed at window_end, not the current state. This is critical for audit reproducibility: replaying the stream with the same inputs produces identical fraud labels.

Sinking to Iceberg for Audit and Reporting

Write both the velocity metrics and fraud signals to Iceberg for long-term retention and regulatory reporting:

-- Immutable audit trail — append-only
CREATE SINK transactions_audit_sink AS
SELECT
    txn_id, account_id, merchant_id,
    amount, currency, channel,
    country_code, txn_time
FROM transactions_raw
WITH (
    connector      = 'iceberg',
    type           = 'append-only',
    catalog.type   = 'rest',
    catalog.uri    = 'http://iceberg-catalog:8181',
    warehouse.path = 's3://financial-lakehouse/warehouse',
    s3.region      = 'us-east-1',
    database.name  = 'audit',
    table.name     = 'transactions'
);

-- Fraud signals — upsert (signals can be revised)
CREATE SINK fraud_signals_sink AS
SELECT * FROM fraud_signals
WITH (
    connector      = 'iceberg',
    type           = 'upsert',
    primary_key    = 'account_id,signal_time',
    catalog.type   = 'rest',
    catalog.uri    = 'http://iceberg-catalog:8181',
    warehouse.path = 's3://financial-lakehouse/warehouse',
    s3.region      = 'us-east-1',
    database.name  = 'risk',
    table.name     = 'fraud_signals'
);

The transactions table uses append-only mode — financial records must be immutable. The fraud signals table uses upsert because risk labels can be revised as more context arrives.

Regulatory Compliance with Iceberg Time Travel

Iceberg's snapshot history is the compliance team's best friend. Every SELECT against an Iceberg table can be scoped to a specific snapshot ID or timestamp:

-- Query data as of end of Q4 2025 (in Trino)
SELECT COUNT(*) FROM financial.audit.transactions
FOR TIMESTAMP AS OF TIMESTAMP '2025-12-31 23:59:59'
WHERE country_code != home_country;

This enables exact point-in-time reconstruction of any dataset — essential for MiFID II, SOX, and FINRA audits. The Iceberg snapshot log provides an immutable chain of custody from ingestion to query.

FAQ

Q: How do we ensure transactions are never lost or duplicated in the Iceberg audit table? A: RisingWave's exactly-once sink protocol (two-phase commit with Iceberg's atomic snapshot) guarantees no duplicates. The append-only mode combined with primary key deduplication at the source provides a complete, non-duplicated audit trail.

Q: Can we use Iceberg for both real-time queries and historical batch reporting? A: Yes. Iceberg's time travel lets Trino and Spark query historical snapshots. RisingWave's lakehouse queries (v2.8+) allow real-time validation against the same tables your batch reports use.

Q: How do we handle PII and GDPR right-to-erasure in an immutable Iceberg table? A: GDPR erasure in Iceberg is handled through rewrite operations that produce new data files with PII redacted, plus expiration of old snapshots containing the original data. This is a planned maintenance task, not a streaming concern.

Q: What is the typical end-to-end latency from transaction to Iceberg? A: With a 30-second checkpoint interval, end-to-end latency is typically 30–90 seconds. For sub-second fraud alerting, use a parallel Kafka sink from RisingWave alongside the Iceberg sink for the audit trail.

Q: Can RisingWave handle multi-currency normalization before writing to Iceberg? A: Yes. Create a reference table with exchange rates and join it in your materialized view using FOR SYSTEM_TIME AS OF for point-in-time rate lookups.

Get Started

Build your financial streaming lakehouse with RisingWave and Iceberg:

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.