Streaming SQL for AI: From Raw Events to Agent-Ready Context

Streaming SQL for AI: From Raw Events to Agent-Ready Context

AI agents are changing how software interacts with users, infrastructure, and business processes. But there is a fundamental bottleneck that limits most agent deployments: the agent's ability to access relevant, up-to-date context at the moment it needs to make a decision. An agent that takes 5 seconds to gather context before responding has already lost the user's attention. An agent working with data that is 15 minutes stale might as well be guessing.

Context engineering has emerged as the discipline of providing AI agents with the right information at the right time. The concept goes beyond prompt engineering. It asks: what data does the agent need, how fresh must it be, and how do you compute it efficiently enough that the agent can retrieve it in milliseconds?

This article shows how streaming SQL solves the context computation problem. You will learn how to transform raw event streams into pre-computed, enriched, and freshness-guaranteed context that any AI agent can query instantly. We use RisingWave as the context computation layer, with verified SQL examples you can run yourself.

Why AI Agents Need Pre-Computed Context

Most agent architectures follow a pattern: the agent receives a request, gathers context by querying databases or APIs, reasons about the context, and produces a response. The gathering step is where things break down.

Consider a support agent that needs to understand a customer's current situation. It might need to know:

  • How many errors the user hit in the last hour
  • Which pages they visited in their current session
  • Their account tier and company name
  • Whether the product they are asking about is trending or experiencing issues

Fetching this from raw event logs means running multiple aggregation queries across potentially millions of rows. Each query adds latency. The agent's response time balloons from sub-second to multiple seconds.

The alternative is pre-computed context: continuously maintained summaries, aggregations, and enriched records that are always ready for the agent to read. Instead of computing "errors in the last hour" on every request, you maintain a materialized view that updates incrementally as each new error event arrives. The agent reads the pre-computed result in milliseconds.

This is exactly what streaming SQL and materialized views are designed for. A streaming database like RisingWave ingests events continuously (from Kafka, Kinesis, database CDC, or direct inserts), and materialized views express the transformations you want as standard SQL. RisingWave handles the incremental computation automatically.

The Three Pillars of Agent-Ready Context

Transforming raw events into context that agents can actually use requires three capabilities: aggregation, enrichment, and freshness guarantees. Each maps directly to a streaming SQL pattern.

Aggregation: Summarizing Raw Events into Actionable Signals

Raw events are too granular for an agent to process directly. An agent does not need to see 500 individual page_view records. It needs to know: "this user visited 12 unique pages across 3 sessions in the last 24 hours, with 4 errors."

In RisingWave, you express this as a materialized view with standard SQL aggregations. The key difference from a regular database view is that RisingWave maintains the result incrementally. When a new event arrives, only the affected rows in the result are updated, not the entire aggregation.

CREATE TABLE user_actions (
    user_id VARCHAR,
    action_type VARCHAR,
    page_url VARCHAR,
    session_id VARCHAR,
    metadata VARCHAR,
    action_ts TIMESTAMPTZ
);

CREATE MATERIALIZED VIEW agent_user_context AS
SELECT
    user_id,
    COUNT(*) AS total_actions_24h,
    COUNT(DISTINCT session_id) AS sessions_24h,
    COUNT(*) FILTER (WHERE action_type = 'error') AS errors_24h,
    COUNT(*) FILTER (WHERE action_type = 'page_view') AS page_views_24h,
    MAX(action_ts) AS last_seen_at,
    ARRAY_AGG(DISTINCT action_type ORDER BY action_type) AS action_types
FROM user_actions
WHERE action_ts > NOW() - INTERVAL '24 hours'
GROUP BY user_id;

An agent querying this view gets a complete user profile in a single read:

 user_id | total_actions_24h | sessions_24h | errors_24h | page_views_24h |         last_seen_at          |      action_types
---------+-------------------+--------------+------------+----------------+-------------------------------+-------------------------
 u_101   |                 4 |            1 |          1 |              2 | 2026-04-02 03:50:14.349+00:00 | {click,error,page_view}
 u_102   |                 3 |            1 |          0 |              2 | 2026-04-02 04:00:14.349+00:00 | {click,page_view}
 u_103   |                 3 |            1 |          2 |              1 | 2026-04-02 04:09:14.349+00:00 | {error,page_view}

