Real-Time Product Recommendation Signals with Streaming SQL

Real-Time Product Recommendation Signals with Streaming SQL

Real-time product recommendation signals are the behavioral events — views, add-to-carts, purchases, and co-purchase patterns — that indicate what a customer is likely to buy next. Streaming SQL lets you compute these signals continuously from your event stream, feeding recommendation engines with data that's seconds old rather than hours old.

Why This Matters for E-Commerce

Recommendation engines are among the highest-ROI features in e-commerce. Studies consistently show that personalized recommendations drive 10-30% of revenue on mature platforms. But most recommendation systems suffer from a silent flaw: their input signals are stale.

A typical pipeline works like this: clickstream events land in a data lake, a nightly or hourly batch job computes co-purchase matrices and view-sequence patterns, and those results feed into a serving layer. By the time a customer sees a recommendation, the signal data is hours old. The customer who just added a camera to their cart might be shown phone cases instead of camera bags.

The problem compounds during high-traffic events. On Black Friday, purchase patterns shift by the hour. A batch job computed at midnight is worse than useless by 3 PM — it's actively misleading.

Real-time recommendation signals close this gap. By processing behavioral events as they happen, you maintain fresh co-purchase graphs, trending product lists, and per-session affinity scores that reflect what's actually happening right now.

How Streaming SQL Solves This

Streaming SQL lets you define the signal computation logic once and have it maintained continuously. RisingWave, a PostgreSQL-compatible streaming database, ingests event streams from Kafka and maintains materialized views that are always current.

For recommendations, the key signals are:

  • Co-purchase signals: products frequently bought together in the same order
  • View sequences: products viewed in succession within a session
  • Add-to-cart events: strong purchase intent signals
  • Collaborative filtering signals: products bought by similar customers

Each of these is expressible as a SQL aggregation over a stream. RisingWave maintains the aggregates incrementally — when a new order event arrives, only the affected product pairs are updated.

Step-by-Step Tutorial

Step 1: Data Source

Ingest behavioral events from Kafka. A single topic with an event type field covers views, cart adds, and purchases.

CREATE SOURCE behavioral_events (
    event_id      VARCHAR,
    session_id    VARCHAR,
    customer_id   VARCHAR,
    product_id    VARCHAR,
    event_type    VARCHAR,   -- 'view', 'add_to_cart', 'purchase'
    category      VARCHAR,
    price         NUMERIC,
    event_ts      TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'behavioral_events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View — Co-Purchase Signals

Compute co-purchase frequency: pairs of products that appear together in completed orders. This is the foundation of "customers also bought" recommendations.

CREATE MATERIALIZED VIEW copurchase_signals AS
SELECT
    a.product_id    AS product_a,
    b.product_id    AS product_b,
    COUNT(*)        AS copurchase_count,
    MAX(a.event_ts) AS last_seen_ts
FROM behavioral_events a
JOIN behavioral_events b
    ON  a.session_id  = b.session_id
    AND a.event_type  = 'purchase'
    AND b.event_type  = 'purchase'
    AND a.product_id  < b.product_id   -- avoid duplicate pairs
GROUP BY a.product_id, b.product_id;

Step 3: View Sequences and Add-to-Cart Affinity

Track products viewed within the same session and products added to cart but not purchased (high-intent, incomplete signals useful for retargeting and "frequently viewed together").

-- Products frequently viewed together within the same session
CREATE MATERIALIZED VIEW coview_signals AS
SELECT
    a.product_id    AS product_a,
    b.product_id    AS product_b,
    COUNT(*)        AS coview_count,
    MAX(a.event_ts) AS last_seen_ts
FROM behavioral_events a
JOIN behavioral_events b
    ON  a.session_id = b.session_id
    AND a.event_type = 'view'
    AND b.event_type = 'view'
    AND a.product_id < b.product_id
GROUP BY a.product_id, b.product_id;

-- Add-to-cart rate by product (intent signal for scoring)
CREATE MATERIALIZED VIEW product_intent_signals AS
SELECT
    product_id,
    COUNT(*) FILTER (WHERE event_type = 'view')         AS view_count,
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart')  AS cart_add_count,
    COUNT(*) FILTER (WHERE event_type = 'purchase')     AS purchase_count,
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart')::NUMERIC
        / NULLIF(COUNT(*) FILTER (WHERE event_type = 'view'), 0)
                                                         AS cart_add_rate,
    COUNT(*) FILTER (WHERE event_type = 'purchase')::NUMERIC
        / NULLIF(COUNT(*) FILTER (WHERE event_type = 'add_to_cart'), 0)
                                                         AS purchase_conversion_rate
FROM behavioral_events
GROUP BY product_id;

Step 4: Serving Layer — Sink to App DB

Push recommendation signals to a PostgreSQL serving database where your recommendation API can query them at low latency.

CREATE SINK copurchase_signals_sink
FROM copurchase_signals
WITH (
    connector   = 'jdbc',
    jdbc.url    = 'jdbc:postgresql://recdb:5432/recommendations?user=rw&password=secret',
    table.name  = 'copurchase_signals_live',
    type        = 'upsert',
    primary_key = 'product_a,product_b'
);

CREATE SINK product_intent_sink
FROM product_intent_signals
WITH (
    connector   = 'jdbc',
    jdbc.url    = 'jdbc:postgresql://recdb:5432/recommendations?user=rw&password=secret',
    table.name  = 'product_intent_live',
    type        = 'upsert',
    primary_key = 'product_id'
);

Comparison Table

Signal TypeBatch FreshnessStreaming FreshnessImpact
Co-purchase pairsHours (nightly job)Seconds"Customers also bought" relevance
View sequencesHoursSeconds"Frequently viewed together"
Add-to-cart rateHoursSecondsIntent scoring for ranking
Trending productsHours or daysMinutesSurfacing what's hot right now

FAQ

Q: Does this replace a machine learning recommendation model? No — it complements it. These signals are inputs to collaborative filtering and matrix factorization models. Streaming SQL keeps those inputs fresh. A model trained on week-old co-purchase data still benefits enormously from signals updated in seconds rather than hours.

Q: How do I handle session boundaries for view sequences? Define sessions by session_id (from your frontend event tracking) or use a tumbling window to group events by customer ID within a time window. RisingWave supports both tumble and hop window functions for time-based grouping.

Q: Can I use this for real-time personalization in merchant dashboards, like SHOPLINE does? Absolutely. SHOPLINE uses RisingWave to power merchant analytics dashboards with real-time GMV and channel breakdowns — the same architectural pattern applies to per-merchant product recommendation signals. Row-level filtering by merchant or customer ID keeps tenant data isolated.

Key Takeaways

  • Co-purchase signals, view sequences, and add-to-cart rates are all computable from a single event stream using SQL joins and aggregations
  • Streaming SQL maintains these signals incrementally — no reprocessing, no batch jobs, no stale cache invalidation
  • RisingWave's PostgreSQL compatibility means your recommendation API can query signals using standard SQL
  • Fresh signals measurably improve recommendation relevance, particularly during high-traffic events when purchase patterns shift rapidly
  • The same streaming foundation used for recommendation signals also powers GMV dashboards, inventory alerts, and CLV calculations — one platform for all real-time analytics

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.