Introduction
You spent three weeks building a RAG pipeline. The embeddings are clean. The retrieval scores look good. The demo works perfectly. Then it goes to production, and a week later the support team starts forwarding user complaints: the assistant is recommending a product that was discontinued, quoting a refund policy that changed last month, and explaining a feature the engineering team removed in the last release.
You dig in and find the problem immediately. The indexing job runs nightly. The knowledge base is always 24 hours stale. Every document change between midnight runs is invisible to the retrieval layer.
The standard fix is to run the indexing job more often. Hourly instead of nightly. But this makes the problem smaller, not solved. You still have a gap. You still have a cron job to maintain. And you still re-embed every document in the corpus on each run, even the 99.7% that have not changed.
This tutorial shows a different approach. You will build a continuous RAG pipeline where embeddings update the moment source documents change. No scheduled jobs. No full re-indexing. The pipeline is defined entirely in SQL and runs inside RisingWave, a streaming database that maintains materialized views incrementally.
The Problem with Static RAG
A static RAG pipeline works like this: a batch job queries your document store, chunks each document, calls an embedding API, and pushes the results to a vector database. This happens once, or on a schedule. Between runs, the vector index is frozen.
The failure mode is straightforward. Your documents live in a PostgreSQL table. An editor updates a policy document at 10 AM. The next indexing job runs at midnight. For fourteen hours, every query that retrieves that document gets the old version. The LLM reads stale content and generates a confident but incorrect answer.
The deeper problem is that the batch model mismatches the nature of document updates. Most content management systems are event-driven. A document is updated, a flag changes, a status moves from draft to published. These are discrete events, not hourly intervals. Treating them as hourly is an approximation that introduces error.
There is also a cost problem. If your knowledge base has 50,000 documents and 500 change per day, a full daily reindex calls the embedding API 50,000 times. A pipeline that embeds only the 500 changed documents calls it 500 times. The streaming approach is not just fresher; it is 100x cheaper on embedding API costs.
Architecture Overview
The continuous RAG pipeline described in this tutorial has four components:
- PostgreSQL (document store): The operational database where authors create and update documents. Every INSERT, UPDATE, and DELETE is captured in real time via CDC.
- RisingWave (streaming SQL engine): Reads the CDC stream and maintains a materialized view that holds the current state of each document, including its embedding. When a document changes, only that row is recomputed.
- Vector search: The materialized view with the
vector(n)column supports similarity queries using the<=>(cosine) operator directly inside RisingWave, which is PostgreSQL wire-compatible. - LLM agent: A Python application that queries RisingWave for relevant context and passes it to the language model.
The data path:
PostgreSQL (documents) → CDC → RisingWave → document_embeddings (MV) → similarity search → LLM
No Kafka. No Debezium. No orchestration layer. RisingWave connects directly to the PostgreSQL WAL via its native CDC connector.
Step-by-Step Implementation
Step 1: Create the Documents Source Table
This table lives in PostgreSQL. Authors write to it through your CMS, content API, or directly via SQL. The updated_at column tracks when each document was last modified.
-- In PostgreSQL (your operational database)
CREATE TABLE documents (
id BIGINT PRIMARY KEY,
title TEXT,
content TEXT,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
For RisingWave to read this table via CDC, PostgreSQL needs logical replication enabled. Set wal_level = logical in postgresql.conf and create a publication:
-- In PostgreSQL
CREATE PUBLICATION risingwave_pub FOR TABLE documents;
Step 2: Connect RisingWave to PostgreSQL via CDC
In RisingWave, create a CDC source that reads from the PostgreSQL WAL. This gives you a live stream of every change to the documents table.
-- In RisingWave
CREATE SOURCE docs_cdc WITH (
connector = 'postgres-cdc',
hostname = 'postgres.internal',
port = '5432',
username = 'cdc_reader',
password = 'secure_password',
database.name = 'myapp',
slot.name = 'risingwave_docs_slot',
publication.name = 'risingwave_pub'
);
CREATE TABLE documents (
id BIGINT PRIMARY KEY,
title TEXT,
content TEXT,
updated_at TIMESTAMPTZ
) FROM docs_cdc TABLE 'public.documents';
RisingWave now mirrors the documents table. Any INSERT, UPDATE, or DELETE in PostgreSQL appears in this table within seconds.
Step 3: Create a Helper Function for Embeddings
RisingWave has a built-in openai_embedding function that calls the OpenAI embedding API directly from SQL. Wrap it in a user-defined function to keep downstream SQL clean.
CREATE FUNCTION embed(t VARCHAR) RETURNS REAL[]
LANGUAGE sql AS $$
SELECT openai_embedding('sk-your-openai-key', 'text-embedding-3-small', t)
$$;
This function takes a text string and returns a REAL[] array of 1536 floats (the dimension for text-embedding-3-small). You can call it in any SQL statement, including inside materialized view definitions.
Step 4: Create the Materialized View That Auto-Embeds Documents
This is the core of the pipeline. The materialized view calls embed() for every document and stores the result as a vector(1536) column. RisingWave maintains this view incrementally: when a document row changes in the source table, only that row is re-embedded.
CREATE MATERIALIZED VIEW document_embeddings AS
SELECT
id,
title,
content,
embed(content)::vector(1536) AS embedding
FROM documents;
When you run this statement, RisingWave begins backfilling all existing documents. After backfill completes, it processes each new change as it arrives. A document updated in PostgreSQL will have a fresh embedding in document_embeddings within seconds.
Step 5: Run Similarity Search Queries
With embeddings in a vector(1536) column, you can run similarity searches directly in RisingWave using the <=> operator (cosine distance). Pass the user's query embedding as a parameter.
SELECT
id,
title,
content,
embedding <=> embed($1)::vector(1536) AS score
FROM document_embeddings
ORDER BY score
LIMIT 5;
The $1 placeholder holds the user's query string. The embed() function converts it to a vector on the fly. The <=> operator computes cosine distance between the query vector and each stored embedding. Lower values mean higher similarity. The query returns the five most relevant documents.
You can also add filters to scope the search:
-- Filter to documents updated in the last 30 days
SELECT
id,
title,
content,
embedding <=> embed($1)::vector(1536) AS score
FROM document_embeddings
WHERE updated_at > NOW() - INTERVAL '30 days'
ORDER BY score
LIMIT 5;
For production workloads with large document sets, RisingWave supports HNSW indexing on vector columns:
CREATE INDEX ON document_embeddings USING hnsw (embedding vector_cosine_ops);
What Happens When a Document Is Updated
The incremental recomputation behavior is worth explaining explicitly, because it is the key advantage over batch pipelines.
When an author updates a document in PostgreSQL:
- PostgreSQL writes the change to its WAL (write-ahead log).
- RisingWave's CDC connector reads the WAL entry within seconds.
- RisingWave receives a diff: the old row is retracted, the new row is asserted.
- RisingWave evaluates the materialized view query for that specific row. It calls
embed(content)with the new content value. - The
document_embeddingsmaterialized view is updated with the new embedding. - Any query that reads
document_embeddingsfrom this point forward sees the fresh embedding.
Crucially, steps 3-5 affect only the row that changed. A knowledge base with 50,000 documents does not reprocess 49,999 untouched documents. The work is proportional to the change, not the size of the corpus.
This also handles deletions correctly. When a document is deleted from PostgreSQL, the CDC stream delivers a DELETE event. RisingWave retracts the corresponding row from document_embeddings. The deleted document stops appearing in search results immediately, with no manual cleanup required.
Connecting to an LLM Agent
On the application side, the RAG retrieval step is a straightforward SQL query against RisingWave, which accepts PostgreSQL wire protocol connections. Use any PostgreSQL client library.
Here is a Python example using psycopg2:
import psycopg2
import openai
def rag_query(user_question: str, top_k: int = 5) -> str:
# Connect to RisingWave via PostgreSQL wire protocol
conn = psycopg2.connect(
host="risingwave.internal",
port=4566,
dbname="dev",
user="root",
password=""
)
cur = conn.cursor()
# Retrieve the top-k most relevant documents
cur.execute("""
SELECT id, title, content,
embedding <=> embed(%s)::vector(1536) AS score
FROM document_embeddings
ORDER BY score
LIMIT %s
""", (user_question, top_k))
results = cur.fetchall()
cur.close()
conn.close()
# Build the context string from retrieved documents
context_parts = []
for row in results:
doc_id, title, content, score = row
context_parts.append(f"[{title}]\n{content}")
context = "\n\n---\n\n".join(context_parts)
# Send to the LLM with fresh retrieved context
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"You are a helpful assistant. "
"Answer the user's question using only the context below. "
"If the answer is not in the context, say so.\n\n"
f"Context:\n{context}"
)
},
{"role": "user", "content": user_question}
]
)
return response.choices[0].message.content
The application queries RisingWave the same way it would query PostgreSQL. There is no special SDK, no vector database client to initialize, no separate embedding service to call. The embedding happens inside the SQL query, and the result is a ranked list of documents.
A note on the embed() call in the query: it runs at query time, not at index time. The user's question gets embedded by the same function that embedded the stored documents, so the vector spaces are aligned. Because embed() calls the OpenAI API, you incur one API call per user query (for the question embedding) plus one API call per document change (for the updated document embedding). You do not pay per search result retrieved.
Production Considerations
A few things to account for before deploying this pipeline:
API key management: The embed() function hardcodes the OpenAI API key in SQL. For production, use RisingWave's secrets management instead:
CREATE SECRET openai_key WITH (backend = 'meta') AS 'sk-your-key';
CREATE FUNCTION embed(t VARCHAR) RETURNS REAL[]
LANGUAGE sql AS $$
SELECT openai_embedding(secret openai_key, 'text-embedding-3-small', t)
$$;
Rate limiting: If a bulk update changes thousands of documents simultaneously, the embedding function will issue many API calls in parallel. RisingWave respects OpenAI's rate limits by default, but consider setting appropriate parallelism limits on the materialized view for high-throughput scenarios.
Model pinning: The embedding model you use for stored documents must match the model you use for query-time embedding. If you switch from text-embedding-3-small to text-embedding-3-large, you need to rebuild the materialized view. RisingWave handles this with DROP MATERIALIZED VIEW followed by CREATE MATERIALIZED VIEW with the updated function call.
Content chunking: Long documents may exceed the token limit for the embedding model (8191 tokens for text-embedding-3-small). Implement chunking at the PostgreSQL layer or as a preprocessing step before documents enter the documents table.
FAQ
What is a continuous RAG pipeline?
A continuous RAG pipeline is a retrieval-augmented generation system where document embeddings update automatically whenever source documents change, rather than being rebuilt on a batch schedule. It uses change data capture (CDC) to detect document updates in real time and incremental view maintenance to recompute only the affected embeddings.
How does RisingWave auto-update embeddings when documents change?
RisingWave connects to PostgreSQL via CDC and reads every INSERT, UPDATE, and DELETE from the write-ahead log. When a document changes, RisingWave identifies the affected row in the document_embeddings materialized view and recomputes only that row by calling the embedding function with the new content. The rest of the embeddings remain untouched.
Can I use RisingWave with vector databases like Pinecone or Milvus?
RisingWave does not have native sinks to Pinecone or Milvus. For vector storage, you can use RisingWave's built-in vector(n) type and run similarity searches directly inside RisingWave using the <=> (cosine) and <-> (L2) operators. Alternatively, sink the structured output to a PostgreSQL database with pgvector, which RisingWave supports as a sink target.
What is the openai_embedding function in RisingWave?
openai_embedding is a built-in function in RisingWave that calls the OpenAI embedding API directly from SQL. Its signature is openai_embedding(api_key, model, input) and it returns a REAL[] array. It works inside materialized views, which means embeddings can be generated and maintained automatically as part of a streaming SQL pipeline.
Does a continuous RAG pipeline require Kafka?
No. RisingWave connects directly to PostgreSQL via native CDC without requiring Kafka, Debezium, or any message queue middleware. RisingWave reads from the PostgreSQL write-ahead log and processes changes in real time.
Conclusion
A continuous RAG pipeline removes the cron job from your architecture and replaces it with a simple guarantee: embeddings stay current because the pipeline that produces them never stops running.
Key takeaways from this tutorial:
- Static RAG pipelines fail on stale content: every batch interval is a window where updated documents produce incorrect LLM answers.
- RisingWave CDC connects directly to PostgreSQL: no Kafka, no Debezium, no middleware.
openai_embeddingworks inside materialized views: one SQL statement defines the entire embedding pipeline.- Incremental recomputation scales with change rate, not corpus size: a 50,000-document corpus where 500 documents change per day calls the embedding API 500 times per day, not 50,000.
- Vector search runs directly in RisingWave: the
<=>operator onvector(n)columns returns similarity-ranked results from the same system that maintains the embeddings.
Ready to try this yourself? Try RisingWave Cloud free, no credit card required. Sign up here.
Join our Slack community to ask questions and share what you build.

