Building a Context Engineering Pipeline for LLM Applications
Context engineering is the discipline of giving LLMs the right data at the right time. Prompt engineering tunes how models reason; context engineering determines what they reason about. A streaming database maintains pre-computed, always-fresh context snapshots — user activity, entity state, relationship graphs — that LLM applications query at inference time, keeping context accurate without adding latency to the request path.
From Prompt Engineering to Context Engineering
Prompt engineering has matured. Most teams know how to write good system prompts, structure few-shot examples, and handle instruction following. The frontier problem is now context engineering.
Context engineering is the practice of determining what information to pass to an LLM at inference time, in what form, and at what level of detail. It is the difference between an assistant that says "you have some recent orders" and one that says "your last three orders include an open return request from Tuesday that has not been resolved."
The second response requires current, structured knowledge about the user. That knowledge must be computed from raw operational data, kept fresh as events occur, and made available at low latency so it does not add seconds to every LLM call.
Most teams solve this badly. They query the production database at inference time (slow, expensive, fragile), or they use stale snapshots (inaccurate), or they build bespoke caching systems (complex, hard to maintain). None of these is the right architecture.
The Right Architecture: Streaming Pre-Computed Context
The correct pattern is to move context computation out of the request path entirely. Instead of querying and aggregating data when the LLM call arrives, maintain continuously-updated materialized views that are fast to read at inference time.
RisingWave is a PostgreSQL-compatible streaming database — open source (Apache 2.0), written in Rust, backed by S3 storage — that makes this pattern straightforward to implement. Materialized views update in seconds as source events arrive. LLM applications query them over a standard PostgreSQL connection with sub-10ms latency.
The architecture:
[User events] ──┐
[Order system CDC] ─┬─▶ [RisingWave materialized views] ──▶ [LLM context fetch]
[Support tickets] ─┘ │
[CRM updates] ──┘ ▼
Context snapshots, always current
Query at inference time, fast
What Context Engineering Actually Needs
Good LLM context for enterprise applications breaks down into four categories:
- Recent activity summary — what has the user done recently, in structured form
- Current entity state — what is the current state of things the user cares about (orders, cases, accounts)
- Preference and behavioral aggregates — what patterns characterize this user over time
- Relationship and graph context — what entities are related to this user, in what way
A streaming database can maintain all four as materialized views.
Setting Up Sources
-- E-commerce order events
CREATE SOURCE order_events (
order_id BIGINT,
user_id BIGINT,
status VARCHAR,
total_amount NUMERIC(10,2),
item_count INT,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'order-events',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Support tickets via CDC
CREATE SOURCE support_tickets WITH (
connector = 'postgres-cdc',
hostname = 'crm-db.internal',
port = '5432',
username = 'cdc_user',
password = '...',
database.name = 'crm',
schema.name = 'public',
table.name = 'support_tickets'
);
-- User behavior events
CREATE SOURCE user_events (
user_id BIGINT,
event_type VARCHAR,
resource_id BIGINT,
category VARCHAR,
event_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'user-behavior',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Context View 1: Recent Activity Summary
-- Structured summary of the last 7 days of user activity
CREATE MATERIALIZED VIEW ctx_user_recent_activity AS
SELECT
user_id,
-- Order activity
COUNT(*) FILTER (
WHERE source = 'order' AND created_at > NOW() - INTERVAL '7 days'
) AS orders_7d,
SUM(amount) FILTER (
WHERE source = 'order' AND created_at > NOW() - INTERVAL '7 days'
) AS order_spend_7d,
COUNT(*) FILTER (
WHERE source = 'order' AND status = 'open'
) AS open_orders,
-- Support activity
COUNT(*) FILTER (
WHERE source = 'support' AND status = 'open'
) AS open_support_tickets,
MAX(CASE WHEN source = 'support' THEN created_at END) AS last_support_contact,
-- Engagement
COUNT(*) FILTER (
WHERE source = 'event' AND event_time > NOW() - INTERVAL '24 hours'
) AS events_24h,
MAX(created_at) AS last_activity_at
FROM (
SELECT user_id, 'order' AS source, status,
total_amount AS amount, created_at, created_at AS event_time
FROM order_events
UNION ALL
SELECT user_id, 'support' AS source, status,
NULL, created_at, created_at
FROM support_tickets
UNION ALL
SELECT user_id, 'event' AS source, NULL,
NULL, event_time, event_time
FROM user_events
) combined
GROUP BY user_id;
Context View 2: Current Entity State
-- Current open orders with status details
CREATE MATERIALIZED VIEW ctx_user_open_orders AS
SELECT
user_id,
order_id,
status,
total_amount,
item_count,
created_at,
-- Days since order was placed
EXTRACT(EPOCH FROM NOW() - created_at) / 86400 AS age_days,
-- Flag orders older than 5 days that are still pending
CASE WHEN status = 'pending'
AND created_at < NOW() - INTERVAL '5 days'
THEN TRUE ELSE FALSE END AS is_delayed
FROM order_events
WHERE status NOT IN ('delivered', 'cancelled', 'refunded');
-- Open support tickets with context
CREATE MATERIALIZED VIEW ctx_user_open_tickets AS
SELECT
user_id,
ticket_id,
subject,
priority,
status,
created_at,
EXTRACT(EPOCH FROM NOW() - created_at) / 3600 AS age_hours,
-- Escalation flag
CASE WHEN priority = 'high'
AND created_at < NOW() - INTERVAL '24 hours'
AND status = 'open'
THEN TRUE ELSE FALSE END AS needs_escalation
FROM support_tickets
WHERE status != 'closed';
Context View 3: Preference and Behavioral Aggregates
-- Long-term user preferences (90-day behavioral profile)
CREATE MATERIALIZED VIEW ctx_user_preferences AS
SELECT
user_id,
-- Top categories by engagement
MODE() WITHIN GROUP (ORDER BY category) AS top_category,
COUNT(DISTINCT category) AS category_breadth,
-- Purchase behavior
COUNT(*) FILTER (WHERE event_type = 'purchase') AS lifetime_purchases,
AVG(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS purchase_rate,
-- Engagement depth
AVG(daily_events) AS avg_daily_events,
COUNT(DISTINCT DATE(event_time)) AS active_days_90d,
-- Recency
MAX(event_time) AS last_seen_at,
NOW() - MAX(event_time) AS days_since_active
FROM (
SELECT
user_id,
event_type,
category,
event_time,
COUNT(*) OVER (PARTITION BY user_id, DATE(event_time)) AS daily_events
FROM user_events
WHERE event_time > NOW() - INTERVAL '90 days'
) windowed
GROUP BY user_id;
Context View 4: Entity Relationship Graph
-- Products the user has interacted with (for cross-sell context)
CREATE MATERIALIZED VIEW ctx_user_product_graph AS
SELECT
user_id,
resource_id AS product_id,
COUNT(*) FILTER (WHERE event_type = 'view') AS view_count,
COUNT(*) FILTER (WHERE event_type = 'purchase') AS purchase_count,
MAX(event_time) AS last_interaction,
BOOL_OR(event_type = 'purchase') AS has_purchased
FROM user_events
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY user_id, resource_id;
Assembling Context for the LLM
At inference time, the LLM application fetches context from these views and assembles it into the prompt:
import psycopg2
import json
conn = psycopg2.connect(
host="risingwave.internal", port=4566,
database="dev", user="root"
)
def fetch_user_context(user_id: int) -> dict:
"""Fetch all context needed for LLM prompt assembly."""
cur = conn.cursor()
# Fetch activity summary
cur.execute("""
SELECT orders_7d, order_spend_7d, open_orders,
open_support_tickets, last_support_contact,
events_24h, last_activity_at
FROM ctx_user_recent_activity
WHERE user_id = %s
""", (user_id,))
activity = cur.fetchone()
# Fetch open orders
cur.execute("""
SELECT order_id, status, total_amount, age_days, is_delayed
FROM ctx_user_open_orders
WHERE user_id = %s
ORDER BY created_at DESC
LIMIT 5
""", (user_id,))
open_orders = cur.fetchall()
# Fetch open tickets
cur.execute("""
SELECT ticket_id, subject, priority, age_hours, needs_escalation
FROM ctx_user_open_tickets
WHERE user_id = %s
ORDER BY created_at DESC
""", (user_id,))
open_tickets = cur.fetchall()
# Fetch preferences
cur.execute("""
SELECT top_category, lifetime_purchases, avg_daily_events, active_days_90d
FROM ctx_user_preferences
WHERE user_id = %s
""", (user_id,))
prefs = cur.fetchone()
return {
"activity": activity,
"open_orders": open_orders,
"open_tickets": open_tickets,
"preferences": prefs
}
def build_context_block(user_id: int) -> str:
"""Assemble a natural-language context block for LLM prompt injection."""
ctx = fetch_user_context(user_id)
activity = ctx["activity"]
lines = []
if activity:
lines.append(f"Recent activity (last 7 days): {activity[0]} orders totaling ${activity[1]:.2f}")
if activity[2] > 0:
lines.append(f"Open orders: {activity[2]}")
if activity[3] > 0:
lines.append(f"Open support tickets: {activity[3]}")
delayed = [o for o in ctx["open_orders"] if o[4]] # is_delayed
if delayed:
lines.append(f"Attention: {len(delayed)} order(s) appear delayed (pending > 5 days)")
escalate = [t for t in ctx["open_tickets"] if t[4]] # needs_escalation
if escalate:
lines.append(f"Attention: {len(escalate)} high-priority ticket(s) unresolved > 24 hours")
return "\n".join(lines)
The LLM receives a context block that is accurate to within seconds of the user's actual state, assembled in under 5 milliseconds from precomputed views.
Streaming Context for Long-Running LLM Sessions
For multi-turn applications (chatbots, copilots), context can go stale during a session. RisingWave's SUBSCRIBE feature pushes view changes to clients, allowing the application to refresh context mid-session if meaningful state changes occur.
-- Subscribe to changes in a user's open orders during a session
SUBSCRIBE TO ctx_user_open_orders WHERE user_id = 12345;
The application receives a notification if the user's order status changes while they are in conversation, and can inject updated context into the next LLM call.
Comparison: Context Delivery Approaches
| Approach | Freshness | Latency Added | Implementation Cost | Handles Complex Aggregations |
| Query production DB at request time | Current | 50–500ms+ | Low | Expensive |
| Scheduled snapshot refresh | Minutes to hours | <5ms | Medium | Yes |
| Application-layer caching (Redis) | Minutes stale | <2ms | High (cache management) | Precomputed only |
| Streaming DB materialized views | Seconds | <10ms | Low (SQL only) | Yes |
The streaming database approach is the only one that achieves both freshness (seconds) and low latency (pre-computed), without requiring a separate cache management system.
FAQ
How much context should be injected into an LLM prompt? This is a context window trade-off. More context improves relevance but increases cost and may dilute the signal. A practical pattern is to fetch all available context (as shown above) and use a lightweight ranking step to select the most relevant fields based on the user's current query before injecting into the prompt.
Can RisingWave context views be personalized per tenant in a multi-tenant application?
Yes. Materialized views can be filtered by tenant_id or org_id, and PostgreSQL-level row security policies can enforce tenant isolation. Each tenant's context is isolated while sharing the same view definitions.
What if the user's context changes while the LLM is generating a response?
For typical single-turn queries, this is not a concern — the context is fetched once at the start of the request. For long-running agentic sessions, the SUBSCRIBE pattern allows the application to track changes and inject updated context at the next turn boundary.
How do you handle context for anonymous or new users with no history?
Views return empty results (or NULLs with LEFT JOIN) for users with no history. The application layer handles the cold-start case explicitly, falling back to behavioral defaults or category-level context (e.g., trending items in a given product area).
Does adding a RisingWave query add latency to every LLM call?
Because views are precomputed, a lookup against ctx_user_recent_activity or similar views typically returns in 2-10ms — comparable to a Redis read and far faster than querying the OLTP database. The context fetch is not on the critical path to the LLM call; it can be parallelized with other request processing.

