Real-Time Customer 360 View with Streaming SQL and CDC

Real-Time Customer 360 View with Streaming SQL and CDC

Every customer-facing team wants the same thing: a single, up-to-date view of each customer that combines purchase history, support interactions, and browsing behavior. This is the Customer 360 view, and it is one of the most requested (and most difficult) data products to build correctly.

The traditional approach stitches together nightly batch ETL jobs, data warehouse snapshots, and reverse ETL pipelines. The result is a profile that is always hours or days stale. When a customer calls support after placing a large order five minutes ago, the agent sees yesterday's data. When your churn model flags a risk, the customer has already left.

A real-time Customer 360 view with streaming SQL and CDC (Change Data Capture) solves this problem. In this guide, you will build a unified customer profile that updates within seconds of any change in your source systems. You will use CDC to capture changes from an orders database, a support ticketing system, and a web activity store, then join them in RisingWave using streaming materialized views that stay fresh automatically.

Why Batch ETL Falls Short for Customer 360

A Customer 360 view is a unified representation of all interactions, transactions, and attributes associated with a single customer. Building one requires combining data from multiple operational systems, each with its own schema, update frequency, and access pattern.

Batch-based approaches to Customer 360 share several weaknesses:

  • Staleness by design - Nightly or hourly batch jobs mean your customer profile is always behind reality. A customer who placed an order, opened a support ticket, and browsed your site in the last hour still looks the same as they did this morning.
  • Complex orchestration - You need Airflow DAGs, dbt models, dependency graphs, and monitoring for each pipeline stage. Every new data source adds another DAG, another set of failure modes.
  • Resource spikes - Batch jobs compete for compute during their scheduled windows. A 2 AM nightly run that grows 20% month-over-month eventually collides with other batch workloads.
  • No incremental processing - Most batch pipelines recompute the entire customer profile even when only a handful of rows changed. This wastes compute and increases latency.

CDC combined with streaming SQL eliminates these problems. Instead of periodically scanning entire tables, CDC captures every INSERT, UPDATE, and DELETE as it happens. A streaming SQL engine consumes these change events, maintains materialized views incrementally, and keeps the Customer 360 view current in real time.

Architecture: CDC Sources to Unified Customer Profile

The architecture for a real-time Customer 360 view has three layers:

  1. CDC capture layer - Debezium connectors (or the built-in CDC connectors in RisingWave) capture row-level changes from your operational databases and publish them to Kafka topics or directly into RisingWave.
  2. Stream processing layer - RisingWave ingests the CDC streams, applies transformations, and maintains materialized views that join data across sources.
  3. Serving layer - Applications query the materialized views directly via PostgreSQL-compatible SQL. No additional serving infrastructure is required.
graph LR
    A[Orders DB<br/>PostgreSQL] -->|CDC| D[RisingWave]
    B[Support DB<br/>MySQL] -->|CDC| D
    C[Clickstream DB<br/>PostgreSQL] -->|CDC| D
    D -->|Materialized Views| E[Customer 360<br/>View]
    E --> F[Support Dashboard]
    E --> G[Marketing Automation]
    E --> H[Churn Prediction]

RisingWave functions as a streaming database that combines ingestion, processing, and serving in one system. You write standard SQL to define your transformations, and RisingWave handles incremental computation automatically. When a new order arrives via CDC, only the affected customer's aggregations are recomputed, not the entire table.

Setting Up CDC Source Tables

In a production deployment, you would create CDC source tables that connect directly to your upstream databases. RisingWave supports direct CDC from PostgreSQL and MySQL without requiring Kafka as an intermediary.

Here is how the production CDC source definitions look:

-- CDC source from your e-commerce PostgreSQL database
CREATE SOURCE orders_cdc WITH (
    connector = 'postgres-cdc',
    hostname = 'orders-db.internal',
    port = '5432',
    username = 'cdc_reader',
    password = 'secret',
    database.name = 'ecommerce',
    slot.name = 'orders_slot'
);

CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    customer_id INT,
    order_total NUMERIC,
    order_status VARCHAR,
    created_at TIMESTAMP
) FROM orders_cdc TABLE 'public.orders';

For this tutorial, we will use standard tables with sample data to demonstrate the pattern. The SQL logic is identical whether data arrives via CDC or direct inserts. The materialized views work the same way in both cases.

Defining the Source Tables

