Real-Time Embeddings Pipeline: Streaming Database Meets Vector Search

Real-Time Embeddings Pipeline: Streaming Database Meets Vector Search

A real-time embeddings pipeline that uses a streaming database sits between your raw event stream and your vector database. Instead of embedding every raw event, RisingWave pre-aggregates events into compact entity documents (one row per product, user, or article) and signals your embedding service only when that document materially changes. This cuts embedding API calls by 10x to 100x, keeps your vector index semantically current, and unlocks hybrid search that combines vector similarity with structured filters computed in SQL.

Why Embedding Raw Events Is a Bad Idea

Most teams building semantic search over live data make the same mistake: they embed events as they arrive.

A product receives 500 page-view events per day. A user generates 80 interactions per session. An article gets edited 12 times before publication. If you embed each event as it arrives, you write 500 nearly-identical vectors for the product, 80 vectors for the user's session, and 12 vectors for the same article. Your vector index fills up with redundant signal. Each similarity search runs over a bloated index. Deduplication logic becomes a separate engineering problem.

The root cause is a missing aggregation layer. Raw events are the right unit for audit logs and analytics. They are the wrong unit for embedding.

A streaming database solves this by sitting between your event stream and your embedding model. It maintains an always-current, aggregated view of each entity: one product document that includes its description, average rating, and review text; one user preference document that reflects everything they have purchased and rated. When the document changes, and only then, the embedding service fetches it, computes a new vector, and upserts into the vector database.

Architecture: Raw Events to Searchable Vectors

The pipeline has four stages:

graph LR
    A[Raw Events<br/>Kafka / CDC / API] --> B[RisingWave<br/>Streaming Database]
    B --> C{Changed<br/>documents?}
    C -->|YES - one row per entity| D[Embedding Model<br/>OpenAI / Cohere / local]
    C -->|NO - skip| B
    D --> E[Vector Database<br/>pgvector / Qdrant / Pinecone]
    E --> F[Application<br/>Hybrid Search]
    B --> F

Stage 1 - Event ingestion. Raw events arrive from Kafka topics, CDC connectors, or direct inserts. RisingWave ingests these at streaming speed.

Stage 2 - Aggregation in RisingWave. Materialized views maintain one row per entity (product, user, document). Each row contains a text document ready for embedding, plus structured metadata for filtering. The view updates incrementally when any source row changes.

Stage 3 - Embedding service. A lightweight process polls RisingWave for entities where last_event_at changed since the last sync. It fetches the pre-aggregated document, calls the embedding API once per entity, and upserts the vector into the vector database.

Stage 4 - Hybrid search. The application sends a query to the vector database for semantic similarity, then filters or re-ranks results using structured metadata from RisingWave: price, rating, category, recency. This combination of semantic relevance and structured precision produces better search results than either approach alone.

Building the Source Tables

We will use a product catalog with user interaction events. The same pattern applies to any domain: articles and reading events, jobs and application events, documents and edit events.

-- Product catalog (could be populated via CDC from your e-commerce DB)
CREATE TABLE emb_products (
    product_id   INT PRIMARY KEY,
    name         VARCHAR,
    brand        VARCHAR,
    category     VARCHAR,
    price        NUMERIC,
    description  VARCHAR
);

-- Raw user interaction events (could be a Kafka source in production)
CREATE TABLE emb_user_events (
    event_id     BIGINT PRIMARY KEY,
    user_id      INT,
    product_id   INT,
    event_type   VARCHAR,   -- 'view', 'add_to_cart', 'purchase', 'review'
    rating       INT,       -- for review events (1-5), NULL otherwise
    review_text  VARCHAR,
    event_at     TIMESTAMP
);

In production you would replace these tables with source definitions pointing to Kafka topics or CDC connectors. The materialized view logic is identical either way. See the RisingWave CDC documentation for how to connect directly to PostgreSQL or MySQL without a Kafka intermediary.

Layer 1: Per-Product Feature Aggregation

