Real-Time Data Pipeline for RAG Applications

Real-Time Data Pipeline for RAG Applications

Your RAG application answered a customer question with yesterday's pricing. A support agent trusted the response and quoted the wrong number. The customer escalated. Sound familiar?

Retrieval-Augmented Generation (RAG) connects large language models to external knowledge bases so they can generate accurate, grounded responses. But most RAG implementations rely on batch pipelines that refresh context every few hours, or worse, once a day. In fast-moving domains like e-commerce, fintech, and customer support, stale context means wrong answers.

The fix is not a better embedding model or a fancier prompt template. The fix is a real-time data pipeline that keeps your vector store current as source data changes. In this article, you will learn how to build a streaming data pipeline that captures changes from source databases via CDC, transforms and enriches them with materialized views in RisingWave, and delivers fresh context documents to your vector database continuously.

Why Batch RAG Pipelines Fail

A typical batch RAG pipeline looks like this: a scheduled job runs every N hours, queries source databases, generates embeddings, and loads them into a vector store. Between runs, the world moves on. Products go out of stock, prices change, support articles get updated, policies shift.

The consequences are predictable:

  • Stale answers: An LLM retrieves context that no longer reflects reality. A product marked "in stock" was sold out two hours ago.
  • Inconsistent state: Different documents in the vector store reflect different points in time. A pricing document says $299, but the product description still references a $199 launch promotion.
  • Missed context: New records created after the last batch run simply do not exist in the vector store. A critical support resolution posted 30 minutes ago is invisible to the RAG system.
  • Wasted compute: Full re-indexing jobs process millions of unchanged documents to catch the few thousand that actually changed.

Research from production deployments confirms the impact. According to industry benchmarks, 68% of production RAG deployments exceed 2-second P95 latencies, and stale data compounds latency with inaccuracy. When users lose trust in AI-generated responses, adoption drops.

The Streaming Alternative: CDC to Vector Store

A streaming architecture replaces the batch cycle with a continuous pipeline:

graph LR
    A[PostgreSQL / MySQL] -->|CDC| B[RisingWave]
    B -->|Materialized Views| C[Transform & Enrich]
    C -->|Sink| D[Kafka / Elasticsearch]
    D -->|Consumer| E[Vector DB - Pinecone / Weaviate]
    E -->|Retrieval| F[LLM Context]

Each component plays a specific role:

  1. CDC (Change Data Capture) reads the source database's transaction log and emits every insert, update, and delete as a structured event the moment it is committed.
  2. RisingWave ingests these CDC events as streaming tables, applying the same SQL semantics you already know.
  3. Materialized views transform, join, and enrich raw CDC records into context-ready documents, maintained incrementally as new events arrive.
  4. Sinks push the continuously updated results to downstream systems like Kafka or Elasticsearch.
  5. A vector database consumer reads from the sink, generates embeddings, and upserts them into Pinecone, Weaviate, or another vector store.

The result: when a product price changes in PostgreSQL, the updated context document reaches your vector store within seconds, not hours.

Building the Pipeline with RisingWave

Let's walk through a concrete implementation. Imagine you run an e-commerce platform with a customer-facing AI assistant. The assistant uses RAG to answer questions about products, pricing, availability, and known issues.

Step 1: Ingest CDC from Source Databases

RisingWave supports native CDC from PostgreSQL and MySQL without requiring middleware like Debezium or Kafka Connect. You create a table with a CDC connector, and RisingWave reads the transaction log directly.

-- Ingest products table from PostgreSQL via CDC
CREATE TABLE products (
    product_id INT PRIMARY KEY,
    name VARCHAR,
    description VARCHAR,
    category VARCHAR,
    price DECIMAL,
    stock_quantity INT,
    updated_at TIMESTAMPTZ
) WITH (
    connector = 'postgres-cdc',
    hostname = 'your-postgres-host',
    port = '5432',
    username = 'cdc_reader',
    password = 'your-password',
    database.name = 'ecommerce',
    schema.name = 'public',
    table.name = 'products',
    slot.name = 'products_slot'
);

This creates a streaming table in RisingWave that mirrors your PostgreSQL products table. Every insert, update, and delete in the source is automatically reflected. You can create similar CDC tables for related data:

