Build a Real-Time RAG Pipeline with Streaming SQL

Build a Real-Time RAG Pipeline with Streaming SQL

·

13 min read

Introduction

Your RAG application answers questions about products, but the prices it quotes are from last night's batch run. A customer asks about availability, and the system confidently recommends an item that sold out three hours ago. The LLM response is fluent, well-structured, and completely wrong.

This is the core failure mode of batch-oriented RAG pipelines. The retrieval layer serves stale context, and no amount of prompt engineering can fix data that was outdated before the query arrived. For any domain where facts change throughout the day (inventory, pricing, policy documents, support tickets), a real-time RAG pipeline is not optional. It is a correctness requirement.

In this guide, you will build a production real-time RAG pipeline using streaming SQL in RisingWave. You will capture changes from your operational database via CDC, transform and enrich the data with materialized views, and sink fresh structured attributes to a vector database. Every code example targets RisingWave v2.8 and uses verified syntax from the official documentation.

Why Do Static RAG Pipelines Produce Stale and Inaccurate Results?

A static RAG pipeline is one where documents are embedded once during an indexing job and stored in a vector database. The retrieval layer searches those embeddings at query time, but the underlying data may have changed hours or days ago. This gap between the source of truth and the retrieval index is the root cause of stale RAG responses.

The batch embedding problem

Most RAG tutorials follow the same pattern: load documents, chunk them, generate embeddings, and push to a vector store. This works for static knowledge bases, but production data is not static. Consider what happens when:

  • Prices change: Your product catalog updates pricing multiple times per day, but the embeddings still reference yesterday's prices.
  • Inventory depletes: Items sell out, but the vector store still contains chunks describing them as "in stock."
  • Policies update: Return policies, warranty terms, or compliance rules change, but the RAG system serves the old version.

The problem is not the embedding model. The problem is that the pipeline treats indexing as a one-time event rather than a continuous process.

Vector attributes go stale faster than you think

A real-time RAG pipeline is a system where the retrieval context is continuously updated as source data changes, so that every query hits fresh, accurate information. In many RAG architectures, the vector embedding captures semantic meaning (what a product is), but the structured metadata attached to each vector (price, stock level, category, status) drives the actual usefulness of the response. These attributes are the first to go stale.

Consider an e-commerce support chatbot. The embedding for "wireless noise-cancelling headphones" stays semantically valid for months. But the metadata fields that matter to customers change constantly:

AttributeUpdate frequencyStaleness impact
PriceMultiple times dailyCustomer sees wrong price, erodes trust
Stock statusEvery transactionBot recommends unavailable items
Shipping estimateHourlyInaccurate delivery promises
Review scoreWith each new reviewOutdated social proof

When a customer asks "What are your best headphones under $200?", the LLM needs current pricing to filter correctly. Stale metadata means the answer is wrong even though the retrieval found the right product.

Example: a support RAG that recommends discontinued products

A real scenario illustrates the cost. An electronics retailer runs a customer support RAG agent. The product catalog lives in PostgreSQL. The RAG pipeline runs a nightly batch job: extract all products, generate embeddings, and upsert to Pinecone with metadata (price, availability, category).

On Tuesday at 2 PM, the team discontinues a popular laptop model and marks it as status = 'discontinued' in PostgreSQL. But the next batch job does not run until midnight. For ten hours, the support bot recommends a product customers cannot buy. Each failed recommendation generates a support ticket, and each ticket costs the company $15-25 in agent time.

The fix is not to run the batch job more frequently. Running full reprocessing every hour is expensive and still leaves gaps. The fix is to make the pipeline incremental: capture the status change the moment it happens and propagate it to the vector store within seconds.

How Does Streaming SQL Keep RAG Context Fresh?

Streaming SQL keeps RAG context fresh by continuously processing database changes as they occur and maintaining up-to-date query results in materialized views. Instead of rebuilding the entire index on a schedule, you define SQL queries that RisingWave maintains incrementally. When a row changes in your source database, the downstream materialized views update within seconds.

Architecture overview

The architecture for a streaming SQL RAG pipeline has four stages:

  1. CDC from source databases: RisingWave connects directly to your PostgreSQL (or MySQL) transaction log and captures every INSERT, UPDATE, and DELETE as it happens.
  2. Streaming transformation: Materialized views in RisingWave join, filter, and enrich the raw CDC data into structured product context.
  3. Sink to vector database: A sink connector pushes the continuously updated results to your vector store (PostgreSQL with pgvector, or any supported downstream system).
  4. RAG query: Your application queries the vector database with embeddings and retrieves context that reflects the current state of your operational data.