The first materialized view aggregates raw events into per-product metrics. This is the layer that compresses thousands of events into one row per product.

CREATE MATERIALIZED VIEW emb_product_features AS
SELECT
    p.product_id,
    p.name                                                              AS product_name,
    p.brand,
    p.category,
    p.price,
    p.description,
    COUNT(e.event_id)                                                   AS total_events,
    COUNT(CASE WHEN e.event_type = 'view'        THEN 1 END)           AS view_count,
    COUNT(CASE WHEN e.event_type = 'add_to_cart' THEN 1 END)           AS cart_count,
    COUNT(CASE WHEN e.event_type = 'purchase'    THEN 1 END)           AS purchase_count,
    COUNT(CASE WHEN e.event_type = 'review'      THEN 1 END)           AS review_count,
    AVG(CASE WHEN e.event_type = 'review' THEN e.rating END)           AS avg_rating,
    STRING_AGG(
        CASE WHEN e.event_type = 'review' THEN e.review_text END,
        ' | '
    )                                                                   AS review_corpus,
    MAX(e.event_at)                                                     AS last_event_at
FROM emb_products p
LEFT JOIN emb_user_events e ON p.product_id = e.product_id
GROUP BY p.product_id, p.name, p.brand, p.category, p.price, p.description;

Query the view to see what it produces:

SELECT product_id, product_name, category, view_count, purchase_count, avg_rating
FROM emb_product_features
ORDER BY product_id;
 product_id |             product_name             |  category   | view_count | purchase_count | avg_rating
------------+--------------------------------------+-------------+------------+----------------+------------
          1 | Wireless Noise-Cancelling Headphones | electronics |          3 |              1 |          5
          2 | Mechanical Gaming Keyboard           | electronics |          2 |              1 |          4
          3 | Ergonomic Office Chair               | furniture   |          1 |              1 |          5
          4 | Running Shoes                        | apparel     |          1 |              0 |
          5 | Smart Water Bottle                   | fitness     |          1 |              0 |
          6 | USB-C Hub 7-in-1                     | electronics |          1 |              1 |
          7 | Yoga Mat Premium                     | fitness     |          1 |              1 |          3
          8 | Standing Desk Converter              | furniture   |          1 |              1 |

Eight products. Eight rows. The events that produced these metrics number in the dozens today and will number in the millions next year. The materialized view always stays at one row per product.

Layer 2: Constructing the Embedding Document

The second materialized view builds the text document that will be sent to your embedding model. This is a critical design decision: the document should capture the entity's identity and crowd-sourced signal in a compact, semantically dense form.

CREATE MATERIALIZED VIEW emb_product_embedding_input AS
SELECT
    product_id,
    product_name,
    last_event_at,
    -- Compact document for the embedding model
    'Product: ' || product_name ||
    '. Brand: ' || brand ||
    '. Category: ' || category ||
    '. Price: $' || price::TEXT ||
    '. Description: ' || description ||
    CASE
        WHEN review_count > 0 THEN
            '. Average rating: ' || ROUND(avg_rating, 1)::TEXT || '/5' ||
            ' from ' || review_count::TEXT || ' reviews.' ||
            ' Sample reviews: ' || COALESCE(review_corpus, '')
        ELSE ''
    END ||
    '. Views: ' || view_count::TEXT ||
    '. Purchases: ' || purchase_count::TEXT
    AS embedding_document,
    -- Structured metadata for hybrid search filters (NOT embedded)
    category,
    price,
    avg_rating,
    purchase_count,
    review_count
FROM emb_product_features;

Check what the document looks like for a specific product:

SELECT product_id, embedding_document
FROM emb_product_embedding_input
WHERE product_id = 1;
 product_id | embedding_document
------------+---------------------------------------------------------------------------
          1 | Product: Wireless Noise-Cancelling Headphones. Brand: SoundPro.
            | Category: electronics. Price: $249.99.
            | Description: Over-ear headphones with active noise cancellation and
            | 30-hour battery life. Average rating: 5/5 from 1 reviews.
            | Sample reviews: Amazing sound quality, worth every penny.
            | Views: 3. Purchases: 1

