Real-Time Data Pipeline for RAG Applications: Beyond Static Vector Stores

Real-Time Data Pipeline for RAG Applications: Beyond Static Vector Stores

Real-Time Data Pipeline for RAG Applications: Beyond Static Vector Stores

Your RAG application is only as fresh as its pipeline. Most RAG systems embed documents once, index them, and never update — meaning retrieved context can be days or weeks stale. A streaming database changes this: data changes in your source systems flow continuously into preprocessed, indexed documents, so every retrieval draws from the current state of your knowledge base.


The Freshness Problem in RAG

Retrieval-Augmented Generation works by embedding a user query, finding semantically similar documents in a vector store, and passing those documents as context to the LLM. The quality of the answer depends entirely on the quality and freshness of what gets retrieved.

Most RAG implementations treat the knowledge base as a static artifact. You run an ingestion job, embed your documents, push them to Pinecone or Weaviate, and the application serves from that snapshot. If underlying data changes — a product spec is updated, a support ticket is resolved, a policy document is revised — the vector store does not know.

The result is answers grounded in outdated context. The LLM confidently retrieves the old version and reasons from it. From the user's perspective, the system gives wrong answers.

Why Batch Ingestion Pipelines Fail

The typical fix is to schedule a re-ingestion job. Run it nightly, re-embed everything, push the new index. This has three problems.

First, it is slow. Re-embedding an entire corpus is expensive and takes time proportional to corpus size. For large knowledge bases, nightly re-ingestion may not even complete before the next run.

Second, it is blunt. You cannot easily re-embed only changed documents without building a change detection layer — which requires comparing document content, tracking modification timestamps, and managing version state. That is its own engineering problem.

Third, it is bounded by batch cadence. Even if you run re-ingestion hourly, events that happened 59 minutes ago are not reflected. For operational data (tickets, orders, alerts, inventory), this latency matters.

A Streaming Pipeline for RAG

The correct model is event-driven: when source data changes, the pipeline immediately preprocesses and re-indexes the affected documents. A streaming database is the right tool for the preprocessing and enrichment layer.

The architecture has four stages:

  1. Source capture — CDC connectors capture row-level changes from production databases in real time
  2. Streaming SQL transformation — RisingWave materializes preprocessed, enriched documents as views
  3. Embedding trigger — changed documents are emitted to a Kafka topic, triggering an embedding job
  4. Vector store update — only the changed embeddings are upserted into the vector store

This is incremental by construction. Only changed data flows through the pipeline.

Stage 1: Capture Source Changes

-- CDC source from a product knowledge base in PostgreSQL
CREATE SOURCE product_docs WITH (
    connector = 'postgres-cdc',
    hostname = 'postgres.internal',
    port = '5432',
    username = 'cdc_user',
    password = '...',
    database.name = 'knowledge',
    schema.name = 'public',
    table.name = 'documents'
);

