Building a Real-Time Recommendation Engine with Streaming SQL

Building a Real-Time Recommendation Engine with Streaming SQL

Most recommendation engines run on batch pipelines. A nightly job crunches user activity logs, updates a model, and pushes new recommendations to a cache. By the time a user sees "Recommended for you," the data behind it is already stale.

This matters more than it sounds. A user who just purchased running shoes does not need running shoe recommendations for the next 24 hours. A product that is trending right now should surface immediately, not after tomorrow's batch run. The gap between user intent and recommendation freshness is where conversions are lost.

A real-time recommendation engine closes that gap. Instead of batch jobs, it processes user activity as a continuous stream and updates recommendations incrementally. Streaming SQL, specifically using materialized views in RisingWave, makes this possible with standard SQL rather than custom application code.

In this guide, you will build a real-time recommendation engine that combines item-based collaborative filtering, trending product detection, and category affinity scoring, all with streaming SQL in RisingWave. Every SQL example has been tested against RisingWave 2.8.0.

Why Streaming SQL for Recommendations

Traditional recommendation systems rely on a pipeline that looks like this: collect events in a data lake, run a batch job (Spark, Airflow) on a schedule, write results to a serving layer (Redis, DynamoDB), and serve from cache until the next batch run.

This architecture has three problems:

  • Latency: Recommendations reflect hours-old or day-old behavior. A user's current session context is invisible.
  • Operational complexity: You maintain separate systems for ingestion, processing, storage, and serving. Each needs monitoring, scaling, and failure handling.
  • Cold-start delay: New products or new users wait for the next batch cycle before they appear in any recommendation.

Streaming SQL replaces the batch processing step with continuously updating materialized views. A materialized view in RisingWave is a precomputed query result that updates incrementally as new data arrives. When a user clicks a product, the recommendation scores adjust within milliseconds, not hours.

The key advantage: you write standard SQL. No need to learn a new framework, manage JVM clusters, or write custom serialization logic. If you can write a GROUP BY query, you can build a streaming recommendation engine.

Modeling the Data: Users, Products, and Activity Streams

Every recommendation engine starts with three data entities: who is browsing (users), what they are browsing (products), and what they are doing (activities). In a production system, user activities would stream in from Apache Kafka or another message broker. For this tutorial, we use RisingWave tables to simulate the same behavior.

Creating the Base Tables

