Real-time product recommendation signals are the behavioral events — views, add-to-carts, purchases, and co-purchase patterns — that indicate what a customer is likely to buy next. Streaming SQL lets you compute these signals continuously from your event stream, feeding recommendation engines with data that's seconds old rather than hours old.
Why This Matters for E-Commerce
Recommendation engines are among the highest-ROI features in e-commerce. Studies consistently show that personalized recommendations drive 10-30% of revenue on mature platforms. But most recommendation systems suffer from a silent flaw: their input signals are stale.
A typical pipeline works like this: clickstream events land in a data lake, a nightly or hourly batch job computes co-purchase matrices and view-sequence patterns, and those results feed into a serving layer. By the time a customer sees a recommendation, the signal data is hours old. The customer who just added a camera to their cart might be shown phone cases instead of camera bags.
The problem compounds during high-traffic events. On Black Friday, purchase patterns shift by the hour. A batch job computed at midnight is worse than useless by 3 PM — it's actively misleading.
Real-time recommendation signals close this gap. By processing behavioral events as they happen, you maintain fresh co-purchase graphs, trending product lists, and per-session affinity scores that reflect what's actually happening right now.
How Streaming SQL Solves This
Streaming SQL lets you define the signal computation logic once and have it maintained continuously. RisingWave, a PostgreSQL-compatible streaming database, ingests event streams from Kafka and maintains materialized views that are always current.
For recommendations, the key signals are:
- Co-purchase signals: products frequently bought together in the same order
- View sequences: products viewed in succession within a session
- Add-to-cart events: strong purchase intent signals
- Collaborative filtering signals: products bought by similar customers
Each of these is expressible as a SQL aggregation over a stream. RisingWave maintains the aggregates incrementally — when a new order event arrives, only the affected product pairs are updated.
Step-by-Step Tutorial
Step 1: Data Source
Ingest behavioral events from Kafka. A single topic with an event type field covers views, cart adds, and purchases.
CREATE SOURCE behavioral_events (
event_id VARCHAR,
session_id VARCHAR,
customer_id VARCHAR,
product_id VARCHAR,
event_type VARCHAR, -- 'view', 'add_to_cart', 'purchase'
category VARCHAR,
price NUMERIC,
event_ts TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'behavioral_events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View — Co-Purchase Signals
Compute co-purchase frequency: pairs of products that appear together in completed orders. This is the foundation of "customers also bought" recommendations.
CREATE MATERIALIZED VIEW copurchase_signals AS
SELECT
a.product_id AS product_a,
b.product_id AS product_b,
COUNT(*) AS copurchase_count,
MAX(a.event_ts) AS last_seen_ts
FROM behavioral_events a
JOIN behavioral_events b
ON a.session_id = b.session_id
AND a.event_type = 'purchase'
AND b.event_type = 'purchase'
AND a.product_id < b.product_id -- avoid duplicate pairs
GROUP BY a.product_id, b.product_id;
Step 3: View Sequences and Add-to-Cart Affinity
Track products viewed within the same session and products added to cart but not purchased (high-intent, incomplete signals useful for retargeting and "frequently viewed together").
-- Products frequently viewed together within the same session
CREATE MATERIALIZED VIEW coview_signals AS
SELECT
a.product_id AS product_a,
b.product_id AS product_b,
COUNT(*) AS coview_count,
MAX(a.event_ts) AS last_seen_ts
FROM behavioral_events a
JOIN behavioral_events b
ON a.session_id = b.session_id
AND a.event_type = 'view'
AND b.event_type = 'view'
AND a.product_id < b.product_id
GROUP BY a.product_id, b.product_id;
-- Add-to-cart rate by product (intent signal for scoring)
CREATE MATERIALIZED VIEW product_intent_signals AS
SELECT
product_id,
COUNT(*) FILTER (WHERE event_type = 'view') AS view_count,
COUNT(*) FILTER (WHERE event_type = 'add_to_cart') AS cart_add_count,
COUNT(*) FILTER (WHERE event_type = 'purchase') AS purchase_count,
COUNT(*) FILTER (WHERE event_type = 'add_to_cart')::NUMERIC
/ NULLIF(COUNT(*) FILTER (WHERE event_type = 'view'), 0)
AS cart_add_rate,
COUNT(*) FILTER (WHERE event_type = 'purchase')::NUMERIC
/ NULLIF(COUNT(*) FILTER (WHERE event_type = 'add_to_cart'), 0)
AS purchase_conversion_rate
FROM behavioral_events
GROUP BY product_id;
Step 4: Serving Layer — Sink to App DB
Push recommendation signals to a PostgreSQL serving database where your recommendation API can query them at low latency.
CREATE SINK copurchase_signals_sink
FROM copurchase_signals
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://recdb:5432/recommendations?user=rw&password=secret',
table.name = 'copurchase_signals_live',
type = 'upsert',
primary_key = 'product_a,product_b'
);
CREATE SINK product_intent_sink
FROM product_intent_signals
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://recdb:5432/recommendations?user=rw&password=secret',
table.name = 'product_intent_live',
type = 'upsert',
primary_key = 'product_id'
);
Comparison Table
| Signal Type | Batch Freshness | Streaming Freshness | Impact |
| Co-purchase pairs | Hours (nightly job) | Seconds | "Customers also bought" relevance |
| View sequences | Hours | Seconds | "Frequently viewed together" |
| Add-to-cart rate | Hours | Seconds | Intent scoring for ranking |
| Trending products | Hours or days | Minutes | Surfacing what's hot right now |
FAQ
Q: Does this replace a machine learning recommendation model? No — it complements it. These signals are inputs to collaborative filtering and matrix factorization models. Streaming SQL keeps those inputs fresh. A model trained on week-old co-purchase data still benefits enormously from signals updated in seconds rather than hours.
Q: How do I handle session boundaries for view sequences?
Define sessions by session_id (from your frontend event tracking) or use a tumbling window to group events by customer ID within a time window. RisingWave supports both tumble and hop window functions for time-based grouping.
Q: Can I use this for real-time personalization in merchant dashboards, like SHOPLINE does? Absolutely. SHOPLINE uses RisingWave to power merchant analytics dashboards with real-time GMV and channel breakdowns — the same architectural pattern applies to per-merchant product recommendation signals. Row-level filtering by merchant or customer ID keeps tenant data isolated.
Key Takeaways
- Co-purchase signals, view sequences, and add-to-cart rates are all computable from a single event stream using SQL joins and aggregations
- Streaming SQL maintains these signals incrementally — no reprocessing, no batch jobs, no stale cache invalidation
- RisingWave's PostgreSQL compatibility means your recommendation API can query signals using standard SQL
- Fresh signals measurably improve recommendation relevance, particularly during high-traffic events when purchase patterns shift rapidly
- The same streaming foundation used for recommendation signals also powers GMV dashboards, inventory alerts, and CLV calculations — one platform for all real-time analytics

