Your recommendation model needs to know that a user browsed four product pages in the last three minutes. Your churn prediction model needs a session engagement score computed from page depth, click velocity, and dwell time. Your personalization engine needs to know that this user converts at 12% from search but only 2% from email campaigns.
These are user behavior features, and they go stale fast. A user who was deeply engaged five minutes ago may have already left. A feature that captures "pages viewed in the current session" is only useful if the session is still happening. Batch pipelines that recompute these values hourly cannot keep up.
This tutorial walks through building a complete user behavior feature pipeline using streaming SQL in RisingWave. You will ingest raw clickstream events from Kafka, compute session-level features with window aggregations, build a composite engagement score, calculate funnel conversion rates per user, and serve all of it through a PostgreSQL-compatible interface that any ML serving layer can query.
Why Batch Feature Pipelines Fall Short for User Behavior
User behavior is temporal by nature. The value of a behavioral feature depends on how recently it was computed. Consider three common features:
- Pages viewed in the current session. A batch pipeline that runs hourly will never capture a session that started 10 minutes ago. By the time the pipeline finishes, the session is over and the feature is useless for real-time personalization.
- Click-through rate over the last 30 minutes. Computing this in a batch job means the CTR reflects a stale window. The user's intent may have shifted entirely since the last computation.
- Engagement score. A composite metric combining click depth, scroll behavior, and time on page. If this score updates once per hour, your models act on outdated signals during the most critical moments of user interaction.
Streaming SQL solves this by computing features incrementally as each event arrives. A materialized view in RisingWave recalculates only the affected rows when new data lands, so feature values stay fresh with sub-second latency and no orchestration overhead.
Setting Up the Clickstream Source
Most production clickstream pipelines publish events to Kafka. Your tracking SDK (Segment, RudderStack, or a custom collector) captures user interactions and writes them to a Kafka topic. RisingWave reads from that topic through a Kafka source connector.
Define the event source
Create a source that ingests raw clickstream events:
CREATE SOURCE user_events (
event_id VARCHAR,
user_id VARCHAR,
session_id VARCHAR,
event_type VARCHAR, -- 'page_view', 'click', 'scroll', 'add_to_cart', 'checkout', 'purchase'
page_url VARCHAR,
page_category VARCHAR, -- 'homepage', 'product', 'search', 'checkout', 'blog'
referrer_channel VARCHAR, -- 'organic', 'paid_search', 'email', 'social', 'direct'
scroll_depth_pct INT, -- 0-100, how far down the page the user scrolled
device_type VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'clickstream.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
This source streams events into RisingWave without storing them persistently. Downstream materialized views subscribe to this source and maintain their own computed state.
Create a user attributes table
For features that combine behavioral data with user metadata, keep a lookup table of user attributes:
CREATE TABLE user_attributes (
user_id VARCHAR PRIMARY KEY,
account_type VARCHAR, -- 'anonymous', 'free', 'premium'
signup_date DATE,
lifetime_orders INT,
last_purchase_date DATE
);
You can populate this table via CDC from your application database or through periodic batch loads.
Computing Session-Level Features
Session-level features summarize a user's behavior within a single visit. These are the building blocks for engagement scoring, churn prediction, and personalization models.
Session summary features
This materialized view computes core session metrics that update continuously as events flow in:
CREATE MATERIALIZED VIEW session_features AS
SELECT
session_id,
user_id,
MIN(event_ts) AS session_start,
MAX(event_ts) AS session_end,
EXTRACT(EPOCH FROM MAX(event_ts) - MIN(event_ts)) AS session_duration_sec,
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE event_type = 'page_view') AS page_views,
COUNT(*) FILTER (WHERE event_type = 'click') AS clicks,
COUNT(DISTINCT page_url) AS distinct_pages,
COUNT(DISTINCT page_category) AS distinct_categories,
MAX(scroll_depth_pct) AS max_scroll_depth,
AVG(scroll_depth_pct) FILTER (WHERE scroll_depth_pct > 0) AS avg_scroll_depth,
BOOL_OR(event_type = 'add_to_cart') AS added_to_cart,
BOOL_OR(event_type = 'purchase') AS converted
FROM user_events
GROUP BY session_id, user_id;
Every new click, scroll, or page view event triggers an incremental update to the affected session row. There is no batch window to wait for.
Click-through rate per session
CTR measures how often page views lead to meaningful clicks. This is a strong signal for user intent:
CREATE MATERIALIZED VIEW session_ctr AS
SELECT
session_id,
user_id,
COUNT(*) FILTER (WHERE event_type = 'page_view') AS impressions,
COUNT(*) FILTER (WHERE event_type = 'click') AS clicks,
ROUND(
COUNT(*) FILTER (WHERE event_type = 'click')::NUMERIC
/ NULLIF(COUNT(*) FILTER (WHERE event_type = 'page_view'), 0),
4
) AS click_through_rate
FROM user_events
GROUP BY session_id, user_id;
Expected output:
session_id | user_id | impressions | clicks | click_through_rate
------------+----------+-------------+--------+--------------------
sess_a1 | user_101 | 12 | 5 | 0.4167
sess_b2 | user_202 | 3 | 0 | 0.0000
sess_c3 | user_303 | 8 | 6 | 0.7500
A CTR of 0.75 signals high intent. A CTR of 0.00 with three page views might indicate a confused user or poor page design. Both are useful signals for downstream models.
Rolling user-level features
Beyond single sessions, ML models often need features that span multiple sessions. This materialized view computes rolling behavior metrics per user:
CREATE MATERIALIZED VIEW user_rolling_features AS
SELECT
user_id,
COUNT(DISTINCT session_id) AS total_sessions,
SUM(page_views) AS lifetime_page_views,
AVG(session_duration_sec) AS avg_session_duration,
AVG(page_views) AS avg_pages_per_session,
AVG(clicks) AS avg_clicks_per_session,
SUM(CASE WHEN converted THEN 1 ELSE 0 END)::NUMERIC
/ NULLIF(COUNT(*), 0) AS historical_conversion_rate,
MAX(session_start) AS last_seen_at
FROM session_features
GROUP BY user_id;
This view layers on top of session_features, which means it benefits from the same incremental computation. When a new event updates a session, the user-level rollup updates too, all within the same pipeline.
Building an Engagement Score
An engagement score collapses multiple behavioral signals into a single number that represents how invested a user is in the current session. This is useful for real-time personalization (show high-engagement users premium content), ad targeting (high engagement correlates with higher ad viewability), and churn prediction (a sudden drop in engagement score is an early warning).
Weighted engagement scoring
The score formula below weights five behavioral dimensions. Adjust the weights based on what matters most for your product:
CREATE MATERIALIZED VIEW engagement_scores AS
SELECT
sf.session_id,
sf.user_id,
sf.session_duration_sec,
sf.page_views,
sf.clicks,
sf.avg_scroll_depth,
sf.distinct_pages,
-- Engagement score: 0-100 scale
LEAST(100, ROUND(
-- Page depth: more pages = higher engagement (max 30 points)
LEAST(30, sf.distinct_pages * 5.0)
-- Click activity: more clicks = higher engagement (max 25 points)
+ LEAST(25, sf.clicks * 3.0)
-- Scroll depth: deeper scrolling = higher engagement (max 20 points)
+ COALESCE(sf.avg_scroll_depth, 0) * 0.2
-- Session duration: longer sessions up to 10min (max 15 points)
+ LEAST(15, sf.session_duration_sec / 40.0)
-- Cart intent: adding to cart is a strong signal (10 points)
+ CASE WHEN sf.added_to_cart THEN 10 ELSE 0 END
)) AS engagement_score,
CASE
WHEN LEAST(100, ROUND(
LEAST(30, sf.distinct_pages * 5.0)
+ LEAST(25, sf.clicks * 3.0)
+ COALESCE(sf.avg_scroll_depth, 0) * 0.2
+ LEAST(15, sf.session_duration_sec / 40.0)
+ CASE WHEN sf.added_to_cart THEN 10 ELSE 0 END
)) >= 70 THEN 'high'
WHEN LEAST(100, ROUND(
LEAST(30, sf.distinct_pages * 5.0)
+ LEAST(25, sf.clicks * 3.0)
+ COALESCE(sf.avg_scroll_depth, 0) * 0.2
+ LEAST(15, sf.session_duration_sec / 40.0)
+ CASE WHEN sf.added_to_cart THEN 10 ELSE 0 END
)) >= 35 THEN 'medium'
ELSE 'low'
END AS engagement_tier
FROM session_features sf;
Expected output:
session_id | user_id | engagement_score | engagement_tier
------------+----------+------------------+-----------------
sess_a1 | user_101 | 78 | high
sess_b2 | user_202 | 14 | low
sess_c3 | user_303 | 52 | medium
The engagement tier label makes it easy for downstream systems to act on the score without knowing the thresholds. Your personalization service can query WHERE engagement_tier = 'high' to find users who are primed for a conversion nudge.
Engagement trend per user
Tracking how engagement changes across sessions reveals trajectory. A user whose engagement score is declining across visits is a churn risk:
CREATE MATERIALIZED VIEW user_engagement_trend AS
SELECT
user_id,
AVG(engagement_score) AS avg_engagement,
MIN(engagement_score) AS min_engagement,
MAX(engagement_score) AS max_engagement,
COUNT(*) FILTER (WHERE engagement_tier = 'high') AS high_engagement_sessions,
COUNT(*) FILTER (WHERE engagement_tier = 'low') AS low_engagement_sessions,
COUNT(*) AS total_scored_sessions
FROM engagement_scores
GROUP BY user_id;
Funnel Conversion Rate Features
Funnel features capture how efficiently a user moves through a sequence of steps. Unlike aggregate funnel dashboards, these features are computed per user, making them directly consumable by ML models.
Per-user funnel progression
CREATE MATERIALIZED VIEW user_funnel_features AS
SELECT
user_id,
COUNT(DISTINCT session_id)
FILTER (WHERE event_type = 'page_view') AS sessions_with_views,
COUNT(DISTINCT session_id)
FILTER (WHERE event_type = 'click') AS sessions_with_clicks,
COUNT(DISTINCT session_id)
FILTER (WHERE event_type = 'add_to_cart') AS sessions_with_cart,
COUNT(DISTINCT session_id)
FILTER (WHERE event_type = 'checkout') AS sessions_with_checkout,
COUNT(DISTINCT session_id)
FILTER (WHERE event_type = 'purchase') AS sessions_with_purchase,
-- Conversion rates between funnel steps
ROUND(
COUNT(DISTINCT session_id) FILTER (WHERE event_type = 'add_to_cart')::NUMERIC
/ NULLIF(COUNT(DISTINCT session_id) FILTER (WHERE event_type = 'click'), 0),
4
) AS click_to_cart_rate,
ROUND(
COUNT(DISTINCT session_id) FILTER (WHERE event_type = 'purchase')::NUMERIC
/ NULLIF(COUNT(DISTINCT session_id) FILTER (WHERE event_type = 'add_to_cart'), 0),
4
) AS cart_to_purchase_rate,
ROUND(
COUNT(DISTINCT session_id) FILTER (WHERE event_type = 'purchase')::NUMERIC
/ NULLIF(COUNT(DISTINCT session_id) FILTER (WHERE event_type = 'page_view'), 0),
4
) AS overall_conversion_rate
FROM user_events
GROUP BY user_id;
Expected output:
user_id | sessions_with_views | sessions_with_cart | sessions_with_purchase | click_to_cart_rate | cart_to_purchase_rate | overall_conversion_rate
----------+---------------------+--------------------+------------------------+--------------------+----------------------+------------------------
user_101 | 20 | 8 | 5 | 0.5000 | 0.6250 | 0.2500
user_202 | 45 | 2 | 0 | 0.0625 | 0.0000 | 0.0000
user_303 | 12 | 6 | 4 | 0.6667 | 0.6667 | 0.3333
User 202 has visited 45 times but never purchased. That is a high-value retargeting candidate. User 303 has a strong cart-to-purchase rate, indicating decisiveness once they find what they want. These per-user funnel rates are features that a propensity model can consume directly.
Channel-attributed conversion features
Different acquisition channels produce different behavior patterns. Computing per-user, per-channel conversion rates creates powerful features for attribution models:
CREATE MATERIALIZED VIEW user_channel_features AS
SELECT
user_id,
referrer_channel,
COUNT(*) AS events_from_channel,
COUNT(DISTINCT session_id) AS sessions_from_channel,
ROUND(
COUNT(*) FILTER (WHERE event_type = 'purchase')::NUMERIC
/ NULLIF(COUNT(DISTINCT session_id), 0),
4
) AS channel_conversion_rate
FROM user_events
GROUP BY user_id, referrer_channel;
Serving Features to ML Models
RisingWave is PostgreSQL-compatible, which means your model serving infrastructure can read features using any PostgreSQL client library. No specialized feature store SDK is required.
Feature retrieval at inference time
When your prediction service receives a request for user_101, it queries RisingWave directly:
SELECT
sf.session_duration_sec,
sf.page_views,
sf.distinct_pages,
sc.click_through_rate,
es.engagement_score,
es.engagement_tier,
uf.click_to_cart_rate,
uf.cart_to_purchase_rate,
uf.overall_conversion_rate,
ur.avg_session_duration,
ur.historical_conversion_rate
FROM session_features sf
JOIN session_ctr sc ON sf.session_id = sc.session_id
JOIN engagement_scores es ON sf.session_id = es.session_id
JOIN user_funnel_features uf ON sf.user_id = uf.user_id
JOIN user_rolling_features ur ON sf.user_id = ur.user_id
WHERE sf.user_id = 'user_101'
AND sf.session_id = 'sess_a1';
This query returns in single-digit milliseconds because it reads from pre-computed materialized views. There is no aggregation at query time.
Sinking features to external stores
If your serving layer already reads from Redis or a PostgreSQL replica, you can push features downstream with a sink connector:
CREATE SINK engagement_features_sink AS
SELECT
user_id,
session_id,
engagement_score,
engagement_tier,
session_duration_sec,
page_views,
click_through_rate
FROM engagement_scores es
JOIN session_ctr sc USING (session_id)
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://feature-store-db:5432/features',
table.name = 'user_engagement_features',
type = 'upsert',
primary_key = 'session_id'
);
This keeps your feature store continuously synchronized. No Airflow DAG, no cron job, no batch orchestration.
FAQ
What are user behavior features?
User behavior features are numerical or categorical values computed from user interaction data (clicks, page views, scrolls, purchases) that serve as inputs to machine learning models. Examples include session duration, pages per session, click-through rate, engagement scores, and funnel conversion rates. These features help models predict outcomes like churn probability, purchase likelihood, and content relevance.
How does streaming SQL compute features differently from batch SQL?
Batch SQL reprocesses entire datasets on a schedule, which means features reflect a snapshot from the last batch run. Streaming SQL processes each event incrementally as it arrives. In RisingWave, materialized views maintain feature values continuously, so they are always current. This eliminates the freshness gap that degrades model accuracy in time-sensitive use cases.
Can I use these features for both training and serving?
Yes. Because RisingWave materialized views are queryable via standard SQL, you can use the same view definitions for both training data extraction (by querying historical state) and real-time serving (by querying the live materialized view). This eliminates training-serving skew, one of the most common causes of ML model degradation in production.
How does RisingWave handle late-arriving events?
RisingWave processes events in the order they arrive from the source. For Kafka sources, this follows partition ordering. If your use case requires strict event-time ordering, you can configure watermarks on the source to handle out-of-order events within a defined tolerance. Late events that arrive within the watermark window are correctly incorporated into materialized view results.
Do I still need a feature store?
It depends on your requirements. RisingWave can serve as both the feature computation engine and the serving layer, since any PostgreSQL client can query materialized views directly. If you need feature versioning, lineage tracking, or a shared feature catalog across teams, you can use RisingWave alongside a feature store like Feast by sinking computed features to it.
Conclusion
User behavior features are only as good as their freshness. Here is what we covered:
- Session-level features like page views, session duration, distinct pages, and scroll depth are computed as continuously updated materialized views that reflect the current state of each active session.
- Click-through rates per session and per user provide intent signals that update with every new event.
- Engagement scores combine multiple behavioral dimensions into a single metric, with configurable weights and tier labels that downstream systems can act on immediately.
- Funnel conversion rates computed per user create ML-ready features that capture individual purchase propensity across the full click-to-purchase journey.
- Feature serving requires no specialized infrastructure. RisingWave's PostgreSQL compatibility means your existing model serving code can read features with a standard database query.
The entire pipeline runs as SQL. No Java, no Python glue code, no orchestration framework. Events flow in from Kafka, materialized views compute features incrementally, and your models read fresh values on every inference request.
Ready to build your own feature pipeline? Get started with RisingWave in under five minutes, or join our Slack community to discuss streaming feature engineering with the community.

