How to Serve Fresh Data to AI Agents in Real Time

How to Serve Fresh Data to AI Agents in Real Time

How to Serve Fresh Data to AI Agents in Real Time

AI agents querying stale databases make confident, wrong decisions. When an agent checks inventory, account state, or system status, it must see current reality — not a snapshot from an hour ago. A streaming database gives agents a PostgreSQL-compatible interface to always-current materialized state, with no polling, no cache warming, and no staleness.


The Problem: Agents Are Querying the Wrong Data Layer

Modern AI agents are remarkable at reasoning, planning, and tool use. They fall apart when the data they reason over is wrong.

The most common failure mode is not a reasoning error — it is a freshness error. An agent queries an inventory database and finds 50 units in stock. It places an order. The real inventory is zero; the database reflects a snapshot from before the last batch sync. The agent was not wrong about its reasoning; it was wrong about reality.

This problem compounds in agentic systems where agents hand off information to other agents, or where agents take actions based on intermediate data reads. A stale read early in a chain produces downstream decisions built on a false premise.

Why Standard Databases Are Not Enough

Relational databases are designed for transactional workloads — reading and writing records as they are. They are excellent at reflecting the current state of rows they manage directly.

The problem is that most "current state" questions for AI agents involve aggregations and joins across multiple sources. "What is the current risk exposure for this customer?" is not a single row lookup. It requires summing open positions, checking credit limits, joining recent transaction history, and possibly incorporating real-time market prices. If you compute that in a query at request time, it is slow. If you precompute it in a batch job, it is stale.

Read replicas and caches solve latency but not freshness. A Redis cache of yesterday's customer risk score is still yesterday's.

Streaming Databases as a Live Context Layer

RisingWave is a PostgreSQL-compatible streaming database — open source, Apache 2.0, written in Rust, with S3 as its storage backend. Its materialized views update incrementally as source data changes, and agents query them exactly like PostgreSQL tables.

The mental model is: RisingWave sits between your operational data sources and your AI agents, maintaining a continuously updated set of views that represent current system state at the granularity agents need.

[Kafka events] ──┐
[PostgreSQL CDC] ─┼──▶ [RisingWave] ──▶ [AI Agent (PostgreSQL client)]
[API streams]  ──┘         │
                    Materialized views
                    always up to date

Agents query materialized views. RisingWave handles the continuous recomputation. The agent always gets current state.

Architecture: What Agents Query

Ingesting Operational Events