We need four tables representing data from different operational systems:

-- Customers table (CDC from CRM database)
CREATE TABLE customers (
    customer_id INT PRIMARY KEY,
    email VARCHAR,
    full_name VARCHAR,
    signup_date TIMESTAMP,
    account_tier VARCHAR
);

-- Orders table (CDC from e-commerce database)
CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    customer_id INT,
    order_total NUMERIC,
    order_status VARCHAR,
    created_at TIMESTAMP
);

-- Support tickets (CDC from helpdesk system)
CREATE TABLE support_tickets (
    ticket_id INT PRIMARY KEY,
    customer_id INT,
    subject VARCHAR,
    priority VARCHAR,
    status VARCHAR,
    created_at TIMESTAMP,
    resolved_at TIMESTAMP
);

-- Web activity (CDC from clickstream database)
CREATE TABLE web_activity (
    event_id INT PRIMARY KEY,
    customer_id INT,
    page_url VARCHAR,
    event_type VARCHAR,
    session_id VARCHAR,
    created_at TIMESTAMP
);

Each table has a PRIMARY KEY, which is required for RisingWave to handle CDC updates correctly. When an upstream UPDATE arrives, RisingWave uses the primary key to locate and modify the existing row, then propagates the change through all downstream materialized views.

Loading Sample Data

INSERT INTO customers VALUES
(1, 'alice@example.com', 'Alice Johnson', '2025-01-15 10:00:00', 'premium'),
(2, 'bob@example.com', 'Bob Smith', '2025-03-20 14:30:00', 'standard'),
(3, 'carol@example.com', 'Carol Davis', '2025-06-01 09:15:00', 'premium');

INSERT INTO orders VALUES
(101, 1, 299.99, 'completed', '2026-03-15 11:00:00'),
(102, 1, 149.50, 'completed', '2026-03-20 14:00:00'),
(103, 1, 89.99, 'shipped', '2026-03-28 09:00:00'),
(104, 2, 599.00, 'completed', '2026-03-10 16:00:00'),
(105, 2, 45.00, 'cancelled', '2026-03-25 10:00:00'),
(106, 3, 1250.00, 'completed', '2026-03-01 08:00:00'),
(107, 3, 320.00, 'completed', '2026-03-18 12:00:00'),
(108, 3, 75.50, 'shipped', '2026-03-30 15:00:00');

INSERT INTO support_tickets VALUES
(1001, 1, 'Order delivery delay', 'medium', 'resolved',
 '2026-03-16 10:00:00', '2026-03-16 14:00:00'),
(1002, 2, 'Refund request', 'high', 'open',
 '2026-03-26 09:00:00', NULL),
(1003, 2, 'Product defect', 'high', 'resolved',
 '2026-03-12 11:00:00', '2026-03-13 10:00:00'),
(1004, 3, 'Account billing question', 'low', 'resolved',
 '2026-03-05 14:00:00', '2026-03-05 15:30:00');

INSERT INTO web_activity VALUES
(5001, 1, '/products/electronics', 'page_view', 'sess_a1', '2026-03-30 10:00:00'),
(5002, 1, '/products/electronics/laptop', 'page_view', 'sess_a1', '2026-03-30 10:02:00'),
(5003, 1, '/cart', 'add_to_cart', 'sess_a1', '2026-03-30 10:05:00'),
(5004, 2, '/products/clothing', 'page_view', 'sess_b1', '2026-03-30 11:00:00'),
(5005, 2, '/support', 'page_view', 'sess_b1', '2026-03-30 11:10:00'),
(5006, 3, '/products/home', 'page_view', 'sess_c1', '2026-03-30 09:00:00'),
(5007, 3, '/products/home/furniture', 'page_view', 'sess_c1', '2026-03-30 09:03:00'),
(5008, 3, '/checkout', 'purchase', 'sess_c1', '2026-03-30 09:10:00'),
(5009, 1, '/account/settings', 'page_view', 'sess_a2', '2026-03-31 08:00:00'),
(5010, 3, '/products/electronics', 'page_view', 'sess_c2', '2026-03-31 14:00:00');

Building the Customer 360 with Materialized Views

The key to a maintainable Customer 360 is a layered approach. Rather than writing one massive query that joins all sources, you build intermediate materialized views for each data domain, then combine them in a final unified view. This makes each piece independently testable and easier to debug.

