Streaming for RAG: How to Keep Retrieval-Augmented Generation Fresh in Real Time

Streaming for RAG: How to Keep Retrieval-Augmented Generation Fresh in Real Time

Streaming for RAG: How to Keep Retrieval-Augmented Generation Fresh in Real Time

Traditional RAG systems retrieve context from batch-refreshed vector stores — embeddings computed hours ago from documents that may have changed since. This creates a fundamental accuracy problem: the LLM generates responses based on stale context, producing confident but incorrect answers. Streaming RAG solves this by continuously updating the retrieval layer as source data changes. A streaming database like RisingWave combines CDC ingestion, SQL transformations, and native vector search in a single system — keeping both structured context and vector embeddings fresh in real time.

This guide explains why batch RAG fails, how streaming RAG works, and how to implement it with architecture patterns from AWS, Confluent, and RisingWave.

The Staleness Problem in RAG

How Traditional RAG Works

Documents → Batch Embed (hourly/daily) → Vector Store → Retrieve → LLM → Response
  1. Documents are chunked and embedded during a batch job
  2. Embeddings are stored in a vector database (Pinecone, Weaviate, Chroma)
  3. When a user queries, the system retrieves the most similar embeddings
  4. Retrieved context is passed to the LLM for response generation

The problem: Between batch runs, the source data changes but the embeddings don't. The vector store serves stale context.

Real-World Failures from Stale RAG

FailureRoot CauseImpact
Air Canada chatbot hallucinated a non-existent bereavement discountPolicy document changed but embeddings weren't updatedLegal consequences, public embarrassment
Medical RAG recommended outdated treatmentStudy was retracted but vectors lingered in the indexIncorrect medical advice
Support agent referenced old pricingPrice changed 2 hours ago, next batch embedding runs in 4 hoursCustomer confusion, lost trust
Inventory assistant said item availableSold out 30 minutes ago, batch runs nightlyOversold products, fulfillment failures

The core issue: LLMs don't know their context is stale. They synthesize authoritative answers from outdated information without any hedging or uncertainty.

Batch vs Streaming RAG: The Numbers

MetricBatch RAGStreaming RAG
Context freshnessHours (3-24h typical)Sub-second to minutes
P95 latency>2 seconds (68% of production RAG)Sub-second achievable
QA accuracy improvementBaselineUp to 200% relative improvement
User drop-off from latency40% at >2 second P95Significantly lower
Embedding staleness windowHours to daysSeconds to minutes

What Is Streaming RAG?

Streaming RAG continuously updates the retrieval layer — both structured context and vector embeddings — as source data changes. Instead of batch re-embedding on a schedule, streaming RAG processes changes as they happen.

Two Approaches to Streaming RAG

1. Streaming Vector Updates (keep embeddings fresh)

Source DB → CDC → Streaming Processor → Embedding API → Vector Store → Retrieve

Every change triggers re-embedding of affected documents. The vector store always reflects the current state of source data.

2. Streaming Structured Context (skip embeddings for known queries)

Source DB → CDC → Streaming Database → Materialized Views → Agent queries via SQL

For known context patterns (customer profile, order status, account data), pre-computed SQL views are faster and fresher than embedding-based retrieval. No embedding latency, no semantic mismatch.

3. Hybrid (best of both)

Source DB → CDC → RisingWave → Materialized Views (structured) + Vector Index (semantic)
                                      ↓                              ↓
                               SQL queries                 Similarity search
                                      ↓                              ↓
                                Agent combines both for optimal context

How Industry Leaders Build Streaming RAG

AWS: Bedrock Knowledge Bases + Kinesis

AWS connects streaming to RAG through Amazon Bedrock:

  • Custom connectors ingest streaming data (Kafka, Kinesis) directly into Bedrock Knowledge Bases
  • RetrieveAndGenerateStream API for streaming LLM responses
  • Programmatic document ingestion enables continuous updates via API calls
  • Architecture: Kinesis → Managed Flink → Bedrock Knowledge Bases

Limitations: AWS-specific, requires Kinesis + Flink + Bedrock. Multiple managed services to coordinate.

Confluent's approach uses Flink SQL to build real-time embedding pipelines:

Data Preparation Phase:

  1. Ingest data changes from operational databases via Kafka
  2. Flink SQL calls embedding services (OpenAI, etc.) in-flight
  3. Enriched data with vectors upserted to vector database via sink connectors

Inference Phase:

  1. User query consumed from Kafka topic
  2. Query enriched with real-time data from vector store
  3. Enriched context sent to LLM

Limitations: Requires Kafka + Flink + vector database + LLM orchestration. Complex, multi-system architecture.

RisingWave: Unified Streaming RAG