This is the string you send to openai.embeddings.create() or your local model. It is semantically rich but compact. You are not embedding 500 raw click events. You are embedding one well-structured summary.

The Incremental Update in Action

This is where the architecture pays off. When new events arrive, RisingWave recomputes only the affected rows.

A new user visits the headphones product, purchases it, and leaves a review:

INSERT INTO emb_user_events VALUES
(24, 105, 1, 'view',     NULL, NULL,                                    '2026-03-31 10:00:00'),
(25, 105, 1, 'purchase', NULL, NULL,                                    '2026-03-31 10:20:00'),
(26, 105, 1, 'review',   4,    'Battery life is incredible, 28h tested', '2026-03-31 12:00:00');

Within seconds, emb_product_features updates for product 1 only. The review count goes from 1 to 2, average rating drops from 5.0 to 4.5, and the review corpus gains the new text. Query the result:

SELECT product_id, product_name, view_count, purchase_count, review_count, avg_rating
FROM emb_product_features
WHERE product_id = 1;
 product_id |             product_name             | view_count | purchase_count | review_count | avg_rating
------------+--------------------------------------+------------+----------------+--------------+------------
          1 | Wireless Noise-Cancelling Headphones |          4 |              2 |            2 |       4.50

The embedding document reflects the change immediately:

SELECT product_id, embedding_document
FROM emb_product_embedding_input
WHERE product_id = 1;
 product_id | embedding_document
------------+---------------------------------------------------------------------------
          1 | Product: Wireless Noise-Cancelling Headphones. Brand: SoundPro.
            | Category: electronics. Price: $249.99.
            | Description: Over-ear headphones with active noise cancellation and
            | 30-hour battery life. Average rating: 4.5/5 from 2 reviews.
            | Sample reviews: Amazing sound quality, worth every penny |
            | Battery life is incredible, 28h tested. Views: 4. Purchases: 2

Your embedding sync agent queries for changed products using last_event_at as a watermark:

SELECT
    product_id,
    product_name,
    last_event_at,
    embedding_document
FROM emb_product_embedding_input
WHERE last_event_at > '2026-03-30 00:00:00'
ORDER BY last_event_at DESC;
 product_id |             product_name             |    last_event_at    | embedding_document
------------+--------------------------------------+---------------------+---------...
          1 | Wireless Noise-Cancelling Headphones | 2026-03-31 12:00:00 | Product: ...
          7 | Yoga Mat Premium                     | 2026-03-30 18:00:00 | Product: ...
          8 | Standing Desk Converter              | 2026-03-30 11:20:00 | Product: ...
          2 | Mechanical Gaming Keyboard           | 2026-03-30 10:30:00 | Product: ...
          6 | USB-C Hub 7-in-1                     | 2026-03-30 10:15:00 | Product: ...
          3 | Ergonomic Office Chair               | 2026-03-30 08:00:00 | Product: ...

You pass this watermark to last_event_at, process the changed documents, call your embedding API once per row, and upsert into your vector database. Only entities that actually changed are re-embedded.

Volume Reduction: The Core Benefit

The math is straightforward. With a raw-event-embedding approach:

  • 8 products, 26 events so far
  • Every event triggers an embedding call
  • 26 API calls today, scaling linearly with event volume

With the RisingWave aggregation approach:

SELECT
    'Raw user events'           AS data_type,
    COUNT(*)                    AS row_count
FROM emb_user_events
UNION ALL
SELECT
    'Embedding input documents',
    COUNT(*)
FROM emb_product_embedding_input;
         data_type          | row_count
----------------------------+-----------
 Raw user events            |        26
 Embedding input documents  |         8

At catalog scale (millions of events, hundreds of thousands of products) the difference is 2 to 3 orders of magnitude. You are also making fewer, more meaningful updates to your vector index, which means fewer stale vectors and less index bloat.

