Building a Real-Time Customer 360 Profile with CDC and Streaming SQL

Building a Real-Time Customer 360 Profile with CDC and Streaming SQL

Building a Real-Time Customer 360 Profile with CDC and Streaming SQL

Customer data in a typical e-commerce company lives in five or more systems: orders in PostgreSQL, CRM records in MySQL, behavioral events in Kafka, support tickets in Zendesk, and email preferences in a marketing platform. A customer 360 profile unifies these into a single, always-current view. The practical path to get there is Change Data Capture (CDC) from your operational databases combined with streaming SQL joins — no nightly ETL jobs, no stale snapshots.


The Fragmented Customer Data Problem

When a customer calls support about an order, your support agent typically needs to open four tabs: the order management system, the CRM, the email history, and the returns portal. They are manually assembling a customer 360 profile in their head.

The same fragmentation affects automated systems. Your recommendation engine does not know about a customer's recent support complaint. Your email campaigns do not know whether a customer just placed a large order. Your fraud detection does not see the full purchase history across all channels.

The standard solution — nightly ETL into a data warehouse — produces a customer profile that is 12 to 24 hours stale. For operational use cases (customer service, real-time personalization, live fraud signals), that latency is too high.

Change Data Capture solves the staleness problem by turning every database write — INSERT, UPDATE, DELETE — into a stream of events. Streaming SQL processes those events and maintains a live unified profile.


What Is Change Data Capture (CDC)?

CDC works by reading the transaction log (WAL in PostgreSQL, binlog in MySQL) rather than querying the database directly. Every change to a row produces an event that includes:

  • The operation type (insert, update, delete)
  • The before-state and after-state of the row
  • The timestamp of the change

This approach has zero impact on the source database's query performance. It does not require adding triggers or changing your application code. The database's existing transaction log is the source.

RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with S3-backed storage. It has native CDC connectors for PostgreSQL and MySQL, making it straightforward to ingest CDC streams directly.


Setting Up CDC Sources

PostgreSQL Orders Database

Enable logical replication on your PostgreSQL instance:

-- On the source PostgreSQL database
ALTER SYSTEM SET wal_level = logical;
-- Then restart PostgreSQL and create a replication slot:
SELECT pg_create_logical_replication_slot('risingwave_slot', 'pgoutput');

In RisingWave, create the CDC source:

CREATE SOURCE orders_cdc
WITH (
    connector = 'postgres-cdc',
    hostname = 'orders-db.internal',
    port = '5432',
    username = 'replication_user',
    password = 'secret',
    database.name = 'orders_db',
    schema.name = 'public',
    table.name = 'orders',
    slot.name = 'risingwave_slot'
);

-- Create the table that receives CDC events
CREATE TABLE orders (
    order_id        VARCHAR PRIMARY KEY,
    customer_id     VARCHAR,
    order_status    VARCHAR,        -- 'placed', 'shipped', 'delivered', 'cancelled'
    total_amount    NUMERIC,
    item_count      INT,
    channel         VARCHAR,        -- 'web', 'mobile', 'marketplace'
    created_at      TIMESTAMPTZ,
    updated_at      TIMESTAMPTZ
) FROM orders_cdc TABLE 'public.orders';

MySQL CRM Database

CREATE SOURCE crm_cdc
WITH (
    connector = 'mysql-cdc',
    hostname = 'crm-db.internal',
    port = '3306',
    username = 'replication_user',
    password = 'secret',
    database.name = 'crm_db'
);

CREATE TABLE customers (
    customer_id     VARCHAR PRIMARY KEY,
    email           VARCHAR,
    first_name      VARCHAR,
    last_name       VARCHAR,
    phone           VARCHAR,
    acquisition_channel VARCHAR,
    signup_date     TIMESTAMPTZ,
    loyalty_tier    VARCHAR,        -- 'standard', 'silver', 'gold', 'platinum'
    last_updated    TIMESTAMPTZ
) FROM crm_cdc TABLE 'crm_db.customers';

CREATE TABLE support_tickets (
    ticket_id       VARCHAR PRIMARY KEY,
    customer_id     VARCHAR,
    status          VARCHAR,        -- 'open', 'resolved', 'escalated'
    category        VARCHAR,
    created_at      TIMESTAMPTZ,
    resolved_at     TIMESTAMPTZ
) FROM crm_cdc TABLE 'crm_db.support_tickets';

Behavioral Events from Kafka

