Your LLM application just gave a customer the wrong answer. Not because the model was wrong. Because the context it received was stale. The user had hit three API errors in the last ten minutes, but the prompt said "no recent issues." The model confidently suggested everything was fine.
Context engineering is the practice of making sure LLMs receive the right information at the right moment. It goes beyond prompt engineering, which focuses on how you phrase instructions. Context engineering asks: what data does the model actually need, how fresh does it need to be, and how do you retrieve it fast enough that the model can respond in under a second?
This post shows how streaming SQL and materialized views solve the context engineering problem for ML engineers and data engineers building LLM-powered applications. You will see exactly how to transform raw event streams into pre-aggregated entity summaries that any LLM pipeline can query in milliseconds. Every SQL example in this post is verified against a live RisingWave 2.8.0 instance.
Why LLM Context Is a Data Engineering Problem
Most LLM application teams treat context as a prompt engineering concern. They focus on what to include in the system prompt, how to chunk documents, and which retrieval strategy to use. But the bottleneck is rarely the prompt itself. It is the pipeline that produces the context.
Consider a support copilot that needs to help a customer. Before the LLM can answer, it needs to know:
- How many errors has this user hit in the last hour?
- Which products are they actively using?
- What is their account tier and company?
- Are they currently in an active session, and what pages did they visit?
If you compute this at request time by running aggregation queries over millions of raw events, each request takes multiple seconds. If you batch-compute it every 15 minutes, the context is stale exactly when it matters most: when something just went wrong.
The solution is pre-computed context: continuously maintained summaries that are always ready to read. This is exactly what a streaming database like RisingWave is designed for. You define the transformations you want in standard SQL, and RisingWave maintains the results incrementally as new events arrive.
The Core Pattern: Stream Events to Materialized Views to LLM
The architecture has three layers:
- Event ingestion: Raw behavioral events stream into RisingWave from Kafka, direct INSERT, CDC from your operational database, or any supported source.
- Materialized view computation: SQL-defined views continuously aggregate, enrich, and denormalize those events into entity summaries. RisingWave updates only the affected rows as each event arrives.
- Context injection: Your LLM pipeline queries the materialized views through a standard PostgreSQL connection, retrieves pre-computed context, and injects it into the prompt.
The LLM never touches raw event data. It reads clean, structured, always-current summaries. The latency of context retrieval drops from seconds to milliseconds.
Let's build this pattern end to end with real, verified SQL.
Building the Data Foundation
Start with three tables: events, users, and products. In production, these would be backed by Kafka sources or CDC connectors. Here we use direct tables to keep the examples runnable.
CREATE TABLE ctx_events (
event_id VARCHAR,
user_id VARCHAR,
session_id VARCHAR,
entity_id VARCHAR,
entity_type VARCHAR,
event_type VARCHAR,
page_url VARCHAR,
properties VARCHAR,
occurred_at TIMESTAMPTZ
);
CREATE TABLE ctx_users (
user_id VARCHAR PRIMARY KEY,
account_tier VARCHAR,
company_name VARCHAR,
industry VARCHAR,
signup_date DATE
);
CREATE TABLE ctx_products (
product_id VARCHAR PRIMARY KEY,
product_name VARCHAR,
category VARCHAR,
description VARCHAR
);
Insert some representative data:
INSERT INTO ctx_users VALUES
('u_001', 'enterprise', 'Acme Financial', 'fintech', '2024-01-15'),
('u_002', 'pro', 'StreamCo', 'saas', '2024-03-22'),
('u_003', 'free', 'DataStart', 'startup', '2025-01-10');
INSERT INTO ctx_products VALUES
('p_001', 'Fraud Shield API', 'risk', 'Real-time transaction fraud detection API'),
('p_002', 'Stream Analytics Pro', 'analytics', 'Streaming SQL analytics platform'),
('p_003', 'Context Engine SDK', 'ai-infra', 'SDK for building LLM context pipelines');
INSERT INTO ctx_events VALUES
('e_001', 'u_001', 's_101', 'p_001', 'product', 'api_call', '/api/fraud/check', '{"status":"success"}', NOW() - INTERVAL '10 minutes'),
('e_002', 'u_001', 's_101', 'p_001', 'product', 'api_call', '/api/fraud/check', '{"status":"error"}', NOW() - INTERVAL '8 minutes'),
('e_003', 'u_001', 's_101', 'p_001', 'product', 'api_call', '/api/fraud/check', '{"status":"error"}', NOW() - INTERVAL '6 minutes'),
('e_004', 'u_001', 's_101', NULL, NULL, 'page_view', '/dashboard', NULL, NOW() - INTERVAL '5 minutes'),
('e_005', 'u_001', 's_101', NULL, NULL, 'page_view', '/docs/rate-limits', NULL, NOW() - INTERVAL '3 minutes'),
('e_006', 'u_002', 's_102', 'p_002', 'product', 'page_view', '/products/analytics', NULL, NOW() - INTERVAL '20 minutes'),
('e_007', 'u_002', 's_102', 'p_002', 'product', 'feature_used', '/api/query', '{"query":"SELECT"}', NOW() - INTERVAL '15 minutes'),
('e_008', 'u_002', 's_102', NULL, NULL, 'page_view', '/dashboard', NULL, NOW() - INTERVAL '12 minutes'),
('e_009', 'u_003', 's_103', NULL, NULL, 'page_view', '/pricing', NULL, NOW() - INTERVAL '30 minutes'),
('e_010', 'u_003', 's_103', NULL, NULL, 'page_view', '/pricing/enterprise', NULL, NOW() - INTERVAL '28 minutes'),
('e_011', 'u_003', 's_103', 'p_003', 'product', 'api_call', '/api/context/build', '{"status":"success"}', NOW() - INTERVAL '25 minutes');
Pre-Aggregating Entity Summaries for LLM Context
This is the central pattern. Instead of running aggregation queries at request time, you define a materialized view that maintains the aggregation continuously. The LLM pipeline reads a pre-computed row, not a computed result.
CREATE MATERIALIZED VIEW ctx_entity_summary AS
SELECT
e.user_id,
u.account_tier,
u.company_name,
u.industry,
COUNT(*) AS total_events_1h,
COUNT(DISTINCT e.session_id) AS sessions_1h,
COUNT(*) FILTER (WHERE e.event_type = 'api_call') AS api_calls_1h,
COUNT(*) FILTER (WHERE e.properties LIKE '%error%') AS errors_1h,
COUNT(DISTINCT e.entity_id)
FILTER (WHERE e.entity_id IS NOT NULL) AS products_touched,
MAX(e.occurred_at) AS last_active_at
FROM ctx_events e
LEFT JOIN ctx_users u ON e.user_id = u.user_id
WHERE e.occurred_at > NOW() - INTERVAL '1 hour'
GROUP BY e.user_id, u.account_tier, u.company_name, u.industry;
Query the view immediately:
user_id | account_tier | company_name | industry | total_events_1h | sessions_1h | api_calls_1h | errors_1h | products_touched | last_active_at
---------+--------------+----------------+----------+-----------------+-------------+--------------+-----------+------------------+-------------------------------
u_001 | enterprise | Acme Financial | fintech | 5 | 1 | 3 | 2 | 1 | 2026-04-02 07:42:47.640+00:00
u_002 | pro | StreamCo | saas | 3 | 1 | 0 | 0 | 1 | 2026-04-02 07:33:47.640+00:00
u_003 | free | DataStart | startup | 3 | 1 | 1 | 0 | 1 | 2026-04-02 07:20:47.640+00:00
(3 rows)
Three things make this different from a regular database view:
Incremental maintenance: RisingWave does not rerun the full aggregation when a new event arrives. It updates only the affected user's row. This is the same principle behind incremental materialized views and it is why reads are always fast regardless of how many events are flowing in.
Temporal filter with automatic expiry: The WHERE e.occurred_at > NOW() - INTERVAL '1 hour' clause is a temporal filter that RisingWave enforces at the storage level. Events older than one hour are automatically evicted from the view's state. You get a rolling window without any cleanup jobs.
Join with dimension data: The LEFT JOIN ctx_users enriches every event with account tier, company name, and industry at ingestion time. When a user's profile is updated, RisingWave propagates the change to all affected rows in the materialized view. Your LLM always receives denormalized, context-rich data, not IDs it has to look up.
Session-Level Context for Conversational Agents
An entity-level summary gives a broad view of user behavior. For conversational agents, you also need session-level context: what is this user doing right now, in this specific interaction?
CREATE MATERIALIZED VIEW ctx_user_session_ctx AS
SELECT
e.session_id,
e.user_id,
u.account_tier,
u.company_name,
MIN(e.occurred_at) AS session_start,
MAX(e.occurred_at) AS session_latest,
COUNT(*) AS event_count,
COUNT(*) FILTER (WHERE e.properties LIKE '%error%') AS error_count,
COUNT(DISTINCT e.page_url) AS pages_visited,
ARRAY_AGG(DISTINCT e.page_url ORDER BY e.page_url) AS visited_urls,
ARRAY_AGG(DISTINCT e.event_type ORDER BY e.event_type) AS event_types
FROM ctx_events e
LEFT JOIN ctx_users u ON e.user_id = u.user_id
WHERE e.occurred_at > NOW() - INTERVAL '2 hours'
GROUP BY e.session_id, e.user_id, u.account_tier, u.company_name;
session_id | user_id | account_tier | company_name | session_start | session_latest | event_count | error_count | pages_visited | visited_urls | event_types
------------+---------+--------------+----------------+-------------------------------+-------------------------------+-------------+-------------+---------------+---------------------------------------------------+--------------------------
s_101 | u_001 | enterprise | Acme Financial | 2026-04-02 07:35:47.640+00:00 | 2026-04-02 07:42:47.640+00:00 | 5 | 2 | 3 | {/api/fraud/check,/dashboard,/docs/rate-limits} | {api_call,page_view}
s_102 | u_002 | pro | StreamCo | 2026-04-02 07:25:47.640+00:00 | 2026-04-02 07:33:47.640+00:00 | 3 | 0 | 3 | {/api/query,/dashboard,/products/analytics} | {feature_used,page_view}
s_103 | u_003 | free | DataStart | 2026-04-02 07:15:47.640+00:00 | 2026-04-02 07:20:47.640+00:00 | 3 | 0 | 3 | {/api/context/build,/pricing,/pricing/enterprise} | {api_call,page_view}
(3 rows)
When a user opens a chat widget, your agent retrieves their session row with a single primary-key lookup: SELECT * FROM ctx_user_session_ctx WHERE session_id = $1. The result tells the model everything it needs: the user is from Acme Financial (enterprise tier), they've hit 2 errors this session while calling /api/fraud/check, and they then went to the rate limits docs. That is a clear signal. The agent can address the issue directly without asking "what seems to be the problem?"
Product Health Signals for Context-Aware Routing
LLM applications often need context about the entities a user is interacting with, not just the user themselves. A support agent should know if the product the user is asking about is experiencing elevated errors across all users, not just this one.
CREATE MATERIALIZED VIEW ctx_product_ctx AS
SELECT
e.entity_id AS product_id,
p.product_name,
p.category,
p.description,
COUNT(*) AS total_events_1h,
COUNT(DISTINCT e.user_id) AS unique_users_1h,
COUNT(*) FILTER (WHERE e.event_type = 'api_call') AS api_calls_1h,
COUNT(*) FILTER (WHERE e.properties LIKE '%error%') AS errors_1h,
ROUND(
100.0 * COUNT(*) FILTER (WHERE e.properties LIKE '%error%')
/ NULLIF(COUNT(*) FILTER (WHERE e.event_type = 'api_call'), 0),
1
) AS error_rate_pct,
MAX(e.occurred_at) AS last_event_at
FROM ctx_events e
JOIN ctx_products p ON e.entity_id = p.product_id
WHERE e.occurred_at > NOW() - INTERVAL '1 hour'
AND e.entity_id IS NOT NULL
GROUP BY e.entity_id, p.product_name, p.category, p.description;
product_id | product_name | category | description | total_events_1h | unique_users_1h | api_calls_1h | errors_1h | error_rate_pct | last_event_at
------------+----------------------+-----------+-------------------------------------------+-----------------+-----------------+--------------+-----------+----------------+-------------------------------
p_001 | Fraud Shield API | risk | Real-time transaction fraud detection API | 3 | 1 | 3 | 2 | 66.7 | 2026-04-02 07:39:47.640+00:00
p_002 | Stream Analytics Pro | analytics | Streaming SQL analytics platform | 2 | 1 | 0 | 0 | | 2026-04-02 07:30:47.640+00:00
p_003 | Context Engine SDK | ai-infra | SDK for building LLM context pipelines | 1 | 1 | 1 | 0 | 0 | 2026-04-02 07:20:47.640+00:00
(3 rows)
The Fraud Shield API has a 66.7% error rate on API calls in the last hour. An agent that receives this context can immediately tell the user: "We are seeing elevated errors on the Fraud Shield API right now. Our team is investigating. Here are the known workarounds while we resolve this." That response is only possible if the agent has product-level health context, not just user-level history.
Windowed Signals for Trend Detection
Some LLM applications need to understand how a situation is evolving, not just its current state. Is error volume trending up or down? Is usage accelerating? Tumbling time windows answer these questions by partitioning events into fixed time buckets.
CREATE MATERIALIZED VIEW ctx_event_window_signals AS
SELECT
window_start,
window_end,
entity_id AS product_id,
COUNT(*) AS events,
COUNT(DISTINCT user_id) AS unique_users,
COUNT(*) FILTER (WHERE properties LIKE '%error%') AS errors,
COUNT(*) FILTER (WHERE event_type = 'api_call') AS api_calls
FROM TUMBLE(ctx_events, occurred_at, INTERVAL '15 minutes')
WHERE entity_id IS NOT NULL
GROUP BY window_start, window_end, entity_id;
window_start | window_end | product_id | events | unique_users | errors | api_calls
---------------------------+---------------------------+------------+--------+--------------+--------+-----------
2026-04-02 07:15:00+00:00 | 2026-04-02 07:30:00+00:00 | p_002 | 1 | 1 | 0 | 0
2026-04-02 07:15:00+00:00 | 2026-04-02 07:30:00+00:00 | p_003 | 1 | 1 | 0 | 1
2026-04-02 07:30:00+00:00 | 2026-04-02 07:45:00+00:00 | p_001 | 3 | 1 | 2 | 3
2026-04-02 07:30:00+00:00 | 2026-04-02 07:45:00+00:00 | p_002 | 1 | 1 | 0 | 0
(4 rows)
An LLM agent can read the last two windows for a product and compare: "Fraud Shield had 0 errors in the 07:15 window and 2 errors in the 07:30 window." That trend is meaningful context that a rolling summary would flatten away. RisingWave's time window functions maintain these windows incrementally, with no manual partitioning or cleanup.
Injecting Context into LLM Prompts
Once the materialized views are running, connecting them to your LLM pipeline is straightforward. RisingWave is wire-compatible with PostgreSQL, so any PostgreSQL client works.
You can also build a context string directly in SQL to minimize application-layer processing:
SELECT
user_id,
account_tier,
company_name,
CONCAT(
'User ', user_id, ' (', company_name, ', ', account_tier, ' tier) | ',
'Events last 1h: ', total_events_1h, ' | ',
'API calls: ', api_calls_1h, ' | ',
'Errors: ', errors_1h, ' | ',
'Products touched: ', products_touched, ' | ',
'Last active: ', TO_CHAR(last_active_at, 'HH24:MI UTC')
) AS llm_context_line
FROM ctx_entity_summary
ORDER BY errors_1h DESC, total_events_1h DESC;
user_id | account_tier | company_name | llm_context_line
---------+--------------+----------------+------------------------------------------------------------------------------------------------------------------
u_001 | enterprise | Acme Financial | User u_001 (Acme Financial, enterprise tier) | Events last 1h: 5 | API calls: 3 | Errors: 2 | Products touched: 1 | Last active: 07:42 UTC
u_002 | pro | StreamCo | User u_002 (StreamCo, pro tier) | Events last 1h: 3 | API calls: 0 | Errors: 0 | Products touched: 1 | Last active: 07:33 UTC
u_003 | free | DataStart | User u_003 (DataStart, free tier) | Events last 1h: 3 | API calls: 1 | Errors: 0 | Products touched: 1 | Last active: 07:20 UTC
(3 rows)
In Python, a context-fetching function looks like this:
import psycopg2
def get_user_context(user_id: str, session_id: str) -> dict:
conn = psycopg2.connect(
host="localhost", port=4566,
user="root", dbname="dev"
)
cur = conn.cursor()
# Entity-level summary (1-hour rolling window)
cur.execute(
"SELECT * FROM ctx_entity_summary WHERE user_id = %s",
(user_id,)
)
row = cur.fetchone()
entity_ctx = {
"account_tier": row[1], "company": row[2],
"events_1h": row[4], "errors_1h": row[7],
} if row else {}
# Session-level context (current session)
cur.execute(
"SELECT * FROM ctx_user_session_ctx WHERE session_id = %s",
(session_id,)
)
row = cur.fetchone()
session_ctx = {
"pages_visited": row[8], "visited_urls": row[9],
} if row else {}
cur.close()
conn.close()
return {**entity_ctx, **session_ctx}
context = get_user_context("u_001", "s_101")
system_prompt = f"""You are a support agent.
User context: {context}
Respond based on the user's recent activity and any errors they are experiencing."""
Both queries hit pre-computed rows and return in single-digit milliseconds. The LLM receives rich, current context without any aggregation overhead at request time.
How Streaming SQL Differs from Static Vector Databases
Retrieval-Augmented Generation (RAG) with vector databases has become the default context engineering pattern for LLM applications. But vector databases have a specific strength: semantic similarity retrieval over unstructured documents. They are not designed for real-time aggregation over structured event data.
The distinction matters:
- Vector DB answers: "Which documentation sections are most relevant to this user's question?"
- Streaming SQL answers: "What has this user done in the last hour, and what errors are they currently hitting?"
A well-engineered LLM application uses both. RAG retrieves relevant knowledge (documentation, policies, product descriptions). Streaming SQL provides real-time behavioral context (current errors, session activity, account state). These are complementary layers, not alternatives.
The key architectural difference is freshness. A real-time data pipeline for RAG can keep your vector store updated with streaming CDC and sinks - but vector search is still retrieval-at-query-time over a document corpus. Streaming SQL materialized views are pre-computed and updated continuously, so they can serve per-user, per-entity aggregations that would be prohibitively expensive to recompute at query time.
| Dimension | Static Vector DB | Streaming Materialized Views |
| Best for | Semantic document retrieval | Real-time event aggregation |
| Freshness | Depends on reindex schedule | Sub-second (incremental) |
| Query cost | Embedding + ANN search | Single primary key lookup |
| Data type | Unstructured text | Structured events and facts |
| Updates | Batch or streaming upsert | Automatic incremental compute |
| Staleness risk | High if not actively refreshed | None - temporal filters auto-expire |
For LLM applications that need both, a common pattern is to use RisingWave as the streaming context layer and sink updated document embeddings to a vector store like Pinecone or pgvector. The streaming layer handles behavioral context; the vector layer handles knowledge retrieval.
What Is Context Engineering for LLM Applications?
Context engineering is the discipline of designing data pipelines that provide large language models with the right information at the right time. Unlike prompt engineering, which focuses on instruction phrasing, context engineering addresses the infrastructure question: how do you compute, store, and retrieve the context the model needs?
For real-time applications, context engineering requires a streaming infrastructure. The context must be pre-computed so retrieval is fast, continuously updated so it reflects the current state of the world, and automatically maintained so engineers do not need to run manual refresh jobs.
Streaming databases like RisingWave are a natural fit because materialized views express context transformations as declarative SQL while handling all incremental computation and state management automatically.
When Should You Pre-Aggregate Context Instead of Querying Raw Events?
Pre-aggregate context using streaming materialized views when any of the following apply:
- Your LLM pipeline runs at user request rate (10s to 1000s of requests per second). Running aggregation queries over raw event logs at that rate will overwhelm your database.
- Your context requires joins between multiple data sources (events, user profiles, product data). These joins are expensive at query time and cheap at ingestion time when maintained incrementally.
- You need sub-second context retrieval. Materialized view reads are primary key lookups. Raw event aggregations are full-table scans.
- Your data is time-windowed (last 1 hour, last 24 hours). Streaming SQL temporal filters maintain rolling windows automatically. Manual window queries require careful index design and still run at query time.
Query raw events directly only when the query runs rarely (offline analysis, debugging), the event volume is small, or you need ad hoc exploration that does not map to a pre-defined view structure.
How Fresh Does LLM Context Need to Be?
Freshness requirements vary by use case:
- Real-time support agents: Seconds. An agent handling an active issue needs to see errors that happened in the last few minutes, not the last hour.
- Fraud and risk agents: Sub-second. The agent must see the latest transaction before approving the next one.
- Personalization agents: Minutes. Recommendation and upsell logic can tolerate a small lag as long as the data is from the current session.
- Analytics and planning agents: Hours to days. Summary statistics and trend data can tolerate batch-level freshness.
RisingWave's temporal filters let you set different freshness windows per view. A session context view uses INTERVAL '2 hours'; a trend summary view uses INTERVAL '7 days'. Each view manages its own time horizon automatically with no additional infrastructure.
Does Streaming SQL Replace RAG?
No. Streaming SQL and RAG solve different context problems. RAG retrieves relevant unstructured knowledge (documentation, articles, policies) using semantic similarity search. Streaming SQL pre-aggregates structured behavioral data (event counts, error rates, session activity) so it is always ready to read.
The two work together. In a production LLM application, you would use streaming SQL to maintain user and entity summaries that are injected into every prompt as structured context, and RAG to retrieve the most relevant knowledge base documents for the specific question being asked. Combining both layers produces agents that are grounded in both current reality and deep knowledge.
Conclusion
Context engineering is a data engineering discipline. The quality of your LLM application's responses depends directly on the quality, freshness, and structure of the context it receives. Getting that right requires infrastructure designed for real-time computation, not batch pipelines running on a schedule.
The pattern is straightforward:
- Stream events into RisingWave from any source: Kafka, CDC, direct inserts.
- Define entity summaries as materialized views. Aggregations, joins, and denormalization happen incrementally at ingestion time, not at request time.
- Use temporal filters to maintain rolling time windows automatically. No cleanup jobs, no TTL configuration.
- Inject context into LLM prompts by querying materialized views through a standard PostgreSQL connection. Single-digit millisecond latency.
- Combine with RAG for applications that need both semantic document retrieval and real-time behavioral context.
The result is an architecture where your LLM pipeline reads clean, current, denormalized context - and the streaming database handles all the complexity of keeping that context accurate as the world changes.
Try RisingWave Cloud free - no credit card required. Sign up and deploy your first streaming materialized view in minutes. Sign up
Join our Slack community to ask questions and connect with other ML and data engineers building with streaming SQL.