This pattern is explored in more depth in How to Reduce LLM Token Costs with Pre-Computed Materialized Views, which applies the same pre-aggregation idea to LLM prompt context. The underlying principle is identical: aggregate before you pay per-token or per-embedding costs.

The same pattern applies to users. Instead of embedding a user's raw clickstream, you maintain a preference summary:

CREATE MATERIALIZED VIEW emb_user_preference_summary AS
SELECT
    e.user_id,
    STRING_AGG(DISTINCT p.category, ', ')                              AS interested_categories,
    COUNT(CASE WHEN e.event_type = 'purchase' THEN 1 END)             AS total_purchases,
    COUNT(CASE WHEN e.event_type = 'review'   THEN 1 END)             AS total_reviews,
    AVG(CASE WHEN e.event_type = 'review' THEN e.rating END)          AS avg_review_given,
    MIN(CASE WHEN e.event_type = 'purchase' THEN p.price END)         AS min_purchase_price,
    MAX(CASE WHEN e.event_type = 'purchase' THEN p.price END)         AS max_purchase_price,
    -- Text document for query-side embedding (personalized search)
    'User interested in: ' ||
        STRING_AGG(DISTINCT p.category, ', ') ||
    '. Purchased products: ' ||
        STRING_AGG(
            CASE WHEN e.event_type = 'purchase' THEN p.name END,
            ', '
        ) ||
    CASE
        WHEN AVG(CASE WHEN e.event_type = 'review' THEN e.rating END) IS NOT NULL THEN
            '. Average rating given: ' ||
            ROUND(AVG(CASE WHEN e.event_type = 'review' THEN e.rating END), 1)::TEXT || '/5'
        ELSE ''
    END                                                                AS user_preference_document,
    MAX(e.event_at)                                                    AS last_active_at
FROM emb_user_events e
JOIN emb_products p ON e.product_id = p.product_id
GROUP BY e.user_id;

Check what the preference summary looks like:

SELECT user_id, interested_categories, total_purchases, avg_review_given,
       min_purchase_price, max_purchase_price
FROM emb_user_preference_summary
ORDER BY user_id;
 user_id | interested_categories  | total_purchases | avg_review_given | min_purchase_price | max_purchase_price
---------+------------------------+-----------------+------------------+--------------------+--------------------
     101 | electronics            |               2 |             4.50 |             149.99 |             249.99
     102 | electronics, furniture |               1 |                5 |             399.99 |             399.99
     103 | apparel, fitness       |               1 |                3 |              79.99 |              79.99
     104 | electronics, furniture |               2 |                  |              69.99 |             299.99
     105 | electronics            |               1 |                4 |             249.99 |             249.99

For personalized search, at query time you fetch the user's preference document from RisingWave, embed it, and use it as the query vector. The result is a semantic search shaped by the user's actual behavior rather than their profile attributes.

SELECT user_id, user_preference_document
FROM emb_user_preference_summary
WHERE user_id = 101;
 user_id | user_preference_document
---------+---------------------------------------------------------------------------------------------
     101 | User interested in: electronics. Purchased products: Wireless Noise-Cancelling Headphones,
         | Mechanical Gaming Keyboard. Average rating given: 4.5/5

This document encodes what the user has actually purchased and how they rate products. It is a far better query vector for "what should I recommend next?" than a raw embedding of their last 100 click events. For a deeper look at how streaming databases and vector databases complement each other for AI workloads, see Serving Fresh Data to AI Agents: Streaming Database vs Vector DB.

Hybrid Search: Structured Filters + Semantic Similarity

The most valuable capability in this architecture is hybrid search: combining vector similarity (semantic relevance) with structured filters (price, category, rating, recency) computed in RisingWave.

Pure vector search has a well-known failure mode. A user searches for "noise cancelling headphones under $100" and the vector model returns a premium $350 pair as the top result because it is semantically similar. The price constraint was not encoded in the embedding.