CREATE SOURCE behavioral_events (
    event_id        VARCHAR,
    customer_id     VARCHAR,
    session_id      VARCHAR,
    event_type      VARCHAR,
    product_id      VARCHAR,
    category_id     VARCHAR,
    event_time      TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'customer_events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Building the Customer 360 Profile View

With all three sources ingested, build the unified profile as a materialized view:

CREATE MATERIALIZED VIEW customer_360 AS
SELECT
    c.customer_id,
    c.email,
    c.first_name,
    c.last_name,
    c.loyalty_tier,
    c.acquisition_channel,
    c.signup_date,

    -- Order summary from PostgreSQL CDC
    COALESCE(ord.total_orders, 0)             AS total_orders,
    COALESCE(ord.total_spend, 0)              AS lifetime_value,
    COALESCE(ord.avg_order_value, 0)          AS avg_order_value,
    ord.first_order_date,
    ord.last_order_date,
    ord.last_order_status,
    ord.preferred_channel,

    -- Support summary from MySQL CDC
    COALESCE(sup.open_tickets, 0)             AS open_support_tickets,
    COALESCE(sup.total_tickets, 0)            AS total_support_tickets,
    sup.last_ticket_at,

    -- Behavioral signals from Kafka
    COALESCE(beh.events_30d, 0)              AS events_last_30d,
    COALESCE(beh.product_views_30d, 0)       AS product_views_last_30d,
    COALESCE(beh.sessions_30d, 0)            AS sessions_last_30d,
    beh.last_active_at,

    -- Derived signals
    CASE
        WHEN ord.last_order_date > NOW() - INTERVAL '30 DAYS' THEN 'active'
        WHEN ord.last_order_date > NOW() - INTERVAL '90 DAYS' THEN 'at_risk'
        WHEN ord.last_order_date > NOW() - INTERVAL '365 DAYS' THEN 'lapsing'
        WHEN ord.last_order_date IS NOT NULL THEN 'churned'
        ELSE 'prospect'
    END AS lifecycle_stage,

    CASE
        WHEN COALESCE(sup.open_tickets, 0) > 0 THEN TRUE
        ELSE FALSE
    END AS has_open_support_issue,

    NOW() AS profile_updated_at

FROM customers c

LEFT JOIN (
    SELECT
        customer_id,
        COUNT(*)                                       AS total_orders,
        SUM(total_amount)                              AS total_spend,
        AVG(total_amount)                              AS avg_order_value,
        MIN(created_at)                                AS first_order_date,
        MAX(created_at)                                AS last_order_date,
        (ARRAY_AGG(order_status ORDER BY created_at DESC))[1] AS last_order_status,
        MODE() WITHIN GROUP (ORDER BY channel)        AS preferred_channel
    FROM orders
    GROUP BY customer_id
) ord ON c.customer_id = ord.customer_id

LEFT JOIN (
    SELECT
        customer_id,
        COUNT(*) FILTER (WHERE status = 'open')        AS open_tickets,
        COUNT(*)                                        AS total_tickets,
        MAX(created_at)                                 AS last_ticket_at
    FROM support_tickets
    GROUP BY customer_id
) sup ON c.customer_id = sup.customer_id

LEFT JOIN (
    SELECT
        customer_id,
        COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '30 DAYS') AS events_30d,
        COUNT(*) FILTER (WHERE event_type = 'product_view'
                         AND event_time > NOW() - INTERVAL '30 DAYS')   AS product_views_30d,
        COUNT(DISTINCT session_id)
            FILTER (WHERE event_time > NOW() - INTERVAL '30 DAYS')      AS sessions_30d,
        MAX(event_time)                                                   AS last_active_at
    FROM behavioral_events
    GROUP BY customer_id
) beh ON c.customer_id = beh.customer_id;

This single view joins data from PostgreSQL, MySQL, and Kafka. Every change in any source system — a new order, a CRM update, a loyalty tier upgrade, a new support ticket — propagates to the customer_360 view within seconds.


Querying the Profile

Your application queries the profile like any other database table:

-- Full profile for a single customer (used in support tooling)
SELECT *
FROM customer_360
WHERE customer_id = 'cust_49821';

-- Identify high-value customers with open support issues (priority escalation)
SELECT customer_id, email, lifetime_value, open_support_tickets
FROM customer_360
WHERE lifetime_value > 1000
  AND has_open_support_issue = TRUE
ORDER BY lifetime_value DESC;

-- At-risk customers who have been browsing recently (re-engagement candidates)
SELECT customer_id, email, last_order_date, product_views_last_30d, sessions_last_30d
FROM customer_360
WHERE lifecycle_stage = 'at_risk'
  AND sessions_last_30d > 3
ORDER BY product_views_last_30d DESC
LIMIT 500;

Because RisingWave speaks the PostgreSQL wire protocol, your existing application code, ORM, and analytics tools query it without modification.


Segmentation and Real-Time Audience Building

The customer 360 profile makes audience segmentation a SQL query rather than a batch process. Create persistent segment views that update as customer behavior changes:

