AI Agent State Management with PostgreSQL: A Production Architecture

AI Agent State Management with PostgreSQL: A Production Architecture

·

13 min read

The State Management Problem

Most agent frameworks ship with a convenient default for state: a Python dict, a SQLite file, or an in-process object on the host machine. This works fine for a single agent running on a single machine during a demo. It stops working the moment you deploy.

Consider what production looks like. You run three instances of your customer support agent behind a load balancer. Each instance routes to a different replica. Session A starts on instance 1, but the next request from the same user lands on instance 2. That instance has no idea what happened in session A. The user gets asked to re-explain their problem. The agent has amnesia.

The deeper problem is not just load balancing. It is what agent state actually contains. A production agent accumulates:

  • Session context: The conversation history, user intent, and preferences gathered during a session
  • Task progress: Where in a multi-step workflow the agent currently is
  • Decision history: What the agent decided before, so it does not loop or contradict itself
  • Accumulated context: Summaries, extracted facts, and intermediate results from earlier steps

This data must survive instance restarts, be readable by any replica in the pool, and be queryable for debugging and observability. A Python dict provides none of these guarantees. SQLite survives restarts but is a local file; instance 2 cannot read instance 1's SQLite database. Redis gives you shared access but no SQL, no joins, and no complex aggregations.

What you need is a real database. Specifically, you need one that speaks PostgreSQL so your existing tooling works, and one that can compute current agent state continuously as events flow in, not just store raw event records.

What Agent State Actually Is

Before choosing a storage technology, it helps to model agent state precisely. Agent state is not a single value. It is the result of applying a sequence of events to derive a current view of the world.

Think of it this way. An agent does not store "current status = waiting_for_user". Instead, it records an event: {event_type: "agent_paused", reason: "awaiting_user_input", timestamp: ...}. The current status is derived from the latest event of that type. This event-log model is intentional: it gives you a full audit trail, allows replaying history, and makes debugging tractable.

The raw event log is your source of truth. But querying raw events to reconstruct current state on every agent decision is expensive and slow. What you need is a continuously maintained projection of the current state, derived from the event log automatically.

This is precisely what a streaming database materialized view delivers.

The Two-Layer Architecture

A clean production architecture for agent state separates the write path from the read path.

Write layer: An append-only event log. Every action the agent takes, every user message received, every tool call made, every decision reached, gets recorded as a row in a structured events table. Writes are simple inserts. No updates, no deletes. This is the source of truth.

Read layer: Continuously maintained materialized views. Each view projects the event log into a specific representation of current state: the latest status per agent, the accumulated context for a session, the task progress summary. Reads are simple point lookups against pre-computed results.

This separation means writers never block readers, the event log is immutable and auditable, and the read views are always fresh because RisingWave maintains them incrementally as new events arrive.

Building the Schema

Start with the two core tables in the write layer.

Agent Events Table

-- Tested against RisingWave v2.8.0
-- This is the append-only event log: the single source of truth

CREATE TABLE agent_events (
    event_id        VARCHAR         PRIMARY KEY,
    agent_id        VARCHAR         NOT NULL,
    session_id      VARCHAR         NOT NULL,
    event_type      VARCHAR         NOT NULL,
    -- Common event types: session_started, message_received, tool_called,
    --   tool_result, decision_made, task_completed, agent_paused, agent_resumed,
    --   error_occurred, session_ended
    payload         JSONB,
    -- Structured payload depends on event_type. Examples:
    --   message_received: {"role": "user", "content": "...", "token_count": 142}
    --   tool_called:      {"tool": "search_orders", "args": {"order_id": "ORD-991"}}
    --   decision_made:    {"action": "escalate", "reason": "...", "confidence": 0.91}
    created_at      TIMESTAMPTZ     NOT NULL DEFAULT NOW()
);

Agent Sessions Table

-- Session metadata: one row per agent session
-- Written once at session start, updated rarely (only for session-level attributes)

CREATE TABLE agent_sessions (
    session_id      VARCHAR         PRIMARY KEY,
    agent_id        VARCHAR         NOT NULL,
    user_id         VARCHAR,
    agent_type      VARCHAR         NOT NULL,
    -- e.g. 'customer_support', 'order_analyst', 'fraud_reviewer'
    started_at      TIMESTAMPTZ     NOT NULL DEFAULT NOW(),
    metadata        JSONB
    -- metadata holds immutable session context: channel, locale, initial_intent, etc.
);

