Real-Time Customer 360 for Banking with Streaming SQL

Real-Time Customer 360 for Banking with Streaming SQL

A real-time customer 360 for banking with streaming SQL means maintaining a continuously updated unified customer profile—combining transactions, product holdings, service interactions, and risk signals—as a set of materialized views that any downstream system can query via a standard PostgreSQL interface, without batch jobs or stale caches.

What Is Customer 360 in Banking and Why Does It Need to Be Real-Time?

Customer 360 is the banking industry's term for a unified, comprehensive view of a customer across all products, channels, and interactions. A full Customer 360 answers questions like: What products does this customer hold? What is their credit utilization? Have they called the contact center recently? Are they at risk of churning?

Traditional Customer 360 implementations are data warehouse projects—ETL pipelines that consolidate data nightly into a customer master table. The problem is that real-time use cases—next-best-offer at the ATM, fraud check during a card transaction, relationship manager dashboard during a live call—require profile freshness measured in seconds, not hours.

Streaming SQL changes the architecture: instead of moving data to a warehouse, you maintain a set of incrementally updated materialized views over streaming events. The profile is always current because RisingWave updates it as each event arrives.

Ingesting Multi-Source Customer Data

-- Transaction stream from core banking
CREATE SOURCE banking_transactions (
    transaction_id      VARCHAR,
    customer_id         VARCHAR,
    product_id          VARCHAR,
    product_type        VARCHAR,    -- 'CHECKING', 'SAVINGS', 'MORTGAGE', 'CARD', 'LOAN'
    transaction_type    VARCHAR,
    amount              DECIMAL(18,2),
    currency            VARCHAR(3),
    merchant_category   VARCHAR,
    channel             VARCHAR,    -- 'BRANCH', 'ATM', 'MOBILE', 'WEB', 'POS'
    tx_time             TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'banking-transactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

-- Customer service interactions via CDC
CREATE SOURCE service_interactions (
    interaction_id      VARCHAR,
    customer_id         VARCHAR,
    channel             VARCHAR,    -- 'CALL_CENTER', 'CHAT', 'BRANCH', 'EMAIL'
    reason_code         VARCHAR,
    sentiment_score     DECIMAL(3,2),
    resolved            BOOLEAN,
    interaction_time    TIMESTAMPTZ
) WITH (
    connector = 'postgres-cdc',
    hostname = 'crm-db.internal',
    port = '5432',
    username = 'rw_reader',
    password = 'secret',
    database.name = 'crm',
    schema.name = 'public',
    table.name = 'interactions'
) FORMAT DEBEZIUM ENCODE JSON;

Building Core Customer 360 Views

-- Product holdings reference table
CREATE TABLE product_holdings (
    customer_id         VARCHAR,
    product_id          VARCHAR,
    product_type        VARCHAR,
    product_status      VARCHAR,
    opened_date         DATE,
    credit_limit        DECIMAL(18,2),
    outstanding_balance DECIMAL(18,2),
    PRIMARY KEY (customer_id, product_id)
);

-- 90-day transaction behavior profile per customer
CREATE MATERIALIZED VIEW customer_transaction_profile AS
SELECT
    customer_id,
    COUNT(*)                                                        AS tx_count_90d,
    SUM(ABS(amount))                                                AS total_activity_volume,
    AVG(amount) FILTER (WHERE amount < 0)                          AS avg_debit_amount,
    COUNT(DISTINCT product_type)                                    AS active_product_types,
    COUNT(DISTINCT channel)                                         AS channels_used,
    COUNT(*) FILTER (WHERE channel = 'MOBILE')                     AS mobile_tx_count,
    COUNT(*) FILTER (WHERE channel = 'BRANCH')                     AS branch_tx_count,
    MAX(tx_time)                                                    AS last_activity_time,
    COUNT(DISTINCT merchant_category)                               AS spending_diversity
FROM banking_transactions
WHERE tx_time >= NOW() - INTERVAL '90 days'
GROUP BY customer_id;

-- Recent service interaction summary
CREATE MATERIALIZED VIEW customer_service_profile AS
SELECT
    customer_id,
    COUNT(*) FILTER (WHERE interaction_time >= NOW() - INTERVAL '30 days')  AS contacts_30d,
    COUNT(*) FILTER (WHERE interaction_time >= NOW() - INTERVAL '90 days')  AS contacts_90d,
    AVG(sentiment_score) FILTER
        (WHERE interaction_time >= NOW() - INTERVAL '30 days')              AS avg_recent_sentiment,
    COUNT(*) FILTER (WHERE NOT resolved
        AND interaction_time >= NOW() - INTERVAL '7 days')                  AS open_issues_7d,
    MAX(interaction_time)                                                   AS last_contact_time,
    COUNT(*) FILTER (WHERE reason_code = 'COMPLAINT'
        AND interaction_time >= NOW() - INTERVAL '90 days')                 AS complaints_90d
FROM service_interactions
GROUP BY customer_id;

-- Unified customer 360 view
CREATE MATERIALIZED VIEW customer_360 AS
SELECT
    ph.customer_id,
    COUNT(DISTINCT ph.product_id)                                   AS total_products,
    COUNT(DISTINCT ph.product_type)                                 AS product_diversity,
    SUM(ph.credit_limit)                                            AS total_credit_limit,
    SUM(ph.outstanding_balance)                                     AS total_outstanding,
    CASE WHEN SUM(ph.credit_limit) > 0
         THEN SUM(ph.outstanding_balance) / SUM(ph.credit_limit)
         ELSE 0 END                                                 AS credit_utilization,
    ctp.tx_count_90d,
    ctp.total_activity_volume,
    ctp.channels_used,
    ctp.mobile_tx_count,
    ctp.last_activity_time,
    csp.contacts_30d,
    csp.avg_recent_sentiment,
    csp.open_issues_7d,
    csp.complaints_90d,
    -- Engagement score: higher is better
    (ctp.mobile_tx_count * 2 + ctp.tx_count_90d +
     COUNT(DISTINCT ph.product_type) * 5)::DECIMAL                 AS engagement_score
FROM product_holdings ph
LEFT JOIN customer_transaction_profile ctp ON ph.customer_id = ctp.customer_id
LEFT JOIN customer_service_profile csp ON ph.customer_id = csp.customer_id
GROUP BY ph.customer_id, ctp.tx_count_90d, ctp.total_activity_volume,
         ctp.channels_used, ctp.mobile_tx_count, ctp.last_activity_time,
         csp.contacts_30d, csp.avg_recent_sentiment, csp.open_issues_7d, csp.complaints_90d;

Next-Best-Offer Signal

-- Identify customers eligible for product cross-sell
CREATE MATERIALIZED VIEW next_best_offer_signals AS
SELECT
    c.customer_id,
    c.total_products,
    c.credit_utilization,
    c.engagement_score,
    c.mobile_tx_count,
    CASE
        WHEN c.total_products = 1
         AND c.engagement_score > 50 THEN 'SAVINGS_ACCOUNT'
        WHEN c.credit_utilization > 0.7
         AND c.total_products < 3 THEN 'BALANCE_TRANSFER'
        WHEN c.mobile_tx_count > 30
         AND c.total_products < 4 THEN 'PREMIUM_CARD'
        ELSE NULL
    END AS recommended_product
FROM customer_360 c
WHERE c.customer_id IS NOT NULL;

Comparison: Batch vs. Streaming Customer 360

DimensionBatch (Nightly ETL)Streaming with RisingWave
Profile freshness12–24 hours staleSeconds
Next-best-offer accuracyBased on yesterday's behaviorBased on today's behavior
Fraud check supportPre-computed risk tier onlyLive signals at transaction time
Contact center useOld profile on screenLive profile during call
InfrastructureETL + data warehouse + serving layerStreaming SQL + PostgreSQL interface
Churn detection lagDays to weeksHours to minutes

FAQ

Q: How do we serve the customer 360 view to multiple downstream applications simultaneously? A: RisingWave exposes a PostgreSQL-compatible wire protocol. Any Postgres client—JDBC, psycopg2, node-postgres—can query customer_360 directly. For high concurrency, front RisingWave with PgBouncer connection pooling. For real-time push to frontends, create a Kafka sink from the materialized view and consume via WebSocket.

Q: How do we handle customers with thousands of transactions? Won't aggregations be slow? A: RisingWave's materialized views are incrementally maintained—they do not re-aggregate all transactions on every query. When a new transaction arrives, only the affected customer's aggregation rows are updated. Query latency is sub-millisecond regardless of transaction history volume.

Q: Can we include credit bureau data in the customer 360 view? A: Yes. Load credit bureau data into a CREATE TABLE (updated via periodic batch loads or CDC). Join it with the streaming materialized view in a composite view. The credit bureau data refreshes at bureau-pull frequency; the behavioral data updates in real time.

Q: How does the customer 360 handle a customer who has accounts at multiple brands (e.g., after a merger)? A: Use a customer identity resolution table that maps multiple account IDs to a single canonical customer_id. Feed this mapping as a CREATE TABLE reference and join it in your materialized views to unify cross-brand activity.

Q: What is the best way to expose real-time churn risk scores from the customer 360? A: Compute churn signals in a materialized view (declining transaction volume, increasing complaints, no recent mobile activity) and sink high-risk customers to a Kafka topic consumed by your CRM or retention campaign system. Alternatively, query the view directly from your retention scoring service on a short polling interval.


Get Started

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.