PostgreSQL (Source DB) → CDC → RisingWave (Streaming SQL) → Materialized Views → Sink → Vector DB (pgvector / Pinecone) → RAG Application → LLM Response

This design separates concerns cleanly. Your operational database stays untouched. RisingWave handles the transformation logic. The vector database receives a continuous feed of fresh attributes. And your RAG application queries a store that is always current.

Materialized views compute structured context continuously

A materialized view in RisingWave is a precomputed query result that updates incrementally as the underlying data changes. Unlike traditional databases where materialized views require manual refresh, RisingWave maintains them automatically in real time.

This is the key mechanism for RAG freshness. You write a SQL query that defines the context you want to attach to each vector. RisingWave evaluates that query continuously. When a product price changes in the source database, the materialized view updates within seconds, and the sink pushes the new value to your vector store.

SQL example: live product context from PostgreSQL CDC

Here is a complete pipeline that captures product, inventory, and pricing data from PostgreSQL and produces a continuously updated context record for each product.

Step 1: Create the CDC source

-- Connect to the source PostgreSQL database via CDC
-- Requires PostgreSQL with wal_level = logical
CREATE SOURCE ecommerce_cdc WITH (
    connector = 'postgres-cdc',
    hostname = 'prod-postgres.internal',
    port = '5432',
    username = 'cdc_reader',
    password = 'secure_password',
    database.name = 'ecommerce',
    slot.name = 'risingwave_rag_slot',
    publication.name = 'rag_publication'
);

Step 2: Create CDC tables for each upstream table

-- Products table from PostgreSQL
CREATE TABLE products (
    product_id INT PRIMARY KEY,
    name VARCHAR,
    description TEXT,
    category VARCHAR,
    status VARCHAR,
    updated_at TIMESTAMPTZ
) FROM ecommerce_cdc TABLE 'public.products';

-- Inventory table from PostgreSQL
CREATE TABLE inventory (
    product_id INT PRIMARY KEY,
    warehouse VARCHAR,
    quantity INT,
    reserved INT,
    last_updated TIMESTAMPTZ
) FROM ecommerce_cdc TABLE 'public.inventory';

-- Pricing table from PostgreSQL
CREATE TABLE pricing (
    product_id INT PRIMARY KEY,
    base_price NUMERIC,
    discount_pct NUMERIC,
    effective_from TIMESTAMPTZ,
    effective_until TIMESTAMPTZ
) FROM ecommerce_cdc TABLE 'public.pricing';

Step 3: Create a materialized view that joins everything into RAG-ready context

-- This materialized view produces a fresh context record
-- for each active product, updated in real time
CREATE MATERIALIZED VIEW product_rag_context AS
SELECT
    p.product_id,
    p.name AS product_name,
    p.description,
    p.category,
    p.status,
    -- Live inventory: available = total minus reserved
    COALESCE(i.quantity, 0) - COALESCE(i.reserved, 0) AS available_stock,
    CASE
        WHEN COALESCE(i.quantity, 0) - COALESCE(i.reserved, 0) > 10 THEN 'in_stock'
        WHEN COALESCE(i.quantity, 0) - COALESCE(i.reserved, 0) > 0 THEN 'low_stock'
        ELSE 'out_of_stock'
    END AS stock_status,
    -- Live pricing with discount applied
    pr.base_price,
    pr.discount_pct,
    ROUND(pr.base_price * (1 - COALESCE(pr.discount_pct, 0) / 100), 2)
        AS current_price,
    -- Structured context string for RAG retrieval
    CONCAT(
        p.name, ' | ',
        p.category, ' | ',
        '$', ROUND(pr.base_price * (1 - COALESCE(pr.discount_pct, 0) / 100), 2)::VARCHAR, ' | ',
        CASE
            WHEN COALESCE(i.quantity, 0) - COALESCE(i.reserved, 0) > 10 THEN 'In Stock'
            WHEN COALESCE(i.quantity, 0) - COALESCE(i.reserved, 0) > 0 THEN 'Low Stock'
            ELSE 'Out of Stock'
        END
    ) AS context_string
FROM products p
LEFT JOIN inventory i ON p.product_id = i.product_id
LEFT JOIN pricing pr ON p.product_id = pr.product_id
WHERE p.status != 'discontinued';

The context_string column produces output like:

Sony WH-1000XM5 | Electronics | $278.00 | In Stock

This string (or the individual columns) becomes the metadata attached to your vector embedding. When the price drops or inventory changes, the materialized view updates automatically, and the sink pushes the new context downstream.

The cost advantage: incremental updates vs full reprocessing

Batch RAG pipelines reprocess every document on each run, regardless of how many actually changed. If your catalog has 500,000 products and 200 changed since the last run, you still pay for 500,000 embedding generations and 500,000 upserts.