Layer 1: Order Summary per Customer

This materialized view aggregates order data into per-customer metrics:

CREATE MATERIALIZED VIEW customer_order_summary AS
SELECT
    customer_id,
    COUNT(*) AS total_orders,
    SUM(CASE WHEN order_status = 'completed' THEN 1 ELSE 0 END) AS completed_orders,
    SUM(CASE WHEN order_status = 'cancelled' THEN 1 ELSE 0 END) AS cancelled_orders,
    SUM(CASE WHEN order_status = 'completed' THEN order_total ELSE 0 END) AS total_spent,
    AVG(order_total) AS avg_order_value,
    MAX(created_at) AS last_order_at
FROM orders
GROUP BY customer_id;

Query result:

 customer_id | total_orders | completed_orders | cancelled_orders | total_spent |    avg_order_value     |    last_order_at
-------------+--------------+------------------+------------------+-------------+------------------------+---------------------
           1 |            3 |                2 |                0 |      449.49 |                 179.83 | 2026-03-28 09:00:00
           2 |            2 |                1 |                1 |      599.00 |                 322.00 | 2026-03-25 10:00:00
           3 |            3 |                2 |                0 |     1570.00 |                 548.50 | 2026-03-30 15:00:00

Layer 2: Support Ticket Summary per Customer

This view captures support interaction patterns:

CREATE MATERIALIZED VIEW customer_support_summary AS
SELECT
    customer_id,
    COUNT(*) AS total_tickets,
    SUM(CASE WHEN status = 'open' THEN 1 ELSE 0 END) AS open_tickets,
    SUM(CASE WHEN priority = 'high' THEN 1 ELSE 0 END) AS high_priority_tickets,
    MAX(created_at) AS last_ticket_at
FROM support_tickets
GROUP BY customer_id;

Query result:

 customer_id | total_tickets | open_tickets | high_priority_tickets |   last_ticket_at
-------------+---------------+--------------+-----------------------+---------------------
           1 |             1 |            0 |                     0 | 2026-03-16 10:00:00
           2 |             2 |            1 |                     2 | 2026-03-26 09:00:00
           3 |             1 |            0 |                     0 | 2026-03-05 14:00:00

Bob has two high-priority tickets and one still open. This is the kind of signal that should surface immediately in a customer profile, not after the next batch run.

Layer 3: Web Activity Summary per Customer

This view summarizes browsing and engagement patterns:

CREATE MATERIALIZED VIEW customer_web_activity AS
SELECT
    customer_id,
    COUNT(*) AS total_events,
    COUNT(DISTINCT session_id) AS total_sessions,
    SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS purchase_events,
    SUM(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) AS cart_events,
    MAX(created_at) AS last_active_at
FROM web_activity
GROUP BY customer_id;

Query result:

 customer_id | total_events | total_sessions | purchase_events | cart_events |   last_active_at
-------------+--------------+----------------+-----------------+-------------+---------------------
           1 |            4 |              2 |               0 |           1 | 2026-03-31 08:00:00
           2 |            2 |              1 |               0 |           0 | 2026-03-30 11:10:00
           3 |            4 |              2 |               1 |           0 | 2026-03-31 14:00:00

Layer 4: The Unified Customer 360 View

Now we join all three domain views with the customer base table. This is the final materialized view that applications query:

CREATE MATERIALIZED VIEW customer_360 AS
SELECT
    c.customer_id,
    c.full_name,
    c.email,
    c.account_tier,
    c.signup_date,
    -- Order metrics
    COALESCE(o.total_orders, 0) AS total_orders,
    COALESCE(o.completed_orders, 0) AS completed_orders,
    COALESCE(o.cancelled_orders, 0) AS cancelled_orders,
    COALESCE(o.total_spent, 0) AS lifetime_value,
    o.avg_order_value,
    o.last_order_at,
    -- Support metrics
    COALESCE(s.total_tickets, 0) AS total_tickets,
    COALESCE(s.open_tickets, 0) AS open_tickets,
    COALESCE(s.high_priority_tickets, 0) AS high_priority_tickets,
    s.last_ticket_at,
    -- Web activity metrics
    COALESCE(w.total_events, 0) AS total_web_events,
    COALESCE(w.total_sessions, 0) AS total_sessions,
    COALESCE(w.purchase_events, 0) AS purchase_events,
    COALESCE(w.cart_events, 0) AS cart_events,
    w.last_active_at,
    -- Computed engagement score
    COALESCE(o.total_orders, 0) * 10
    + COALESCE(w.total_sessions, 0) * 5
    + COALESCE(w.purchase_events, 0) * 20
    - COALESCE(s.high_priority_tickets, 0) * 15
    AS engagement_score