RisingWave provides streaming RAG in a single system:

Source DB → RisingWave (CDC + SQL + Vector Search) → Agent
  • CDC: Native ingestion from PostgreSQL/MySQL
  • SQL transformations: Materialized views for structured context
  • Vector search (v2.6+): Native vector(n) data type with similarity operators
  • Serving: PostgreSQL protocol for both SQL and vector queries

No separate vector database, no Kafka, no Flink.

Architecture Patterns

Pattern 1: CDC → Continuous Embedding → Vector Store

For workloads requiring semantic similarity search over frequently changing documents:

PostgreSQL → Debezium/CDC → Kafka → Embedding Service → Vector DB
                                         (OpenAI)        (Pinecone)

Every database change triggers re-embedding. The vector store reflects the current document state within seconds of the source change.

Implementation with Confluent Flink:

-- Flink SQL: embed documents in-flight
INSERT INTO document_vectors
SELECT doc_id, content,
  ML_PREDICT('openai-embedding', content) as embedding
FROM document_changes;

Pattern 2: Streaming Materialized Views as Structured Context

For workloads where the context is structured and queryable (customer data, order status, inventory):

-- RisingWave: always-current structured context
CREATE MATERIALIZED VIEW customer_context AS
SELECT
  c.id, c.name, c.plan, c.plan_updated_at,
  COUNT(t.id) FILTER (WHERE t.status = 'open') as open_tickets,
  SUM(o.amount) FILTER (WHERE o.created_at > NOW() - INTERVAL '30 days') as spend_30d,
  last_value(i.event ORDER BY i.ts) as last_interaction
FROM customers c
LEFT JOIN tickets t ON c.id = t.customer_id
LEFT JOIN orders o ON c.id = o.customer_id
LEFT JOIN interactions i ON c.id = i.customer_id
GROUP BY c.id, c.name, c.plan, c.plan_updated_at;

Agent queries: SELECT * FROM customer_context WHERE id = 'cust_123' — sub-100ms, always fresh.

Key insight: For many RAG use cases, you don't need vector similarity search at all. Pre-computed SQL views provide faster, more precise context than embedding-based retrieval — especially for structured data.

Pattern 3: Hybrid RAG (Structured + Semantic)

Combine both approaches for maximum coverage:

-- Structured context (always-current via CDC)
CREATE MATERIALIZED VIEW product_context AS
SELECT product_id, name, description, price, stock_level, last_updated
FROM products;  -- CDC table from production DB

-- Vector index for semantic search
CREATE TABLE product_embeddings (
  product_id INT PRIMARY KEY,
  description_embedding VECTOR(1536)
);

-- Agent uses both:
-- 1. Exact lookup: SELECT * FROM product_context WHERE product_id = 123
-- 2. Semantic search: SELECT product_id FROM product_embeddings
--    ORDER BY description_embedding <-> $query_vector LIMIT 5

The agent combines precise structured data (current price, stock level) with semantic search (find similar products) for the richest possible context.

Streaming RAG vs Batch RAG: Architecture Comparison

AspectBatch RAGStreaming RAG (Confluent)Streaming RAG (RisingWave)
ComponentsBatch ETL + Vector DB + LLMKafka + Flink + Vector DB + LLMRisingWave (single system)
Context freshnessHoursSeconds-minutesSub-second
Structured context❌ (vectors only)Via Flink SQL✅ Materialized views
Vector search✅ Via vector DB✅ Via vector DB✅ Native (v2.6+)
CDC supportExternal (Fivetran, Airbyte)Debezium + KafkaNative (no middleware)
Query interfaceVector DB APIKafka + Vector DB APIPostgreSQL protocol (SQL)
ComplexityLow-mediumHighLow
CostVector DB + computeKafka + Flink + Vector DBSingle system

When to Use Which Pattern

Use batch RAG when:

  • Source data changes infrequently (daily or less)
  • Freshness within hours is acceptable
  • You only need semantic similarity (no structured context)
  • Volume is low enough that full re-embedding is feasible

Use streaming structured context when:

  • Context is structured (customer data, order status, inventory)
  • Queries are known in advance
  • Sub-second freshness is required
  • You want the simplest possible architecture

Use streaming vector updates when:

  • Context is unstructured (documents, knowledge base articles)
  • Semantic similarity is the primary retrieval method
  • Documents change frequently (hourly or more)

Use hybrid streaming RAG when:

  • You need both structured and semantic retrieval
  • Different context types have different freshness requirements
  • Maximum retrieval accuracy matters

Implementation: Real-Time RAG with RisingWave

End-to-End Example: Support Knowledge Base

