Introduction
Search for "agentic data architecture" and you will find no shortage of material. Pyramid diagrams with three to five layers. Circles labeled "orchestration" pointing to other circles labeled "context." Frameworks with names like the Reasoning-Action Loop or the Adaptive Intelligence Stack. Most of it is accurate at a high level. None of it helps you ship.
This article takes a different approach. Every layer of the reference architecture is backed by SQL you can run. No pseudocode. No "implement your own integration here" placeholders. The architecture has three layers: ingestion, processing, and serving. By the end of this article, you will have the complete implementation for all three.
The system uses RisingWave as the streaming SQL engine, PostgreSQL as the operational write store, and the risingwave-mcp server for agent-facing access. The same architecture works whether you are building a customer support agent, a sales intelligence agent, or an autonomous operations agent.
Why This Article Exists
McKinsey frameworks do not help you ship. Neither do the blog posts that restate them. The gap in agentic data architecture coverage is not conceptual. It is implementation. Engineers know they need a retrieval layer. They know agents need fresh context. What they do not always know is what CREATE SOURCE statement to write, how to structure the materialized views so agents can reason about them, or where to draw the line between what lives in PostgreSQL and what lives in the streaming layer.
This guide is opinionated about those decisions. Follow it and you will have a working architecture in a day.
The Three Layers
An agentic data architecture needs to solve three problems in sequence:
- Ingestion: Get data from operational systems into the streaming layer continuously.
- Processing and context: Transform raw data into structured context that agents can query: user profiles, entity states, risk signals.
- Serving: Expose that context to agents through SQL or natural language, with sub-second freshness.
Each layer has a clear owner and a clear interface. Let's build them.
Layer 1: Ingestion
The ingestion layer brings data into RisingWave from wherever it lives operationally. Two sources cover most production architectures: PostgreSQL (the operational database) and Kafka (the event bus).
Ingesting from PostgreSQL via CDC
Change data capture from PostgreSQL gives you a live stream of every INSERT, UPDATE, and DELETE from your transactional database. RisingWave connects directly to the PostgreSQL WAL without requiring Kafka or Debezium.
First, configure PostgreSQL for logical replication:
-- In PostgreSQL: enable logical replication
ALTER SYSTEM SET wal_level = logical;
-- Create a publication for the tables you want to capture
CREATE PUBLICATION risingwave_pub FOR TABLE users, orders, accounts;
Then create the CDC source in RisingWave:
-- In RisingWave: create the CDC source
CREATE SOURCE pg_cdc WITH (
connector = 'postgres-cdc',
hostname = 'postgres.internal',
port = '5432',
username = 'cdc_reader',
password = 'secure_password',
database.name = 'production',
slot.name = 'risingwave_main_slot',
publication.name = 'risingwave_pub'
);
Now create tables in RisingWave that mirror the PostgreSQL tables:
-- Mirror the users table
CREATE TABLE users (
user_id BIGINT PRIMARY KEY,
name VARCHAR,
email VARCHAR,
plan VARCHAR,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
) FROM pg_cdc TABLE 'public.users';
-- Mirror the orders table
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
amount NUMERIC,
status VARCHAR,
product_category VARCHAR,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
) FROM pg_cdc TABLE 'public.orders';
-- Mirror the accounts table
CREATE TABLE accounts (
account_id BIGINT PRIMARY KEY,
user_id BIGINT,
balance NUMERIC,
last_activity TIMESTAMPTZ,
risk_score NUMERIC
) FROM pg_cdc TABLE 'public.accounts';
Every change in PostgreSQL flows into these tables within seconds.
Ingesting from Kafka
For event streams that do not originate in a database (clickstream, API events, IoT telemetry), Kafka is the standard source.
-- Ingest user behavior events from Kafka
CREATE SOURCE user_events (
event_id VARCHAR,
user_id BIGINT,
event_type VARCHAR,
page VARCHAR,
metadata JSONB,
event_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'user-events',
properties.bootstrap.server = 'kafka.internal:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Ingest transaction events from Kafka
CREATE SOURCE transaction_events (
transaction_id VARCHAR,
account_id BIGINT,
user_id BIGINT,
amount NUMERIC,
merchant VARCHAR,
country VARCHAR,
event_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'transactions',
properties.bootstrap.server = 'kafka.internal:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Both sources feed into Layer 2. RisingWave joins, aggregates, and enriches the combined stream continuously.
Layer 2: Processing and Context
The processing layer transforms raw ingested data into structured context that agents can query intelligently. Three categories of context views cover the large majority of agent use cases.
Context View 1: User Profile (Recent Behavior Aggregation)
The user profile view aggregates a user's recent behavior into a compact, queryable record. Agents use this to personalize responses, segment users, and understand intent.
CREATE MATERIALIZED VIEW user_profile AS
SELECT
u.user_id,
u.name,
u.email,
u.plan,
-- Order behavior: last 30 days
COUNT(o.order_id) AS orders_last_30d,
COALESCE(SUM(o.amount), 0) AS spend_last_30d,
COALESCE(AVG(o.amount), 0) AS avg_order_value,
MAX(o.created_at) AS last_order_at,
-- Top category
MODE() WITHIN GROUP (ORDER BY o.product_category) AS favorite_category,
-- Recent events
COUNT(e.event_id) AS page_views_last_7d
FROM users u
LEFT JOIN orders o
ON u.user_id = o.user_id
AND o.created_at > NOW() - INTERVAL '30 days'
AND o.status = 'completed'
LEFT JOIN user_events e
ON u.user_id = e.user_id
AND e.event_time > NOW() - INTERVAL '7 days'
AND e.event_type = 'page_view'
GROUP BY u.user_id, u.name, u.email, u.plan;
An agent querying this view for user 9421 gets a single row with everything needed to personalize a response: spend history, favorite category, recent engagement level.
Context View 2: Entity State (Current Status)
The entity state view holds the current, authoritative status of business entities. For order management agents, this is the single most queried view.
CREATE MATERIALIZED VIEW order_entity_state AS
SELECT
o.order_id,
o.user_id,
u.name AS customer_name,
u.email AS customer_email,
o.amount,
o.status,
o.product_category,
o.created_at,
o.updated_at,
-- Time since last update in minutes
EXTRACT(EPOCH FROM (NOW() - o.updated_at)) / 60 AS minutes_since_update,
-- Flag orders that have been in the same status too long
CASE
WHEN o.status = 'processing'
AND EXTRACT(EPOCH FROM (NOW() - o.updated_at)) / 3600 > 24
THEN true
ELSE false
END AS is_stalled
FROM orders o
JOIN users u ON o.user_id = u.user_id;
An agent checking on a specific order gets its current status, whether it is stalled, and the customer's contact details, all in one query.
Context View 3: Risk Signal (Anomaly Detection)
The risk signal view identifies anomalous patterns in real time. Fraud detection agents and compliance agents use this as a primary input.
-- Tumbling window: transaction velocity per user in 1-hour buckets
CREATE MATERIALIZED VIEW transaction_velocity AS
SELECT
user_id,
window_start,
window_end,
COUNT(*) AS tx_count,
SUM(amount) AS tx_total,
COUNT(DISTINCT country) AS country_count,
-- Flag high-velocity windows
CASE
WHEN COUNT(*) > 10 THEN 'high'
WHEN COUNT(*) > 5 THEN 'medium'
ELSE 'normal'
END AS velocity_level
FROM TUMBLE(transaction_events, event_time, INTERVAL '1' HOUR)
GROUP BY user_id, window_start, window_end;
-- Risk signal view: current risk state per user
CREATE MATERIALIZED VIEW user_risk_signal AS
SELECT
a.account_id,
a.user_id,
a.balance,
a.risk_score,
-- Most recent velocity window
v.tx_count AS recent_tx_count,
v.tx_total AS recent_tx_total,
v.country_count AS recent_country_count,
v.velocity_level,
-- Combined risk assessment
CASE
WHEN a.risk_score > 0.8 OR v.velocity_level = 'high' THEN 'critical'
WHEN a.risk_score > 0.5 OR v.velocity_level = 'medium' THEN 'elevated'
ELSE 'normal'
END AS risk_level
FROM accounts a
LEFT JOIN transaction_velocity v
ON a.user_id = v.user_id
AND v.window_end = (
SELECT MAX(window_end)
FROM transaction_velocity tv2
WHERE tv2.user_id = a.user_id
);
The TUMBLE function groups transaction events into fixed 1-hour windows. TUMBLE(transaction_events, event_time, INTERVAL '1' HOUR) partitions the stream by window_start and window_end, which appear as grouping columns. This produces a running count of transaction velocity without any application-side aggregation code.
Layer 3: Serving
The serving layer exposes the context views to agents. Two paths are available: direct SQL via a PostgreSQL connection, and natural language via the risingwave-mcp server.
Direct SQL from Agent Code
RisingWave implements the PostgreSQL wire protocol. Any PostgreSQL client library connects to it without modification. Here is a Python agent that uses the three context views to triage a customer support ticket:
import psycopg2
from openai import OpenAI
def triage_support_ticket(ticket_text: str, user_id: int) -> str:
# Connect to RisingWave via PostgreSQL wire protocol
conn = psycopg2.connect(
host="risingwave.internal",
port=4566,
dbname="dev",
user="root",
password=""
)
cur = conn.cursor()
# Fetch user profile
cur.execute("""
SELECT name, plan, orders_last_30d, spend_last_30d,
favorite_category, last_order_at
FROM user_profile
WHERE user_id = %s
""", (user_id,))
profile = cur.fetchone()
# Fetch any stalled orders
cur.execute("""
SELECT order_id, status, amount, minutes_since_update
FROM order_entity_state
WHERE user_id = %s AND is_stalled = true
ORDER BY minutes_since_update DESC
LIMIT 3
""", (user_id,))
stalled_orders = cur.fetchall()
# Fetch risk signal
cur.execute("""
SELECT risk_level, velocity_level, recent_tx_count
FROM user_risk_signal
WHERE user_id = %s
""", (user_id,))
risk = cur.fetchone()
cur.close()
conn.close()
# Build context for the agent
context = f"""
Customer: {profile[0]} | Plan: {profile[1]}
Orders (30d): {profile[2]} | Spend (30d): ${profile[3]:.2f}
Favorite category: {profile[4]} | Last order: {profile[5]}
Stalled orders: {len(stalled_orders)}
{chr(10).join([f" - Order {r[0]}: {r[1]}, ${r[2]:.2f}, stalled {r[3]:.0f}m" for r in stalled_orders])}
Risk level: {risk[0] if risk else 'unknown'} |
Velocity: {risk[1] if risk else 'unknown'}
"""
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"You are a customer support triage agent. "
"Based on the customer context below, assess the ticket "
"and recommend a response priority and action.\n\n"
f"Customer context:\n{context}"
)
},
{"role": "user", "content": f"Support ticket:\n{ticket_text}"}
]
)
return response.choices[0].message.content
The agent queries three materialized views, assembles a context record, and passes it to the LLM. Because the views update continuously, the context always reflects the current state of the customer's account.
Via MCP: Natural Language Agent Queries
The risingwave-mcp server (released January 2026) wraps RisingWave in the Model Context Protocol. Agents using Claude, GitHub Copilot, or any MCP-compatible framework can query RisingWave in natural language, with the MCP server translating requests to SQL.
Configure it in claude_desktop_config.json:
{
"mcpServers": {
"risingwave": {
"command": "python",
"args": ["/path/to/risingwave-mcp/src/main.py"],
"env": {
"RISINGWAVE_CONNECTION_STR": "postgresql://root:root@localhost:4566/dev"
}
}
}
}
With this in place, an agent can ask "Show me all high-risk users with stalled orders from the last hour" and the MCP server translates that to a join across user_risk_signal and order_entity_state. The agent does not need to know the schema. The materialized views, with their clear column names and comments, become self-documenting tools.
Add comments to make views more discoverable:
COMMENT ON MATERIALIZED VIEW user_risk_signal IS
'Current risk level per user based on account risk score and recent transaction velocity. Use to identify users who need manual review or fraud screening.';
COMMENT ON MATERIALIZED VIEW user_profile IS
'Aggregated user behavior for the last 30 days: order count, spend, favorite category. Use for personalization and customer segmentation.';
COMMENT ON MATERIALIZED VIEW order_entity_state IS
'Current state of all orders with staleness flags. Query by user_id or order_id. is_stalled=true means the order has been in the same status for more than 24 hours.';
Complete Architecture Diagram
┌─────────────────────────────────────────────────────────────┐
│ LAYER 1: INGESTION │
│ │
│ PostgreSQL (writes) Kafka (event bus) │
│ ├── users ├── user-events │
│ ├── orders CDC ├── transactions │
│ └── accounts ─────────────►└── (other streams) │
│ │ │
│ ▼ │
│ RisingWave │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ LAYER 2: PROCESSING / CONTEXT │
│ │
│ user_profile order_entity_state user_risk_signal│
│ (behavior agg) (current status) (anomaly detect)│
│ │ │ │ │
│ └──────────────────────┴────────────────────┘ │
│ Materialized Views │
│ (incremental, always current) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ LAYER 3: SERVING │
│ │
│ Direct SQL (psycopg2) MCP (risingwave-mcp) │
│ ├── Python agent ├── Claude Desktop │
│ ├── Node.js agent ├── VS Code Copilot │
│ └── Any PG client └── Any MCP client │
└─────────────────────────────────────────────────────────────┘
Data moves downward through the layers in real time. PostgreSQL handles all writes. RisingWave handles all reads and analytics. The serving layer never touches PostgreSQL directly.
Production Considerations: What Goes Where
One of the most common architecture mistakes in agentic systems is using the same database for writes and analytical reads. This creates two problems: OLTP queries and analytical queries compete for the same resources, and complex aggregations (like the user_profile view) impose unacceptable latency on the transactional database.
The rule is simple:
PostgreSQL owns writes. RisingWave owns reads.
| Operation | System | Rationale |
| User registration | PostgreSQL | Transactional write, needs ACID |
| Order creation | PostgreSQL | Transactional write, needs ACID |
| Customer profile query | RisingWave | Aggregation, needs freshness |
| Risk signal lookup | RisingWave | Aggregation, needs sub-second latency |
| Order status check | RisingWave | Current state, CDC-mirrored |
| Historical reporting | Data warehouse | Full history, batch acceptable |
Agents should never query PostgreSQL directly. They query RisingWave. RisingWave stays in sync with PostgreSQL via CDC, so the agent always gets current data without hitting the operational database.
This separation also gives you a single point of control for agent data access. You decide what context views to expose. Agents can only see what you have explicitly modeled in a materialized view. That is a security and governance property, not just an architecture preference.
FAQ
What is an agentic data architecture?
An agentic data architecture is a system design that gives AI agents reliable access to structured, fresh, queryable data. It typically has three layers: ingestion (CDC and event streams), processing (materialized views that aggregate and contextualize data), and serving (SQL or MCP interfaces that agents use to retrieve context). The key requirement is freshness: agents make decisions based on current state, not batch snapshots.
Why use RisingWave instead of PostgreSQL directly for agent queries?
PostgreSQL is optimized for transactional writes. Complex aggregations like user profiles or risk signals require scanning and joining large tables, which is slow on a transactional database and competes with write traffic. RisingWave maintains these aggregations incrementally as materialized views, so agent queries return pre-computed results with sub-second latency regardless of the underlying data volume.
What is the TUMBLE function in RisingWave?
TUMBLE is a streaming window function in RisingWave that groups a stream into fixed, non-overlapping time windows. The syntax is FROM TUMBLE(table, time_column, INTERVAL 'X' UNIT), and the query must GROUP BY window_start, window_end. It is commonly used for computing metrics like transaction counts per hour or event rates per minute.
What is the risingwave-mcp server?
The risingwave-mcp server (github.com/risingwavelabs/risingwave-mcp) is an official MCP server for RisingWave, released in January 2026. It exposes RisingWave tables and materialized views as tools that MCP-compatible AI agents can discover and query using natural language. It supports Claude Desktop, VS Code Copilot, and any client that implements the Model Context Protocol.
Do agents need to know SQL to use this architecture?
Not necessarily. Through the MCP serving path, agents can query RisingWave using natural language. The risingwave-mcp server translates natural language requests into SQL. Through the direct SQL path, agents issue SQL queries via a standard PostgreSQL connection. Either way, the SQL complexity lives in the materialized view definitions, not in the agent code.
Conclusion
Agentic data architecture does not require a new category of database, a proprietary orchestration layer, or three months of infrastructure work. It requires a clear separation of concerns: PostgreSQL for writes, RisingWave for reads and streaming aggregation, and a well-defined context layer that agents can query without writing complex SQL themselves.
Key takeaways from this guide:
- Layer 1 (Ingestion): CDC from PostgreSQL and Kafka connectors bring data into RisingWave without middleware.
- Layer 2 (Context): Three materialized view patterns cover most agent use cases: user profile, entity state, and risk signal.
- Layer 3 (Serving): Direct SQL via psycopg2 for programmatic agents; risingwave-mcp for natural language queries.
- PostgreSQL for writes, RisingWave for reads: this split eliminates query contention and gives you governance over what agents can access.
- All of this is standard SQL: no proprietary APIs, no vendor-specific frameworks, no lock-in beyond what you are already committed to.
Ready to try this yourself? Try RisingWave Cloud free, no credit card required. Sign up here.
Join our Slack community to ask questions and share what you build.