Notice the WHERE action_ts > NOW() - INTERVAL '24 hours' clause. In RisingWave, this is a temporal filter that automatically expires old data. Records older than 24 hours are removed from the materialized view and their storage is reclaimed. The agent always sees a rolling 24-hour window without any cleanup jobs.

Enrichment: Joining Streams with Reference Data

Raw events typically contain IDs and timestamps but lack the business context an agent needs. A page_view event has a user_id, but the agent needs the account tier, company name, and signup date to personalize its response.

Enrichment means joining event streams with reference tables (also called dimension tables) to produce context-rich records. In RisingWave, this is a standard SQL JOIN inside a materialized view:

CREATE TABLE user_profiles (
    user_id VARCHAR PRIMARY KEY,
    plan_tier VARCHAR,
    signup_date DATE,
    company_name VARCHAR
);

CREATE MATERIALIZED VIEW enriched_actions AS
SELECT
    ua.user_id,
    ua.action_type,
    ua.page_url,
    ua.action_ts,
    up.plan_tier,
    up.company_name
FROM user_actions ua
LEFT JOIN user_profiles up ON ua.user_id = up.user_id;

Every new event that arrives in user_actions is automatically joined with the corresponding profile from user_profiles. If a profile is updated (for example, the customer upgrades from "free" to "pro"), all enriched records for that user are updated in the materialized view.

 user_id | action_type |       page_url        |           action_ts           | plan_tier  | company_name
---------+-------------+-----------------------+-------------------------------+------------+--------------
 u_103   | error       | /api/stream           | 2026-04-02 04:09:14.349+00:00 | pro        | DataFlow Inc
 u_103   | error       | /api/stream           | 2026-04-02 04:07:14.349+00:00 | pro        | DataFlow Inc
 u_103   | page_view   | /dashboard            | 2026-04-02 04:05:14.349+00:00 | pro        | DataFlow Inc
 u_102   | click       | /signup               | 2026-04-02 04:00:14.349+00:00 | free       | StartupXYZ
 u_102   | page_view   | /pricing/enterprise   | 2026-04-02 03:58:14.349+00:00 | free       | StartupXYZ

An agent can now see at a glance that user u_103 from DataFlow Inc (a "pro" tier customer) is hitting repeated errors on the /api/stream endpoint. That is enough context to generate a targeted support response without additional queries.

Freshness Guarantees: Controlling the Time Horizon

Agents need context at different time scales. A fraud detection agent needs the last 5 minutes of transactions. A support agent needs the current session. A weekly summary agent needs 7 days of data. Each context window has a different freshness requirement.

RisingWave provides two mechanisms for controlling freshness:

Temporal filters use the NOW() function to define sliding time windows. Data outside the window is automatically evicted:

CREATE MATERIALIZED VIEW user_context_fresh AS
SELECT
    ua.user_id,
    up.plan_tier,
    up.company_name,
    COUNT(*) AS recent_actions,
    COUNT(*) FILTER (WHERE ua.action_type = 'error') AS recent_errors,
    MAX(ua.action_ts) AS last_activity
FROM user_actions ua
LEFT JOIN user_profiles up ON ua.user_id = up.user_id
WHERE ua.action_ts > NOW() - INTERVAL '1 hour'
GROUP BY ua.user_id, up.plan_tier, up.company_name;
 user_id | plan_tier  | company_name | recent_actions | recent_errors |         last_activity
---------+------------+--------------+----------------+---------------+-------------------------------
 u_101   | enterprise | Acme Corp    |              4 |             1 | 2026-04-02 03:50:14.349+00:00
 u_102   | free       | StartupXYZ   |              3 |             0 | 2026-04-02 04:00:14.349+00:00
 u_103   | pro        | DataFlow Inc |              3 |             2 | 2026-04-02 04:09:14.349+00:00

