AI agents are only as useful as the data they can access. Most agent frameworks connect to static databases or REST APIs, which means the agent works with stale snapshots rather than live state. If you are building an agent that monitors infrastructure, manages orders, or responds to customer behavior, it needs data that reflects what is happening right now.
The Model Context Protocol (MCP) is an open standard that solves the integration side of this problem: it gives AI agents a universal way to discover and call external tools, including database queries. A streaming database like RisingWave solves the data freshness side: it continuously processes incoming events and maintains always-up-to-date materialized views. Because RisingWave speaks the PostgreSQL wire protocol, connecting it to MCP requires no special adapters or custom middleware.
This article walks through the architecture, shows you how to expose RisingWave materialized views as MCP tools, and provides working SQL and Python examples you can run locally.
What Is MCP and Why Does It Matter for AI Agents?
The Model Context Protocol is an open-source standard created by Anthropic for connecting AI applications to external data sources and tools. Think of it as a USB-C port for AI: a single, standardized interface that any AI application can use to talk to any compatible server.
MCP defines three core primitives:
- Tools: Executable functions an AI agent can invoke (for example, querying a database or calling an API)
- Resources: Read-only data sources that provide context (file contents, database schemas, configuration)
- Prompts: Reusable templates that help structure how the agent interacts with the language model
The architecture follows a client-server pattern. An MCP host (the AI application, such as Claude Desktop or a custom agent) creates one MCP client per server connection. Each MCP server exposes tools, resources, or prompts that the host can discover at runtime and invoke as needed.
graph LR
A[AI Agent / MCP Host] --> B[MCP Client]
B --> C[MCP Server: RisingWave]
B --> D[MCP Server: File System]
B --> E[MCP Server: Slack API]
C --> F[(RisingWave DB)]
This matters because it eliminates the need to write custom integration code for every data source. Before MCP, connecting an agent to a new database meant building a bespoke plugin. With MCP, you build one server and it works with Claude Desktop, VS Code Copilot, ChatGPT, Cursor, and any other MCP-compatible host.
By March 2026, MCP SDKs had reached over 97 million monthly downloads, with adoption from OpenAI, Google, Microsoft, and AWS. It has become the de facto standard for agentic AI integration.
Why a Streaming Database Is the Right Data Source for Agents
A traditional database stores data at rest. You insert rows, and the data sits there until someone queries it. A streaming database like RisingWave does something fundamentally different: it ingests continuous data streams (from Kafka, Kinesis, or database CDC) and maintains materialized views that are incrementally updated as new events arrive.
This distinction matters for AI agents in three ways:
1. Always-fresh results. When an agent queries a materialized view in RisingWave, it gets the latest computed result, not data from the last batch job. If a customer placed an order 200 milliseconds ago, the agent sees it.
2. Pre-computed answers. Materialized views store the result of a query, not just raw data. An agent asking "what is the average order value by region?" does not trigger a full table scan. It reads a pre-computed result that updates continuously. This means sub-millisecond response times for queries that would otherwise take seconds.
3. SQL interface. RisingWave is wire-compatible with PostgreSQL. Any tool that connects to PostgreSQL, including every existing PostgreSQL MCP server, works with RisingWave out of the box. You do not need a specialized connector.
graph LR
K[Kafka / Kinesis / CDC] -->|Streaming ingestion| RW[(RisingWave)]
RW -->|Incremental updates| MV[Materialized Views]
MV -->|PostgreSQL protocol| MCP[MCP Server]
MCP -->|Tools & Resources| Agent[AI Agent]
This architecture gives agents something that static databases cannot: a continuously updated, queryable view of the world that responds in milliseconds.
Setting Up RisingWave as an MCP Data Source
Step 1: Start RisingWave
If you do not have RisingWave running locally, install and start it:
# macOS (Homebrew)
brew install risingwavelabs/risingwave/risingwave
risingwave single-node --store-directory /tmp/risingwave-data &
# Or use Docker
docker run -d --name risingwave -p 4566:4566 risingwavelabs/risingwave:latest
Verify the connection using any PostgreSQL client:
psql -h localhost -p 4566 -U root -d dev -c "SELECT version();"
version
------------------------------------------------
PostgreSQL 13.14.0-RisingWave-2.8.0 (Homebrew)
(1 row)
RisingWave listens on port 4566 and accepts standard PostgreSQL connections. Any library, driver, or tool that works with PostgreSQL works here: psycopg2, JDBC, Go's pgx, Node's pg, or the psql CLI.
Step 2: Create Tables and Materialized Views
Let's set up a realistic scenario: an e-commerce system where an AI agent needs to answer questions about orders and customers.
-- Customers table
CREATE TABLE customers (
customer_id INT PRIMARY KEY,
name VARCHAR,
region VARCHAR,
signup_date TIMESTAMP
);
-- Orders table
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
product VARCHAR,
amount DECIMAL,
status VARCHAR,
order_time TIMESTAMP
);
Now create materialized views that pre-compute the answers an agent might need:
-- Revenue metrics by region, updated continuously
CREATE MATERIALIZED VIEW avg_order_value_by_region AS
SELECT
c.region,
COUNT(o.order_id) AS total_orders,
ROUND(AVG(o.amount)::NUMERIC, 2) AS avg_order_value,
ROUND(SUM(o.amount)::NUMERIC, 2) AS total_revenue
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.status = 'completed'
GROUP BY c.region;
-- High-value customers, always current
CREATE MATERIALIZED VIEW high_value_customers AS
SELECT
c.customer_id,
c.name,
c.region,
COUNT(o.order_id) AS order_count,
ROUND(SUM(o.amount)::NUMERIC, 2) AS total_spent
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
WHERE o.status = 'completed'
GROUP BY c.customer_id, c.name, c.region
HAVING SUM(o.amount) > 500;
-- Order pipeline status, real-time
CREATE MATERIALIZED VIEW order_status_summary AS
SELECT
status,
COUNT(*) AS order_count,
ROUND(SUM(amount)::NUMERIC, 2) AS total_amount
FROM orders
GROUP BY status;
Query the views to confirm they work:
SELECT * FROM avg_order_value_by_region ORDER BY total_revenue DESC;
region | total_orders | avg_order_value | total_revenue
---------+--------------+-----------------+---------------
us-east | 4 | 412.49 | 1649.96
us-west | 2 | 524.99 | 1049.98
eu-west | 1 | 49.99 | 49.99
(3 rows)
SELECT * FROM high_value_customers ORDER BY total_spent DESC;
customer_id | name | region | order_count | total_spent
-------------+------------------+---------+-------------+-------------
5 | Stark Industries | us-east | 2 | 1299.98
3 | Initech | us-west | 2 | 1049.98
(2 rows)
SELECT * FROM order_status_summary ORDER BY order_count DESC;
status | order_count | total_amount
-----------+-------------+--------------
completed | 7 | 2749.93
pending | 2 | 599.98
cancelled | 1 | 299.99
(3 rows)
These materialized views update automatically whenever new rows are inserted into the orders or customers tables. There is no cron job, no refresh command, no lag. The agent always reads current data.
Step 3: Install the RisingWave MCP Server
RisingWave provides an official MCP server that exposes over 100 tools for querying, managing schemas, monitoring streaming jobs, and more.
git clone https://github.com/risingwavelabs/risingwave-mcp.git
cd risingwave-mcp
pip install -r requirements.txt
Configure the connection to your RisingWave instance using environment variables:
export RISINGWAVE_CONNECTION_STR="postgresql://root:root@localhost:4566/dev"
Exposing Materialized Views as MCP Tools
The RisingWave MCP server automatically discovers all tables, materialized views, sources, and sinks in your database. When an AI agent connects, it can list schemas, describe tables, and run queries through MCP tool calls.
But you can also build a focused MCP server that exposes specific materialized views as dedicated tools. This approach gives you finer control over what the agent can access and how it interprets the results.
Here is a Python example using the mcp SDK and psycopg2:
import json
import psycopg2
from mcp.server.fastmcp import FastMCP
# Initialize the MCP server
mcp = FastMCP("risingwave-analytics")
# Database connection
def get_connection():
return psycopg2.connect(
host="localhost",
port=4566,
user="root",
dbname="dev"
)
@mcp.tool()
def get_revenue_by_region() -> str:
"""Get current revenue metrics broken down by region.
Returns total orders, average order value, and total revenue
for each region, computed from completed orders only."""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT * FROM avg_order_value_by_region
ORDER BY total_revenue DESC
""")
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
cur.close()
conn.close()
return json.dumps([dict(zip(columns, row)) for row in rows], default=str)
@mcp.tool()
def get_high_value_customers() -> str:
"""Get customers who have spent more than $500 in completed orders.
Returns customer name, region, order count, and total spent."""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT * FROM high_value_customers
ORDER BY total_spent DESC
""")
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
cur.close()
conn.close()
return json.dumps([dict(zip(columns, row)) for row in rows], default=str)
@mcp.tool()
def get_order_pipeline_status() -> str:
"""Get a real-time summary of the order pipeline.
Returns the count and total amount for each order status
(completed, pending, cancelled)."""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT * FROM order_status_summary
ORDER BY order_count DESC
""")
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
cur.close()
conn.close()
return json.dumps([dict(zip(columns, row)) for row in rows], default=str)
@mcp.tool()
def query_risingwave(sql: str) -> str:
"""Execute a read-only SQL query against RisingWave.
Use this for ad-hoc questions not covered by other tools.
Only SELECT statements are allowed."""
if not sql.strip().upper().startswith("SELECT"):
return json.dumps({"error": "Only SELECT queries are allowed"})
conn = get_connection()
cur = conn.cursor()
cur.execute(sql)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
cur.close()
conn.close()
return json.dumps([dict(zip(columns, row)) for row in rows], default=str)
if __name__ == "__main__":
mcp.run(transport="stdio")
Each @mcp.tool() function becomes a tool that the AI agent can discover and call. The docstrings are critical: they tell the agent what each tool does and when to use it. When the agent receives a question like "what region has the highest revenue?", the language model reads the tool descriptions, picks get_revenue_by_region, and calls it.
Connecting to Claude Desktop
Add the server to your Claude Desktop configuration (claude_desktop_config.json):
{
"mcpServers": {
"risingwave-analytics": {
"command": "python",
"args": ["/path/to/risingwave_mcp_server.py"],
"env": {
"RISINGWAVE_CONNECTION_STR": "postgresql://root:root@localhost:4566/dev"
}
}
}
}
Connecting to VS Code Copilot
Add to .vscode/mcp.json in your project:
{
"servers": {
"risingwave-analytics": {
"type": "stdio",
"command": "python",
"args": ["/path/to/risingwave_mcp_server.py"],
"env": {
"RISINGWAVE_CONNECTION_STR": "postgresql://root:root@localhost:4566/dev"
}
}
}
}
After configuration, the AI agent can discover your tools automatically. Ask it "what is the order pipeline status?" and it will call get_order_pipeline_status behind the scenes, returning fresh results from RisingWave's continuously updated materialized view.
Architecture: From Streaming Data to Agent Response
Here is the complete data flow from raw events to an agent answering a user's question:
sequenceDiagram
participant User
participant Agent as AI Agent (MCP Host)
participant MCP as MCP Server
participant RW as RisingWave
participant Kafka as Kafka / Data Source
Kafka->>RW: Continuous event stream
RW->>RW: Update materialized views
User->>Agent: "Which region has the most revenue?"
Agent->>MCP: tools/list (discover available tools)
MCP-->>Agent: [get_revenue_by_region, get_high_value_customers, ...]
Agent->>MCP: tools/call get_revenue_by_region
MCP->>RW: SELECT * FROM avg_order_value_by_region
RW-->>MCP: Query result (sub-ms latency)
MCP-->>Agent: JSON response with revenue data
Agent-->>User: "us-east leads with $1,649.96 in revenue from 4 orders"
This architecture has several properties worth noting:
Separation of concerns. The streaming pipeline (Kafka to RisingWave to materialized views) runs independently of the agent. The MCP server is a thin query layer that reads from materialized views. The agent knows nothing about Kafka or stream processing.
Low latency. Materialized views in RisingWave store pre-computed results. The MCP server executes a simple SELECT that returns in under a millisecond. The agent does not wait for aggregations to run.
Dynamic tool discovery. The agent discovers available tools at connection time via tools/list. If you add a new materialized view and register it as an MCP tool, the agent picks it up on the next connection without any code changes on the agent side.
Security through scoping. By wrapping each materialized view in a dedicated tool function, you control exactly what data the agent can access. The query_risingwave tool adds an extra escape hatch for ad-hoc queries, but you can omit it in production if you want to restrict the agent to pre-defined views only.
Production Considerations
Scaling the MCP Server
For production deployments, consider these patterns:
- Connection pooling: Use a connection pool (such as
psycopg2.pool.ThreadedConnectionPool) instead of creating a new connection per tool call - Read replicas: Point MCP servers at RisingWave read replicas to isolate agent query traffic from ingestion workloads
- Rate limiting: Add rate limits to the MCP server to prevent a runaway agent from overwhelming the database
Securing Agent Access
The MCP specification supports OAuth 2.1 for authentication, and enterprise identity provider integration is on the roadmap for Q2 2026. In the meantime:
- Run MCP servers locally using STDIO transport (no network exposure)
- For remote deployments, use the Streamable HTTP transport with bearer token authentication
- Restrict the SQL tool to SELECT-only queries (as shown in the example above)
- Use RisingWave's role-based access control to create a read-only database user for the MCP server
Monitoring and Observability
The RisingWave MCP server includes tools for monitoring streaming infrastructure: fragment tracking, actor monitoring, backfill progress, and cluster health checks. You can expose these as MCP tools so that an operations agent can monitor the streaming pipeline itself, not just query business data.
What Is the Model Context Protocol (MCP)?
MCP (Model Context Protocol) is an open standard for connecting AI applications to external data sources and tools. It defines a client-server architecture where AI agents discover and invoke tools exposed by MCP servers using JSON-RPC 2.0 over STDIO or HTTP transport. MCP was created by Anthropic and is now supported by all major AI platforms including OpenAI, Google, and Microsoft.
How Does RisingWave's PostgreSQL Compatibility Help with MCP Integration?
RisingWave is wire-compatible with PostgreSQL, which means any PostgreSQL client, driver, or MCP server works with it without modification. You connect to RisingWave using the same psql command, the same psycopg2 library, and the same JDBC drivers you already use with PostgreSQL. This eliminates the need for a custom connector, and it means existing PostgreSQL MCP servers (such as the official PostgreSQL MCP server) can connect to RisingWave directly.
When Should I Use a Streaming Database Instead of a Regular Database for AI Agents?
Use a streaming database when your agent needs to answer questions about data that changes continuously, such as order pipelines, user activity, sensor readings, or financial transactions. A regular database requires you to run batch refresh jobs to keep materialized views current. RisingWave updates materialized views incrementally and synchronously as new data arrives, so the agent always queries the latest state without waiting for a refresh cycle.
Can I Use the Existing RisingWave MCP Server or Do I Need to Build My Own?
RisingWave provides an official MCP server with over 100 built-in tools for querying data, managing schemas, and monitoring streaming jobs. For most use cases, this server is sufficient. Build a custom MCP server only when you need to expose a curated set of tools with specific business logic, restrict agent access to particular views, or add custom response formatting.
Conclusion
Connecting a streaming database to AI agents through MCP creates a system where agents can access continuously updated data using a standardized protocol. The key takeaways:
- MCP provides a universal integration layer that works across Claude, ChatGPT, VS Code Copilot, and other AI platforms, so you build the server once
- RisingWave's PostgreSQL compatibility means zero custom integration work. Any PostgreSQL MCP server connects to RisingWave directly
- Materialized views are the bridge between streaming data and agent queries. They pre-compute results so agents get sub-millisecond responses
- The architecture separates concerns cleanly: streaming pipeline, query layer, and agent logic are independent components
- Production readiness requires attention to connection pooling, access control, and rate limiting, but the fundamentals are straightforward
The combination of MCP's standardized tool discovery with RisingWave's always-fresh materialized views gives AI agents something they have not had before: real-time awareness of the world, accessible through a simple SQL query.
Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart
Join our Slack community to ask questions and connect with other stream processing developers.