Materialized Views for Current Agent State

With the write layer defined, the read layer is a set of materialized views. RisingWave maintains these incrementally: each new row inserted into agent_events triggers an update only to the affected rows in the views, not a full recomputation.

Current State Per Agent

This view answers the most common agent query: "What is my current status, and what was the last thing that happened?"

CREATE MATERIALIZED VIEW agent_current_state AS
SELECT DISTINCT ON (agent_id)
    e.agent_id,
    e.session_id,
    e.event_type                                AS last_event_type,
    e.payload,
    e.created_at                                AS last_event_at,
    s.agent_type,
    s.user_id,
    s.started_at                                AS session_started_at
FROM agent_events e
JOIN agent_sessions s ON e.session_id = s.session_id
ORDER BY e.agent_id, e.created_at DESC;

The agent queries this view at the start of each decision loop:

SELECT
    last_event_type,
    payload,
    last_event_at,
    agent_type,
    user_id
FROM agent_current_state
WHERE agent_id = 'agent-7f2c9d';

Session Summary for Context Retrieval

When an agent resumes a session (after a pause, a restart, or a handoff to another instance), it needs a summary of what happened so far. This view pre-computes that summary continuously.

CREATE MATERIALIZED VIEW session_summary AS
SELECT
    session_id,
    agent_id,
    COUNT(*)                                                        AS total_events,
    COUNT(*) FILTER (WHERE event_type = 'message_received')         AS user_messages,
    COUNT(*) FILTER (WHERE event_type = 'tool_called')             AS tool_calls,
    COUNT(*) FILTER (WHERE event_type = 'decision_made')           AS decisions,
    COUNT(*) FILTER (WHERE event_type = 'error_occurred')          AS errors,
    MIN(created_at)                                                 AS first_event_at,
    MAX(created_at)                                                 AS latest_event_at,
    MAX(created_at) - MIN(created_at)                              AS session_duration,
    SUM(
        COALESCE((payload->>'token_count')::INT, 0)
    )                                                               AS total_tokens_used
FROM agent_events
GROUP BY session_id, agent_id;

An agent resuming after a handoff retrieves its session summary and the last N events to reconstruct context:

-- Step 1: Get session summary
SELECT
    total_events,
    user_messages,
    tool_calls,
    decisions,
    errors,
    session_duration,
    total_tokens_used
FROM session_summary
WHERE session_id = 'sess-a3b8f1';

-- Step 2: Get recent event history for context window
SELECT
    event_type,
    payload,
    created_at
FROM agent_events
WHERE session_id = 'sess-a3b8f1'
ORDER BY created_at DESC
LIMIT 20;

Task Progress View

For agents executing multi-step workflows, this view tracks progress across task phases.

CREATE MATERIALIZED VIEW agent_task_progress AS
SELECT
    session_id,
    agent_id,
    COUNT(*) FILTER (WHERE event_type = 'decision_made')           AS decisions_made,
    COUNT(*) FILTER (WHERE event_type = 'tool_called')             AS tools_invoked,
    COUNT(*) FILTER (WHERE event_type = 'task_completed')          AS tasks_completed,
    BOOL_OR(event_type = 'error_occurred')                         AS had_error,
    BOOL_OR(event_type = 'session_ended')                          AS is_closed,
    MAX(created_at) FILTER (WHERE event_type = 'decision_made')    AS last_decision_at
FROM agent_events
GROUP BY session_id, agent_id;

Connecting from Python

Because RisingWave implements the PostgreSQL wire protocol, your agent framework connects with any standard PostgreSQL driver. No proprietary SDK. The same psycopg2 or SQLAlchemy code your team already knows works without modification.

import psycopg2
import json
from datetime import datetime, timezone
import uuid

# Standard PostgreSQL connection -- works with RisingWave unchanged
conn = psycopg2.connect(
    host="localhost",
    port=4566,       # RisingWave default port
    dbname="dev",
    user="root",
    password=""
)
conn.autocommit = True