-- Real-time order events from Kafka
CREATE SOURCE order_events (
    order_id    BIGINT,
    customer_id BIGINT,
    product_id  BIGINT,
    quantity    INT,
    status      VARCHAR,
    created_at  TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'order-events',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Live inventory updates via CDC from warehouse system
CREATE SOURCE inventory_updates WITH (
    connector = 'postgres-cdc',
    hostname = 'warehouse-db.internal',
    port = '5432',
    username = 'cdc_user',
    password = '...',
    database.name = 'warehouse',
    schema.name = 'public',
    table.name = 'inventory_ledger'
);

Materialized Views for Agent Context

-- Current inventory level per SKU (agents check this before recommending orders)
CREATE MATERIALIZED VIEW current_inventory AS
SELECT
    product_id,
    SUM(quantity_delta)             AS units_on_hand,
    MAX(event_time)                 AS last_updated
FROM inventory_updates
GROUP BY product_id;

-- Customer order activity in last 24 hours (agents use this for personalization)
CREATE MATERIALIZED VIEW customer_activity_24h AS
SELECT
    customer_id,
    COUNT(*) FILTER (WHERE status = 'placed')       AS orders_placed,
    COUNT(*) FILTER (WHERE status = 'cancelled')    AS orders_cancelled,
    SUM(order_value) FILTER (WHERE status = 'placed') AS spend_24h,
    MAX(created_at)                                 AS last_order_at
FROM order_events
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY customer_id;

-- System health summary (agents use this for operational decisions)
CREATE MATERIALIZED VIEW system_health_summary AS
SELECT
    service_name,
    COUNT(*) FILTER (WHERE level = 'ERROR')     AS error_count_5m,
    COUNT(*) FILTER (WHERE level = 'WARN')      AS warn_count_5m,
    MAX(event_time)                             AS last_event_at,
    AVG(response_time_ms)                       AS avg_response_ms_5m
FROM service_logs
WHERE event_time > NOW() - INTERVAL '5 minutes'
GROUP BY service_name;

Agent Queries (Tool Implementations)

Because RisingWave exposes a PostgreSQL interface, agent tool implementations are standard database queries. Any PostgreSQL driver works.

import psycopg2
import json

conn = psycopg2.connect(
    host="risingwave.internal",
    port=4566,
    database="dev",
    user="root"
)

def check_inventory(product_id: int) -> dict:
    """Tool: check current inventory for a product."""
    cur = conn.cursor()
    cur.execute("""
        SELECT units_on_hand, last_updated
        FROM current_inventory
        WHERE product_id = %s
    """, (product_id,))
    row = cur.fetchone()
    if not row:
        return {"units_on_hand": 0, "last_updated": None}
    return {
        "units_on_hand": row[0],
        "last_updated": row[1].isoformat()
    }

def get_customer_context(customer_id: int) -> dict:
    """Tool: get current customer activity context."""
    cur = conn.cursor()
    cur.execute("""
        SELECT orders_placed, orders_cancelled, spend_24h, last_order_at
        FROM customer_activity_24h
        WHERE customer_id = %s
    """, (customer_id,))
    row = cur.fetchone()
    return {
        "orders_placed_24h": row[0] if row else 0,
        "orders_cancelled_24h": row[1] if row else 0,
        "spend_24h": float(row[2]) if row else 0.0,
        "last_order_at": row[3].isoformat() if row and row[3] else None
    }

def check_system_health(service_name: str) -> dict:
    """Tool: check real-time health of a service."""
    cur = conn.cursor()
    cur.execute("""
        SELECT error_count_5m, warn_count_5m, avg_response_ms_5m
        FROM system_health_summary
        WHERE service_name = %s
    """, (service_name,))
    row = cur.fetchone()
    return {
        "error_count_5m": row[0] if row else 0,
        "warn_count_5m": row[1] if row else 0,
        "avg_response_ms": float(row[2]) if row else None,
        "status": "degraded" if (row and row[0] > 10) else "healthy"
    }

These tools plug directly into any agent framework — LangChain, LlamaIndex, OpenAI function calling, or a custom orchestrator. The agent's tool calls return current state, every time.

Multi-Agent Context Sharing

In multi-agent systems, a shared streaming database prevents context divergence. If Agent A reads inventory at t=0 and Agent B reads inventory at t=5, they see the same materialized view — the most current state at the time of their respective queries.

Without a shared live context layer, agents operating in parallel can reach contradictory conclusions about system state. Agent A decides to approve an order based on inventory of 50 units. Agent B decides to approve a different order based on the same 50 units. The warehouse has 50 units, but now two approved orders totaling 100.

-- Pessimistic inventory view including pending orders
-- Agents query this instead of raw inventory for safe ordering decisions
CREATE MATERIALIZED VIEW available_inventory AS
SELECT
    i.product_id,
    i.units_on_hand
        - COALESCE(p.units_pending, 0)  AS units_available,
    i.units_on_hand,
    COALESCE(p.units_pending, 0)        AS units_pending
FROM current_inventory i
LEFT JOIN (
    SELECT product_id, SUM(quantity) AS units_pending
    FROM order_events
    WHERE status = 'pending'
    GROUP BY product_id
) p ON p.product_id = i.product_id;

Agents checking available_inventory see inventory net of pending orders — a more conservative and correct view for ordering decisions.

Temporal Queries: What Was True When?

Sometimes agents need to reason about state at a specific point in time — for dispute resolution, audit trails, or explaining past decisions. RisingWave's append-only tables and Kafka source offsets enable this.

-- Append-only log of inventory snapshots (queryable by time range)
CREATE MATERIALIZED VIEW inventory_history AS
SELECT
    product_id,
    SUM(quantity_delta) OVER (
        PARTITION BY product_id
        ORDER BY event_time
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    )                   AS cumulative_units,
    event_time
FROM inventory_updates;

An agent resolving a dispute can query: "What was the inventory of product 42 at 14:35 UTC on March 15?" against this historical view.

Comparison: Approaches to Agent Data Access

ApproachData FreshnessQuery LatencyAgent IntegrationHandles Aggregations
Direct OLTP queriesCurrentLow (simple) / High (complex)Standard SQLExpensive
Read replicaSeconds behindLow–mediumStandard SQLBetter, still expensive
Redis cacheMinutes to hours staleVery lowCustom clientPrecomputed only
Data warehouseHours staleMedium (optimized)Standard SQLYes
Streaming DB (RisingWave)SecondsLow (precomputed)PostgreSQLYes, incrementally

The streaming database wins on the combination: fresh data, low query latency (because results are precomputed), and standard SQL interface that requires no special agent SDK.


FAQ

Can agents subscribe to changes in RisingWave rather than polling? Yes. RisingWave supports SUBSCRIBE queries that emit a stream of changes as they occur. An agent or orchestrator can open a subscription and receive pushed updates when a materialized view changes, rather than polling on a timer.

How do you prevent an agent from reading partially-updated state? RisingWave processes stream data transactionally within each checkpoint interval (configurable, default 1 second). Agents always read consistent checkpoint state — they will never see a row that reflects half of a batch update.

What authentication and authorization does RisingWave support? RisingWave supports standard PostgreSQL authentication (username/password, SSL). Row-level security and schema-level grants allow scoping agent access to specific views, preventing agents from reading data outside their authorized scope.

Can agents write back to RisingWave? Agents can write to RisingWave tables using standard INSERT statements. These writes appear immediately in dependent materialized views. This is useful for recording agent decisions as events that feed back into the context layer.

What is the latency between a source event and a queryable result? End-to-end latency from a Kafka event or CDC change to a queryable materialized view update is typically 1-5 seconds under normal load, depending on view complexity and cluster size. Simple aggregations update in under a second.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.