FROM customers c
LEFT JOIN customer_order_summary o ON c.customer_id = o.customer_id
LEFT JOIN customer_support_summary s ON c.customer_id = s.customer_id
LEFT JOIN customer_web_activity w ON c.customer_id = w.customer_id;

Query result:

 customer_id |   full_name   | account_tier | total_orders | lifetime_value | avg_order_value | total_tickets | open_tickets | total_web_events | total_sessions | engagement_score
-------------+---------------+--------------+--------------+----------------+-----------------+---------------+--------------+------------------+----------------+------------------
           1 | Alice Johnson | premium      |            3 |         449.49 |          179.83 |             1 |            0 |                4 |              2 |               40
           2 | Bob Smith     | standard     |            2 |         599.00 |          322.00 |             2 |            1 |                2 |              1 |               -5
           3 | Carol Davis   | premium      |            3 |        1570.00 |          548.50 |             1 |            0 |                4 |              2 |               60

Notice Bob's engagement score is -5. He has two high-priority support tickets (costing 30 points) and relatively low activity. This is a customer at risk. The engagement score formula is simple here for illustration, but in production you can tune the weights or replace it with a more sophisticated model.

The COALESCE calls are essential. LEFT JOINs produce NULLs when a customer has no data in a particular domain (for example, a new customer with no orders yet). Without COALESCE, arithmetic on those NULLs would propagate NULLs through the engagement score.

Real-Time Updates: How CDC Keeps the Profile Current

The power of this architecture is that the customer_360 view updates automatically when any source changes. Let us simulate a new order arriving for Bob via CDC:

-- A new completed order arrives via CDC
INSERT INTO orders VALUES
(109, 2, 250.00, 'completed', '2026-04-01 10:00:00');

Now query Bob's updated profile:

SELECT full_name, total_orders, lifetime_value, engagement_score
FROM customer_360
WHERE customer_id = 2;
 full_name | total_orders | lifetime_value | engagement_score
-----------+--------------+----------------+------------------
 Bob Smith |            3 |         849.00 |                5

Bob's profile updated within seconds. His total orders increased from 2 to 3, lifetime value jumped from $599 to $849, and his engagement score improved from -5 to 5. No batch job was triggered. No DAG was scheduled. RisingWave's incremental computation engine propagated the single row change through the materialized view graph automatically.

This is how streaming materialized views differ from traditional batch processing. Only the affected aggregations are recomputed. If you have 10 million customers and one order arrives, RisingWave updates exactly one customer's aggregation, not all 10 million.

Querying the Customer 360 for Business Logic

With the unified view in place, you can build business logic directly in SQL.

Churn Risk Classification

SELECT
    full_name,
    account_tier,
    lifetime_value,
    open_tickets,
    high_priority_tickets,
    engagement_score,
    CASE
        WHEN open_tickets > 0 AND engagement_score < 0 THEN 'high_risk'
        WHEN open_tickets > 0 OR engagement_score < 20 THEN 'medium_risk'
        ELSE 'healthy'
    END AS churn_risk
FROM customer_360
ORDER BY engagement_score ASC;
   full_name   | account_tier | lifetime_value | open_tickets | high_priority_tickets | engagement_score | churn_risk
---------------+--------------+----------------+--------------+-----------------------+------------------+------------
 Bob Smith     | standard     |         599.00 |            1 |                     2 |               -5 | high_risk
 Alice Johnson | premium      |         449.49 |            0 |                     0 |               40 | healthy
 Carol Davis   | premium      |        1570.00 |            0 |                     0 |               60 | healthy

You can wrap this query in another materialized view to create a continuously updated churn risk table. Downstream systems (marketing automation, support routing, account management dashboards) can query it directly.

High-Value Customer Alerts

You can also create a sink to push alerts when high-value customers show risk signals. For example, sink to Kafka for downstream consumption:

CREATE SINK churn_alerts AS
SELECT
    customer_id,
    full_name,
    email,
    lifetime_value,
    engagement_score