Time windows (TUMBLE and HOP) partition events into fixed or sliding time buckets. This is useful when an agent needs periodic snapshots rather than a running total:

CREATE MATERIALIZED VIEW agent_product_signals AS
SELECT
    window_start,
    window_end,
    page_url,
    COUNT(*) AS view_count,
    COUNT(DISTINCT user_id) AS unique_visitors,
    COUNT(*) FILTER (WHERE action_type = 'error') AS error_count
FROM TUMBLE(user_actions, action_ts, INTERVAL '15 minutes')
GROUP BY window_start, window_end, page_url;
       window_start        |        window_end         |       page_url        | view_count | unique_visitors | error_count
---------------------------+---------------------------+-----------------------+------------+-----------------+-------------
 2026-04-02 04:00:00+00:00 | 2026-04-02 04:15:00+00:00 | /api/stream           |          2 |               1 |           2
 2026-04-02 04:00:00+00:00 | 2026-04-02 04:15:00+00:00 | /dashboard            |          1 |               1 |           0
 2026-04-02 04:00:00+00:00 | 2026-04-02 04:15:00+00:00 | /signup               |          1 |               1 |           0
 2026-04-02 03:45:00+00:00 | 2026-04-02 04:00:00+00:00 | /api/query            |          1 |               1 |           1
 2026-04-02 03:45:00+00:00 | 2026-04-02 04:00:00+00:00 | /docs/getting-started |          1 |               1 |           0

An agent receiving a question about product health can scan these 15-minute windows to spot trends: "/api/stream had 2 errors in the current window with only 1 visitor, that is a 100% error rate."

Session-Level Context for Conversational Agents

Many AI agents operate in the context of a user session. A chatbot needs to know what the user has done in this session, not just their historical profile. RisingWave lets you build session-scoped materialized views:

CREATE MATERIALIZED VIEW agent_session_context AS
SELECT
    session_id,
    user_id,
    MIN(action_ts) AS session_start,
    MAX(action_ts) AS session_latest,
    COUNT(*) AS action_count,
    COUNT(*) FILTER (WHERE action_type = 'error') AS error_count,
    COUNT(DISTINCT page_url) AS pages_visited,
    ARRAY_AGG(DISTINCT page_url ORDER BY page_url) AS visited_pages
FROM user_actions
WHERE action_ts > NOW() - INTERVAL '2 hours'
GROUP BY session_id, user_id;
 session_id | user_id |         session_start         |        session_latest         | action_count | error_count | pages_visited |                          visited_pages
------------+---------+-------------------------------+-------------------------------+--------------+-------------+---------------+------------------------------------------------------------------
 s_001      | u_101   | 2026-04-02 03:40:14.349+00:00 | 2026-04-02 03:50:14.349+00:00 |            4 |           1 |             4 | {/api/query,/dashboard,/dashboard/metrics,/docs/getting-started}
 s_002      | u_102   | 2026-04-02 03:55:14.349+00:00 | 2026-04-02 04:00:14.349+00:00 |            3 |           0 |             3 | {/pricing,/pricing/enterprise,/signup}
 s_003      | u_103   | 2026-04-02 04:05:14.349+00:00 | 2026-04-02 04:09:14.349+00:00 |            3 |           2 |             2 | {/api/stream,/dashboard}

When a user opens the chat widget, the agent can immediately retrieve their session context: "User u_102 from StartupXYZ has been browsing pricing pages and just clicked signup. They are on the free tier." That is enough to generate a highly relevant, personalized response.

Architecture: RisingWave as the Context Computation Layer

The full architecture positions RisingWave between your event sources and your AI agents:

graph LR
    subgraph Sources
        K[Kafka Topics]
        CDC[Database CDC]
        API[API Events]
    end
    subgraph RisingWave
        T[Source Tables]
        MV1[agent_user_context]
        MV2[agent_session_context]
        MV3[enriched_actions]
        MV4[agent_product_signals]
        T --> MV1
        T --> MV2
        T --> MV3
        T --> MV4
    end
    subgraph Agent Layer
        LLM[LLM / Agent Framework]
        MCP[MCP Server]
        SDK[PostgreSQL Client]
    end
    K --> T
    CDC --> T
    API --> T
    MV1 --> MCP
    MV2 --> SDK
    MV3 --> LLM
    MV4 --> SDK

