Building a Streaming Personalization Engine with RisingWave

Building a Streaming Personalization Engine with RisingWave

A streaming personalization engine with RisingWave updates user preference profiles, affinity scores, and recommendation features in real time as users browse and interact. By maintaining continuously updated feature tables as materialized views, your personalization API serves relevant content within milliseconds—without waiting for batch model retraining or overnight feature computation.

Why Batch Feature Pipelines Break Personalization

Most personalization systems follow this pattern: collect events during the day, run a nightly batch job to compute features, update the recommendation model, and serve those features the next day. This 24-hour cycle means your personalization engine is always flying blind about what users are doing right now.

A user who just read three articles about machine learning should see ML-related content recommendations immediately—not tomorrow. A user who just upgraded to a paid plan should stop seeing upgrade prompts the moment the event fires, not after the nightly sync.

Streaming personalization closes this loop. Feature tables update continuously, and your personalization API always reads current features.

Architecture of a Streaming Personalization Engine

The pipeline has four layers:

  1. Event ingestion — behavioral signals (views, clicks, ratings, purchases) from Kafka
  2. Feature computation — materialized views computing affinity scores, recency, and frequency
  3. Feature serving — RisingWave's PostgreSQL interface for low-latency feature lookups
  4. Recommendation dispatch — your ML model or rule engine reads live features and returns personalized results

The key insight: RisingWave replaces the batch feature pipeline. Your recommendation model stays the same; you just feed it fresher features.

Setting Up Event Ingestion

CREATE SOURCE content_interactions (
    event_id        VARCHAR,
    user_id         VARCHAR,
    content_id      VARCHAR,
    content_type    VARCHAR,
    category        VARCHAR,
    subcategory     VARCHAR,
    interaction_type VARCHAR,
    dwell_seconds   INTEGER,
    event_time      TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'content-interactions',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Load content catalog from your CMS database via CDC:

CREATE SOURCE content_catalog
FROM postgres-cdc WITH (
    hostname = 'cms-db',
    port = '5432',
    username = 'replicator',
    password = 'secret',
    database.name = 'cms',
    schema.name = 'public',
    table.name = 'content_items',
    slot.name = 'rw_slot'
);

Computing User Affinity Scores

Build a continuously updated affinity profile per user per content category:

CREATE MATERIALIZED VIEW user_category_affinity AS
SELECT
    user_id,
    category,
    window_start,
    window_end,
    COUNT(*) FILTER (WHERE interaction_type = 'view')           AS views,
    COUNT(*) FILTER (WHERE interaction_type = 'click')          AS clicks,
    COUNT(*) FILTER (WHERE interaction_type = 'share')          AS shares,
    COUNT(*) FILTER (WHERE interaction_type = 'save')           AS saves,
    SUM(dwell_seconds)                                           AS total_dwell_seconds,
    AVG(dwell_seconds)                                           AS avg_dwell_seconds,
    -- Weighted affinity score: saves and shares are strongest signals
    (COUNT(*) FILTER (WHERE interaction_type = 'view') * 1
     + COUNT(*) FILTER (WHERE interaction_type = 'click') * 3
     + COUNT(*) FILTER (WHERE interaction_type = 'save') * 5
     + COUNT(*) FILTER (WHERE interaction_type = 'share') * 7)  AS affinity_score
FROM TUMBLE(content_interactions, event_time, INTERVAL '7 DAYS')
GROUP BY user_id, category, window_start, window_end;

Real-Time Content Popularity Features

Track trending content using hopping windows to weight recent engagement more heavily:

CREATE MATERIALIZED VIEW content_trending_scores AS
SELECT
    content_id,
    content_type,
    category,
    window_start,
    window_end,
    COUNT(*) FILTER (WHERE interaction_type = 'view')           AS recent_views,
    COUNT(*) FILTER (WHERE interaction_type = 'click')          AS recent_clicks,
    COUNT(DISTINCT user_id)                                      AS unique_users,
    AVG(dwell_seconds)                                           AS avg_dwell,
    -- Trending score: recency-weighted engagement
    (COUNT(*) FILTER (WHERE interaction_type = 'view') * 1
     + COUNT(*) FILTER (WHERE interaction_type = 'click') * 2
     + COUNT(DISTINCT user_id) * 3)                              AS trending_score
FROM HOP(content_interactions, event_time, INTERVAL '15 MINUTES', INTERVAL '2 HOURS')
GROUP BY content_id, content_type, category, window_start, window_end;

Serving Features to Your Recommendation API

Your recommendation API queries RisingWave's PostgreSQL interface directly for user features at serving time:

-- Query executed by recommendation API at serving time
SELECT
    a.category,
    a.affinity_score,
    a.avg_dwell_seconds
FROM user_category_affinity a
WHERE a.user_id = $1
ORDER BY a.affinity_score DESC
LIMIT 5;

For item-level recommendations, join user affinity with trending content:

-- Top content to recommend for a given user
SELECT
    t.content_id,
    t.category,
    t.trending_score,
    a.affinity_score,
    (t.trending_score * 0.4 + a.affinity_score * 0.6)   AS combined_score
FROM content_trending_scores t
JOIN user_category_affinity a
    ON t.category = a.category
    AND a.user_id = $1
ORDER BY combined_score DESC
LIMIT 20;

Comparison: Personalization Feature Pipeline Approaches

DimensionNightly Batch FeaturesHourly Micro-BatchStreaming (RisingWave)
Feature freshness24 hours1 hourSeconds
New user cold start24-hour delay1-hour delayMinutes
Event-driven personalizationNot possibleApproximateNative
Infrastructure complexityHigh (MLOps + ETL)HighModerate (Kafka + RisingWave)
Feature serving latencyFast (cached)Fast (cached)Fast (materialized views)
Recency signal qualityStalePartially freshReal time

Sinking Features to a Feature Store

Push computed features to your ML feature store via JDBC:

CREATE SINK user_affinity_feature_store
FROM user_category_affinity
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://feature-store:5432/features',
    table.name = 'user_category_affinity',
    type = 'upsert',
    primary_key = 'user_id,category,window_start'
);

FAQ

Q: How does this work with collaborative filtering models that need historical data? RisingWave handles the real-time feature layer—recent affinity scores, trending signals, and current session context. Your collaborative filtering model still trains on historical data stored in your warehouse. At serving time, combine warehouse-derived user embeddings with RisingWave's real-time affinity features for a hybrid approach.

Q: Can RisingWave serve features at the millisecond latency required for real-time personalization? RisingWave materialized views are precomputed, so queries read from already-aggregated state rather than scanning raw events. For simple lookups (a user's top categories), query latency is typically 5–50ms, well within the 100–200ms budget for recommendation API calls.

Q: How do I handle new users with no history? The affinity materialized view returns no rows for users with no events. In your recommendation API, fall back to content_trending_scores for new users. As the user generates their first few interactions, the affinity view begins to populate, enabling personalized recommendations within minutes.

Q: Does RisingWave replace a dedicated feature store? For real-time feature computation and low-latency serving, RisingWave can serve as a lightweight feature store. For ML training pipelines that need historical feature snapshots and time-travel queries, a dedicated feature store (Feast, Tecton) may complement RisingWave's online serving layer.

Get Started

Build a personalization engine that knows what users care about right now.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.