Building a Real-Time Customer 360 Profile with CDC and Streaming SQL
Customer data in a typical e-commerce company lives in five or more systems: orders in PostgreSQL, CRM records in MySQL, behavioral events in Kafka, support tickets in Zendesk, and email preferences in a marketing platform. A customer 360 profile unifies these into a single, always-current view. The practical path to get there is Change Data Capture (CDC) from your operational databases combined with streaming SQL joins — no nightly ETL jobs, no stale snapshots.
The Fragmented Customer Data Problem
When a customer calls support about an order, your support agent typically needs to open four tabs: the order management system, the CRM, the email history, and the returns portal. They are manually assembling a customer 360 profile in their head.
The same fragmentation affects automated systems. Your recommendation engine does not know about a customer's recent support complaint. Your email campaigns do not know whether a customer just placed a large order. Your fraud detection does not see the full purchase history across all channels.
The standard solution — nightly ETL into a data warehouse — produces a customer profile that is 12 to 24 hours stale. For operational use cases (customer service, real-time personalization, live fraud signals), that latency is too high.
Change Data Capture solves the staleness problem by turning every database write — INSERT, UPDATE, DELETE — into a stream of events. Streaming SQL processes those events and maintains a live unified profile.
What Is Change Data Capture (CDC)?
CDC works by reading the transaction log (WAL in PostgreSQL, binlog in MySQL) rather than querying the database directly. Every change to a row produces an event that includes:
- The operation type (insert, update, delete)
- The before-state and after-state of the row
- The timestamp of the change
This approach has zero impact on the source database's query performance. It does not require adding triggers or changing your application code. The database's existing transaction log is the source.
RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with S3-backed storage. It has native CDC connectors for PostgreSQL and MySQL, making it straightforward to ingest CDC streams directly.
Setting Up CDC Sources
PostgreSQL Orders Database
Enable logical replication on your PostgreSQL instance:
-- On the source PostgreSQL database
ALTER SYSTEM SET wal_level = logical;
-- Then restart PostgreSQL and create a replication slot:
SELECT pg_create_logical_replication_slot('risingwave_slot', 'pgoutput');
In RisingWave, create the CDC source:
CREATE SOURCE orders_cdc
WITH (
connector = 'postgres-cdc',
hostname = 'orders-db.internal',
port = '5432',
username = 'replication_user',
password = 'secret',
database.name = 'orders_db',
schema.name = 'public',
table.name = 'orders',
slot.name = 'risingwave_slot'
);
-- Create the table that receives CDC events
CREATE TABLE orders (
order_id VARCHAR PRIMARY KEY,
customer_id VARCHAR,
order_status VARCHAR, -- 'placed', 'shipped', 'delivered', 'cancelled'
total_amount NUMERIC,
item_count INT,
channel VARCHAR, -- 'web', 'mobile', 'marketplace'
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
) FROM orders_cdc TABLE 'public.orders';
MySQL CRM Database
CREATE SOURCE crm_cdc
WITH (
connector = 'mysql-cdc',
hostname = 'crm-db.internal',
port = '3306',
username = 'replication_user',
password = 'secret',
database.name = 'crm_db'
);
CREATE TABLE customers (
customer_id VARCHAR PRIMARY KEY,
email VARCHAR,
first_name VARCHAR,
last_name VARCHAR,
phone VARCHAR,
acquisition_channel VARCHAR,
signup_date TIMESTAMPTZ,
loyalty_tier VARCHAR, -- 'standard', 'silver', 'gold', 'platinum'
last_updated TIMESTAMPTZ
) FROM crm_cdc TABLE 'crm_db.customers';
CREATE TABLE support_tickets (
ticket_id VARCHAR PRIMARY KEY,
customer_id VARCHAR,
status VARCHAR, -- 'open', 'resolved', 'escalated'
category VARCHAR,
created_at TIMESTAMPTZ,
resolved_at TIMESTAMPTZ
) FROM crm_cdc TABLE 'crm_db.support_tickets';
Behavioral Events from Kafka
CREATE SOURCE behavioral_events (
event_id VARCHAR,
customer_id VARCHAR,
session_id VARCHAR,
event_type VARCHAR,
product_id VARCHAR,
category_id VARCHAR,
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'customer_events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
Building the Customer 360 Profile View
With all three sources ingested, build the unified profile as a materialized view:
CREATE MATERIALIZED VIEW customer_360 AS
SELECT
c.customer_id,
c.email,
c.first_name,
c.last_name,
c.loyalty_tier,
c.acquisition_channel,
c.signup_date,
-- Order summary from PostgreSQL CDC
COALESCE(ord.total_orders, 0) AS total_orders,
COALESCE(ord.total_spend, 0) AS lifetime_value,
COALESCE(ord.avg_order_value, 0) AS avg_order_value,
ord.first_order_date,
ord.last_order_date,
ord.last_order_status,
ord.preferred_channel,
-- Support summary from MySQL CDC
COALESCE(sup.open_tickets, 0) AS open_support_tickets,
COALESCE(sup.total_tickets, 0) AS total_support_tickets,
sup.last_ticket_at,
-- Behavioral signals from Kafka
COALESCE(beh.events_30d, 0) AS events_last_30d,
COALESCE(beh.product_views_30d, 0) AS product_views_last_30d,
COALESCE(beh.sessions_30d, 0) AS sessions_last_30d,
beh.last_active_at,
-- Derived signals
CASE
WHEN ord.last_order_date > NOW() - INTERVAL '30 DAYS' THEN 'active'
WHEN ord.last_order_date > NOW() - INTERVAL '90 DAYS' THEN 'at_risk'
WHEN ord.last_order_date > NOW() - INTERVAL '365 DAYS' THEN 'lapsing'
WHEN ord.last_order_date IS NOT NULL THEN 'churned'
ELSE 'prospect'
END AS lifecycle_stage,
CASE
WHEN COALESCE(sup.open_tickets, 0) > 0 THEN TRUE
ELSE FALSE
END AS has_open_support_issue,
NOW() AS profile_updated_at
FROM customers c
LEFT JOIN (
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(total_amount) AS total_spend,
AVG(total_amount) AS avg_order_value,
MIN(created_at) AS first_order_date,
MAX(created_at) AS last_order_date,
(ARRAY_AGG(order_status ORDER BY created_at DESC))[1] AS last_order_status,
MODE() WITHIN GROUP (ORDER BY channel) AS preferred_channel
FROM orders
GROUP BY customer_id
) ord ON c.customer_id = ord.customer_id
LEFT JOIN (
SELECT
customer_id,
COUNT(*) FILTER (WHERE status = 'open') AS open_tickets,
COUNT(*) AS total_tickets,
MAX(created_at) AS last_ticket_at
FROM support_tickets
GROUP BY customer_id
) sup ON c.customer_id = sup.customer_id
LEFT JOIN (
SELECT
customer_id,
COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '30 DAYS') AS events_30d,
COUNT(*) FILTER (WHERE event_type = 'product_view'
AND event_time > NOW() - INTERVAL '30 DAYS') AS product_views_30d,
COUNT(DISTINCT session_id)
FILTER (WHERE event_time > NOW() - INTERVAL '30 DAYS') AS sessions_30d,
MAX(event_time) AS last_active_at
FROM behavioral_events
GROUP BY customer_id
) beh ON c.customer_id = beh.customer_id;
This single view joins data from PostgreSQL, MySQL, and Kafka. Every change in any source system — a new order, a CRM update, a loyalty tier upgrade, a new support ticket — propagates to the customer_360 view within seconds.
Querying the Profile
Your application queries the profile like any other database table:
-- Full profile for a single customer (used in support tooling)
SELECT *
FROM customer_360
WHERE customer_id = 'cust_49821';
-- Identify high-value customers with open support issues (priority escalation)
SELECT customer_id, email, lifetime_value, open_support_tickets
FROM customer_360
WHERE lifetime_value > 1000
AND has_open_support_issue = TRUE
ORDER BY lifetime_value DESC;
-- At-risk customers who have been browsing recently (re-engagement candidates)
SELECT customer_id, email, last_order_date, product_views_last_30d, sessions_last_30d
FROM customer_360
WHERE lifecycle_stage = 'at_risk'
AND sessions_last_30d > 3
ORDER BY product_views_last_30d DESC
LIMIT 500;
Because RisingWave speaks the PostgreSQL wire protocol, your existing application code, ORM, and analytics tools query it without modification.
Segmentation and Real-Time Audience Building
The customer 360 profile makes audience segmentation a SQL query rather than a batch process. Create persistent segment views that update as customer behavior changes:
CREATE MATERIALIZED VIEW segment_vip_at_risk AS
SELECT customer_id, email, lifetime_value, last_order_date
FROM customer_360
WHERE loyalty_tier IN ('gold', 'platinum')
AND lifecycle_stage IN ('at_risk', 'lapsing');
CREATE MATERIALIZED VIEW segment_high_intent AS
SELECT customer_id, email, product_views_last_30d
FROM customer_360
WHERE lifecycle_stage IN ('active', 'at_risk')
AND product_views_last_30d > 10
AND total_orders < 2; -- heavy browsers, light buyers
Push these segments to your marketing platform with a Kafka sink:
CREATE SINK vip_at_risk_to_crm
FROM segment_vip_at_risk
WITH (
connector = 'kafka',
topic = 'crm_segment_updates',
properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;
When a gold-tier customer places a new order and moves from 'at_risk' back to 'active', they are automatically removed from the VIP at-risk segment and the CRM is notified — no nightly batch sync required.
Handling Identity Resolution
E-commerce customers often have multiple identities: anonymous visitor before sign-up, email address, loyalty program ID, and mobile app user ID. A minimal identity graph can be maintained as a CDC-backed table:
CREATE TABLE identity_graph (
identity_id VARCHAR PRIMARY KEY,
customer_id VARCHAR, -- canonical customer ID
identity_type VARCHAR, -- 'email', 'cookie_id', 'loyalty_id', 'device_id'
identity_value VARCHAR,
created_at TIMESTAMPTZ
);
When a previously anonymous session is identified (user logs in), insert a row linking the cookie_id to the customer_id. The customer_360 profile then covers behavior from before the login event by joining behavioral events through the identity graph.
Comparison: Customer 360 Approaches
| Dimension | Nightly ETL to DW | Real-Time CDP (SaaS) | CDC + Streaming SQL (RisingWave) |
| Profile freshness | 12–24 hours | Minutes | Seconds |
| Source systems supported | Any (with ETL connectors) | Vendor-dependent | PostgreSQL, MySQL, Kafka, and more |
| SQL-accessible | Yes (BI tools) | Vendor query interface | Yes (PostgreSQL protocol) |
| Custom segment logic | SQL in DW | Vendor DSL / UI | Plain SQL |
| Operational use (support, app) | Impractical (too stale) | Yes | Yes |
| Infrastructure | DW + ETL platform | SaaS subscription | Self-hosted streaming DB |
| Cost model | Warehouse compute + storage | Per-profile pricing | Storage on S3 + compute |
| Audit trail | ETL logs | Vendor audit | Kafka event log + CDC log |
Operational Considerations
Schema changes on source databases. CDC streams break when a source column is renamed or dropped. Implement a schema registry (Confluent Schema Registry or similar) and treat source schema changes as a deployment event that requires updating the RisingWave table definition.
Bootstrapping the profile. When you first set up CDC, RisingWave performs an initial snapshot of the source tables before it starts tailing the transaction log. For large customer tables (tens of millions of rows), this snapshot can take hours. Plan the cutover accordingly.
Privacy and data access. The customer 360 profile aggregates sensitive personal data. Ensure the RisingWave instance is deployed in a network segment accessible only to authorized services. Column-level access controls can restrict which teams can query PII fields (email, phone) vs. aggregate behavioral metrics.
Frequently Asked Questions
Q: What happens if the source PostgreSQL database has a schema migration? Schema changes that add nullable columns are generally backward-compatible with CDC. Breaking changes — dropped columns, type changes — require updating the RisingWave table definition and potentially rebuilding the materialized view. Treat source schema changes as coordinated deployments.
Q: How do we handle GDPR deletion requests? When a customer exercises their right to erasure, delete their row from the source CRM table. The CDC event will propagate a DELETE operation to RisingWave, which will remove the customer from all downstream materialized views. Additionally, Kafka topics may need to be compacted to purge the event history for that customer.
Q: Can we add data from Salesforce or Zendesk? Any system that can emit events to Kafka can be a source for the customer 360 profile. Salesforce and Zendesk both have event streaming capabilities or can be connected via third-party CDC tools (Debezium, Airbyte, Fivetran). The RisingWave side simply defines a new Kafka source and joins it into the profile view.
Q: What query latency can we expect on the customer_360 view?
Because the view is pre-computed and indexed, point lookups by customer_id return in milliseconds. Segment queries that scan the full profile table depend on the number of customers and the selectivity of the filter. For millions of customers, ensure the common filter columns are indexed.
Q: How does this differ from a traditional data warehouse customer 360? The key difference is latency and operational accessibility. A DW customer 360 is excellent for retrospective analytics, historical cohort analysis, and BI dashboards. A streaming SQL customer 360 is designed for operational workloads — real-time personalization, live customer service tooling, and triggered marketing automation — where a 12-hour stale profile is not useful.