The key properties of this architecture:

Separation of concerns. Data engineers define the context transformations in SQL. Agent developers query the results through standard PostgreSQL clients. Neither team needs to understand the other's implementation details.

Incremental computation. RisingWave processes each event as it arrives and updates only the affected rows in each materialized view. This is fundamentally different from batch pipelines that recompute everything on a schedule. For context engineering, incremental computation means the agent always reads the latest state, not the state from the last batch run.

PostgreSQL compatibility. RisingWave is wire-compatible with PostgreSQL. Any tool, framework, or protocol that speaks PostgreSQL can query RisingWave materialized views. This includes the Model Context Protocol (MCP), LangChain's SQL toolkit, direct psycopg2 connections, and any ORM that supports PostgreSQL.

Automatic state management. Temporal filters and time windows handle data retention automatically. You do not need separate cleanup jobs, TTL configurations, or manual garbage collection. Define the freshness requirement in your SQL, and RisingWave enforces it.

Connecting Agents to Pre-Computed Context

Once your materialized views are running, connecting an AI agent is straightforward. Because RisingWave speaks PostgreSQL, you can use any PostgreSQL client library.

Here is a Python example using psycopg2 that retrieves user context for an agent:

import psycopg2
import json

def get_agent_context(user_id: str) -> dict:
    conn = psycopg2.connect(
        host="localhost", port=4566,
        user="root", dbname="dev"
    )
    cur = conn.cursor()

    # Fetch pre-computed user context
    cur.execute(
        "SELECT * FROM agent_user_context WHERE user_id = %s",
        (user_id,)
    )
    row = cur.fetchone()
    if row:
        context = {
            "user_id": row[0],
            "total_actions_24h": row[1],
            "sessions_24h": row[2],
            "errors_24h": row[3],
            "page_views_24h": row[4],
            "last_seen_at": str(row[5]),
            "action_types": row[6],
        }
    else:
        context = {"user_id": user_id, "status": "no recent activity"}

    cur.close()
    conn.close()
    return context

# The agent receives structured, pre-computed context
context = get_agent_context("u_101")
# Pass to LLM as part of the prompt
system_prompt = f"""You are a support agent. Here is the user's context:
{json.dumps(context, indent=2)}
Respond based on their recent activity and any issues they may be experiencing."""

The query SELECT * FROM agent_user_context WHERE user_id = %s executes in single-digit milliseconds because it reads a pre-computed result, not raw event data. The agent framework (LangChain, CrewAI, or a custom agent) receives structured context without any data processing overhead.

For MCP-based integrations, you can expose each materialized view as an MCP tool, allowing the agent to discover and query context views dynamically. See How to Connect a Streaming Database to AI Agents via MCP for a detailed walkthrough.

Streaming SQL vs. Batch Pipelines for Agent Context

A common question is: why not just run a batch pipeline (Airflow, dbt, Spark) to pre-compute agent context on a schedule?

Batch pipelines work when minutes or hours of staleness are acceptable. For many agent use cases, they are not.

DimensionBatch PipelineStreaming SQL (RisingWave)
FreshnessMinutes to hours (depends on schedule)Sub-second (incremental updates)
Latency to queryLow (pre-computed)Low (pre-computed)
Context accuracyStale between runsAlways current
InfrastructureScheduler + compute cluster + storageSingle streaming database
Change propagationNext batch runImmediate
TTL / data expiryManual cleanup jobsAutomatic via temporal filters

The key advantage of streaming SQL is not just speed. It eliminates an entire class of bugs caused by stale context. When an agent makes a decision based on data that was accurate 10 minutes ago but is wrong now, the result is a bad user experience that is difficult to debug because the agent's reasoning was technically correct given its (outdated) inputs.

What Is Context Engineering for AI Agents?