Streaming SQL pipelines are incremental by design. RisingWave only recomputes the materialized view rows affected by the incoming change. If 200 products changed, only 200 rows flow through the pipeline. The cost scales with the rate of change, not the size of the dataset.

For a catalog of 500,000 products with a 0.5% hourly change rate:

ApproachRecords processed per hourEmbedding API calls per hour
Hourly batch reindex500,000500,000
Streaming incremental~2,500~2,500

That is a 200x reduction in compute and API costs, while delivering sub-minute freshness instead of hourly lag.

What Does a Production Real-Time RAG Architecture Look Like?

A production real-time RAG architecture combines CDC ingestion, streaming SQL transformation, vector database sinking, and application-layer retrieval into a single continuous pipeline. Each component has a clear responsibility, and data flows from the operational database to the LLM response without batch boundaries.

Full architecture

The production system has four layers:

Real-Time RAG Pipeline Architecture: Operational Layer (PostgreSQL, MySQL) → CDC → Streaming Layer (RisingWave materialized views) → Sink → Vector Layer (pgvector, Pinecone) → Application Layer (RAG + LLM)

Let's walk through each step with complete SQL.

Step 1: Ingest from operational databases via CDC

You already saw the CDC source setup in the previous section. For a production system, you may also capture additional tables like customer reviews and support policies:

-- Reviews table: captures customer feedback in real time
CREATE TABLE reviews (
    review_id INT PRIMARY KEY,
    product_id INT,
    rating NUMERIC,
    review_text TEXT,
    created_at TIMESTAMPTZ
) FROM ecommerce_cdc TABLE 'public.reviews';

-- Support policies: captures policy document updates
CREATE TABLE support_policies (
    policy_id INT PRIMARY KEY,
    title VARCHAR,
    content TEXT,
    effective_date TIMESTAMPTZ,
    status VARCHAR
) FROM ecommerce_cdc TABLE 'public.support_policies';

RisingWave reads these changes directly from the PostgreSQL WAL, with no Kafka or Debezium middleware required. For details on configuring PostgreSQL for CDC, see the RisingWave PostgreSQL CDC documentation.

Step 2: Transform and enrich with streaming SQL

Beyond the product_rag_context view, you can build additional materialized views that aggregate review data and combine it with product context:

-- Aggregate review statistics per product
CREATE MATERIALIZED VIEW product_review_stats AS
SELECT
    product_id,
    COUNT(*) AS review_count,
    ROUND(AVG(rating), 1) AS avg_rating,
    COUNT(*) FILTER (WHERE rating >= 4) AS positive_reviews
FROM reviews
GROUP BY product_id;

-- Enriched product context with review data
CREATE MATERIALIZED VIEW enriched_product_context AS
SELECT
    pc.product_id,
    pc.product_name,
    pc.description,
    pc.category,
    pc.current_price,
    pc.stock_status,
    pc.available_stock,
    COALESCE(rs.avg_rating, 0) AS avg_rating,
    COALESCE(rs.review_count, 0) AS review_count,
    -- Build the full context string for RAG
    CONCAT(
        pc.product_name, '. ',
        pc.category, '. ',
        'Price: $', pc.current_price::VARCHAR, '. ',
        'Availability: ', pc.stock_status, '. ',
        'Rating: ', COALESCE(rs.avg_rating, 0)::VARCHAR,
        '/5 (', COALESCE(rs.review_count, 0)::VARCHAR, ' reviews).'
    ) AS rag_context
FROM product_rag_context pc
LEFT JOIN product_review_stats rs ON pc.product_id = rs.product_id;

Each materialized view builds on the previous one. RisingWave tracks dependencies and propagates changes through the entire chain. When a new review arrives, product_review_stats updates, which triggers enriched_product_context to update, which triggers the downstream sink. For more on how materialized views work in RisingWave, see the CREATE MATERIALIZED VIEW documentation.

Step 3: Sink fresh attributes to the vector database

The sink connector pushes the continuously updated context to your vector store. Here is an example sinking to a PostgreSQL database running pgvector:

-- Sink enriched context to PostgreSQL with pgvector
CREATE SINK product_context_to_pgvector FROM enriched_product_context WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://vector-db.internal:5432/rag_store',
    user = 'rag_writer',
    password = 'secure_password',
    table.name = 'product_vectors',
    type = 'upsert',
    primary_key = 'product_id'
);

On the pgvector side, your product_vectors table has an embedding column that your application populates (or that a trigger generates via an embedding API call). If you are new to pgvector, the pgvector GitHub repository covers installation and indexing strategies. The metadata columns (price, stock status, rating) come from RisingWave and stay current. For the full list of supported sink connectors, see the RisingWave sink documentation.