-- Product catalog
CREATE TABLE products (
    product_id INT PRIMARY KEY,
    product_name VARCHAR,
    category VARCHAR,
    price DECIMAL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Registered users
CREATE TABLE users (
    user_id INT PRIMARY KEY,
    username VARCHAR,
    signup_date TIMESTAMPTZ DEFAULT NOW()
);

-- User activity stream
-- In production, this would be a source connected to Kafka
CREATE TABLE user_activities (
    activity_id INT PRIMARY KEY,
    user_id INT,
    product_id INT,
    activity_type VARCHAR, -- 'view', 'click', 'add_to_cart', 'purchase'
    activity_time TIMESTAMPTZ DEFAULT NOW(),
    session_id VARCHAR
);

The user_activities table is the core stream. Every page view, click, add-to-cart, and purchase event flows into this table. In a production deployment, you would replace this with a Kafka source that ingests events directly from your application's event bus.

Loading Sample Data

To demonstrate the recommendation logic, load a small dataset with five users and ten products:

INSERT INTO products (product_id, product_name, category, price) VALUES
    (1, 'Wireless Headphones', 'Electronics', 79.99),
    (2, 'Running Shoes', 'Sports', 129.99),
    (3, 'Bluetooth Speaker', 'Electronics', 49.99),
    (4, 'Yoga Mat', 'Sports', 29.99),
    (5, 'Laptop Stand', 'Electronics', 45.00),
    (6, 'Water Bottle', 'Sports', 19.99),
    (7, 'USB-C Hub', 'Electronics', 39.99),
    (8, 'Resistance Bands', 'Sports', 24.99),
    (9, 'Mechanical Keyboard', 'Electronics', 149.99),
    (10, 'Fitness Tracker', 'Electronics', 89.99);

INSERT INTO users (user_id, username) VALUES
    (101, 'alice'), (102, 'bob'), (103, 'carol'),
    (104, 'dave'), (105, 'eve');

INSERT INTO user_activities (activity_id, user_id, product_id, activity_type, activity_time, session_id) VALUES
    -- Alice: browses and buys electronics
    (1, 101, 1, 'view', '2026-04-01 10:00:00+00', 'sess_a1'),
    (2, 101, 1, 'click', '2026-04-01 10:01:00+00', 'sess_a1'),
    (3, 101, 3, 'view', '2026-04-01 10:05:00+00', 'sess_a1'),
    (4, 101, 3, 'add_to_cart', '2026-04-01 10:06:00+00', 'sess_a1'),
    (5, 101, 5, 'view', '2026-04-01 10:10:00+00', 'sess_a1'),
    (6, 101, 5, 'purchase', '2026-04-01 10:15:00+00', 'sess_a1'),
    (7, 101, 9, 'view', '2026-04-01 11:00:00+00', 'sess_a2'),
    (8, 101, 9, 'click', '2026-04-01 11:01:00+00', 'sess_a2'),
    -- Bob: heavy electronics buyer, overlaps with Alice
    (9, 102, 1, 'view', '2026-04-01 09:00:00+00', 'sess_b1'),
    (10, 102, 1, 'purchase', '2026-04-01 09:05:00+00', 'sess_b1'),
    (11, 102, 3, 'view', '2026-04-01 09:10:00+00', 'sess_b1'),
    (12, 102, 3, 'purchase', '2026-04-01 09:15:00+00', 'sess_b1'),
    (13, 102, 7, 'view', '2026-04-01 09:20:00+00', 'sess_b1'),
    (14, 102, 7, 'purchase', '2026-04-01 09:25:00+00', 'sess_b1'),
    (15, 102, 9, 'view', '2026-04-01 09:30:00+00', 'sess_b1'),
    (16, 102, 9, 'purchase', '2026-04-01 09:35:00+00', 'sess_b1'),
    -- Carol: sports enthusiast
    (17, 103, 2, 'view', '2026-04-01 08:00:00+00', 'sess_c1'),
    (18, 103, 2, 'purchase', '2026-04-01 08:05:00+00', 'sess_c1'),
    (19, 103, 4, 'view', '2026-04-01 08:10:00+00', 'sess_c1'),
    (20, 103, 4, 'purchase', '2026-04-01 08:15:00+00', 'sess_c1'),
    (21, 103, 6, 'view', '2026-04-01 08:20:00+00', 'sess_c1'),
    (22, 103, 6, 'add_to_cart', '2026-04-01 08:25:00+00', 'sess_c1'),
    (23, 103, 8, 'view', '2026-04-01 08:30:00+00', 'sess_c1'),
    (24, 103, 8, 'purchase', '2026-04-01 08:35:00+00', 'sess_c1'),
    -- Dave: mixed interests, overlaps with Alice on electronics
    (25, 104, 1, 'view', '2026-04-01 12:00:00+00', 'sess_d1'),
    (26, 104, 1, 'click', '2026-04-01 12:01:00+00', 'sess_d1'),
    (27, 104, 5, 'view', '2026-04-01 12:05:00+00', 'sess_d1'),
    (28, 104, 5, 'purchase', '2026-04-01 12:10:00+00', 'sess_d1'),
    (29, 104, 2, 'view', '2026-04-01 12:15:00+00', 'sess_d1'),
    (30, 104, 2, 'click', '2026-04-01 12:16:00+00', 'sess_d1'),
    -- Eve: new user with minimal activity (cold-start case)
    (31, 105, 10, 'view', '2026-04-01 13:00:00+00', 'sess_e1');

The dataset reflects realistic patterns: Alice and Bob both like electronics (overlap for collaborative filtering), Carol prefers sports products, Dave has mixed interests, and Eve is a new user with only one interaction (a cold-start scenario).

Building the Recommendation Engine with Materialized Views

The recommendation engine consists of four materialized views, each handling a different aspect of the recommendation logic. Because these are streaming materialized views, they update automatically as new activity arrives.

Step 1: Weighted Interaction Scores

Not all user actions carry equal signal. A purchase indicates stronger intent than a page view. This materialized view assigns weighted scores to each user-product pair:

CREATE MATERIALIZED VIEW mv_user_product_interaction AS
SELECT
    user_id,
    product_id,
    SUM(
        CASE activity_type
            WHEN 'view' THEN 1
            WHEN 'click' THEN 2
            WHEN 'add_to_cart' THEN 3
            WHEN 'purchase' THEN 5
        END
    ) AS interaction_score,
    COUNT(*) AS interaction_count,
    MAX(activity_time) AS last_interaction
FROM user_activities
GROUP BY user_id, product_id;

Query the result to see how each user's engagement is scored:

SELECT * FROM mv_user_product_interaction
ORDER BY user_id, interaction_score DESC;
 user_id | product_id | interaction_score | interaction_count |     last_interaction
---------+------------+-------------------+-------------------+---------------------------
     101 |          5 |                 6 |                 2 | 2026-04-01 10:15:00+00:00
     101 |          3 |                 4 |                 2 | 2026-04-01 10:06:00+00:00
     101 |          9 |                 3 |                 2 | 2026-04-01 11:01:00+00:00
     101 |          1 |                 3 |                 2 | 2026-04-01 10:01:00+00:00
     102 |          1 |                 6 |                 2 | 2026-04-01 09:05:00+00:00
     102 |          7 |                 6 |                 2 | 2026-04-01 09:25:00+00:00
     102 |          3 |                 6 |                 2 | 2026-04-01 09:15:00+00:00
     102 |          9 |                 6 |                 2 | 2026-04-01 09:35:00+00:00
     ...

Alice's highest-scored product is the Laptop Stand (score 6: view + purchase). Bob has four products at score 6, all purchases. These scores form the foundation for collaborative filtering.

Step 2: Item-Based Collaborative Filtering

Collaborative filtering is a recommendation technique that finds patterns across users. The item-based variant answers the question: "Users who interacted with product A also interacted with product B." In SQL, this is a self-join on the interaction view:

CREATE MATERIALIZED VIEW mv_collaborative_filtering AS
SELECT
    a.product_id AS source_product_id,
    b.product_id AS recommended_product_id,
    COUNT(DISTINCT a.user_id) AS co_interaction_count,
    AVG(b.interaction_score) AS avg_score
FROM mv_user_product_interaction a
JOIN mv_user_product_interaction b
    ON a.user_id = b.user_id
    AND a.product_id <> b.product_id
GROUP BY a.product_id, b.product_id;

This view builds a product-to-product affinity graph. Every time a user interacts with a new product, the co-interaction counts update automatically. Query the results to see which products are strongly related:

SELECT
    cf.source_product_id,
    p1.product_name AS source_product,
    cf.recommended_product_id,
    p2.product_name AS recommended_product,
    cf.co_interaction_count,
    ROUND(cf.avg_score, 1) AS avg_score
FROM mv_collaborative_filtering cf
JOIN products p1 ON cf.source_product_id = p1.product_id
JOIN products p2 ON cf.recommended_product_id = p2.product_id
WHERE cf.co_interaction_count >= 2
ORDER BY cf.source_product_id, cf.co_interaction_count DESC, cf.avg_score DESC
LIMIT 15;
 source_product_id |   source_product    | recommended_product_id | recommended_product | co_interaction_count | avg_score
-------------------+---------------------+------------------------+---------------------+----------------------+-----------
                 1 | Wireless Headphones |                      5 | Laptop Stand        |                    2 |       6.0
                 1 | Wireless Headphones |                      3 | Bluetooth Speaker   |                    2 |       5.0
                 1 | Wireless Headphones |                      9 | Mechanical Keyboard |                    2 |       4.5
                 3 | Bluetooth Speaker   |                      9 | Mechanical Keyboard |                    2 |       4.5
                 3 | Bluetooth Speaker   |                      1 | Wireless Headphones |                    2 |       4.5
                 5 | Laptop Stand        |                      1 | Wireless Headphones |                    2 |       3.0
                 9 | Mechanical Keyboard |                      3 | Bluetooth Speaker   |                    2 |       5.0
                 9 | Mechanical Keyboard |                      1 | Wireless Headphones |                    2 |       4.5

The results make sense: Wireless Headphones strongly correlate with Laptop Stand and Bluetooth Speaker, since Alice, Bob, and Dave all interact with overlapping electronics products.

Trending products serve two purposes: they provide a fallback for users with sparse history, and they surface time-sensitive popularity signals. This materialized view aggregates activity across all users:

CREATE MATERIALIZED VIEW mv_trending_products AS
SELECT
    ua.product_id,
    p.product_name,
    p.category,
    COUNT(*) AS total_interactions,
    COUNT(DISTINCT ua.user_id) AS unique_users,
    SUM(CASE WHEN ua.activity_type = 'purchase' THEN 1 ELSE 0 END) AS purchase_count,
    SUM(CASE WHEN ua.activity_type = 'add_to_cart' THEN 1 ELSE 0 END) AS cart_count,
    MAX(ua.activity_time) AS last_activity
FROM user_activities ua
JOIN products p ON ua.product_id = p.product_id
GROUP BY ua.product_id, p.product_name, p.category;
SELECT product_name, category, total_interactions, unique_users, purchase_count
FROM mv_trending_products
ORDER BY total_interactions DESC, unique_users DESC;
    product_name     |  category   | total_interactions | unique_users | purchase_count
---------------------+-------------+--------------------+--------------+----------------
 Wireless Headphones | Electronics |                  6 |            3 |              1
 Bluetooth Speaker   | Electronics |                  4 |            2 |              1
 Laptop Stand        | Electronics |                  4 |            2 |              2
 Mechanical Keyboard | Electronics |                  4 |            2 |              1
 Running Shoes       | Sports      |                  4 |            2 |              1
 Yoga Mat            | Sports      |                  2 |            1 |              1
 USB-C Hub           | Electronics |                  2 |            1 |              1
 Water Bottle        | Sports      |                  2 |            1 |              0
 Resistance Bands    | Sports      |                  2 |            1 |              1
 Fitness Tracker     | Electronics |                  1 |            1 |              0

Wireless Headphones leads with 6 interactions from 3 unique users. In a production system with time-windowed aggregation, you could scope this to the last hour or last 15 minutes for truly real-time trending signals.

Step 4: Category Affinity Scoring

Category affinity captures broader user preferences. This is especially useful when a user has interacted with only a few products but shows a clear category preference:

CREATE MATERIALIZED VIEW mv_user_category_affinity AS
SELECT
    ua.user_id,
    p.category,
    COUNT(*) AS category_interactions,
    SUM(
        CASE ua.activity_type
            WHEN 'view' THEN 1
            WHEN 'click' THEN 2
            WHEN 'add_to_cart' THEN 3
            WHEN 'purchase' THEN 5
        END
    ) AS category_score
FROM user_activities ua
JOIN products p ON ua.product_id = p.product_id
GROUP BY ua.user_id, p.category;
SELECT * FROM mv_user_category_affinity ORDER BY user_id, category_score DESC;
 user_id |  category   | category_interactions | category_score
---------+-------------+-----------------------+----------------
     101 | Electronics |                     8 |             16
     102 | Electronics |                     8 |             24
     103 | Sports      |                     8 |             22
     104 | Electronics |                     4 |              9
     104 | Sports      |                     2 |              3
     105 | Electronics |                     1 |              1

The scores clearly separate user preferences: Bob is the most engaged electronics buyer (score 24), Carol dominates sports (score 22), and Dave shows a mixed profile leaning toward electronics.

Assembling Personalized Recommendations

With the four materialized views in place, you can combine them into a single query that generates personalized recommendations for any user. This query merges collaborative filtering signals with category affinity, filtering out products the user has already interacted with:

WITH user_interacted AS (
    SELECT product_id
    FROM mv_user_product_interaction
    WHERE user_id = 101
),
collaborative_recs AS (
    SELECT
        cf.recommended_product_id AS product_id,
        SUM(cf.co_interaction_count * cf.avg_score) AS cf_score
    FROM mv_collaborative_filtering cf
    WHERE cf.source_product_id IN (SELECT product_id FROM user_interacted)
      AND cf.recommended_product_id NOT IN (SELECT product_id FROM user_interacted)
    GROUP BY cf.recommended_product_id
),
category_recs AS (
    SELECT
        p.product_id,
        uca.category_score AS cat_score
    FROM mv_user_category_affinity uca
    JOIN products p ON p.category = uca.category
    WHERE uca.user_id = 101
      AND p.product_id NOT IN (SELECT product_id FROM user_interacted)
)
SELECT
    p.product_id,
    p.product_name,
    p.category,
    p.price,
    COALESCE(cr.cf_score, 0) AS collaborative_score,
    COALESCE(cat.cat_score, 0) AS category_score,
    COALESCE(cr.cf_score, 0) + COALESCE(cat.cat_score, 0) AS total_score
FROM products p
LEFT JOIN collaborative_recs cr ON p.product_id = cr.product_id
LEFT JOIN category_recs cat ON p.product_id = cat.product_id
WHERE cr.cf_score IS NOT NULL OR cat.cat_score IS NOT NULL
ORDER BY total_score DESC
LIMIT 5;
 product_id |  product_name   |  category   | price  | collaborative_score | category_score | total_score
------------+-----------------+-------------+--------+---------------------+----------------+-------------
          7 | USB-C Hub       | Electronics |  39.99 |                  18 |             16 |          34
         10 | Fitness Tracker | Electronics |  89.99 |                   0 |             16 |          16
          2 | Running Shoes   | Sports      | 129.99 |                   6 |              0 |           6

For Alice (user 101), the engine recommends:

  1. USB-C Hub (total score 34): Strong collaborative filtering signal (Bob, who shares Alice's taste, purchased it) plus electronics category affinity.
  2. Fitness Tracker (total score 16): No collaborative signal, but Alice's strong electronics affinity makes it relevant.
  3. Running Shoes (total score 6): Weak collaborative signal from Dave's overlapping behavior.

Notice what is absent: Sports products like Yoga Mat or Resistance Bands do not appear because Alice has no sports affinity, and no similar users connect her to those items.

Connecting to Production Data Streams

In a real deployment, you would not insert data manually. Instead, user activities stream from Kafka, and the product catalog syncs via change data capture (CDC). Here is how that looks with RisingWave sources:

-- Ingest user activities from a Kafka topic
CREATE SOURCE user_activity_source (
    activity_id INT,
    user_id INT,
    product_id INT,
    activity_type VARCHAR,
    activity_time TIMESTAMPTZ,
    session_id VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'user-activities',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Ingest product catalog updates via CDC from PostgreSQL
CREATE TABLE products_cdc (
    product_id INT PRIMARY KEY,
    product_name VARCHAR,
    category VARCHAR,
    price DECIMAL,
    updated_at TIMESTAMPTZ
) WITH (
    connector = 'postgres-cdc',
    hostname = 'postgres',
    port = '5432',
    username = 'replicator',
    password = 'secret',
    database.name = 'ecommerce',
    table.name = 'products'
);

With these sources defined, all four materialized views update continuously as events arrive from Kafka and product changes sync from PostgreSQL. The latency from event to updated recommendation is typically sub-second.

Performance Considerations for Production

Scaling the Self-Join

The collaborative filtering materialized view uses a self-join, which can be expensive at scale. For a catalog with millions of products and billions of interactions, consider these strategies:

  • Filter by time window: Add a WHERE activity_time > NOW() - INTERVAL '7 days' clause to limit the interaction scope using temporal filters.
  • Score threshold: Filter out low-signal interactions (e.g., WHERE interaction_score >= 3) to reduce the join cardinality.
  • Parallelism: RisingWave automatically parallelizes materialized view computation across available CPU cores.

Serving Recommendations

For high-traffic applications, query the materialized views directly from your API server using a PostgreSQL client. RisingWave serves read queries against materialized views with millisecond latency since results are precomputed. For extremely high read volumes (100k+ QPS), you can add a sink to Redis to serve recommendations from an in-memory cache.

Handling the Cold-Start Problem

New users with little or no activity present a challenge for collaborative filtering. The category affinity and trending product views provide fallback strategies:

  • For a brand-new user, serve trending products from mv_trending_products.
  • After a few interactions, category affinity kicks in and narrows recommendations.
  • Once the user has enough history, collaborative filtering dominates.

This layered approach is exactly what the hybrid query above implements: collaborative signals take priority when available, with category affinity as a secondary signal.

What Is a Real-Time Recommendation Engine?

A real-time recommendation engine is a system that generates personalized product or content suggestions based on user behavior as it happens, rather than relying on precomputed batch results. It processes user activity streams (clicks, views, purchases) continuously and updates its recommendation scores within milliseconds of each new event.

Unlike batch-based systems that refresh recommendations on a fixed schedule, a real-time engine uses stream processing to maintain always-current recommendation state. This means a user who just added headphones to their cart immediately sees related accessories, not yesterday's generic suggestions.

How Does Collaborative Filtering Work in Streaming SQL?

Collaborative filtering in streaming SQL works by maintaining a continuously updated materialized view that tracks product co-occurrence across users. When user A and user B both interact with the same products, the system infers that other products in user B's history are relevant to user A.

In RisingWave, this is implemented as a self-join on a user-product interaction materialized view. The streaming engine incrementally updates co-occurrence counts as each new event arrives, so recommendation scores reflect the latest user behavior without re-scanning historical data.

When Should You Use Streaming SQL Instead of a Machine Learning Model?

Streaming SQL is the right choice when your recommendation logic can be expressed as aggregations, joins, and scoring rules rather than neural network inference. It excels at item-based collaborative filtering, trending detection, and rule-based scoring. You also benefit from the operational simplicity of maintaining SQL queries instead of ML training pipelines and model serving infrastructure.

If your use case requires deep personalization with embeddings, content-based features, or reinforcement learning, a dedicated ML system is more appropriate. That said, many production recommendation engines start with SQL-based approaches and only add ML components when the SQL baseline is no longer sufficient.

RisingWave and Apache Flink can both power real-time recommendation engines, but they differ in developer experience and operational complexity. RisingWave uses a PostgreSQL-compatible SQL interface, stores state internally, and requires no external state backend like RocksDB or S3 checkpointing. Flink requires a JVM-based runtime, separate state management, and more operational overhead for deployment and scaling.

For recommendation engines built primarily with SQL logic (aggregations, joins, window functions), RisingWave provides a simpler path from development to production. Flink offers more flexibility for custom operators and Java/Python UDFs if your recommendation logic requires complex procedural code.

Conclusion

Building a real-time recommendation engine does not require a complex ML pipeline or a fleet of batch processing jobs. With streaming SQL in RisingWave, you can implement the core recommendation patterns using four materialized views:

  • Weighted interaction scores that quantify user intent from raw activity events.
  • Item-based collaborative filtering via a self-join that finds product affinities across users.
  • Trending product detection that surfaces globally popular items in real time.
  • Category affinity scoring that captures broader user preferences for cold-start handling.

These views update incrementally, sub-second, as new events arrive. The hybrid recommendation query combines all signals to produce personalized results that exclude products the user has already engaged with.

The SQL you write is standard, tested, and readable. No custom frameworks, no JVM tuning, no model retraining schedules. As your data volume grows, RisingWave scales the computation automatically across available resources.


Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart ->

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.