FROM customer_360
WHERE lifetime_value > 500
  AND engagement_score < 10
WITH (
    connector = 'kafka',
    properties.bootstrap.server = 'kafka:9092',
    topic = 'churn-alerts',
    type = 'append-only',
    force_append_only = 'true'
);

This continuously evaluates the churn condition and pushes matching rows to Kafka, where your alerting system or CRM can pick them up in real time.

Production Considerations

Scaling CDC Sources

When connecting CDC from large production databases, keep these points in mind:

  • Use dedicated replication slots - Each CDC source should have its own replication slot to avoid blocking other consumers. RisingWave manages these automatically with the built-in CDC connectors.
  • Filter at the source - If your orders table has 100 columns but the Customer 360 only needs 5, select only those columns in your CDC table definition to reduce network and processing overhead.
  • Handle schema evolution - When upstream schemas change, you need to update the corresponding RisingWave table definitions. Plan for this with a schema registry or versioned CDC topics.

Handling Late-Arriving Data

Web activity data may arrive out of order. A clickstream event from 10 minutes ago could arrive after a more recent event. Because RisingWave materialized views operate on the current state of the data (not on event time windows in this pattern), late-arriving data is handled naturally. The aggregation updates when the row arrives, regardless of its timestamp.

For time-windowed aggregations (such as "page views in the last hour"), you can use time window functions to handle late data with appropriate watermarks.

Monitoring the Pipeline

RisingWave exposes internal system tables to monitor materialized view freshness and throughput. Check the processing lag:

SELECT name, node_name, fragment_id
FROM rw_materialized_views
WHERE name = 'customer_360';

This helps you verify that your Customer 360 view is keeping up with the incoming CDC event rate.

What Is a Customer 360 View and Why Does It Need Real-Time Data?

A Customer 360 view is a unified data model that aggregates all known information about a customer - purchases, support interactions, web behavior, demographics, and preferences - into a single queryable profile. It needs real-time data because customer interactions happen continuously across multiple channels. A batch-updated profile that reflects yesterday's state causes support agents to miss context, marketing campaigns to target the wrong segments, and churn models to react too late.

How Does CDC Enable Real-Time Customer 360?

Change Data Capture (CDC) reads the transaction log (WAL in PostgreSQL, binlog in MySQL) of your operational databases and converts each committed change into a stream event. This means every INSERT, UPDATE, and DELETE in your orders database, CRM, or helpdesk system becomes available for processing within seconds, without impacting the source database's performance. RisingWave can ingest these CDC streams directly or via Kafka, then maintain materialized views that join and aggregate the data in real time.

Can Streaming SQL Replace My Existing Batch ETL for Customer Data?

For use cases that require fresh data, yes. Streaming SQL with materialized views provides the same transformation and aggregation capabilities as batch SQL in tools like dbt, but with continuous incremental updates instead of scheduled full refreshes. You still need batch processing for historical backfills, large-scale reprocessing, and some types of machine learning training. Many teams run both: streaming for the real-time Customer 360 and batch for monthly cohort analysis or annual reporting.

How Does RisingWave Handle Joins Across Multiple CDC Sources?

RisingWave supports streaming joins across tables from different CDC sources using standard SQL JOIN syntax. When you create a materialized view that joins orders with support tickets and web activity, RisingWave maintains the join result incrementally. A change in any source table triggers an update only in the affected join output rows. This is possible because RisingWave maintains internal state for each join, tracking which rows from each side have matched. The materialized view documentation covers the supported join types and performance characteristics.

Conclusion

Building a real-time Customer 360 view does not require a complex event-driven architecture with dozens of microservices. With CDC and streaming SQL, you can:

  • Capture changes from multiple databases using CDC connectors that read transaction logs without impacting source performance
  • Build layered materialized views that aggregate order, support, and web activity data per customer
  • Join across sources using standard SQL LEFT JOINs with COALESCE for null handling
  • Compute derived metrics like engagement scores and churn risk classifications that update in real time
  • Serve the unified profile directly via PostgreSQL-compatible queries, with no additional serving layer needed

The key architectural insight is the layered approach: build domain-specific summary views first, then compose them into the final unified view. This keeps each piece testable, debuggable, and independently evolvable.

All SQL in this article was tested on RisingWave 2.8.0.


Ready to build your own real-time Customer 360? Get started with RisingWave in 5 minutes. Quickstart ->

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.