You can also generate embeddings directly inside RisingWave using the built-in openai_embedding function (introduced in v2.5.0), keeping the entire pipeline in SQL:

-- Generate embeddings within RisingWave (requires OpenAI API key)
CREATE FUNCTION text_embedding(t VARCHAR) RETURNS REAL[]
LANGUAGE sql AS $$
    SELECT openai_embedding('your-openai-api-key', 'text-embedding-3-small', t)
$$;

-- Materialized view with auto-generated embeddings
CREATE MATERIALIZED VIEW product_embeddings AS
SELECT
    product_id,
    product_name,
    rag_context,
    current_price,
    stock_status,
    avg_rating,
    text_embedding(rag_context) AS embedding
FROM enriched_product_context;

This approach means every time a product's price, stock, or rating changes, a new embedding is generated automatically. For high-volume catalogs, you may prefer to generate embeddings on the application side and only sink the structured metadata from RisingWave.

Step 4: RAG application queries the vector database with fresh context

On the application side, your RAG query now retrieves context that reflects the current state of your data:

import psycopg2
import openai

def query_rag(user_question: str, top_k: int = 5):
    # Generate embedding for the user's question
    response = openai.embeddings.create(
        model="text-embedding-3-small",
        input=user_question
    )
    query_embedding = response.data[0].embedding

    # Query pgvector with fresh metadata filters
    conn = psycopg2.connect("postgresql://rag_reader:password@vector-db:5432/rag_store")
    cur = conn.cursor()
    cur.execute("""
        SELECT product_name, rag_context, current_price, stock_status
        FROM product_vectors
        WHERE stock_status != 'out_of_stock'
        ORDER BY embedding <=> %s::vector
        LIMIT %s
    """, (query_embedding, top_k))

    results = cur.fetchall()
    context = "\n".join([row[1] for row in results])

    # Send to LLM with fresh context
    llm_response = openai.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": f"Answer based on this product data:\n{context}"},
            {"role": "user", "content": user_question}
        ]
    )
    return llm_response.choices[0].message.content

The critical detail: the WHERE stock_status != 'out_of_stock' filter works because RisingWave continuously pushes updated stock status values. A batch pipeline would still show discontinued products until the next reindex. The streaming pipeline filters them out within seconds of the status change.

FAQ

What is a real-time RAG pipeline?

A real-time RAG pipeline is a retrieval-augmented generation system where the retrieval context is continuously updated as source data changes, rather than being rebuilt on a batch schedule. It uses streaming ingestion (typically CDC) and incremental processing to ensure that every query retrieves current, accurate information from the vector database.

How is streaming SQL different from batch ETL for RAG?

Streaming SQL processes changes incrementally as they occur, while batch ETL reprocesses the entire dataset on a schedule. For RAG, this means streaming SQL updates only the vector records that changed (scaling with change rate), while batch ETL regenerates all records regardless of what changed (scaling with dataset size). Streaming SQL delivers sub-minute freshness; batch ETL typically delivers hourly or daily freshness.

Can RisingWave generate vector embeddings directly?

Yes. RisingWave v2.5.0 introduced the built-in openai_embedding function, which calls OpenAI's embedding API from within SQL. You can create a materialized view that generates embeddings for each row and updates them automatically when the source data changes. For high-throughput scenarios, you may prefer to generate embeddings on the application side and use RisingWave only for structured metadata.

Do I need Kafka to build a real-time RAG pipeline with RisingWave?

No. RisingWave supports native CDC from PostgreSQL and MySQL without requiring Kafka, Debezium, or any middleware. RisingWave connects directly to the database's transaction log and captures changes in real time. This simplifies the architecture from three or four systems down to two: your source database and RisingWave.

Conclusion

Stale retrieval context is the most common and least discussed failure mode in production RAG systems. Batch pipelines create an inherent gap between your source of truth and your retrieval layer. Streaming SQL closes that gap.

Key takeaways:

  • Batch RAG pipelines serve stale metadata: prices, stock levels, and policy changes are invisible until the next reindex.
  • CDC captures every change in real time: RisingWave reads directly from the PostgreSQL WAL without middleware.
  • Materialized views compute fresh context incrementally: only changed rows are reprocessed, reducing costs by orders of magnitude.
  • The full pipeline is standard SQL: source creation, transformation, and sinking all use familiar SQL syntax.
  • Sub-minute freshness replaces hourly lag: your RAG application always retrieves current information.

Ready to try this yourself? Try RisingWave Cloud free, no credit card required. Sign up here.

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.