-- Step 1: Ingest knowledge base articles via CDC
CREATE SOURCE kb_source WITH (
  connector = 'postgres-cdc',
  hostname = 'kb-db.internal',
  port = '5432',
  database.name = 'knowledge_base'
);

CREATE TABLE articles (
  article_id INT PRIMARY KEY,
  title VARCHAR,
  content TEXT,
  category VARCHAR,
  updated_at TIMESTAMP
) FROM kb_source TABLE 'public.articles';

-- Step 2: Structured context (always-current article metadata)
CREATE MATERIALIZED VIEW article_index AS
SELECT
  article_id,
  title,
  category,
  LENGTH(content) as content_length,
  updated_at,
  -- Extract first 200 chars as summary
  SUBSTRING(content, 1, 200) as summary
FROM articles;

-- Step 3: Full-text search for known queries
-- Agent queries by category and keyword
SELECT article_id, title, summary
FROM article_index
WHERE category = 'billing'
ORDER BY updated_at DESC
LIMIT 5;

-- Step 4: Vector search for semantic queries (v2.6+)
-- Store pre-computed embeddings
CREATE TABLE article_vectors (
  article_id INT PRIMARY KEY,
  embedding VECTOR(1536)
);

-- Semantic similarity search
SELECT a.article_id, a.title, a.summary
FROM article_vectors v
JOIN article_index a ON v.article_id = a.article_id
ORDER BY v.embedding <-> $query_embedding
LIMIT 5;

Agent Integration

import psycopg2
import openai

def get_rag_context(query: str, category: str = None) -> list:
    conn = psycopg2.connect(host="risingwave", port=4566, dbname="dev", user="root")
    cursor = conn.cursor()

    results = []

    # Structured retrieval (precise, fast)
    if category:
        cursor.execute("""
            SELECT title, summary FROM article_index
            WHERE category = %s ORDER BY updated_at DESC LIMIT 3
        """, (category,))
        results.extend([{"title": r[0], "text": r[1]} for r in cursor.fetchall()])

    # Semantic retrieval (broad, similarity-based)
    query_embedding = openai.embeddings.create(
        model="text-embedding-3-small", input=query
    ).data[0].embedding

    cursor.execute("""
        SELECT a.title, a.summary
        FROM article_vectors v
        JOIN article_index a ON v.article_id = a.article_id
        ORDER BY v.embedding <-> %s::vector
        LIMIT 3
    """, (str(query_embedding),))
    results.extend([{"title": r[0], "text": r[1]} for r in cursor.fetchall()])

    conn.close()
    return results

# Use in agent
context = get_rag_context("How do I change my billing address?", category="billing")

Both structured and semantic results are always current — updated within milliseconds of any article change in the source database.

Frequently Asked Questions

What is streaming RAG?

Streaming RAG continuously updates the retrieval layer — both structured context and vector embeddings — as source data changes, instead of batch-refreshing on a schedule. This ensures AI agents and LLMs always retrieve current information, eliminating the stale-context problem that causes hallucinations and incorrect responses.

How much does data freshness affect RAG accuracy?

Significantly. Research shows streaming RAG can improve QA accuracy by up to 200% relative to batch RAG, because the context retrieved is always current. The Air Canada chatbot incident — where a support bot hallucinated a non-existent policy due to stale RAG context — demonstrates the real-world risk of batch-refreshed retrieval.

Do I need a separate vector database for streaming RAG?

Not necessarily. RisingWave v2.6+ includes native vector search (vector(n) data type with similarity operators), enabling both structured SQL queries and vector similarity search in a single system. For large-scale vector workloads (billions of vectors with ANN indexing), a dedicated vector database (Pinecone, Weaviate) may still be beneficial.

How does RisingWave compare to Confluent for RAG?

Confluent uses Kafka + Flink + external vector database for streaming RAG — powerful but complex (3+ systems). RisingWave combines CDC, SQL transformations, and vector search in a single PostgreSQL-compatible system. RisingWave is simpler and cheaper; Confluent offers deeper enterprise integration and Flink-native embedding pipelines.

Can I use streaming RAG without vector embeddings?

Yes. For many RAG use cases — customer support, order inquiries, account management — structured SQL context (materialized views) is more precise and faster than vector similarity search. Use vector embeddings for semantic/fuzzy retrieval over unstructured documents. Use SQL views for precise, structured context. The hybrid approach combines both.

How does streaming RAG handle document chunking?

For streaming documents, use fixed-size chunking (deterministic, parallelizable) or sliding window chunking (maintains stable chunk IDs based on document_id + offset). Avoid semantic chunking for streaming — it requires full document context and is better suited for batch processing. When a document changes, only the affected chunks need re-embedding.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.