AI agents are only as useful as the data they can reach. An agent that cannot query your database cannot answer questions about your product, your users, or your infrastructure. For years, every team that wanted AI agents to read from a database had to build its own connector: a bespoke function that called the database, formatted results, and returned them to the model. Multiply that by every database, every framework, and every model, and you get an explosion of custom glue code.
The Model Context Protocol (MCP) is Anthropic's open standard for solving this problem. It defines a universal interface that lets any AI agent discover and call external tools, including database queries, without custom per-integration code. This guide explains what MCP is, how it works at the protocol level, and how to expose a streaming database as MCP tools so AI agents always query fresh, real-time data.
By the end, you will have a working Python MCP server that connects to RisingWave and makes continuously updated materialized views available to any MCP-compatible AI agent.
What Is the Model Context Protocol?
The Model Context Protocol (MCP) is an open-source specification created by Anthropic in late 2024 for connecting AI applications to external data sources and tools. It defines a client-server architecture with a standardized message format built on JSON-RPC 2.0.
Before MCP, integrating an AI agent with a database looked like this: you wrote a custom function in your agent code, hardcoded the database connection, formatted the query result as a string, and added it to the model's context. Every database required its own implementation. Every model required its own integration. The ecosystem was fragmented.
MCP replaces that pattern with a single standard:
- MCP servers expose capabilities (tools, resources, prompts) over a defined protocol
- MCP clients are embedded in AI hosts (Claude Desktop, VS Code Copilot, custom agent frameworks)
- MCP hosts (applications like Claude or Cursor) create client connections and orchestrate calls
The analogy that circulates in the developer community is apt: MCP is the USB-C of AI integration. You build the server once, and it works with any host that speaks the protocol. By early 2026, the SDK had reached over 97 million monthly downloads, and all major AI platforms, including OpenAI, Google, and Microsoft, had adopted it.
The Three MCP Primitives
MCP defines three types of capabilities an MCP server can expose:
Tools are executable functions the AI agent can invoke. Think of them like POST endpoints: the agent calls them with arguments, they do something (query a database, call an API), and return a result. Tools are the primary mechanism for database access.
Resources are read-only data sources. They behave more like GET endpoints: the agent reads structured content such as file contents, database schemas, or configuration documents. Resources are useful for providing context the model needs before it can reason, not for interactive queries.
Prompts are reusable templates that structure how the agent interacts with the model. They are less relevant for database integration but useful for standardizing complex multi-step reasoning patterns.
For database integration, you will mostly work with tools.
Transport Mechanisms
MCP supports two transport mechanisms:
- STDIO: The MCP server runs as a local subprocess. The host communicates via standard input and output. This is the default for local development and tools like Claude Desktop.
- Streamable HTTP: The MCP server runs as a remote HTTP service with Server-Sent Events for streaming. This is the right transport for production deployments where the server needs to be shared across multiple agents or run in the cloud.
For this guide, we use STDIO, which requires no network configuration and works out of the box with Claude Desktop.
Why Databases Need Special Consideration in MCP
Exposing a database through MCP is straightforward in concept but has a few practical decisions that shape the outcome.
Stale Data Is a Silent Failure
Most databases return data from when you last inserted or updated it. If your AI agent queries a table of order statuses and the last batch job ran two hours ago, the agent confidently answers based on stale state. The user sees a wrong answer and has no way to know the data was outdated.
This is where a streaming database changes the equation. RisingWave ingests continuous event streams from sources like Kafka or database CDC, and maintains materialized views that update incrementally as each new event arrives. When an MCP tool queries a materialized view, it reads the current computed result. Not a batch from two hours ago. The current result, updated milliseconds after the latest event.
The Curated Tool Approach vs. Pass-Through SQL
There are two philosophies for exposing a database through MCP:
Pass-through SQL: Expose a single tool that accepts any SQL query and executes it. Simple to build, but the model has to invent every query from scratch. The model can easily write a query that returns too much data, runs too slowly, or joins the wrong tables.
Curated tools: Expose one function per logical question. get_active_users_last_hour(), get_error_rate_by_tier(), get_top_events_by_source(). Each function has a descriptive docstring that tells the model what it answers and when to use it. The model picks the right tool for the question rather than constructing a query.
The curated approach produces more reliable agent behavior because the model does not need to know your schema. It reads the tool descriptions and calls the appropriate function. This guide builds curated tools while also including a fallback read-only SQL tool for ad-hoc questions.
Schema as a Resource
Before an agent can query a database intelligently, it helps to know what is in it. MCP resources let you expose the database schema as a structured document the agent reads at the start of a session. This is particularly useful when you have many tables and the agent needs to understand the data model before choosing which tool to call.
Setting Up RisingWave as a Streaming Data Source
RisingWave is wire-compatible with PostgreSQL, which means every PostgreSQL client library connects to it without modification. You use psycopg2, asyncpg, or any PostgreSQL driver. This is the right foundation for an MCP server because all existing PostgreSQL tooling works immediately.
Start RisingWave Locally
If you do not have RisingWave running, install and start it:
# macOS
brew install risingwavelabs/risingwave/risingwave
risingwave single-node --store-directory /tmp/risingwave-data &
Verify the connection:
psql -h localhost -p 4566 -U root -d dev -c "SELECT version();"
version
------------------------------------------------
PostgreSQL 13.14.0-RisingWave-2.8.0 (Homebrew)
(1 row)
Create Tables and Insert Sample Data
We will use an event tracking scenario: users generating events from different sources (API, web, CLI, SDK). The AI agent will answer questions like "which users have errors?" and "what is the error rate by plan tier?"
CREATE TABLE mcp_events (
event_id BIGINT,
user_id VARCHAR,
event_type VARCHAR,
source VARCHAR,
payload VARCHAR,
created_at TIMESTAMPTZ
);
CREATE TABLE mcp_users (
user_id VARCHAR PRIMARY KEY,
name VARCHAR,
tier VARCHAR,
region VARCHAR
);
Insert representative rows:
INSERT INTO mcp_users VALUES
('u_001', 'Alice Johnson', 'pro', 'us-east'),
('u_002', 'Bob Smith', 'free', 'us-west'),
('u_003', 'Carol Lee', 'enterprise', 'eu-west'),
('u_004', 'Dave Park', 'pro', 'ap-south');
INSERT INTO mcp_events VALUES
(1, 'u_001', 'query', 'api', 'SELECT count(*) FROM orders', NOW() - INTERVAL '10 minutes'),
(2, 'u_001', 'query', 'web', 'SELECT * FROM products WHERE price > 100', NOW() - INTERVAL '8 minutes'),
(3, 'u_001', 'error', 'api', 'connection timeout', NOW() - INTERVAL '5 minutes'),
(4, 'u_002', 'query', 'api', 'SELECT sum(amount) FROM payments', NOW() - INTERVAL '20 minutes'),
(5, 'u_002', 'query', 'cli', 'SHOW TABLES', NOW() - INTERVAL '15 minutes'),
(6, 'u_003', 'query', 'api', 'SELECT * FROM events LIMIT 100', NOW() - INTERVAL '3 minutes'),
(7, 'u_003', 'query', 'api', 'CREATE MATERIALIZED VIEW mv1 AS ...', NOW() - INTERVAL '2 minutes'),
(8, 'u_003', 'insert', 'sdk', 'INSERT INTO stream_data VALUES (...)', NOW() - INTERVAL '1 minute'),
(9, 'u_004', 'query', 'web', 'SELECT * FROM sensor_readings', NOW() - INTERVAL '30 minutes'),
(10, 'u_004', 'error', 'api', 'permission denied', NOW() - INTERVAL '25 minutes');
Create Materialized Views
These are the pre-computed results the MCP tools will read. RisingWave updates them incrementally as new events arrive.
User activity summary - rolling 1-hour window:
CREATE MATERIALIZED VIEW mcp_user_activity_summary AS
SELECT
e.user_id,
u.name,
u.tier,
u.region,
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE e.event_type = 'query') AS query_count,
COUNT(*) FILTER (WHERE e.event_type = 'error') AS error_count,
MAX(e.created_at) AS last_active_at
FROM mcp_events e
JOIN mcp_users u ON e.user_id = u.user_id
WHERE e.created_at > NOW() - INTERVAL '1 hour'
GROUP BY e.user_id, u.name, u.tier, u.region;
Query the view to verify the output:
SELECT * FROM mcp_user_activity_summary ORDER BY total_events DESC;
user_id | name | tier | region | total_events | query_count | error_count | last_active_at
---------+---------------+------------+----------+--------------+-------------+-------------+-------------------------------
u_003 | Carol Lee | enterprise | eu-west | 3 | 2 | 0 | 2026-04-01 07:26:14.809+00:00
u_001 | Alice Johnson | pro | us-east | 3 | 2 | 1 | 2026-04-01 07:22:14.809+00:00
u_004 | Dave Park | pro | ap-south | 2 | 1 | 1 | 2026-04-01 07:02:14.809+00:00
u_002 | Bob Smith | free | us-west | 2 | 2 | 0 | 2026-04-01 07:12:14.809+00:00
(4 rows)
Error rate by plan tier:
CREATE MATERIALIZED VIEW mcp_error_rate_by_tier AS
SELECT
u.tier,
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE e.event_type = 'error') AS error_count,
ROUND(
(COUNT(*) FILTER (WHERE e.event_type = 'error')
* 100.0 / NULLIF(COUNT(*), 0))::NUMERIC,
1
) AS error_rate_pct
FROM mcp_events e
JOIN mcp_users u ON e.user_id = u.user_id
WHERE e.created_at > NOW() - INTERVAL '1 hour'
GROUP BY u.tier;
SELECT * FROM mcp_error_rate_by_tier ORDER BY error_rate_pct DESC;
tier | total_events | error_count | error_rate_pct
------------+--------------+-------------+----------------
pro | 5 | 2 | 40.0
free | 2 | 0 | 0
enterprise | 3 | 0 | 0
(3 rows)
Event volume broken down by source:
CREATE MATERIALIZED VIEW mcp_source_breakdown AS
SELECT
source,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users,
COUNT(*) FILTER (WHERE event_type = 'error') AS error_count
FROM mcp_events
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY source;
SELECT * FROM mcp_source_breakdown ORDER BY event_count DESC;
source | event_count | unique_users | error_count
--------+-------------+--------------+-------------
api | 6 | 4 | 2
web | 2 | 2 | 0
sdk | 1 | 1 | 0
cli | 1 | 1 | 0
(4 rows)
All three materialized views update automatically as new rows arrive in mcp_events. The temporal filter WHERE created_at > NOW() - INTERVAL '1 hour' acts as a sliding window: events older than one hour fall out of the view automatically. No batch jobs, no manual cleanup.
Building the MCP Server in Python
With the data model in place, the next step is a Python MCP server that exposes these views as tools. Install the dependencies:
pip install mcp psycopg2-binary
The MCP Python SDK's FastMCP class lets you register tools with a decorator. Each function becomes a tool the AI agent can discover and call.
#!/usr/bin/env python3
"""
MCP server exposing RisingWave streaming views as agent tools.
Run with: python risingwave_mcp_server.py
"""
import json
import psycopg2
import psycopg2.extras
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("risingwave-analytics")
DB_CONFIG = {
"host": "localhost",
"port": 4566,
"user": "root",
"dbname": "dev",
}
def _run_query(sql: str, params=None) -> list[dict]:
"""Execute a read-only query and return rows as a list of dicts."""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(sql, params)
rows = cur.fetchall()
cur.close()
conn.close()
return [dict(r) for r in rows]
@mcp.tool()
def get_user_activity_summary() -> str:
"""Return a real-time summary of user activity over the past hour.
Each row contains the user's name, plan tier, region, total event count,
query count, error count, and timestamp of their most recent event.
Use this when asked about active users, who is using the product,
or which users have errors.
"""
rows = _run_query(
"SELECT * FROM mcp_user_activity_summary ORDER BY total_events DESC"
)
return json.dumps(rows, default=str)
@mcp.tool()
def get_error_rate_by_tier() -> str:
"""Return error rates grouped by plan tier over the past hour.
Each row contains the tier name, total event count, error count, and
error rate as a percentage. Use this when asked about reliability by plan,
which tier has the most errors, or support escalation risk.
"""
rows = _run_query(
"SELECT * FROM mcp_error_rate_by_tier ORDER BY error_rate_pct DESC"
)
return json.dumps(rows, default=str)
@mcp.tool()
def get_source_breakdown() -> str:
"""Return event volume and error count broken down by integration source.
Sources are: api, web, sdk, cli. Use this when asked about which
integration surface is most active, or where errors originate.
"""
rows = _run_query(
"SELECT * FROM mcp_source_breakdown ORDER BY event_count DESC"
)
return json.dumps(rows, default=str)
@mcp.tool()
def query_risingwave(sql: str) -> str:
"""Execute a read-only SQL SELECT query against RisingWave.
Use this for ad-hoc questions not answered by the other tools.
Only SELECT statements are permitted. Available tables and views:
mcp_events, mcp_users, mcp_user_activity_summary,
mcp_error_rate_by_tier, mcp_source_breakdown.
"""
stripped = sql.strip().upper()
if not stripped.startswith("SELECT") and not stripped.startswith("SHOW"):
return json.dumps({"error": "Only SELECT queries are allowed."})
rows = _run_query(sql)
return json.dumps(rows, default=str)
@mcp.resource("risingwave://schema")
def get_schema() -> str:
"""Database schema for the RisingWave analytics instance.
Lists all tables and materialized views with their columns and types.
Read this resource to understand the data model before writing queries.
"""
rows = _run_query("""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name LIKE 'mcp_%'
ORDER BY table_name, ordinal_position
""")
return json.dumps(rows, default=str)
if __name__ == "__main__":
mcp.run(transport="stdio")
A few things worth noting about this implementation:
Docstrings are tool descriptions. The text in each function's docstring becomes the description the AI model reads when deciding which tool to call. Write them as answers to the question "when would I use this?" not "what does this do?" The model reads tool descriptions before it sees any data, so a clear description is the single biggest factor in reliable tool selection.
The schema resource is a complement to tools. The @mcp.resource decorator exposes the database schema as a named resource. The agent can read it with resources/read risingwave://schema to understand the data model before writing ad-hoc SQL. This is particularly useful when an agent is new to the database or when you expose many tables.
Connection-per-call is fine for low volume. For production workloads, replace the per-call connections with psycopg2.pool.ThreadedConnectionPool. For this guide, one connection per query call keeps the code simple and easy to follow.
Connecting to Claude Desktop
Save the server script to a path on your machine, then register it in Claude Desktop's configuration file.
On macOS, the configuration file is at ~/Library/Application Support/Claude/claude_desktop_config.json. Open it and add:
{
"mcpServers": {
"risingwave-analytics": {
"command": "python",
"args": ["/path/to/risingwave_mcp_server.py"]
}
}
}
Restart Claude Desktop. Open a new conversation and ask: "Which users have errors in the past hour?" Claude discovers the tools via tools/list, identifies get_user_activity_summary as the right tool from its description, calls it, and returns a human-readable answer based on the live data from RisingWave.
Connecting to VS Code Copilot
For VS Code, create a .vscode/mcp.json file in your project:
{
"servers": {
"risingwave-analytics": {
"type": "stdio",
"command": "python",
"args": ["/path/to/risingwave_mcp_server.py"]
}
}
}
After saving, VS Code Copilot in agent mode discovers the tools automatically. You can ask it questions about your data directly from the editor, and it will call the appropriate tool against your live RisingWave instance.
TypeScript Alternative: Using the Official MCP SDK
If your team prefers TypeScript, the MCP TypeScript SDK provides an equivalent Server class. Here is the equivalent of the Python server above:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import pg from "pg";
const pool = new pg.Pool({
host: "localhost",
port: 4566,
user: "root",
database: "dev",
});
async function runQuery(sql: string): Promise<Record<string, unknown>[]> {
const client = await pool.connect();
try {
const result = await client.query(sql);
return result.rows;
} finally {
client.release();
}
}
const server = new Server(
{ name: "risingwave-analytics", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "get_user_activity_summary",
description:
"Return a real-time summary of user activity over the past hour. " +
"Use this when asked about active users, errors, or who is using the product.",
inputSchema: { type: "object", properties: {} },
},
{
name: "get_error_rate_by_tier",
description:
"Return error rates grouped by plan tier. " +
"Use this when asked about reliability by plan or support escalation risk.",
inputSchema: { type: "object", properties: {} },
},
{
name: "get_source_breakdown",
description:
"Return event volume broken down by integration source (api, web, sdk, cli).",
inputSchema: { type: "object", properties: {} },
},
],
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const queryMap: Record<string, string> = {
get_user_activity_summary:
"SELECT * FROM mcp_user_activity_summary ORDER BY total_events DESC",
get_error_rate_by_tier:
"SELECT * FROM mcp_error_rate_by_tier ORDER BY error_rate_pct DESC",
get_source_breakdown:
"SELECT * FROM mcp_source_breakdown ORDER BY event_count DESC",
};
const sql = queryMap[request.params.name];
if (!sql) {
throw new Error(`Unknown tool: ${request.params.name}`);
}
const rows = await runQuery(sql);
return {
content: [{ type: "text", text: JSON.stringify(rows, null, 2) }],
};
});
const transport = new StdioServerTransport();
await server.connect(transport);
Install dependencies with npm install @modelcontextprotocol/sdk pg, compile with TypeScript, and point your Claude Desktop or VS Code configuration at the compiled output. The behavior is identical to the Python version: the agent discovers the three tools, calls the appropriate one, and gets fresh data from RisingWave's materialized views.
How the Full Request Flow Works
When an AI agent receives a question, it goes through a specific sequence before returning an answer:
- tools/list: The agent (MCP host) sends a
tools/listrequest to the server at connection time. The server returns the list of available tools with their names, descriptions, and input schemas. - Model selects tool: The language model reads the tool list and picks the best match for the user's question based on the descriptions.
- tools/call: The agent sends a
tools/callrequest with the tool name and any arguments. - Server executes query: The MCP server runs the corresponding SELECT against RisingWave's materialized view.
- Result returned: RisingWave returns the pre-computed result in sub-millisecond time. The MCP server formats it as JSON and sends it back.
- Agent answers: The model uses the tool result as context and generates a natural language answer.
The key performance characteristic: step 4 is a point read against a pre-computed materialized view. RisingWave does not run the aggregation when the agent asks. It returns the result that has been maintained continuously since the view was created. This is what makes streaming materialized views the right foundation for agent tools: the aggregation cost is paid upfront, once, and the agent always reads a pre-computed answer.
Production Considerations
Connection Pooling
The single-connection-per-call pattern in the examples works for local development and low traffic. For production, use a connection pool:
from psycopg2 import pool
connection_pool = pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
host="localhost",
port=4566,
user="root",
dbname="dev",
)
def _run_query(sql: str, params=None) -> list[dict]:
conn = connection_pool.getconn()
try:
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(sql, params)
return [dict(r) for r in cur.fetchall()]
finally:
connection_pool.putconn(conn)
This reuses connections across calls, which is important when many agents or sessions query the server simultaneously.
Access Control
Use a read-only database user for the MCP server. In RisingWave (PostgreSQL-compatible):
CREATE USER mcp_reader WITH PASSWORD 'strong_password';
GRANT SELECT ON mcp_user_activity_summary TO mcp_reader;
GRANT SELECT ON mcp_error_rate_by_tier TO mcp_reader;
GRANT SELECT ON mcp_source_breakdown TO mcp_reader;
This ensures the MCP server can read data but cannot modify tables, drop views, or access unrelated schemas.
Using Streamable HTTP Transport for Remote Deployment
STDIO works only when the MCP server runs on the same machine as the AI host. For remote deployments, switch to Streamable HTTP:
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8080)
Then point your MCP client at http://your-server:8080/mcp. Add bearer token authentication at the HTTP layer using a reverse proxy or the SDK's built-in auth hooks before exposing this externally.
Monitoring the Streaming Pipeline
Because RisingWave is the data backend, you can also expose monitoring views as MCP tools. For example, a view that tracks how many events have arrived in the last minute gives the agent a way to detect if the streaming pipeline has stalled. See How to Monitor and Debug Streaming SQL Pipelines for a walkthrough of the key metrics to track.
Frequently Asked Questions
What is the Model Context Protocol (MCP)?
The Model Context Protocol is an open standard created by Anthropic that defines how AI applications connect to external data sources and tools. It uses a client-server architecture where MCP servers expose tools, resources, and prompts, and MCP clients (embedded in AI applications like Claude Desktop or VS Code Copilot) discover and invoke them. The protocol uses JSON-RPC 2.0 over STDIO or HTTP transport.
How is MCP different from function calling or tool use in the OpenAI API?
Function calling is a per-model feature: you define tools in your API request, and the model returns a structured call to one of them. MCP is a standalone protocol: you build a server once, and any MCP-compatible host can connect to it, regardless of which underlying model or API it uses. MCP tools are also server-hosted and reusable across sessions, whereas function definitions in the OpenAI API are typically inlined per request.
Why use a streaming database instead of a regular PostgreSQL database as the MCP backend?
A regular PostgreSQL database returns data as of the last INSERT or UPDATE. If your agent queries a table that is refreshed by a batch job every 30 minutes, the agent works with data that is up to 30 minutes stale. A streaming database like RisingWave updates materialized views incrementally as each new event arrives, so the agent always reads the current state. This matters for any use case where the data changes faster than your batch job runs: activity monitoring, error detection, operational dashboards. For a deeper comparison, see Building AI Agent Memory with a Streaming Database.
Can I use the existing PostgreSQL MCP server with RisingWave?
Yes. Because RisingWave is wire-compatible with PostgreSQL, any existing PostgreSQL MCP server connects to it without modification. Point the PostgreSQL MCP server's connection string at postgresql://root@localhost:4566/dev and it works. The advantage of building a custom server (as shown in this guide) is that you can expose curated, named tools with descriptive docstrings rather than giving the agent raw access to the schema.
Conclusion
The Model Context Protocol gives AI agents a standardized way to discover and call external tools. For database integration, it eliminates the custom connector problem: you build one MCP server, and it works with Claude Desktop, VS Code Copilot, and any other MCP-compatible host.
The key takeaways from this guide:
- MCP defines three primitives: tools (executable functions), resources (read-only context), and prompts (reusable templates). For database access, tools are the primary mechanism.
- Curated tools outperform pass-through SQL because the model selects tools based on descriptions, not by writing queries. Well-named tools with clear docstrings produce more reliable agent behavior.
- RisingWave's PostgreSQL compatibility means any PostgreSQL driver connects to it, and any PostgreSQL MCP server works with it out of the box. No custom adapter required.
- Materialized views are the right target for MCP tools because they pre-compute results. When the agent calls a tool, it reads a pre-computed answer in sub-millisecond time rather than waiting for a fresh aggregation.
- Production deployments need connection pooling, a read-only database user, and Streamable HTTP transport for remote access.
For a deeper look at how streaming databases support AI agents beyond MCP, including session context, user preference aggregation, and temporal filters, see Streaming SQL for AI: From Raw Events to Agent-Ready Context.
Ready to build your first MCP server with RisingWave? Get started in 5 minutes. Quickstart
Join our Slack community to ask questions and connect with other stream processing developers.