The solution is to keep structured metadata in RisingWave alongside the embedding documents, then apply filters in SQL before or after the vector similarity step.

Create a view that exposes structured signals for post-retrieval filtering:

CREATE MATERIALIZED VIEW emb_hybrid_search_candidates AS
SELECT
    f.product_id,
    f.product_name,
    f.category,
    f.price,
    f.avg_rating,
    f.purchase_count,
    f.review_count,
    ROUND(
        COALESCE(f.avg_rating, 3.0) * 0.4
        + LEAST(f.purchase_count, 10) * 0.3
        + LEAST(f.view_count, 20) * 0.1
        + CASE WHEN f.review_count > 0 THEN 1 ELSE 0 END * 0.2,
        2
    )                                                            AS structured_score
FROM emb_product_features f;

In production, your application retrieves a broad candidate set from the vector database using semantic search, then queries RisingWave to apply structured filters and re-rank:

-- Filter and re-rank vector search results using structured signals
-- In production: replace (1, 2, 6) with the product_ids from vector DB top-k results
SELECT
    product_id,
    product_name,
    category,
    price,
    avg_rating,
    purchase_count,
    structured_score
FROM emb_hybrid_search_candidates
WHERE category = 'electronics'
  AND price < 200
  AND (avg_rating IS NULL OR avg_rating >= 3.5)
ORDER BY structured_score DESC, purchase_count DESC;
 product_id |        product_name        |  category   | price  | avg_rating | purchase_count | structured_score
------------+----------------------------+-------------+--------+------------+----------------+------------------
          2 | Mechanical Gaming Keyboard | electronics | 149.99 |          4 |              1 |              2.3
          6 | USB-C Hub 7-in-1           | electronics |  69.99 |            |              1 |             1.60

This is the hybrid search pattern: semantic retrieval narrows the candidate set to semantically relevant items; structured filtering applies exact business constraints; and a combined score (blending vector similarity with purchase count, rating, and recency from RisingWave) produces the final ranked list.

The structured metadata in RisingWave updates incrementally as new events arrive. A product that crosses 4.0 average rating (after collecting more reviews) automatically improves its position in hybrid search results within seconds, with no batch reindex required.

Connecting the Embedding Sync Agent

The sync agent is a simple loop that runs on a schedule or in response to RisingWave Kafka sinks. Here is the minimal Python pattern:

import psycopg2
import openai

RW_CONN = psycopg2.connect(
    host="localhost", port=4566, user="root", dbname="dev"
)
openai_client = openai.OpenAI()

def sync_changed_products(since_ts: str):
    with RW_CONN.cursor() as cur:
        cur.execute("""
            SELECT product_id, embedding_document
            FROM emb_product_embedding_input
            WHERE last_event_at > %s
            ORDER BY last_event_at DESC
        """, (since_ts,))
        rows = cur.fetchall()

    for product_id, document in rows:
        response = openai_client.embeddings.create(
            model="text-embedding-3-small",
            input=document
        )
        vector = response.data[0].embedding
        # Upsert vector into your vector DB (pgvector, Qdrant, Pinecone, etc.)
        upsert_to_vector_db(product_id, vector, metadata={...})

    return len(rows)

The key is the WHERE last_event_at > %s clause. RisingWave maintains last_event_at incrementally. Your sync agent only fetches and re-embeds entities that have received new events since the last sync. When zero products change, zero embedding API calls are made.

For a complete example of polling RisingWave for fresh entity state and feeding it to downstream AI systems, see Building AI Agent Memory with a Streaming Database.

Why This Matters for Incremental Materialized Views

RisingWave's incremental computation engine is the reason this architecture works at scale. When a new review arrives for product 1, RisingWave does not recompute emb_product_features for all 8 products. It locates the group product_id = 1, applies the delta (one new row), and propagates the updated aggregate forward through the view graph.

This means the latency from event arrival to updated embedding document is bounded by barrier latency (typically sub-second in a healthy cluster), not by the size of your product catalog. A catalog of 10 million products behaves identically to one with 100.

