AI agents forget everything between requests. Without persistent memory, an agent cannot recall what a user asked five minutes ago, learn that a particular customer prefers technical documentation over marketing content, or notice that the same question keeps coming up across sessions. Every interaction starts from zero.
The standard fix is to bolt on a vector database for semantic search and a key-value store for session state. This works for simple cases, but it creates a fragmented architecture: conversation logs live in one system, user preference aggregations run as batch jobs in another, and the agent stitches it all together at query time. The memory layer becomes a patchwork of services that drift out of sync.
A streaming database offers a different approach. RisingWave can serve as a unified memory backend for AI agents: it stores raw conversation history in tables, computes user preference summaries and session statistics through materialized views that update incrementally, and serves the results over the PostgreSQL wire protocol. Your agent reads fresh, pre-computed context with a simple SELECT query. No batch jobs, no glue code, no stale data.
This article shows you how to build this architecture step by step, with verified SQL examples running on RisingWave 2.8.0.
Why AI Agent Memory Is Hard
Memory for AI agents is not a solved problem. The challenge is not just storage; it is keeping stored information useful, current, and fast to retrieve.
The Three Memory Requirements
Every production AI agent needs three types of memory:
Conversation history - The raw log of messages exchanged between users and the agent. This serves as short-term memory within a session and as a retrieval source for long-term context across sessions.
Aggregated preferences - Summarized signals about what each user cares about. An agent helping a developer should know that this user has viewed Kafka documentation seven times and rated materialized view tutorials highly, without scanning every raw interaction.
Operational metadata - Statistics about the agent's own behavior: which tools it calls most frequently, average latency per tool, success rates. This enables self-monitoring and optimization.
Where Existing Approaches Fall Short
The typical architecture for agent memory looks like this:
- Redis or Memcached for session state (fast but volatile)
- PostgreSQL or MongoDB for conversation logs (durable but static)
- A vector database (Pinecone, Qdrant, Chroma) for semantic retrieval
- Batch jobs (Airflow, dbt) to compute aggregated summaries on a schedule
This stack has real problems. The preference summaries are only as fresh as the last batch run, which might be hourly or daily. The agent cannot see that a user just bookmarked three articles about Kafka connectors because the aggregation has not run yet. Serving context requires querying multiple systems and merging results in application code. And every additional storage system is another thing to operate, monitor, and pay for.
Streaming Database as a Unified Memory Layer
A streaming database collapses this stack into a single system. RisingWave stores the raw data in tables and continuously computes aggregations through materialized views. When new data arrives (a new message, a new page view, a new tool call), the relevant materialized views update within milliseconds, not hours. The agent queries one system over one protocol.
Here is the architecture:
graph TD
A[AI Agent] -->|INSERT messages| RW[(RisingWave)]
A -->|SELECT context| RW
APP[Application Events] -->|INSERT interactions| RW
RW -->|Incremental updates| MV1[MV: User Preferences]
RW -->|Incremental updates| MV2[MV: Session Stats]
RW -->|Incremental updates| MV3[MV: Tool Performance]
MV1 -->|PostgreSQL query| A
MV2 -->|PostgreSQL query| A
MV3 -->|PostgreSQL query| A
The key properties of this architecture:
- Single storage system. Conversations, interactions, and tool call logs all live in RisingWave tables. No data duplication across Redis, PostgreSQL, and a vector store.
- Always-fresh aggregations. Materialized views recompute incrementally as new rows arrive. The user preference summary reflects the bookmark from 200 milliseconds ago, not yesterday's batch run.
- PostgreSQL protocol. Any PostgreSQL client library (psycopg2, JDBC, pgx, node-postgres) connects to RisingWave. Your agent framework does not need a special SDK.
- SQL interface. Memory queries are standard SQL. No proprietary query languages, no framework-specific APIs.
Building the Memory Schema
Let's build a working memory layer. All SQL below has been tested on RisingWave 2.8.0.
Step 1: Define the Raw Storage Tables
Three tables capture the raw signals that feed into agent memory:
-- Conversation messages between users and AI agents
CREATE TABLE agent_conversations (
message_id INT PRIMARY KEY,
session_id VARCHAR,
user_id VARCHAR,
role VARCHAR,
content VARCHAR,
token_count INT,
created_at TIMESTAMPTZ
);
-- User interactions with products, docs, and features
CREATE TABLE user_interactions (
interaction_id INT PRIMARY KEY,
user_id VARCHAR,
interaction_type VARCHAR,
item_id VARCHAR,
item_category VARCHAR,
rating FLOAT,
created_at TIMESTAMPTZ
);
-- Agent tool invocations and performance data
CREATE TABLE agent_tool_calls (
call_id INT PRIMARY KEY,
session_id VARCHAR,
user_id VARCHAR,
tool_name VARCHAR,
success BOOLEAN,
latency_ms INT,
created_at TIMESTAMPTZ
);
In a production system, these tables would typically be populated through Kafka sources or CDC connectors rather than direct INSERTs. The agent application publishes events to Kafka, and RisingWave ingests them continuously.
Step 2: Create Materialized Views for Computed Memory
This is where the streaming database approach pays off. Instead of running batch jobs to aggregate user behavior, you define the aggregation once as a materialized view. RisingWave keeps the result current as new data flows in.
User preference summary - Aggregates interaction signals per user and content category:
CREATE MATERIALIZED VIEW mv_user_preference_summary AS
SELECT
ui.user_id,
ui.item_category,
COUNT(*) FILTER (WHERE ui.interaction_type = 'page_view') AS view_count,
COUNT(*) FILTER (WHERE ui.interaction_type = 'bookmark') AS bookmark_count,
ROUND(AVG(ui.rating)::NUMERIC, 1) AS avg_rating,
MAX(ui.created_at) AS last_interaction
FROM user_interactions ui
GROUP BY ui.user_id, ui.item_category;
user_id | item_category | view_count | bookmark_count | avg_rating | last_interaction
------------+---------------+------------+----------------+------------+---------------------------
user-alice | blog | 0 | 1 | | 2026-04-01 09:05:00+00:00
user-alice | documentation | 3 | 0 | 4.8 | 2026-04-01 14:05:00+00:00
user-bob | documentation | 3 | 0 | 3 | 2026-04-01 12:55:00+00:00
user-carol | marketing | 1 | 0 | | 2026-04-01 12:05:00+00:00
The agent can now see at a glance that user-alice heavily prefers documentation content (3 views, 4.8 avg rating) and has also bookmarked blog content. This context arrives in a single SELECT, with no application-side aggregation.
Conversation session metadata - Summarizes each session for fast retrieval:
CREATE MATERIALIZED VIEW mv_conversation_context AS
SELECT
ac.session_id,
ac.user_id,
COUNT(*) AS message_count,
SUM(ac.token_count) AS total_tokens,
MIN(ac.created_at) AS session_start,
MAX(ac.created_at) AS session_end
FROM agent_conversations ac
GROUP BY ac.session_id, ac.user_id;
session_id | user_id | message_count | total_tokens | session_start | session_end
------------+------------+---------------+--------------+---------------------------+---------------------------
sess-101 | user-alice | 4 | 62 | 2026-04-01 09:00:00+00:00 | 2026-04-01 09:01:03+00:00
sess-102 | user-bob | 4 | 48 | 2026-04-01 10:00:00+00:00 | 2026-04-01 10:02:03+00:00
sess-103 | user-alice | 2 | 28 | 2026-04-01 11:00:00+00:00 | 2026-04-01 11:00:03+00:00
sess-104 | user-carol | 2 | 27 | 2026-04-01 12:00:00+00:00 | 2026-04-01 12:00:02+00:00
sess-105 | user-bob | 2 | 22 | 2026-04-01 13:00:00+00:00 | 2026-04-01 13:00:02+00:00
sess-106 | user-alice | 2 | 26 | 2026-04-01 14:00:00+00:00 | 2026-04-01 14:00:03+00:00
Tool performance metrics - Tracks which tools the agent uses for each user and how they perform:
CREATE MATERIALIZED VIEW mv_agent_session_stats AS
SELECT
tc.user_id,
tc.tool_name,
COUNT(*) AS call_count,
ROUND(AVG(tc.latency_ms)::NUMERIC, 0) AS avg_latency_ms,
SUM(CASE WHEN tc.success THEN 1 ELSE 0 END) AS success_count
FROM agent_tool_calls tc
GROUP BY tc.user_id, tc.tool_name;
user_id | tool_name | call_count | avg_latency_ms | success_count
------------+--------------------+------------+----------------+---------------
user-alice | run_sql_example | 1 | 280 | 1
user-alice | search_docs | 3 | 107 | 3
user-bob | query_system_table | 1 | 200 | 1
user-bob | run_sql_example | 1 | 350 | 1
user-bob | search_docs | 1 | 110 | 1
user-carol | search_pricing | 1 | 90 | 1
All three materialized views update automatically when new rows land in the underlying tables. There is nothing to schedule and nothing to trigger.
Serving Context to the Agent
With the schema in place, serving memory to the agent is just SQL. Here are the query patterns your agent code would use.
Retrieve Recent Conversation History
When starting a new session, the agent needs context from previous interactions:
SELECT session_id, role, content, created_at
FROM agent_conversations
WHERE user_id = 'user-alice'
ORDER BY created_at DESC
LIMIT 5;
session_id | role | content | created_at
------------+-----------+----------------------------------------------------------------------------------------------------------------------+---------------------------
sess-106 | assistant | Yes, you can use watermarks and the EMIT ON WINDOW CLOSE strategy for handling late data. | 2026-04-01 14:00:03+00:00
sess-106 | user | Can RisingWave handle late-arriving events? | 2026-04-01 14:00:00+00:00
sess-103 | assistant | Here is an example that aggregates page views by user and computes session duration. | 2026-04-01 11:00:03+00:00
sess-103 | user | Show me how to create a materialized view for user analytics. | 2026-04-01 11:00:00+00:00
sess-101 | assistant | RisingWave uses SQL natively and stores state internally, while Flink requires external state backends like RocksDB. | 2026-04-01 09:01:03+00:00
The agent can inject these messages into the LLM prompt as prior context, giving it awareness of what the user has already asked about.
Build a User Profile for Personalization
Combine preference data with session history to build a compact user profile:
SELECT
p.item_category AS preferred_category,
p.avg_rating,
p.view_count + p.bookmark_count AS engagement_signals,
c.message_count AS total_messages,
c.total_tokens
FROM mv_user_preference_summary p
JOIN mv_conversation_context c ON p.user_id = c.user_id
WHERE p.user_id = 'user-alice'
ORDER BY p.last_interaction DESC;
preferred_category | avg_rating | engagement_signals | total_messages | total_tokens
--------------------+------------+--------------------+----------------+--------------
documentation | 4.8 | 3 | 2 | 28
documentation | 4.8 | 3 | 2 | 26
documentation | 4.8 | 3 | 4 | 62
blog | | 1 | 2 | 28
blog | | 1 | 2 | 26
blog | | 1 | 4 | 62
This tells the agent that user-alice engages heavily with documentation (4.8 avg rating, 3 engagement signals) and has had multiple sessions with significant token usage. The agent can use this to prioritize documentation links in its responses and tailor its explanations to the user's demonstrated technical level.
Integrate with Python Agent Code
Here is how an agent built with a framework like LangChain or a custom Python application would query RisingWave for context:
import psycopg2
import json
def get_user_context(user_id: str) -> dict:
"""Retrieve fresh memory context for an AI agent from RisingWave."""
conn = psycopg2.connect(
host="localhost",
port=4566,
user="root",
dbname="dev"
)
cur = conn.cursor()
# Get recent conversation history
cur.execute("""
SELECT role, content, created_at
FROM agent_conversations
WHERE user_id = %s
ORDER BY created_at DESC LIMIT 10
""", (user_id,))
conversations = cur.fetchall()
# Get user preferences from the materialized view
cur.execute("""
SELECT item_category, view_count, bookmark_count, avg_rating
FROM mv_user_preference_summary
WHERE user_id = %s
ORDER BY last_interaction DESC
""", (user_id,))
preferences = cur.fetchall()
# Get tool usage patterns
cur.execute("""
SELECT tool_name, call_count, avg_latency_ms
FROM mv_agent_session_stats
WHERE user_id = %s
ORDER BY call_count DESC
""", (user_id,))
tool_stats = cur.fetchall()
cur.close()
conn.close()
return {
"recent_messages": [
{"role": r, "content": c, "time": str(t)}
for r, c, t in conversations
],
"preferences": [
{"category": cat, "views": v, "bookmarks": b, "rating": str(r)}
for cat, v, b, r in preferences
],
"tool_patterns": [
{"tool": name, "calls": cnt, "avg_ms": str(lat)}
for name, cnt, lat in tool_stats
]
}
# Use the context in your agent prompt
context = get_user_context("user-alice")
system_prompt = f"""You are a helpful assistant. Here is what you know about this user:
- They prefer: {json.dumps(context['preferences'])}
- Recent conversation topics: {json.dumps(context['recent_messages'][:3])}
- Tools they trigger most: {json.dumps(context['tool_patterns'])}
Tailor your responses based on this context."""
Because RisingWave speaks PostgreSQL, this code uses the standard psycopg2 library. No special drivers, no proprietary SDKs. If your agent framework already connects to PostgreSQL (and most do), it connects to RisingWave with a port number change.
Comparison: Streaming Database vs. Traditional Memory Stacks
| Capability | Traditional Stack (Redis + PostgreSQL + Vector DB + Batch) | Streaming Database (RisingWave) |
| Conversation storage | PostgreSQL or MongoDB | RisingWave tables |
| Preference aggregation | Batch job (hourly/daily) | Materialized views (milliseconds) |
| Session state | Redis (volatile) | RisingWave tables (durable) |
| Semantic search | Vector database required | Combine with a vector DB for embeddings, or use pg_bigm for text search |
| Query protocol | Multiple protocols per system | PostgreSQL wire protocol |
| Freshness of aggregations | Minutes to hours | Milliseconds |
| Operational overhead | 3-4 systems to manage | 1 system |
| Agent integration | Custom glue code per system | Standard PostgreSQL client |
The one area where a streaming database does not replace specialized tooling is vector similarity search for semantic retrieval. If your agent needs to find "messages similar to X" using embeddings, you still want a vector index. However, for structured memory (who asked what, user preferences, session statistics, tool performance), a streaming database handles everything in one place.
Production Deployment Patterns
Scaling for Multi-Tenant Agents
When serving thousands of users, partition your memory queries by user_id. RisingWave's materialized views are partitioned internally, and point lookups by a GROUP BY key are fast:
-- This query hits a single partition, not a full scan
SELECT * FROM mv_user_preference_summary
WHERE user_id = 'user-alice';
For high-throughput ingestion, use Kafka sources instead of direct INSERTs. Your agent application publishes events to Kafka topics, and RisingWave consumes them continuously:
CREATE SOURCE conversation_events
WITH (
connector = 'kafka',
topic = 'agent.conversations',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Memory Retention and Cleanup
Not all memory needs to live forever. Use RisingWave's temporal filters in materialized views to maintain rolling windows:
-- Only aggregate interactions from the last 30 days
CREATE MATERIALIZED VIEW mv_recent_preferences AS
SELECT
user_id,
item_category,
COUNT(*) AS interaction_count,
ROUND(AVG(rating)::NUMERIC, 1) AS avg_rating
FROM user_interactions
WHERE created_at > NOW() - INTERVAL '30 days'
GROUP BY user_id, item_category;
Connecting to Agent Frameworks
RisingWave works with any framework that supports PostgreSQL:
- LangChain: Use
SQLDatabasewith a PostgreSQL connection string pointing to port 4566 - LlamaIndex: Use the
PGVectorStoreadapter or direct SQL queries viaSQLDatabase - Custom agents: Use
psycopg2(Python),pgx(Go),node-postgres(JavaScript), or any JDBC driver - MCP integration: Connect through the RisingWave MCP server for standardized tool access across Claude, ChatGPT, and VS Code Copilot (see our guide on connecting a streaming database to AI agents via MCP)
What Is AI Agent Memory and Why Does It Matter?
AI agent memory is a system that persists information across interactions so that an agent can recall previous conversations, learn user preferences, and maintain context over time. Without memory, every agent interaction starts from scratch, forcing users to repeat themselves and preventing the agent from personalizing its responses. A well-designed memory layer makes the difference between an agent that feels like a stateless chatbot and one that behaves like a knowledgeable assistant.
How Does a Streaming Database Differ from a Vector Database for Agent Memory?
A streaming database like RisingWave and a vector database serve different purposes in an agent memory stack. A vector database stores high-dimensional embeddings and performs similarity search, which is useful for finding semantically related past conversations. A streaming database stores structured data (conversation logs, interaction events, tool calls) and continuously computes aggregations through materialized views. For most agent memory needs, such as "what has this user asked about?" or "what are this user's top preferences?", a streaming database provides direct, fast answers through SQL without requiring embedding generation or approximate nearest-neighbor search.
Can RisingWave Replace My Existing Agent Memory Infrastructure?
RisingWave can replace the relational database, the key-value store, and the batch aggregation layer in a typical agent memory stack. It stores raw events durably in tables, maintains always-fresh aggregations in materialized views, and serves both through the PostgreSQL protocol. The one component it does not replace is a vector database for embedding-based semantic search. For agents that rely heavily on semantic retrieval, use RisingWave alongside a vector index. For agents that primarily need structured memory (conversation history, preference summaries, session statistics), RisingWave handles it all.
How Do Materialized Views Keep Agent Memory Fresh Without Batch Jobs?
Unlike traditional materialized views that require a manual REFRESH command, RisingWave's materialized views are incrementally maintained. When a new row is inserted into a source table, RisingWave computes only the delta to the affected aggregation and updates the materialized view within milliseconds. There is no scheduler, no cron job, and no full recomputation. The agent always reads the latest aggregated result, whether the last event arrived 100 milliseconds ago or 10 hours ago.
Conclusion
Building reliable memory for AI agents does not require stitching together four different storage systems. A streaming database consolidates the storage, computation, and serving layers into one system. Here are the key takeaways:
- Store everything in one place. Conversation history, user interactions, and tool call logs go into RisingWave tables. One system to operate, one protocol to connect.
- Compute preferences continuously. Materialized views replace batch aggregation jobs. User preference summaries, session statistics, and tool performance metrics update within milliseconds of new data arriving.
- Serve context over PostgreSQL. Your agent queries RisingWave using the same psycopg2, JDBC, or pgx driver it already uses for PostgreSQL. No new SDKs, no proprietary APIs.
- Keep it simple. The entire memory layer is defined in SQL: CREATE TABLE for raw storage, CREATE MATERIALIZED VIEW for computed memory, and SELECT for retrieval. Your agent framework does not need to know it is talking to a streaming database.
The result is an agent memory system that is always fresh, queryable in standard SQL, and operationally simple. Your agent remembers what matters, and it remembers it now, not after the next batch run.
Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart
Join our Slack community to ask questions and connect with other stream processing developers.

