Every AI agent has a context problem. The agent needs to know what a user has done, what state an entity is in, or what a system is doing right now. But the raw data that answers those questions is buried in event logs with millions of rows that are too slow to query at inference time and too noisy to feed directly into a prompt.
The standard fix is a pipeline that pre-computes summaries on a schedule. Batch. Every 15 minutes. Every hour. That means context that is 14 minutes stale at best and 59 minutes stale at worst. For a support agent, that is the difference between "you just purchased StreamKit Pro" and "you may have purchased something recently." For a fraud detection agent, stale context is a liability.
This tutorial shows a better path: streaming SQL for AI raw events to agent-ready context. You will build a working pipeline in RisingWave that takes raw event inserts, joins them with dimension tables, and materializes compact, structured context that any AI agent can read in a single query. All SQL is verified against RisingWave 2.8.0. Actual output is shown.
Why Raw Events Are the Wrong Input for AI Agents
Raw event tables look something like this: one row per action, with a user ID, a timestamp, an event type, and a handful of fields. Simple to write to. Impossible to query cheaply at scale.
Feeding raw events to an agent creates three problems:
Token overhead. An agent that reads 500 raw events to understand one user wastes 90% of its context window on repetitive schema and timestamps. The same information - "this user made 8 actions, had 1 error, and spent $299 in the last 2 hours" - fits in one JSON object under 300 tokens.
Latency. Aggregating raw events at query time means running a GROUP BY across potentially millions of rows on every agent request. That adds hundreds of milliseconds to latency, which compounds across the 10-50 context lookups a complex agent might make in a single reasoning step.
Staleness. Batch pipelines that pre-compute summaries solve the latency problem but introduce staleness. A materialized view in a streaming database like RisingWave solves both at once: the result is pre-computed (fast to read) and continuously updated (fresh).
The Architecture: Raw Events to Agent-Ready Context
The pipeline has four layers:
- Event ingestion - raw behavioral events land in source tables (from your application, Kafka, CDC, or direct inserts)
- Dimension tables - user profiles, product catalogs, and configuration data that enrich the events
- Materialized views - streaming SQL that continuously joins and aggregates events into compact context objects
- Agent query layer - the agent reads pre-computed context in milliseconds via standard PostgreSQL
Raw Events ──┐
├─► RisingWave Streaming SQL ──► Materialized Views ──► AI Agent
Dimensions ──┘ (incremental) (always fresh) (reads JSON)
RisingWave handles steps 2 and 3. Because it is wire-compatible with PostgreSQL, your agent connects using any PostgreSQL client - psycopg2, asyncpg, SQLAlchemy, or a framework-level connector.
Step 1: Define Source Tables
Start with three source tables: one for raw events and two for dimension data. Use the ai_ctx_ prefix to keep them organized.
CREATE TABLE ai_ctx_events (
event_id VARCHAR,
user_id VARCHAR,
session_id VARCHAR,
event_type VARCHAR, -- 'page_view', 'click', 'purchase', 'error', 'search'
page_path VARCHAR,
product_id VARCHAR,
amount NUMERIC,
error_code VARCHAR,
occurred_at TIMESTAMPTZ
);
CREATE TABLE ai_ctx_users (
user_id VARCHAR PRIMARY KEY,
display_name VARCHAR,
plan VARCHAR, -- 'free', 'pro', 'enterprise'
country VARCHAR,
joined_at DATE
);
CREATE TABLE ai_ctx_products (
product_id VARCHAR PRIMARY KEY,
product_name VARCHAR,
category VARCHAR,
price NUMERIC
);
In production, ai_ctx_events would be backed by a Kafka source connector so events flow in continuously from your application. ai_ctx_users and ai_ctx_products would be populated via Change Data Capture (CDC) from your primary database.
For this tutorial, insert dimension data directly:
INSERT INTO ai_ctx_users VALUES
('u_001', 'Alice Kim', 'enterprise', 'US', '2024-03-15'),
('u_002', 'Bob Torres', 'pro', 'GB', '2024-09-01'),
('u_003', 'Chen Wei', 'free', 'SG', '2025-01-20');
INSERT INTO ai_ctx_products VALUES
('p_101', 'StreamKit Pro', 'data-platform', 299.00),
('p_102', 'AlertEngine Plus', 'observability', 149.00),
('p_103', 'DataSync Lite', 'integration', 49.00);
Now simulate a realistic mix of raw events across three users and three sessions:
INSERT INTO ai_ctx_events VALUES
('e001','u_001','s_A','page_view','/pricing', NULL, NULL, NULL, NOW() - INTERVAL '55 minutes'),
('e002','u_001','s_A','page_view','/pricing/enterprise',NULL, NULL, NULL, NOW() - INTERVAL '50 minutes'),
('e003','u_001','s_A','search', '/search', NULL, NULL, NULL, NOW() - INTERVAL '48 minutes'),
('e004','u_001','s_A','purchase', '/checkout', 'p_101',299.00, NULL, NOW() - INTERVAL '45 minutes'),
('e005','u_001','s_A','page_view','/dashboard', NULL, NULL, NULL, NOW() - INTERVAL '40 minutes'),
('e006','u_001','s_A','error', '/api/export', NULL, NULL, 'ERR_5001', NOW() - INTERVAL '35 minutes'),
('e007','u_001','s_A','click', '/docs/quickstart', NULL, NULL, NULL, NOW() - INTERVAL '30 minutes'),
('e008','u_002','s_B','page_view','/blog', NULL, NULL, NULL, NOW() - INTERVAL '25 minutes'),
('e009','u_002','s_B','page_view','/blog/streaming-sql',NULL, NULL, NULL, NOW() - INTERVAL '22 minutes'),
('e010','u_002','s_B','click', '/docs', NULL, NULL, NULL, NOW() - INTERVAL '20 minutes'),
('e011','u_002','s_B','purchase', '/checkout', 'p_102',149.00, NULL, NOW() - INTERVAL '18 minutes'),
('e012','u_002','s_B','error', '/api/ingest', NULL, NULL, 'ERR_4022', NOW() - INTERVAL '15 minutes'),
('e013','u_002','s_B','error', '/api/ingest', NULL, NULL, 'ERR_4022', NOW() - INTERVAL '12 minutes'),
('e014','u_003','s_C','page_view','/pricing', NULL, NULL, NULL, NOW() - INTERVAL '10 minutes'),
('e015','u_003','s_C','click', '/pricing/pro', NULL, NULL, NULL, NOW() - INTERVAL '8 minutes'),
('e016','u_003','s_C','page_view','/signup', NULL, NULL, NULL, NOW() - INTERVAL '5 minutes'),
('e017','u_003','s_C','error', '/api/signup', NULL, NULL, 'ERR_4001', NOW() - INTERVAL '3 minutes'),
('e018','u_001','s_D','page_view','/dashboard', NULL, NULL, NULL, NOW() - INTERVAL '2 minutes');
18 raw events across 3 users. Each event is a thin row - just IDs, types, and timestamps. No agent should read these directly.
Step 2: Build the User Activity Profile View
The first materialized view compresses the raw event stream into a per-user activity profile. It joins with ai_ctx_users for enrichment and uses a temporal filter to maintain a rolling 2-hour window.
CREATE MATERIALIZED VIEW ai_ctx_user_profile AS
SELECT
e.user_id,
u.display_name,
u.plan,
u.country,
COUNT(*) AS total_events,
COUNT(DISTINCT e.session_id) AS sessions,
COUNT(*) FILTER (WHERE e.event_type = 'page_view') AS page_views,
COUNT(*) FILTER (WHERE e.event_type = 'error') AS errors,
COUNT(*) FILTER (WHERE e.event_type = 'purchase') AS purchases,
COALESCE(SUM(e.amount) FILTER (WHERE e.event_type = 'purchase'), 0) AS total_spent,
MAX(e.occurred_at) AS last_seen
FROM ai_ctx_events e
JOIN ai_ctx_users u ON e.user_id = u.user_id
WHERE e.occurred_at > NOW() - INTERVAL '2 hours'
GROUP BY e.user_id, u.display_name, u.plan, u.country;
Query the result:
SELECT * FROM ai_ctx_user_profile ORDER BY user_id;
user_id | display_name | plan | country | total_events | sessions | page_views | errors | purchases | total_spent | last_seen
---------+--------------+------------+---------+--------------+----------+------------+--------+-----------+-------------+-------------------------------
u_001 | Alice Kim | enterprise | US | 8 | 2 | 4 | 1 | 1 | 299.00 | 2026-04-02 07:27:54.940+00:00
u_002 | Bob Torres | pro | GB | 6 | 1 | 2 | 2 | 1 | 149.00 | 2026-04-02 07:17:54.940+00:00
u_003 | Chen Wei | free | SG | 4 | 1 | 2 | 1 | 0 | 0 | 2026-04-02 07:26:54.940+00:00
(3 rows)
18 raw events compressed into 3 rows. Each row contains everything a support agent or personalization system needs to understand that user's recent behavior. The WHERE e.occurred_at > NOW() - INTERVAL '2 hours' clause is a temporal filter - RisingWave automatically evicts data outside the window and reclaims storage without any cleanup job.
Notice what the join adds: the agent sees Alice Kim, enterprise, US alongside behavioral signals, without needing a separate lookup to the user table at query time. Enrichment happens once at write time, not on every agent read.
Step 3: Build the Session-Level Summary View
Support agents and conversational interfaces need session context, not just lifetime profiles. A user might have a great overall history but be in the middle of a bad session right now. This view captures the current-session state:
CREATE MATERIALIZED VIEW ai_ctx_session_summary AS
SELECT
e.session_id,
e.user_id,
MIN(e.occurred_at) AS session_start,
MAX(e.occurred_at) AS session_end,
COUNT(*) AS event_count,
COUNT(*) FILTER (WHERE e.event_type = 'error') AS errors,
COUNT(DISTINCT e.page_path) AS unique_pages,
COUNT(*) FILTER (WHERE e.event_type = 'purchase') AS purchases,
STRING_AGG(DISTINCT e.error_code, ', ')
FILTER (WHERE e.error_code IS NOT NULL) AS error_codes
FROM ai_ctx_events e
WHERE e.occurred_at > NOW() - INTERVAL '2 hours'
GROUP BY e.session_id, e.user_id;
SELECT * FROM ai_ctx_session_summary ORDER BY session_id;
session_id | user_id | session_start | session_end | event_count | errors | unique_pages | purchases | error_codes
------------+---------+-------------------------------+-------------------------------+-------------+--------+--------------+-----------+-------------
s_A | u_001 | 2026-04-02 06:34:54.940+00:00 | 2026-04-02 06:59:54.940+00:00 | 7 | 1 | 7 | 1 | ERR_5001
s_B | u_002 | 2026-04-02 07:04:54.940+00:00 | 2026-04-02 07:17:54.940+00:00 | 6 | 2 | 5 | 1 | ERR_4022
s_C | u_003 | 2026-04-02 07:19:54.940+00:00 | 2026-04-02 07:26:54.940+00:00 | 4 | 1 | 4 | 0 | ERR_4001
s_D | u_001 | 2026-04-02 07:27:54.940+00:00 | 2026-04-02 07:27:54.940+00:00 | 1 | 0 | 1 | 0 |
(4 rows)
Session s_B for Bob Torres (u_002) shows 2 errors with error code ERR_4022 on the /api/ingest endpoint. A support agent reading this in the context of an incoming chat from Bob can immediately address the ingestion issue without asking "can you describe what happened?" The context arrived pre-computed.
Step 4: Build the Entity State View
Agents that answer product or business questions need entity-level signals, not just user-level ones. This materialized view tracks product engagement over the last hour:
CREATE MATERIALIZED VIEW ai_ctx_entity_state AS
SELECT
e.product_id,
p.product_name,
p.category,
p.price,
COUNT(*) FILTER (WHERE e.event_type = 'purchase') AS purchases_1h,
COALESCE(SUM(e.amount) FILTER (WHERE e.event_type = 'purchase'), 0) AS revenue_1h,
COUNT(DISTINCT e.user_id) AS unique_buyers_1h
FROM ai_ctx_events e
JOIN ai_ctx_products p ON e.product_id = p.product_id
WHERE e.occurred_at > NOW() - INTERVAL '1 hour'
AND e.product_id IS NOT NULL
GROUP BY e.product_id, p.product_name, p.category, p.price;
SELECT * FROM ai_ctx_entity_state ORDER BY product_id;
product_id | product_name | category | price | purchases_1h | revenue_1h | unique_buyers_1h
------------+------------------+---------------+-------+--------------+------------+------------------
p_101 | StreamKit Pro | data-platform | 299 | 1 | 299.00 | 1
p_102 | AlertEngine Plus | observability | 149 | 1 | 149.00 | 1
(2 rows)
A sales or recommendation agent can use this view to know which products are moving right now. An agent answering "which products are popular today?" reads two rows instead of scanning thousands of purchase events. The dimension join gives it human-readable product names and categories rather than bare IDs.
Step 5: Produce Agent-Ready JSON Context
The final step bundles user profile data into a single JSON object. This is the format most LLM frameworks expect when you construct a system prompt: compact, labeled, no schema noise.
CREATE MATERIALIZED VIEW ai_ctx_json_context AS
SELECT
up.user_id,
up.display_name,
up.plan,
up.country,
JSONB_BUILD_OBJECT(
'user_id', up.user_id,
'name', up.display_name,
'plan', up.plan,
'country', up.country,
'events_2h', up.total_events,
'sessions_2h', up.sessions,
'page_views_2h', up.page_views,
'errors_2h', up.errors,
'purchases_2h', up.purchases,
'total_spent_2h', up.total_spent,
'last_seen', up.last_seen
) AS context_json
FROM ai_ctx_user_profile up;
Query context for a specific user:
SELECT context_json
FROM ai_ctx_json_context
WHERE user_id = 'u_001';
{"country": "US", "errors_2h": 1, "events_2h": 8, "last_seen": "2026-04-02T07:27:54.940+00:00",
"name": "Alice Kim", "page_views_2h": 4, "plan": "enterprise", "purchases_2h": 1,
"sessions_2h": 2, "total_spent_2h": 299.0, "user_id": "u_001"}
This is the output the agent gets. One query. One row. One JSON blob. The agent can inline this directly into a system prompt:
import psycopg2, json
conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")
cur = conn.cursor()
def get_user_context(user_id: str) -> dict:
cur.execute(
"SELECT context_json FROM ai_ctx_json_context WHERE user_id = %s",
(user_id,)
)
row = cur.fetchone()
return row[0] if row else {}
context = get_user_context("u_001")
system_prompt = f"""You are a support agent for a SaaS platform.
User context (last 2 hours):
{json.dumps(context, indent=2)}
Respond to the user's message based on their recent activity.
If they had errors, acknowledge and help resolve them.
If they recently purchased, confirm and offer onboarding assistance."""
The SELECT context_json FROM ai_ctx_json_context WHERE user_id = %s query runs in single-digit milliseconds because it reads a pre-computed row, not the underlying event table. No aggregation at query time. No JOIN overhead. No schema deserialization.
The Token Reduction: 18 Raw Events vs. 1 JSON Object
Here is the compression in numbers:
SELECT 'raw events' AS data_type, COUNT(*) AS rows FROM ai_ctx_events
UNION ALL
SELECT 'user profiles', COUNT(*) FROM ai_ctx_user_profile
UNION ALL
SELECT 'session summaries',COUNT(*) FROM ai_ctx_session_summary
UNION ALL
SELECT 'entity states', COUNT(*) FROM ai_ctx_entity_state;
data_type | rows
-------------------+------
raw events | 18
user profiles | 3
session summaries | 4
entity states | 2
(4 rows)
18 raw events for 3 users. If an agent read all 18 rows with their full schema (event_id, user_id, session_id, event_type, page_path, product_id, amount, error_code, occurred_at) that is roughly 180 columns worth of data. The compact JSON context for those same 3 users is 3 objects, each about 250 tokens. For a production system at 10,000 events per user per day, the compression ratio reaches 10x-50x depending on how many distinct event types and columns the raw table has.
The token savings matter for two reasons: cost (fewer input tokens) and quality (the model can focus on the signal rather than the noise).
Time Windows for Periodic Signals
Some agent use cases need bucketed signals rather than rolling aggregates. A sales agent asking "what happened in the last 30 minutes?" wants a discrete snapshot, not a running total. Use TUMBLE windows:
SELECT
window_start,
window_end,
event_type,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users
FROM TUMBLE(ai_ctx_events, occurred_at, INTERVAL '30 minutes')
GROUP BY window_start, window_end, event_type
ORDER BY window_start, event_type;
window_start | window_end | event_type | event_count | unique_users
---------------------------+---------------------------+------------+-------------+--------------
2026-04-02 06:30:00+00:00 | 2026-04-02 07:00:00+00:00 | click | 1 | 1
2026-04-02 06:30:00+00:00 | 2026-04-02 07:00:00+00:00 | error | 1 | 1
2026-04-02 06:30:00+00:00 | 2026-04-02 07:00:00+00:00 | page_view | 3 | 1
2026-04-02 06:30:00+00:00 | 2026-04-02 07:00:00+00:00 | purchase | 1 | 1
2026-04-02 06:30:00+00:00 | 2026-04-02 07:00:00+00:00 | search | 1 | 1
2026-04-02 07:00:00+00:00 | 2026-04-02 07:30:00+00:00 | click | 2 | 2
2026-04-02 07:00:00+00:00 | 2026-04-02 07:30:00+00:00 | error | 3 | 2
2026-04-02 07:00:00+00:00 | 2026-04-02 07:30:00+00:00 | page_view | 5 | 3
2026-04-02 07:00:00+00:00 | 2026-04-02 07:30:00+00:00 | purchase | 1 | 1
(9 rows)
An agent can read this windowed view to spot trends: errors spiked in the current 30-minute window (3 errors across 2 users) relative to the previous window (1 error for 1 user). This kind of signal is useful for monitoring agents that decide whether to escalate or alert.
TUMBLE and HOP windows are built-in RisingWave SQL functions. No custom code required.
Streaming SQL vs. Batch Pipelines: Why Freshness Matters for Agents
The comparison table most teams ask for:
| Dimension | Batch (Airflow + dbt) | Streaming SQL (RisingWave) |
| Context freshness | Minutes to hours (depends on schedule) | Sub-second (incremental updates) |
| Query latency | Low (pre-computed) | Low (pre-computed) |
| Infrastructure | Scheduler + compute cluster + object storage | Single streaming database |
| Code required | Python DAGs + dbt models + schedule config | SQL only |
| Data expiry | Manual cleanup jobs or partition pruning | Automatic via temporal filters |
| Stream-table joins | Complex (requires Spark or custom code) | Native SQL JOIN |
The key difference is not just speed. Staleness creates a class of agent bugs that are difficult to debug: the agent reasons correctly from its inputs, but its inputs are wrong. When Alice upgraded from "pro" to "enterprise" 20 minutes ago and her context still says "pro," the agent might send her the wrong onboarding flow. That bug is not in the agent's logic - it is in the pipeline.
Streaming SQL eliminates that class of bug by keeping the context always current. When Alice's profile row in ai_ctx_users updates, RisingWave propagates the change through the JOIN in ai_ctx_user_profile incrementally, within seconds.
For a deeper look at how this architecture connects to AI agent frameworks, see How to Connect a Streaming Database to AI Agents via MCP and AI Agent Memory with a Streaming Database.
Connecting to an Agent Framework
Because RisingWave speaks the PostgreSQL wire protocol, you do not need any special adapter. LangChain's SQLDatabase toolkit, LlamaIndex's database connector, CrewAI tool definitions, and raw psycopg2 connections all work without modification.
A LangChain example that exposes ai_ctx_user_profile as a tool:
from langchain_community.utilities import SQLDatabase
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_openai import ChatOpenAI
# RisingWave is PostgreSQL-compatible
db = SQLDatabase.from_uri("postgresql://root@localhost:4566/dev")
llm = ChatOpenAI(model="gpt-4o", temperature=0)
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
tools = toolkit.get_tools()
# The agent can now query ai_ctx_user_profile, ai_ctx_session_summary, etc.
# as naturally as any PostgreSQL table
The agent discovers the materialized views as tables, reads their schemas, and formulates queries. The streaming database handles the computation. The agent handles the reasoning. Neither component needs to know how the other works.
For production deployments, you can also expose specific materialized views as MCP tools with typed schemas, giving the agent named, documented access points rather than open-ended SQL.
FAQ
What is streaming SQL for AI, and how does it differ from batch ETL?
Streaming SQL for AI refers to using a streaming database like RisingWave to continuously transform raw event data into pre-computed, agent-ready context using standard SQL. Unlike batch ETL (which runs on a schedule and produces stale results), streaming SQL processes each event as it arrives and updates materialized views incrementally. The result is context that is always seconds-fresh without requiring manual refresh jobs.
How much does this approach reduce LLM token usage?
The reduction depends on the ratio of raw events to summary rows. In the example above, 18 raw events across 3 users compress to 3 JSON objects. At production scale, where a single user might generate thousands of events per day, a compact context object containing 10-12 aggregated fields replaces thousands of raw rows. Real-world deployments commonly achieve 10x-50x token reduction compared to feeding raw event data into the prompt.
Does RisingWave work with my existing agent framework?
Yes. RisingWave is wire-compatible with PostgreSQL 13, so any library, ORM, or framework that connects to PostgreSQL works with RisingWave out of the box. This includes psycopg2, asyncpg, SQLAlchemy, LangChain's SQL toolkit, LlamaIndex's database connector, and direct JDBC connections. See the RisingWave quickstart for connection examples.
What happens when a dimension table (like user profiles) is updated?
When a row in ai_ctx_users changes, RisingWave detects the change and incrementally updates all materialized views that JOIN with it. The update propagates within seconds to ai_ctx_user_profile and ai_ctx_json_context. The next agent query reads the new values automatically. No cache invalidation, no pipeline re-run - the streaming database handles the propagation.
Conclusion
The path from raw events to agent-ready context is a data engineering problem, not a prompt engineering problem. The agent can only reason well with the information it receives. If that information is stale, noisy, or token-expensive, the agent's output degrades regardless of the model quality.
Streaming SQL resolves all three issues at once:
- Freshness comes from incremental computation: each new event updates only the affected rows in each materialized view, so the agent always reads current state.
- Compactness comes from aggregation: millions of raw events become dozens of summary rows, reducing token cost by 10x-50x compared to feeding raw logs into a prompt.
- Simplicity comes from SQL: the entire context pipeline is expressed as four
CREATE MATERIALIZED VIEWstatements, with no custom streaming code, no scheduler, and no cleanup jobs.
RisingWave sits between your event sources and your agent layer, acting as the context computation engine. Data engineers write SQL to define what context looks like. Agent developers query the results through any PostgreSQL client. Neither team needs to understand the other's implementation.
Start with the three tables and four views in this tutorial, connect your Kafka topic or CDC source, and your agents will have always-fresh, compact, structured context on every request.
Ready to build agent-ready context with streaming SQL? Get started with RisingWave in 5 minutes. Quickstart
Join our Slack community to ask questions and connect with other stream processing developers.