-- Ingest product categories
CREATE TABLE product_categories (
    category VARCHAR PRIMARY KEY,
    parent_category VARCHAR,
    description VARCHAR
) WITH (
    connector = 'postgres-cdc',
    hostname = 'your-postgres-host',
    port = '5432',
    username = 'cdc_reader',
    password = 'your-password',
    database.name = 'ecommerce',
    schema.name = 'public',
    table.name = 'product_categories',
    slot.name = 'categories_slot'
);

-- Ingest customer support tickets
CREATE TABLE customer_support_tickets (
    ticket_id INT PRIMARY KEY,
    product_id INT,
    issue_summary VARCHAR,
    resolution VARCHAR,
    created_at TIMESTAMPTZ
) WITH (
    connector = 'postgres-cdc',
    hostname = 'your-postgres-host',
    port = '5432',
    username = 'cdc_reader',
    password = 'your-password',
    database.name = 'ecommerce',
    schema.name = 'public',
    table.name = 'customer_support_tickets',
    slot.name = 'tickets_slot'
);

Step 2: Transform and Enrich with Materialized Views

Raw CDC records are not useful as RAG context on their own. A product row with category = 'ai-ml' means nothing to an LLM without the category's full description. A product listing without associated support tickets misses critical context.

Materialized views in RisingWave let you join, aggregate, and format data into context-ready documents. These views are incrementally maintained: when a source record changes, only the affected rows in the materialized view are recomputed.

First, create an enriched view that joins products with their categories:

CREATE MATERIALIZED VIEW enriched_products AS
SELECT
    p.product_id,
    p.name AS product_name,
    p.description AS product_description,
    p.category,
    pc.parent_category,
    pc.description AS category_description,
    p.price,
    p.stock_quantity,
    p.updated_at
FROM products p
JOIN product_categories pc ON p.category = pc.category;

Querying this view returns fully enriched records:

 product_id |     product_name     |                       product_description                        |    category     | parent_category |            category_description             | price  | stock_quantity
------------+----------------------+------------------------------------------------------------------+-----------------+-----------------+---------------------------------------------+--------+---------------
          1 | RisingWave Cloud Pro | Fully managed streaming database with auto-scaling and support   | cloud-services  | platform        | Managed cloud infrastructure and services   | 299.99 |          1000
          2 | StreamProcessor SDK  | Python SDK for building custom stream processing pipelines       | developer-tools | platform        | SDKs, CLIs, and developer utilities         |   0.00 |          9999
          3 | DataFlow Monitor     | Real-time monitoring dashboard for streaming data pipelines      | observability   | operations      | Monitoring, alerting, and debugging tools   |  49.99 |           500
          4 | Vector Search Addon  | Built-in vector similarity search for AI and ML workloads        | ai-ml           | platform        | AI and machine learning features and addons |  99.99 |           750

Next, build RAG-ready context documents by concatenating fields into natural language text that an LLM can use directly:

CREATE MATERIALIZED VIEW rag_product_context AS
SELECT
    p.product_id,
    p.name AS product_name,
    CONCAT(
        'Product: ', p.name, '. ',
        'Description: ', p.description, '. ',
        'Category: ', pc.description, ' (', pc.parent_category, '). ',
        'Price: $', CAST(p.price AS VARCHAR), '. ',
        'In stock: ', CAST(p.stock_quantity AS VARCHAR), ' units.'
    ) AS context_document,
    p.updated_at
FROM products p
JOIN product_categories pc ON p.category = pc.category;

The output is a clean, structured text document for each product:

 product_id |     product_name     | context_document
------------+----------------------+-------------------------------------------------------------------------------------
          1 | RisingWave Cloud Pro | Product: RisingWave Cloud Pro. Description: Fully managed streaming database with
            |                      | auto-scaling and dedicated support. Category: Managed cloud infrastructure and
            |                      | services (platform). Price: $299.99. In stock: 1000 units.
          2 | StreamProcessor SDK  | Product: StreamProcessor SDK. Description: Python SDK for building custom stream
            |                      | processing pipelines. Category: SDKs, CLIs, and developer utilities (platform).
            |                      | Price: $0.00. In stock: 9999 units.

For richer context, add support ticket information using aggregation:

CREATE MATERIALIZED VIEW rag_product_context_with_support AS
SELECT
    p.product_id,
    p.name AS product_name,
    CONCAT(
        'Product: ', p.name, '. ',
        'Description: ', p.description, '. ',
        'Category: ', pc.description, '. ',
        'Price: $', CAST(p.price AS VARCHAR), '. ',
        'Stock: ', CAST(p.stock_quantity AS VARCHAR), ' units. ',
        'Known issues: ', COALESCE(
            STRING_AGG(t.issue_summary, '; ' ORDER BY t.created_at DESC),
            'None reported'
        ), '.'
    ) AS context_document,
    p.updated_at,
    COUNT(t.ticket_id) AS open_tickets
FROM products p
JOIN product_categories pc ON p.category = pc.category
LEFT JOIN customer_support_tickets t ON p.product_id = t.product_id
GROUP BY p.product_id, p.name, p.description, p.category,
         pc.description, p.price, p.stock_quantity, p.updated_at;

Now the context document includes live support data:

 product_id |     product_name     | context_document                                                  | open_tickets
------------+----------------------+-------------------------------------------------------------------+--------------
          1 | RisingWave Cloud Pro | Product: RisingWave Cloud Pro. ... Known issues: Connection        |            2
            |                      | timeout after 30 seconds; Auto-scaling not triggering during       |
            |                      | peak load.                                                        |
          3 | DataFlow Monitor     | Product: DataFlow Monitor. ... Known issues: Dashboard latency    |            1
            |                      | spikes above 5 seconds.                                           |
          2 | StreamProcessor SDK  | Product: StreamProcessor SDK. ... Known issues: None reported.    |            0

When a new support ticket is filed, the materialized view updates automatically. The next time your RAG system retrieves context for that product, it includes the latest known issues.

Step 3: Sink to Downstream Systems

RisingWave supports sinking data to Kafka, Elasticsearch, and many other systems. For a RAG pipeline, two common patterns work well:

Pattern A: Sink to Kafka, consume with a vector DB ingestion worker

CREATE SINK product_updates_to_kafka
FROM rag_product_context
WITH (
    connector = 'kafka',
    properties.bootstrap.server = 'kafka-broker:9092',
    topic = 'rag-product-updates',
    type = 'upsert',
    primary_key = 'product_id'
) FORMAT UPSERT ENCODE JSON;