def record_event(agent_id: str, session_id: str,
                 event_type: str, payload: dict) -> str:
    """Append an event to the agent event log."""
    event_id = str(uuid.uuid4())
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO agent_events
                (event_id, agent_id, session_id, event_type, payload, created_at)
            VALUES (%s, %s, %s, %s, %s, %s)
            """,
            (event_id, agent_id, session_id, event_type,
             json.dumps(payload), datetime.now(timezone.utc))
        )
    return event_id

def get_current_state(agent_id: str) -> dict | None:
    """Retrieve the latest agent state from the materialized view."""
    with conn.cursor() as cur:
        cur.execute(
            "SELECT * FROM agent_current_state WHERE agent_id = %s",
            (agent_id,)
        )
        row = cur.fetchone()
        if row is None:
            return None
        cols = [desc[0] for desc in cur.description]
        return dict(zip(cols, row))

def get_session_summary(session_id: str) -> dict | None:
    """Retrieve the pre-computed session summary."""
    with conn.cursor() as cur:
        cur.execute(
            "SELECT * FROM session_summary WHERE session_id = %s",
            (session_id,)
        )
        row = cur.fetchone()
        if row is None:
            return None
        cols = [desc[0] for desc in cur.description]
        return dict(zip(cols, row))

# Usage: an agent records a decision event
record_event(
    agent_id="agent-7f2c9d",
    session_id="sess-a3b8f1",
    event_type="decision_made",
    payload={
        "action": "escalate_to_human",
        "reason": "Customer has unresolved billing dispute > 14 days",
        "confidence": 0.95
    }
)

# Immediately after, the materialized view reflects the new state
state = get_current_state("agent-7f2c9d")
print(state["last_event_type"])   # "decision_made"
print(state["last_event_at"])     # current timestamp

SQLAlchemy works identically. Point your engine at postgresql+psycopg2://root@localhost:4566/dev and use the ORM or Core expressions as you normally would. Any PostgreSQL-compatible ORM, migration tool, or introspection utility works with RisingWave without modification.

Observability with System Tables

RisingWave exposes streaming job health through system tables. You can monitor the agent state pipeline the same way you would monitor any streaming workload.

-- Check that materialized views are processing events without lag
SELECT
    name,
    definition,
    created_at
FROM rw_streaming_jobs
WHERE name IN ('agent_current_state', 'session_summary', 'agent_task_progress');

-- Monitor Kafka consumer lag if agent events flow through Kafka
SELECT
    source_name,
    partition,
    current_offset,
    latest_offset,
    (latest_offset - current_offset) AS lag
FROM rw_kafka_job_lag
WHERE source_name = 'agent_events_stream';

An Important Architectural Note on Write Patterns

RisingWave is optimized for streaming analytics: continuous reads against materialized views that stay fresh as event streams flow in. It is not designed as a high-frequency OLTP write store. If your agents generate thousands of state mutations per second per session and require strict ACID transaction guarantees on individual writes (row-level locking, serializable isolation), you should use PostgreSQL for the write side and RisingWave for the read side.

In that architecture, agent events are written to a PostgreSQL agent_events table. RisingWave ingests that table via CDC (Change Data Capture) using its native PostgreSQL CDC connector, with no code changes required on the write side. The materialized views in RisingWave compute current state continuously from the CDC stream.

This hybrid pattern gives you:

  • PostgreSQL for ACID-safe, high-frequency writes
  • RisingWave for sub-second aggregations, joins, and materialized views that scale independently of the write load
  • A single SQL interface for both, since both speak PostgreSQL wire protocol

For most agent workloads, event rates are modest enough that writing directly to RisingWave is straightforward. The hybrid pattern becomes relevant at very high write rates or when existing operational databases are already the system of record for agent events.

FAQ

<script type="application/ld+json">
{
  "@context": "https://schema.org",
  "@type": "FAQPage",
  "mainEntity": [
    {
      "@type": "Question",
      "name": "Why is SQLite not suitable for production AI agent state management?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "SQLite stores data as a local file on the host machine. When you run multiple agent instances behind a load balancer, each instance has its own SQLite file with no shared access. A user session that starts on instance 1 becomes invisible to instance 2. SQLite also has no support for concurrent writes from multiple processes, no network protocol for remote access, and no streaming or aggregation layer. It is suitable for single-process development and testing but not for production multi-instance deployments."
      }
    },
    {
      "@type": "Question",
      "name": "What is the difference between a materialized view and a cached query for agent state?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "A cached query stores a static snapshot that must be explicitly invalidated or refreshed on a timer. If an agent writes a new event and then immediately reads its current state from a cache, it may see stale results until the cache TTL expires. A streaming materialized view in RisingWave is incrementally maintained: every insert into agent_events triggers a precise update to the affected rows in the view, with no TTL and no manual refresh. The agent always reads the result of the latest event, typically within milliseconds."
      }
    },
    {
      "@type": "Question",
      "name": "Can I use SQLAlchemy or Django ORM with RisingWave?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "Yes. RisingWave implements the PostgreSQL wire protocol, so any library that connects to PostgreSQL works with RisingWave without modification. SQLAlchemy, Django ORM, psycopg2, asyncpg, and JDBC all connect using the standard PostgreSQL connection string pointing at port 4566. Note that RisingWave does not support CREATE TRIGGER, stored procedures, or CREATE EXTENSION, so schema migration tools that rely on those features may need adjustment."
      }
    },
    {
      "@type": "Question",
      "name": "How does the event-log model support agent debugging?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "Because agent_events is an append-only log, you have a complete, immutable history of every decision, tool call, and message for every session. When an agent makes a bad decision, you can replay the exact sequence of events it saw to reproduce the issue. You can also query the log with SQL to find patterns: how often agents escalate, which tool calls produce errors, which sessions take the most steps to resolve. This observability is impossible when state is stored as mutable in-memory objects or a single-row status record that gets overwritten on each update."
      }
    },
    {
      "@type": "Question",
      "name": "When should I use PostgreSQL for writes and RisingWave for reads?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "Use the hybrid write-to-PostgreSQL, read-from-RisingWave pattern when your agents generate very high event rates (thousands of events per second across all sessions), when you need strict serializable transaction isolation on individual writes, or when an existing PostgreSQL database is already the system of record for agent events. RisingWave ingests the PostgreSQL change stream via its native CDC connector. For most agent workloads with moderate event rates, writing directly to RisingWave is simpler and sufficient."
      }
    }
  ]
}
</script>

What is a streaming database for agent state?

A streaming database is a database that continuously ingests event streams and maintains query results incrementally as new data arrives. For AI agent state management, it serves as a read layer that always reflects the latest state derived from the event log. RisingWave is PostgreSQL-compatible, so agents connect with psycopg2, SQLAlchemy, or any standard PostgreSQL client.

What is the difference between a materialized view and a cached query for agent state?

A cached query stores a static snapshot that must be explicitly invalidated or refreshed on a timer. A streaming materialized view in RisingWave is incrementally maintained: every insert into agent_events triggers a precise update to the affected rows in the view, with no TTL and no manual refresh.

Can I use SQLAlchemy or Django ORM with RisingWave?

Yes. RisingWave implements the PostgreSQL wire protocol. SQLAlchemy, Django ORM, psycopg2, asyncpg, and JDBC all connect using the standard PostgreSQL connection string. Note that RisingWave does not support CREATE TRIGGER, stored procedures, or CREATE EXTENSION.

How does the event-log model support debugging?

Because agent_events is an append-only log, you have a complete, immutable history of every decision, tool call, and message. When an agent makes a bad decision, you can replay the exact sequence of events it saw. You can also write SQL queries to find patterns across sessions: escalation rates, error-prone tool calls, high-latency decision paths.

When should I use PostgreSQL for writes and RisingWave for reads?

Use the hybrid pattern when your agents generate very high event rates (thousands of events per second), when you need strict serializable transaction isolation, or when an existing PostgreSQL database is already the system of record. RisingWave ingests the PostgreSQL change stream via its native CDC connector with no code changes on the write side.

Conclusion

The state management problem for AI agents is fundamentally a database problem. Agents that rely on SQLite or in-memory dicts cannot scale horizontally, cannot survive restarts gracefully, and cannot be debugged when things go wrong.

A two-layer architecture solves this cleanly. The write layer is an append-only event log in a PostgreSQL-compatible database. The read layer is a set of continuously maintained materialized views that project current state from that log. The agent always reads from the views and writes to the log. Multiple instances share the same database and see the same state.

RisingWave handles this architecture natively. Its PostgreSQL wire compatibility means zero changes to existing Python or Java code. Its streaming materialized views mean current agent state is always fresh, not scheduled. Its system tables make the pipeline observable without additional instrumentation.

If you are moving an agent from prototype to production, persistent state management is the first infrastructure problem you will face. Starting with the right database model makes everything downstream easier.


Try it on RisingWave Cloud. Free tier available, no credit card required. Sign up here.

Join our Slack community to discuss agent architectures and share what you are building.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.