The same incremental property applies to emb_product_embedding_input and emb_user_preference_summary. Each materialized view in the graph maintains its own internal state. Changes propagate forward only to the rows that are actually affected.

Production Architecture Notes

Schema evolution. When your embedding model changes, all vectors need to be recomputed. RisingWave's last_event_at watermark approach makes this straightforward: set the watermark to the epoch (beginning of time), and your sync agent will process every entity in order without special logic.

Multiple embedding models. You can create separate embedding_document columns for different model families (for example, a short document for a lightweight model and a longer one for a more capable model). The materialized view can carry both without extra cost since it is computed once and cached.

Sink-based triggering. Rather than polling on a schedule, you can create a Kafka sink from emb_product_embedding_input that emits only changed rows. Your embedding service consumes from that Kafka topic and processes changes as they arrive, eliminating the polling interval entirely.

-- Emit changed product documents to Kafka for embedding service consumption
CREATE SINK emb_product_changes_sink AS
SELECT product_id, product_name, embedding_document, last_event_at
FROM emb_product_embedding_input
WITH (
    connector = 'kafka',
    properties.bootstrap.server = 'kafka:9092',
    topic = 'product-embedding-updates',
    type = 'upsert',
    primary_key = 'product_id'
);

Vector database choice. This pattern works with any vector database. If you are already running PostgreSQL (or RisingWave itself), pgvector lets you store vectors in the same database. For larger-scale retrieval, Qdrant, Pinecone, and Weaviate all support metadata filtering that maps directly to the structured fields you maintain in RisingWave.

Frequently Asked Questions

How often should I re-embed an entity?

Re-embed when the entity's document changes meaningfully. With the watermark approach, you re-embed on every change. If your embedding API costs are a concern, add a minimum change threshold: only re-embed product 1 if its avg_rating changed by more than 0.2, or if it received a new review. You can encode this logic as an additional filter in the sync query.

Does RisingWave store the vectors?

No. RisingWave stores the structured features and the text documents used to generate vectors. The vectors themselves live in your vector database (pgvector, Qdrant, Pinecone, etc.). This separation is intentional: RisingWave excels at incremental aggregation over structured and semi-structured data, while vector databases excel at approximate nearest-neighbor search over high-dimensional vectors.

Can this work without Kafka?

Yes. The source tables in this tutorial are plain RisingWave tables. In production, you can connect to Kafka topics, PostgreSQL CDC, MySQL CDC, or Kinesis as source connectors. The materialized view logic is identical regardless of the source type. The embedding sync agent always reads from RisingWave's PostgreSQL-compatible interface.

What happens when a product has no reviews yet?

The CASE WHEN review_count > 0 block in emb_product_embedding_input omits the rating and review text for products with no reviews. The embedding document falls back to description, price, and behavioral signals (view count, purchase count). This is better than embedding a placeholder like "No reviews yet," which would cluster unreviewed products together in the vector space regardless of their actual category or price.

Conclusion

A real-time embeddings pipeline with a streaming database and a vector database gives you three things that no other architecture delivers together:

  • Volume reduction: RisingWave aggregates millions of raw events into one document per entity, cutting embedding API calls by 10x to 100x
  • Semantic freshness: embedding documents update within seconds of new events, keeping your vector index current without batch reindexing
  • Hybrid search: structured signals (price, rating, category, recency) maintained incrementally in RisingWave augment vector similarity to produce results that are both semantically relevant and precisely filtered

The architecture scales from a local RisingWave instance to a production cluster handling millions of events per second without changing a line of SQL. The materialized views stay at one row per entity regardless of event volume.

All SQL in this article was tested on RisingWave 2.8.0.


Ready to build your real-time embeddings pipeline? Get started with RisingWave in minutes at docs.risingwave.com/get-started.

Join the RisingWave Slack community to connect with ML engineers and AI platform engineers building similar pipelines.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.