-- Kafka source for support ticket updates
CREATE SOURCE support_tickets (
    ticket_id   BIGINT,
    status      VARCHAR,
    content     TEXT,
    resolved_at TIMESTAMPTZ,
    updated_at  TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'support-tickets-updates',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Stage 2: Streaming SQL Preprocessing

This is where RisingWave adds value. Instead of embedding raw records, you preprocess documents in SQL — cleaning text, joining metadata, filtering resolved/relevant records — before sending them downstream.

-- Enrich and clean product docs for embedding
CREATE MATERIALIZED VIEW rag_product_chunks AS
SELECT
    d.doc_id,
    d.product_id,
    p.product_name,
    p.category,
    -- Structured chunk for embedding: title + body with metadata context
    'Product: ' || p.product_name || E'\n'
        || 'Category: ' || p.category || E'\n'
        || 'Last updated: ' || TO_CHAR(d.updated_at, 'YYYY-MM-DD') || E'\n\n'
        || d.content                        AS chunk_text,
    d.updated_at,
    d.version
FROM product_docs d
JOIN products p ON p.product_id = d.product_id
WHERE d.is_published = TRUE;

-- Resolved support tickets as FAQ-style knowledge chunks
CREATE MATERIALIZED VIEW rag_resolved_tickets AS
SELECT
    ticket_id,
    'Support Case: ' || subject || E'\n'
        || 'Resolution: ' || resolution_summary AS chunk_text,
    resolved_at                                 AS updated_at,
    'support'                                   AS source_type
FROM support_tickets
WHERE status = 'resolved'
  AND resolution_summary IS NOT NULL
  AND resolved_at > NOW() - INTERVAL '90 days';

-- Unified RAG document feed
CREATE MATERIALIZED VIEW rag_document_feed AS
SELECT doc_id::TEXT AS document_id, chunk_text, updated_at, 'product' AS source
FROM rag_product_chunks
UNION ALL
SELECT ticket_id::TEXT, chunk_text, updated_at, 'support'
FROM rag_resolved_tickets;

Stage 3: Emit Changed Documents for Embedding

RisingWave can write changes to a Kafka topic, triggering your embedding service only for changed documents.

-- Sink changed documents to Kafka for embedding service
CREATE SINK rag_documents_to_embed
FROM rag_document_feed
WITH (
    connector = 'kafka',
    topic = 'rag-documents-pending-embed',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT UPSERT ENCODE JSON (
    force_append_only = false
);

Your embedding service consumes from rag-documents-pending-embed, calls your embedding model (OpenAI, Cohere, or a local model), and upserts the resulting vector into your vector store keyed on document_id.

Stage 4: Vector Store Upsert (Python example)

import json
from kafka import KafkaConsumer
import openai
import pinecone

consumer = KafkaConsumer('rag-documents-pending-embed', ...)
index = pinecone.Index('knowledge-base')

for message in consumer:
    doc = json.loads(message.value)
    embedding = openai.embeddings.create(
        model="text-embedding-3-small",
        input=doc["chunk_text"]
    ).data[0].embedding

    index.upsert(vectors=[{
        "id": doc["document_id"],
        "values": embedding,
        "metadata": {
            "source": doc["source"],
            "updated_at": doc["updated_at"]
        }
    }])

Only changed documents flow through this path. A document that has not changed since yesterday does not get re-embedded.

Metadata Filtering from Streaming Aggregates

Beyond document freshness, RisingWave can maintain aggregated metadata that improves retrieval precision.

-- Track per-product support ticket volume (for retrieval ranking)
CREATE MATERIALIZED VIEW product_issue_signals AS
SELECT
    product_id,
    COUNT(*)                                            AS open_ticket_count,
    COUNT(*) FILTER (WHERE status = 'critical')        AS critical_ticket_count,
    MAX(created_at)                                     AS latest_ticket_at
FROM support_tickets
WHERE status != 'resolved'
GROUP BY product_id;

This view can be queried at retrieval time to boost or filter documents. A product with 12 open critical tickets may warrant different retrieval behavior than one with zero.

Comparison: RAG Pipeline Architectures

ApproachFreshnessRe-indexing CostOperational ComplexityHandles Deletes
One-time batch loadDays/weeks staleFull re-embedLow initially, growsNo
Scheduled re-ingestionBounded by scheduleFull or incrementalMediumPartial
Change detection scriptsVariableIncrementalHigh (custom logic)Manual
Streaming DB + CDCSecondsIncremental onlyLow (declarative SQL)Yes (via tombstones)

The streaming database approach handles document deletions natively: when a CDC tombstone arrives, the materialized view drops the row, the sink emits a deletion event, and the embedding service removes the vector from the store.

RisingWave as the Preprocessing Layer

RisingWave's role here is not to replace the vector store — it complements it. RisingWave handles the structured, relational, and temporal processing that SQL is good at: joins, filtering, aggregation, text construction. The vector store handles semantic similarity search.

This separation of concerns is important. SQL is the right language for "join documents to their product metadata, filter to published records updated in the last 90 days, and format a structured chunk." It is not the right tool for cosine similarity search over dense vectors. Use each for what it is good at.


FAQ

Does RisingWave store vectors or support similarity search? Not natively — RisingWave is a streaming SQL database, not a vector database. Its role in a RAG pipeline is the preprocessing and enrichment layer upstream of the vector store. The vector store (Pinecone, Weaviate, pgvector, etc.) handles embedding storage and ANN search.

How does the pipeline handle document deletes? CDC connectors capture DELETE operations as tombstone events. When a document is deleted from the source, RisingWave removes it from the materialized view. The Kafka sink emits a deletion event (null value, keyed on document ID), and the consuming embedding service removes the corresponding vector from the vector store.

What is the typical end-to-end latency from source change to updated vector? CDC capture to materialized view update is typically under 5 seconds. The embedding call (external API or local model) adds another 1-10 seconds depending on document length and model. Total pipeline latency of 10-30 seconds from source change to updated vector is realistic.

Can this pipeline handle multiple embedding models? Yes. The Kafka topic rag-documents-pending-embed can be consumed by multiple embedding services, each using a different model. You can maintain parallel indexes (e.g., one with text-embedding-3-small for cost, one with a fine-tuned model for accuracy) from the same streaming source.

How do you handle chunking of long documents? Chunking logic can be implemented in the embedding service consuming from Kafka, or you can pre-chunk in RisingWave using window functions and string operations if documents have predictable structure. For variable-length documents, chunking in application code (with overlap) is more practical.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.