Context engineering is the practice of designing systems that provide AI agents with the right information at the right time. It goes beyond prompt engineering, which focuses on how you phrase instructions to the model. Context engineering addresses the data pipeline: what context does the model receive, how fresh is it, and how is it structured?

For AI agents operating in real-time environments (support bots, fraud detection, monitoring agents), context engineering requires a streaming data infrastructure. The context must be pre-computed so the agent can retrieve it instantly, continuously updated so it reflects the current state of the world, and automatically maintained so data engineers do not need to run manual refresh jobs.

RisingWave's materialized views are a natural fit for this pattern because they express the context transformation as declarative SQL while handling the incremental computation and state management automatically.

How Does Streaming SQL Differ from RAG for Agent Context?

Retrieval-augmented generation (RAG) retrieves documents from a vector store based on semantic similarity to the user's query. It works well for unstructured knowledge (documentation, support articles, policy documents) but has limitations for structured, real-time operational data.

Streaming SQL complements RAG by handling the structured data side:

  • RAG answers: "What does our documentation say about rate limiting?"
  • Streaming SQL answers: "This user hit 47 rate limit errors in the last hour across 3 API endpoints."

The two approaches are not mutually exclusive. A well-designed agent uses RAG for knowledge retrieval and streaming SQL for real-time operational context. The combination produces agents that understand both what to do (from documentation) and what is happening right now (from streaming data).

When Should You Use RisingWave Instead of Building Custom Context Pipelines?

Use RisingWave when your agent context requires any combination of:

  • Real-time aggregation: You need counts, sums, averages, or distinct counts over sliding time windows that update as events arrive.
  • Stream-table joins: Your events need enrichment from reference data (user profiles, product catalogs, configuration tables).
  • Freshness guarantees: Your agent's accuracy depends on data being seconds-fresh, not minutes-fresh.
  • SQL familiarity: Your team knows SQL and does not want to maintain a custom stream processing application in Java or Scala.

Custom pipelines (Kafka Streams, Flink applications) make sense when you need complex event processing logic that goes beyond what SQL can express, or when you already have a mature Flink infrastructure. For most context engineering use cases, streaming SQL provides the same capability with significantly less operational overhead. RisingWave is open source and can be deployed as a single binary for development or as a distributed cluster for production.

How Fresh Does Agent Context Need to Be?

The required freshness depends on the use case:

  • Fraud detection agents: Sub-second. The agent must see the latest transaction before approving the next one.
  • Support agents: Seconds to minutes. The agent needs the current session context but a slight delay is tolerable.
  • Analytics agents: Minutes to hours. Summary statistics and trend data can tolerate batch-level freshness.
  • Planning agents: Hours to days. Strategic recommendations based on historical patterns do not need real-time data.

RisingWave's temporal filters let you define different freshness windows for different materialized views. A fraud context view can use INTERVAL '5 minutes', while a trend analysis view uses INTERVAL '7 days'. Each view maintains its own time horizon automatically.

Conclusion

AI agents are only as good as the context they operate on. Stale data leads to wrong decisions. Slow context retrieval leads to poor user experience. Building custom aggregation pipelines leads to operational complexity that scales poorly.

Streaming SQL provides a direct path from raw event streams to agent-ready context:

  • Aggregation turns millions of raw events into compact, queryable summaries using standard SQL GROUP BY, COUNT, and FILTER expressions inside materialized views.
  • Enrichment joins event streams with reference tables so agents receive business context (account tiers, product names, company details) alongside behavioral signals.
  • Freshness guarantees are built into the SQL itself through temporal filters and time windows, with automatic data expiry and no manual cleanup.
  • RisingWave handles the incremental computation, state management, and PostgreSQL-compatible serving layer, so data engineers write SQL and agent developers query results through any PostgreSQL client.

The result is an architecture where the data pipeline and the agent framework are cleanly separated, each doing what it does best. The streaming database computes context. The agent consumes context. Neither needs to understand the other's internals.


Ready to build agent-ready context with streaming SQL? Get started with RisingWave in 5 minutes. Quickstart

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.