A downstream consumer reads from the rag-product-updates Kafka topic, generates embeddings via an embedding API (such as OpenAI's text-embedding-3-small), and upserts them into Pinecone or Weaviate. This decouples the streaming pipeline from the embedding generation, letting you scale each independently.

Pattern B: Sink to Elasticsearch for hybrid search

CREATE SINK product_context_to_es
FROM rag_product_context
WITH (
    connector = 'elasticsearch',
    index = 'product_context',
    url = 'http://elasticsearch:9200',
    username = 'elastic',
    password = 'your-password'
);

Elasticsearch supports both keyword search and k-NN vector search, making it a viable option when you need hybrid retrieval (combining semantic similarity with keyword matching).

Step 4: Embedding and Vector Store Ingestion

On the consumer side, a lightweight Python worker reads from Kafka and writes to your vector database:

from kafka import KafkaConsumer
import json
import openai
import pinecone

consumer = KafkaConsumer(
    'rag-product-updates',
    bootstrap_servers=['kafka-broker:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

pinecone.init(api_key="your-api-key", environment="us-east-1")
index = pinecone.Index("product-context")

for message in consumer:
    record = message.value
    # Generate embedding for the context document
    embedding = openai.embeddings.create(
        model="text-embedding-3-small",
        input=record['context_document']
    ).data[0].embedding

    # Upsert into Pinecone
    index.upsert(vectors=[{
        'id': str(record['product_id']),
        'values': embedding,
        'metadata': {
            'product_name': record['product_name'],
            'updated_at': record['updated_at']
        }
    }])

This worker runs continuously. Every time RisingWave pushes an updated context document to Kafka (because a source record changed), the worker re-embeds and upserts it. Your vector store stays current.

Batch RAG vs Streaming RAG: A Side-by-Side Comparison

DimensionBatch RAGStreaming RAG
Data freshnessHours to days oldSeconds to minutes old
Update mechanismScheduled full/incremental dumpContinuous CDC + incremental MV
Compute efficiencyRe-processes all documents each runOnly processes changed records
ConsistencyPoint-in-time snapshot (may be stale by query time)Near-real-time consistency with source
InfrastructureCron job + batch ETL + embedding jobStreaming DB + Kafka + embedding worker
Failure recoveryRe-run entire batchResume from last checkpoint
Cost at scaleHigh (full recompute)Low (incremental updates)
Implementation complexityLow (familiar batch patterns)Medium (streaming concepts)

The streaming approach costs more to set up initially, but it pays off quickly in data-sensitive applications. If your AI assistant quotes a price that changed two hours ago, the cost of a wrong answer far exceeds the cost of a streaming pipeline.

RisingWave's Built-in Vector Support

Beyond serving as the streaming transformation layer, RisingWave also offers built-in vector similarity search. Starting with version 2.5, RisingWave includes an openai_embedding built-in function and supports vector indexing with HNSW (Hierarchical Navigable Small World) indexes.

This means you can build a complete RAG pipeline entirely within RisingWave for simpler use cases, without a separate vector database:

-- Create a UDF for generating embeddings
CREATE FUNCTION text_embedding(t VARCHAR) RETURNS REAL[]
LANGUAGE sql AS $$
    SELECT openai_embedding('your-api-key', 'text-embedding-3-small', t)
$$;

-- Create a materialized view with embeddings
CREATE MATERIALIZED VIEW product_embeddings AS
SELECT
    product_id,
    product_name,
    context_document,
    text_embedding(context_document) AS embedding
FROM rag_product_context;

For production workloads with millions of vectors, a dedicated vector database like Pinecone or Weaviate will provide better query performance and more features (filtered search, namespaces, hybrid ranking). But for prototyping or smaller datasets, RisingWave's built-in vector support eliminates an entire infrastructure component.

What Is a Real-Time Data Pipeline for RAG?

A real-time data pipeline for RAG is a streaming architecture that continuously captures changes from source databases, transforms them into context documents, and delivers them to a vector store so that a retrieval-augmented generation system always retrieves up-to-date information. Unlike batch pipelines that refresh on a schedule, a real-time pipeline uses Change Data Capture (CDC) to detect changes the moment they happen, processes them through incremental transformations, and pushes updated embeddings to the vector database within seconds.

How Does CDC Keep RAG Context Fresh?

CDC (Change Data Capture) keeps RAG context fresh by reading the database transaction log and emitting every insert, update, and delete as a structured event in real time. When a product price changes in PostgreSQL, CDC captures that change immediately. A streaming database like RisingWave ingests the event, recomputes any affected materialized views, and sinks the updated context document to a Kafka topic or directly to a search index. The downstream embedding worker then regenerates the vector and upserts it into the vector store. The entire cycle, from source change to updated vector, typically completes in under 10 seconds.

When Should I Use Streaming RAG Instead of Batch RAG?

Use streaming RAG when your application serves answers about data that changes frequently and where stale responses carry real consequences. E-commerce product catalogs, financial instrument pricing, customer support knowledge bases, and inventory management systems are strong candidates. If your source data changes less than once per day and tolerance for staleness is high (for example, a static documentation site), batch RAG is simpler and sufficient. The decision comes down to the cost of a wrong answer versus the cost of running a streaming pipeline.

Can RisingWave Replace a Separate Vector Database?

RisingWave can handle vector embeddings and similarity search natively, making it a viable option for simpler RAG use cases or prototyping. It supports the openai_embedding built-in function, cosine similarity UDFs, and vector indexes. For production systems with tens of millions of vectors requiring filtered search, multi-tenancy, and sub-10ms query latency, a dedicated vector database like Pinecone or Weaviate is the better choice. In that architecture, RisingWave serves as the streaming transformation layer that keeps the vector database continuously updated.

Conclusion

Keeping RAG context fresh is not an optional optimization. It is a correctness requirement for any AI application that serves answers about changing data. The core takeaways:

  • Batch RAG pipelines introduce staleness that leads to incorrect LLM responses, eroding user trust.
  • CDC captures every source change in real time, eliminating the gap between what your database knows and what your AI can retrieve.
  • Materialized views in RisingWave transform and enrich raw CDC events into structured context documents, maintained incrementally without full recomputation.
  • Sinking to Kafka or Elasticsearch bridges the gap between the streaming layer and your vector database, letting you scale embedding generation independently.
  • RisingWave's built-in vector support can simplify the architecture for smaller workloads, while dedicated vector databases handle production scale.

The architecture is straightforward: PostgreSQL (CDC) to RisingWave (transform) to Kafka (delivery) to your vector store (retrieval). Each component does what it does best, and the pipeline keeps your AI grounded in reality.


Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.