Streaming for AI Agents: Why Real-Time Data Is the Missing Infrastructure Layer
AI agents that operate on stale, batch-processed data make confident decisions based on information that is hours old — leading to hallucinations, incorrect actions, and broken workflows. IBM validated this problem at $11 billion scale when it acquired Confluent in March 2026, calling real-time data streaming "the engine of enterprise AI and agents." The fix is streaming infrastructure that feeds agents continuously updated context. A streaming database like RisingWave provides this context layer: materialized views that update within milliseconds, queryable via standard PostgreSQL connections, with no Kafka or Flink expertise required.
This article explains why streaming is the critical missing layer for AI agents, how industry leaders are building it, and how to implement it with SQL.
The $11 Billion Validation
On March 17, 2026, IBM completed its acquisition of Confluent for $11 billion — the largest enterprise software deal focused specifically on real-time data for AI. The strategic rationale was explicit:
"With Confluent, we are giving clients the ability to move trusted data continuously across their entire operation so their AI models and agents can act on what is happening right now, not on data that is hours old. Transactions happen in milliseconds, and AI decisions need to happen just as fast." — Rob Thomas, Senior Vice President, IBM Software
"What does AI need? It needs data. It needs the right data, right now." — Jay Kreps, Confluent CEO
Confluent's Chief Product Officer, Shaun Clowes, put it more bluntly: AI agents are intelligent but "dumb as rocks" without real-time, reliable data streams.
This acquisition signals a fundamental industry shift: streaming isn't just an infrastructure layer — it's becoming the runtime for agentic AI.
The Data Latency Gap
The core problem IBM identified is the data latency gap: AI models and agents relying on batch-processed data that refreshes every few hours.
| Scenario | Batch-Refreshed Agent | Streaming-Powered Agent |
| Customer upgraded plan 5 minutes ago | References old plan, gives wrong answer | Knows current plan, responds correctly |
| Product sold out 30 seconds ago | Shows as available, takes order | Knows it's out of stock, suggests alternative |
| Suspicious login from new country | Won't detect until next batch run | Flags immediately, triggers verification |
| Market price moved 10 seconds ago | Uses stale price for decision | Acts on current market data |
The critical insight: LLMs treat whatever data they receive as ground truth. They don't hedge on potentially outdated information. They respond with full confidence — even when the data is hours stale. An average model with fresh data consistently outperforms a frontier model with stale data.
How AI Agents Consume Data Today
The Current Stack (and Its Limitations)
Tool/Function Calling: Agents call APIs to fetch data on demand. Each call adds latency (100-500ms), and complex context (joins across 5 tables, 30-day aggregations) requires multiple sequential calls. No pre-computation.
RAG (Retrieval-Augmented Generation): Agents retrieve context from vector stores. But embeddings are batch-refreshed — typically hours old. A document updated 5 minutes ago still returns stale embeddings until the next batch run.
Direct Database Queries: Agents query OLTP databases. But analytical queries (aggregations, joins, window functions) compete with application traffic and can be slow or harmful to production performance.
None of these provide real-time, pre-computed, always-current context. That's what streaming databases solve.
Model Context Protocol (MCP): The Universal Agent Connector
MCP — the open standard for connecting AI agents to external data sources — has reached 97 million monthly SDK downloads in 2026, with adoption from OpenAI, Google DeepMind, Anthropic, and Microsoft. The official MCP registry lists over 6,400 servers.
MCP defines how agents discover, request, and receive context from data sources. A streaming database with an MCP server provides the freshest possible context to any MCP-compatible agent.
RisingWave has an MCP server that allows Claude Desktop, VS Code Copilot, and other MCP-compatible assistants to query streaming materialized views directly.
How Industry Leaders Are Building Streaming for Agents
Confluent: Streaming Agents on Flink
Confluent introduced Streaming Agents — agentic AI embedded directly into Apache Flink on Confluent Cloud:
- Agents continuously perceive, reason, and act on live event streams
- Defined via SQL:
CREATE TOOL,CREATE AGENT,AI_RUN_AGENT - Native integration with OpenAI, Azure ML, AWS SageMaker
- Support for both function-based and MCP-based tools
- A2A (Agent-to-Agent) protocol integration for multi-agent collaboration
Architecture:
Kafka Topics → Flink (Streaming Agent) → LLM → Tool Calls → Actions
↑ ↓
Event Context Kafka Output
Trade-off: Powerful but complex. Requires Kafka + Flink + Confluent Cloud. Java/SQL hybrid. Enterprise pricing.
AWS: Bedrock AgentCore + Kinesis
AWS connects streaming to agents through:
- Bedrock AgentCore Memory with streaming notifications via Kinesis
- Managed Service for Apache Flink processing Kinesis streams for real-time AI
- Integration architecture: Kinesis Data Streams → Flink → Bedrock
RisingWave: The Simpler Path
RisingWave provides streaming context for agents without the Kafka + Flink complexity:
Sources (CDC, Kafka) → RisingWave → Materialized Views → Agent queries via PostgreSQL
One system replaces three (Kafka + Flink + serving database). Agents query pre-computed, always-current context via standard PostgreSQL connections.
Building a Streaming Context Layer for AI Agents
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Data Sources │
│ PostgreSQL (CDC) │ Kafka Events │ MySQL (CDC) │ APIs │
└────────┬───────────┴───────┬────────┴───────┬───────┴───────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ RisingWave │
│ (Streaming Database) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Materialized Views (Context Layer) │ │
│ │ │ │
│ │ customer_context │ order_status │ risk_scores │ │
│ │ product_inventory │ user_activity │ fraud_signals │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ PostgreSQL Protocol │ MCP Server │ Iceberg Sink │
└────────┬───────────────┴──────┬───────┴─────────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌──────────────────┐
│ AI Agents │ │ MCP-Compatible │
│ (LangChain, │ │ Assistants │
│ CrewAI, │ │ (Claude, Copilot│
│ AutoGen) │ │ ChatGPT) │
└─────────────────┘ └──────────────────┘
Implementation: Customer Support Agent Context
-- Step 1: Ingest from production databases via CDC
CREATE SOURCE pg_source WITH (
connector = 'postgres-cdc',
hostname = 'prod-db.internal',
port = '5432',
username = 'cdc_user',
password = 'secret',
database.name = 'production'
);
CREATE TABLE customers (...) FROM pg_source TABLE 'public.customers';
CREATE TABLE orders (...) FROM pg_source TABLE 'public.orders';
CREATE TABLE support_tickets (...) FROM pg_source TABLE 'public.support_tickets';
-- Step 2: Build always-current context views
CREATE MATERIALIZED VIEW customer_context AS
SELECT
c.customer_id,
c.name,
c.email,
c.current_plan,
c.plan_updated_at,
-- Open support issues
COUNT(t.id) FILTER (WHERE t.status = 'open') as open_tickets,
MAX(t.created_at) FILTER (WHERE t.status = 'open') as latest_ticket_time,
-- Recent order activity
COUNT(o.id) FILTER (WHERE o.created_at > NOW() - INTERVAL '30 days') as orders_30d,
SUM(o.amount) FILTER (WHERE o.created_at > NOW() - INTERVAL '30 days') as spend_30d,
-- Churn risk signal
CASE
WHEN COUNT(t.id) FILTER (WHERE t.status = 'open') > 3 THEN 'high'
WHEN c.plan_updated_at < NOW() - INTERVAL '90 days' THEN 'medium'
ELSE 'low'
END as churn_risk
FROM customers c
LEFT JOIN support_tickets t ON c.customer_id = t.customer_id
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.email, c.current_plan, c.plan_updated_at;
Agent Integration (Python)
import psycopg2
def get_customer_context(customer_id: str) -> dict:
"""Fetch real-time customer context for the support agent."""
conn = psycopg2.connect(
host="risingwave", port=4566, dbname="dev", user="root"
)
cursor = conn.cursor()
cursor.execute("""
SELECT name, current_plan, plan_updated_at, open_tickets,
spend_30d, churn_risk
FROM customer_context
WHERE customer_id = %s
""", (customer_id,))
row = cursor.fetchone()
conn.close()
if not row:
return {}
return {
"name": row[0],
"current_plan": row[1],
"plan_updated_at": str(row[2]),
"open_tickets": row[3],
"spend_30d": float(row[4]) if row[4] else 0,
"churn_risk": row[5]
}
# Pass to LLM as context
context = get_customer_context("cust_12345")
prompt = f"""You are a customer support agent. Here is the customer's current context:
{context}
The customer says: "I want to cancel my subscription."
Respond appropriately based on their current plan, spend history, and churn risk."""
The customer_context view updates within milliseconds of any change to the customer's plan, tickets, or orders. The agent always sees the current state — never stale data.
RisingWave vs Confluent for Agent Streaming
| Aspect | Confluent (Kafka + Flink) | RisingWave |
| Components | Kafka + Flink + Connectors + ksqlDB | Single system |
| Agent integration | Streaming Agents (Flink-native) | PostgreSQL protocol + MCP server |
| Language | Java + SQL + Python | SQL only |
| Context serving | Via Kafka topics or external DB | Built-in (PostgreSQL protocol) |
| CDC | Debezium + Kafka Connect | Native (no middleware) |
| Cost | Enterprise pricing (Confluent Cloud) | Open source (Apache 2.0) |
| Complexity | High (3+ systems) | Low (1 system) |
| Best for | Enterprise with existing Kafka | Teams wanting simplest path to agent context |
Confluent's advantage: Streaming Agents with native LLM integration, A2A protocol, enterprise governance.
RisingWave's advantage: One system, SQL-only, PostgreSQL compatibility (works with any agent framework), self-hostable, open source.
Market Context
The convergence of streaming and AI agents is backed by massive market momentum:
| Metric | Value |
| IBM-Confluent acquisition | $11 billion |
| AI agents market (2026) | $10.9 billion |
| AI agents market (2034 projection) | $139 billion |
| AI agents market CAGR | 49.6% |
| Enterprise apps with AI agents by end 2026 | 40% (up from <5% in 2025) |
| IT leaders increasing streaming investment | 90% |
| MCP monthly SDK downloads | 97 million |
| Data streaming addressable market | $100 billion |
Gartner predicts 40% of enterprise applications will include task-specific AI agents by end of 2026, up from less than 5% in 2025. Every one of those agents needs real-time context.
Getting Started
- Deploy RisingWave:
docker run -d -p 4566:4566 risingwavelabs/risingwave:latest - Connect your databases: Create CDC sources from PostgreSQL/MySQL
- Build context views: Define materialized views for your agent's context needs
- Connect your agent: Use any PostgreSQL driver (psycopg2, pgx, JDBC) or the MCP server
- Query context:
SELECT * FROM customer_context WHERE customer_id = 'X'— always fresh
Frequently Asked Questions
Why do AI agents need streaming data?
AI agents make decisions based on the data they receive. When that data is hours old (batch-refreshed), agents make confident but incorrect decisions — recommending products that are out of stock, referencing subscription plans that changed, or missing fraud patterns that emerged minutes ago. Streaming provides sub-second data freshness, ensuring agents act on current reality.
What did IBM's $11B Confluent acquisition mean for streaming + AI?
IBM's acquisition validated that real-time data streaming is the critical infrastructure layer for enterprise AI and agents. IBM's Rob Thomas stated: "Transactions happen in milliseconds, and AI decisions need to happen just as fast." The deal positions streaming as the foundation for AI agent architectures, not just a data pipeline tool.
How does RisingWave compare to Confluent for AI agent streaming?
Confluent (Kafka + Flink) provides a comprehensive but complex streaming platform with native Streaming Agents and enterprise governance. RisingWave provides a simpler path: a single PostgreSQL-compatible streaming database that serves as the agent context layer. RisingWave is best for teams wanting SQL-only development, built-in serving, and open-source flexibility. Confluent is best for enterprises with existing Kafka infrastructure and need for Flink-native agent capabilities.
What is the Model Context Protocol (MCP) and how does it relate to streaming?
MCP is an open standard (97M+ monthly SDK downloads) for connecting AI agents to external data sources. It defines how agents discover and query data. A streaming database with an MCP server (like RisingWave) provides the freshest possible context to any MCP-compatible agent — Claude, ChatGPT, Copilot, or custom agents.
Can I use streaming for AI agents without Kafka?
Yes. RisingWave ingests directly from PostgreSQL and MySQL via native CDC — no Kafka, no Debezium, no middleware. For many agent context workloads, the data source is an operational database, not an event stream. RisingWave's native CDC provides the simplest possible streaming pipeline for agent context.

