A real-time CRM streaming database is no longer a luxury. It is a requirement. The moment your sales team or AI agent enriches a contact record, every downstream view, dashboard, and automation should reflect that change within milliseconds, not minutes or hours. Traditional CRMs were built for a world where humans typed notes into forms a few times per day. That world is gone.
Today, AI agents generate dozens of writes per contact: enrichment data from LinkedIn, summarized call transcripts, inferred intent signals, email sentiment scores. Each write conflicts with every read. Batch ETL pipelines that reconcile these signals overnight produce stale context that costs deals. This post shows you how to solve the read/write conflict with a streaming SQL architecture built on change data capture (CDC), the CQRS pattern, and incrementally maintained materialized views in RisingWave.
By the end, you will have a working architecture that separates CRM writes from reads, maintains a live customer 360 view, and delivers fresh context to both human reps and AI agents through standard SQL.
Why do traditional CRMs fail when AI agents enter the picture?
Traditional CRMs assume a simple interaction model: a human opens a record, edits a few fields, clicks save. The database handles a handful of writes per user per hour. Reports run on nightly batch jobs. This model breaks under three pressures that AI agents introduce.
Write volume explodes. A single AI enrichment agent can generate 10-50 attribute updates per contact per hour. It pulls firmographic data from external APIs, extracts entities from emails, scores intent from call recordings, and reconciles duplicates. Each of these is a write. Multiply that across thousands of contacts, and you have a write throughput that legacy CRMs were never designed to handle.
Read and write paths conflict. Every enrichment, override, and inference modifies the same contact record that a sales rep needs to read. When a rep opens a contact before a call, they need the reconciled view: the best-known email, the most recent company, the latest deal stage. But the write path is still ingesting raw observations. In a traditional CRM, these two paths share the same tables and the same database, creating lock contention and stale reads.
Batch ETL cannot keep pace. Most CRM analytics layers rely on periodic ETL jobs that extract data, transform it, and load it into a warehouse or reporting database. These jobs run every 15 minutes at best, every 24 hours at worst. An AI agent that needs sub-second context freshness cannot wait for a batch job. By the time the batch completes, the agent has already made decisions on stale data.
Consider a concrete scenario. A sales rep finishes a call with a prospect. The call recording agent transcribes the conversation and extracts three key facts: the prospect mentioned a competitor, their contract renews in Q3, and they want a demo next week. These facts need to appear in the contact's unified profile immediately, because the rep's next action (sending a follow-up email) depends on them. In a batch-oriented CRM, those facts sit in a queue until the next ETL run.
How does the CQRS pattern solve the CRM read/write conflict?
Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates the write model from the read model. Instead of reading and writing to the same tables, you capture every raw observation into a write-optimized store and maintain separate, purpose-built read views. This separation eliminates the core conflict.
Here is how the pattern maps to a CRM architecture:
- Write path (PostgreSQL): Every observation, whether from a human rep, an AI agent, or an external API, is captured as an immutable fact in an Entity-Attribute-Value (EAV) schema. Each row records who observed what, when, and with what confidence. Nothing is overwritten.
- Read path (RisingWave): A streaming database consumes the change stream from PostgreSQL via CDC, and maintains live materialized views that flatten, reconcile, and permission-filter the raw facts into the canonical customer record.
- Application layer: Your sales app, AI agents, and dashboards query RisingWave for the reconciled view. They never touch the write database directly for reads.
The data flow looks like this:
PostgreSQL (writes) --> CDC connector --> RisingWave (materialized views) --> Application / AI agents / Search index
Let's build it. First, create the write-side schema in PostgreSQL. The EAV model captures every observation as a separate row:
-- PostgreSQL: write-side schema
CREATE TABLE contact_observations (
id BIGSERIAL PRIMARY KEY,
contact_id UUID NOT NULL,
attribute VARCHAR(100) NOT NULL, -- e.g., 'email', 'company', 'title'
value TEXT NOT NULL,
source VARCHAR(50) NOT NULL, -- 'human', 'ai_enrichment', 'email_sync'
confidence FLOAT DEFAULT 1.0,
observed_at TIMESTAMP DEFAULT NOW(),
observed_by UUID -- user or agent ID
);
Every write is an INSERT, never an UPDATE. When a rep corrects an email address, that correction is a new row with source = 'human' and a fresh timestamp. When an AI agent infers a job title from LinkedIn, that is another row with source = 'ai_enrichment' and a confidence score.
Next, connect RisingWave to PostgreSQL using a CDC source:
-- RisingWave: create CDC source from PostgreSQL
CREATE SOURCE contact_observations_cdc FROM pg_cdc_table
WITH (
hostname = 'your-pg-host',
port = '5432',
username = 'cdc_reader',
password = 'your-password',
database.name = 'crm_db',
schema.name = 'public',
table.name = 'contact_observations',
slot.name = 'risingwave_crm_slot'
);
Now create a materialized view that reconciles the raw observations into a canonical contact record. The reconciliation logic follows two rules: human observations override AI observations, and within the same source, newer observations override older ones.
-- RisingWave: reconcile observations into canonical contact record
CREATE MATERIALIZED VIEW canonical_contacts AS
WITH ranked_observations AS (
SELECT
contact_id,
attribute,
value,
source,
confidence,
observed_at,
ROW_NUMBER() OVER (
PARTITION BY contact_id, attribute
ORDER BY
CASE WHEN source = 'human' THEN 0 ELSE 1 END,
confidence DESC,
observed_at DESC
) AS rn
FROM contact_observations_cdc
)
SELECT
contact_id,
MAX(CASE WHEN attribute = 'first_name' AND rn = 1 THEN value END) AS first_name,
MAX(CASE WHEN attribute = 'last_name' AND rn = 1 THEN value END) AS last_name,
MAX(CASE WHEN attribute = 'email' AND rn = 1 THEN value END) AS email,
MAX(CASE WHEN attribute = 'company' AND rn = 1 THEN value END) AS company,
MAX(CASE WHEN attribute = 'title' AND rn = 1 THEN value END) AS title,
MAX(CASE WHEN attribute = 'phone' AND rn = 1 THEN value END) AS phone,
MAX(CASE WHEN attribute = 'linkedin_url' AND rn = 1 THEN value END) AS linkedin_url
FROM ranked_observations
WHERE rn = 1
GROUP BY contact_id;
This view updates incrementally. When a new observation lands in PostgreSQL, the CDC connector delivers it to RisingWave, and the materialized view recomputes only the affected contact. The result is sub-second freshness with no batch jobs.
How do you build a customer 360 view with streaming materialized views?
A canonical contact record is just the starting point. A true customer 360 view joins data from multiple sources: CRM contact facts, organization data, deal pipeline status, and interaction history. RisingWave lets you express these joins as materialized views that update continuously.
Assume you have additional CDC sources for organizations, deals, and interactions:
-- RisingWave: additional CDC sources
CREATE SOURCE organizations_cdc FROM pg_cdc_table
WITH (
hostname = 'your-pg-host',
port = '5432',
username = 'cdc_reader',
password = 'your-password',
database.name = 'crm_db',
schema.name = 'public',
table.name = 'organizations',
slot.name = 'risingwave_org_slot'
);
CREATE SOURCE deals_cdc FROM pg_cdc_table
WITH (
hostname = 'your-pg-host',
port = '5432',
username = 'cdc_reader',
password = 'your-password',
database.name = 'crm_db',
schema.name = 'public',
table.name = 'deals',
slot.name = 'risingwave_deals_slot'
);
CREATE SOURCE interactions_cdc FROM pg_cdc_table
WITH (
hostname = 'your-pg-host',
port = '5432',
username = 'cdc_reader',
password = 'your-password',
database.name = 'crm_db',
schema.name = 'public',
table.name = 'interactions',
slot.name = 'risingwave_interactions_slot'
);
Now build the customer 360 materialized view that joins across all sources:
-- RisingWave: customer 360 materialized view
CREATE MATERIALIZED VIEW customer_360 AS
SELECT
c.contact_id,
c.first_name,
c.last_name,
c.email,
c.company,
c.title,
o.org_id,
o.org_name,
o.industry,
o.employee_count,
d.deal_id,
d.deal_name,
d.stage,
d.amount,
d.expected_close_date,
i.last_interaction_at,
i.interaction_count,
i.last_interaction_type
FROM canonical_contacts c
LEFT JOIN organizations_cdc o ON c.company = o.org_name
LEFT JOIN (
SELECT
contact_id,
deal_id,
deal_name,
stage,
amount,
expected_close_date,
ROW_NUMBER() OVER (
PARTITION BY contact_id
ORDER BY expected_close_date ASC
) AS rn
FROM deals_cdc
) d ON c.contact_id = d.contact_id AND d.rn = 1
LEFT JOIN (
SELECT
contact_id,
MAX(interaction_at) AS last_interaction_at,
COUNT(*) AS interaction_count,
(ARRAY_AGG(interaction_type ORDER BY interaction_at DESC))[1] AS last_interaction_type
FROM interactions_cdc
GROUP BY contact_id
) i ON c.contact_id = i.contact_id;
This single view gives your application a complete picture of every contact: their reconciled profile, their organization, their most imminent deal, and their interaction history. Because RisingWave maintains this view incrementally, any change to any source table (a new interaction logged, a deal stage advanced, an organization detail updated) propagates to the 360 view within milliseconds.
Permission-aware views are a natural extension. If your CRM requires that sales reps only see contacts assigned to them, add a filter:
-- RisingWave: permission-filtered view for a specific team
CREATE MATERIALIZED VIEW team_customer_360 AS
SELECT c360.*
FROM customer_360 c360
JOIN contact_assignments ca ON c360.contact_id = ca.contact_id
WHERE ca.team_id = 'sales-west';
Your application queries the filtered view. No additional access-control logic in application code. No stale permission caches.
What CRM use cases benefit most from streaming SQL?
The CQRS + streaming SQL architecture unlocks several use cases that are difficult or impossible with batch-oriented CRMs.
Real-time lead scoring
Instead of scoring leads in a nightly batch, aggregate signals as they arrive. Each interaction, email open, page visit, and enrichment update feeds into a continuously updated score:
-- RisingWave: real-time lead scoring
CREATE MATERIALIZED VIEW lead_scores AS
SELECT
c.contact_id,
c.first_name,
c.last_name,
c.email,
c.company,
-- Scoring factors
COALESCE(i.interaction_count, 0) * 5 AS interaction_score,
CASE WHEN d.stage = 'negotiation' THEN 30
WHEN d.stage = 'proposal' THEN 20
WHEN d.stage = 'qualification' THEN 10
ELSE 0
END AS deal_stage_score,
CASE WHEN c.title ILIKE '%VP%' OR c.title ILIKE '%Director%' OR c.title ILIKE '%CTO%'
THEN 15 ELSE 0
END AS seniority_score,
CASE WHEN o.employee_count > 500 THEN 20
WHEN o.employee_count > 100 THEN 10
ELSE 5
END AS company_size_score,
-- Total score
(COALESCE(i.interaction_count, 0) * 5 +
CASE WHEN d.stage = 'negotiation' THEN 30
WHEN d.stage = 'proposal' THEN 20
WHEN d.stage = 'qualification' THEN 10
ELSE 0
END +
CASE WHEN c.title ILIKE '%VP%' OR c.title ILIKE '%Director%' OR c.title ILIKE '%CTO%'
THEN 15 ELSE 0
END +
CASE WHEN o.employee_count > 500 THEN 20
WHEN o.employee_count > 100 THEN 10
ELSE 5
END
) AS total_score
FROM canonical_contacts c
LEFT JOIN organizations_cdc o ON c.company = o.org_name
LEFT JOIN (
SELECT contact_id, COUNT(*) AS interaction_count
FROM interactions_cdc
GROUP BY contact_id
) i ON c.contact_id = i.contact_id
LEFT JOIN (
SELECT contact_id, stage,
ROW_NUMBER() OVER (PARTITION BY contact_id ORDER BY expected_close_date ASC) AS rn
FROM deals_cdc
) d ON c.contact_id = d.contact_id AND d.rn = 1;
The moment a new interaction is logged or a deal advances, the lead score updates. Your sales team sees the hottest leads first, always.
Live pipeline dashboards
Deal pipeline dashboards powered by batch queries show yesterday's reality. With streaming SQL, dashboard metrics update the instant a rep moves a deal to a new stage:
-- RisingWave: live pipeline metrics
CREATE MATERIALIZED VIEW pipeline_summary AS
SELECT
stage,
COUNT(*) AS deal_count,
SUM(amount) AS total_value,
AVG(amount) AS avg_deal_size,
MIN(expected_close_date) AS nearest_close
FROM deals_cdc
WHERE stage NOT IN ('closed_won', 'closed_lost')
GROUP BY stage;
Automated deal risk alerts
Identify stalled deals automatically. A materialized view can detect deals that have not had any interaction in 14 days:
-- RisingWave: stalled deal detection
CREATE MATERIALIZED VIEW stalled_deals AS
SELECT
d.deal_id,
d.deal_name,
d.contact_id,
d.stage,
d.amount,
d.expected_close_date,
i.last_interaction_at,
NOW() - i.last_interaction_at AS days_since_contact
FROM deals_cdc d
LEFT JOIN (
SELECT contact_id, MAX(interaction_at) AS last_interaction_at
FROM interactions_cdc
GROUP BY contact_id
) i ON d.contact_id = i.contact_id
WHERE d.stage NOT IN ('closed_won', 'closed_lost')
AND (i.last_interaction_at IS NULL OR i.last_interaction_at < NOW() - INTERVAL '14 days');
Connect this view to an alerting system, and your sales manager gets notified the moment a deal goes cold.
AI agent context delivery
This is the use case that ties everything together. When an AI agent needs to prepare context for a sales call, it queries the customer 360 view through standard SQL. No custom APIs, no cache invalidation logic, no stale snapshots. The agent runs SELECT * FROM customer_360 WHERE contact_id = '...' and gets the freshest possible data, reconciled from every source, with human overrides applied.
This approach works particularly well with context engineering patterns where AI agents need structured, up-to-date context delivered through a database interface rather than ad-hoc API calls.
FAQ
Can RisingWave replace my CRM's analytics layer?
Yes, for read-heavy analytics workloads. RisingWave acts as a streaming read replica that maintains continuously updated materialized views. Your CRM (Salesforce, HubSpot, or a custom PostgreSQL-based system) remains the system of record for writes. RisingWave handles the analytics, dashboards, and AI agent queries with sub-second freshness, eliminating the need for batch ETL into a separate warehouse.
How does streaming SQL handle CRM data reconciliation from multiple sources?
RisingWave materialized views express reconciliation logic as standard SQL. You define ranking rules (human overrides AI, high-confidence overrides low-confidence, newer overrides older) using window functions like ROW_NUMBER(). The streaming engine applies these rules incrementally as new data arrives, so the reconciled view is always current without running a full recomputation.
What latency can I expect for CRM materialized views in RisingWave?
For typical CRM workloads (thousands to millions of contacts with moderate update rates), expect end-to-end latency from PostgreSQL write to materialized view update in the range of 200ms to 2 seconds. This includes CDC capture, streaming transport, and incremental view maintenance. The exact latency depends on your barrier interval configuration and the complexity of your view definitions.
Does this approach work with existing CRMs like Salesforce or HubSpot?
Yes. You do not need to replace your CRM. Use a CDC connector or data integration tool (such as Airbyte or Fivetran) to replicate your CRM data into PostgreSQL or directly into RisingWave. RisingWave supports PostgreSQL CDC natively, and you can also ingest data via Kafka topics if your integration tool exports to Kafka. The materialized views then serve as a real-time analytics and context layer on top of your existing CRM.
Conclusion
Building a real-time CRM with streaming SQL comes down to three architectural decisions:
- Separate writes from reads with CQRS. Capture every observation as an immutable fact in PostgreSQL. Let a streaming database handle the reconciliation and read path.
- Use CDC to bridge the gap. Change data capture eliminates batch ETL. Every write to PostgreSQL flows to RisingWave within milliseconds.
- Express your read models as materialized views. Customer 360 views, lead scores, pipeline dashboards, and deal risk alerts are all SQL queries that RisingWave maintains incrementally.
- Deliver context through SQL, not custom APIs. AI agents and applications query materialized views with standard SQL. No cache layers, no stale snapshots, no reconciliation logic in application code.
This architecture scales with the write volume that AI agents introduce, keeps every view fresh, and requires no proprietary query languages or complex stream processing frameworks. It is just SQL.
Ready to try this yourself? Try RisingWave Cloud free, no credit card required. Sign up here.
Join our Slack community to ask questions and connect with other stream processing developers.