CREATE MATERIALIZED VIEW segment_vip_at_risk AS
SELECT customer_id, email, lifetime_value, last_order_date
FROM customer_360
WHERE loyalty_tier IN ('gold', 'platinum')
  AND lifecycle_stage IN ('at_risk', 'lapsing');

CREATE MATERIALIZED VIEW segment_high_intent AS
SELECT customer_id, email, product_views_last_30d
FROM customer_360
WHERE lifecycle_stage IN ('active', 'at_risk')
  AND product_views_last_30d > 10
  AND total_orders < 2;  -- heavy browsers, light buyers

Push these segments to your marketing platform with a Kafka sink:

CREATE SINK vip_at_risk_to_crm
FROM segment_vip_at_risk
WITH (
    connector = 'kafka',
    topic = 'crm_segment_updates',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

When a gold-tier customer places a new order and moves from 'at_risk' back to 'active', they are automatically removed from the VIP at-risk segment and the CRM is notified — no nightly batch sync required.


Handling Identity Resolution

E-commerce customers often have multiple identities: anonymous visitor before sign-up, email address, loyalty program ID, and mobile app user ID. A minimal identity graph can be maintained as a CDC-backed table:

CREATE TABLE identity_graph (
    identity_id     VARCHAR PRIMARY KEY,
    customer_id     VARCHAR,         -- canonical customer ID
    identity_type   VARCHAR,         -- 'email', 'cookie_id', 'loyalty_id', 'device_id'
    identity_value  VARCHAR,
    created_at      TIMESTAMPTZ
);

When a previously anonymous session is identified (user logs in), insert a row linking the cookie_id to the customer_id. The customer_360 profile then covers behavior from before the login event by joining behavioral events through the identity graph.


Comparison: Customer 360 Approaches

DimensionNightly ETL to DWReal-Time CDP (SaaS)CDC + Streaming SQL (RisingWave)
Profile freshness12–24 hoursMinutesSeconds
Source systems supportedAny (with ETL connectors)Vendor-dependentPostgreSQL, MySQL, Kafka, and more
SQL-accessibleYes (BI tools)Vendor query interfaceYes (PostgreSQL protocol)
Custom segment logicSQL in DWVendor DSL / UIPlain SQL
Operational use (support, app)Impractical (too stale)YesYes
InfrastructureDW + ETL platformSaaS subscriptionSelf-hosted streaming DB
Cost modelWarehouse compute + storagePer-profile pricingStorage on S3 + compute
Audit trailETL logsVendor auditKafka event log + CDC log

Operational Considerations

Schema changes on source databases. CDC streams break when a source column is renamed or dropped. Implement a schema registry (Confluent Schema Registry or similar) and treat source schema changes as a deployment event that requires updating the RisingWave table definition.

Bootstrapping the profile. When you first set up CDC, RisingWave performs an initial snapshot of the source tables before it starts tailing the transaction log. For large customer tables (tens of millions of rows), this snapshot can take hours. Plan the cutover accordingly.

Privacy and data access. The customer 360 profile aggregates sensitive personal data. Ensure the RisingWave instance is deployed in a network segment accessible only to authorized services. Column-level access controls can restrict which teams can query PII fields (email, phone) vs. aggregate behavioral metrics.


Frequently Asked Questions

Q: What happens if the source PostgreSQL database has a schema migration? Schema changes that add nullable columns are generally backward-compatible with CDC. Breaking changes — dropped columns, type changes — require updating the RisingWave table definition and potentially rebuilding the materialized view. Treat source schema changes as coordinated deployments.

Q: How do we handle GDPR deletion requests? When a customer exercises their right to erasure, delete their row from the source CRM table. The CDC event will propagate a DELETE operation to RisingWave, which will remove the customer from all downstream materialized views. Additionally, Kafka topics may need to be compacted to purge the event history for that customer.

Q: Can we add data from Salesforce or Zendesk? Any system that can emit events to Kafka can be a source for the customer 360 profile. Salesforce and Zendesk both have event streaming capabilities or can be connected via third-party CDC tools (Debezium, Airbyte, Fivetran). The RisingWave side simply defines a new Kafka source and joins it into the profile view.

Q: What query latency can we expect on the customer_360 view? Because the view is pre-computed and indexed, point lookups by customer_id return in milliseconds. Segment queries that scan the full profile table depend on the number of customers and the selectivity of the filter. For millions of customers, ensure the common filter columns are indexed.

Q: How does this differ from a traditional data warehouse customer 360? The key difference is latency and operational accessibility. A DW customer 360 is excellent for retrospective analytics, historical cohort analysis, and BI dashboards. A streaming SQL customer 360 is designed for operational workloads — real-time personalization, live customer service tooling, and triggered marketing automation — where a 12-hour stale profile is not useful.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.