Real-Time Feature Engineering for ML Models: Replace Spark Jobs with SQL

Real-Time Feature Engineering for ML Models: Replace Spark Jobs with SQL

Real-Time Feature Engineering for ML Models: Replace Spark Jobs with SQL

Training/serving skew is the most common silent failure in ML systems. Spark jobs compute features one way at training time; a separate serving system computes them another way in production. The features are nominally the same but numerically different, and your model performs worse than expected with no obvious reason why. Replacing Spark batch jobs with streaming SQL in RisingWave eliminates the skew: the same SQL definition runs incrementally in production and generates training data from history.


The Training/Serving Skew Problem

Every ML team learns about training/serving skew eventually, usually through a painful production incident.

You train a model on features computed with a Spark job. The Spark job processes a 90-day historical window, joins several tables, applies a rolling average, handles nulls in a specific way. You get good offline metrics. You deploy.

In production, the serving system has to compute the same features at sub-second latency. Spark is too slow for that, so the serving layer uses a different implementation — maybe a Python microservice, maybe a Redis-backed precomputation, maybe a feature store with its own compute engine. The implementation tries to replicate what the Spark job does, but there are subtle differences: floating point handling, null coalescing, window boundary definitions, timezone offsets.

The model sees different numbers in production than it was trained on. The gap is small — maybe 1-3% numerical difference in feature values — but it causes the model to underperform against offline benchmarks. This is training/serving skew.

Why Spark Makes This Worse

Spark is a batch compute engine. It processes data in large bounded jobs, is designed for throughput rather than latency, and is fundamentally not suited for sub-second online inference.

Teams handle this by running two implementations of their feature logic: a Spark job for training and a separate serving system for inference. These implementations drift over time. Bug fixes applied to one are forgotten on the other. New features are added to the Spark job but the serving system is not updated. Over months, the two systems diverge.

The root cause is using two different tools for what is conceptually one operation: computing a feature. The fix is to use one tool for both.

Streaming SQL as a Single Feature Definition

RisingWave is a PostgreSQL-compatible streaming database — open source (Apache 2.0), written in Rust, with S3-backed storage. Materialized views in RisingWave are defined in standard SQL and run continuously. The same SQL that defines serving features can generate training data by running over historical event logs.

Write the feature once. Use it in both contexts.

Setting Up Sources

-- User behavior events from Kafka
CREATE SOURCE user_events (
    event_id    BIGINT,
    user_id     BIGINT,
    session_id  VARCHAR,
    event_type  VARCHAR,      -- 'click', 'view', 'add_to_cart', 'purchase'
    item_id     BIGINT,
    category    VARCHAR,
    revenue     NUMERIC(10,2),
    event_time  TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'user-behavior',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- User profile data via CDC from application database
CREATE SOURCE user_profiles WITH (
    connector = 'postgres-cdc',
    hostname = 'app-db.internal',
    port = '5432',
    username = 'cdc_user',
    password = '...',
    database.name = 'app',
    schema.name = 'public',
    table.name = 'users'
);

Defining Features with Materialized Views

-- Feature group 1: session-level engagement features (short-window)
CREATE MATERIALIZED VIEW feature_session_engagement AS
SELECT
    session_id,
    user_id,
    COUNT(*)                                            AS event_count,
    COUNT(DISTINCT item_id)                             AS unique_items_viewed,
    COUNT(*) FILTER (WHERE event_type = 'click')        AS click_count,
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart')  AS add_to_cart_count,
    SUM(revenue) FILTER (WHERE event_type = 'purchase') AS session_revenue,
    MAX(event_time) - MIN(event_time)                   AS session_duration,
    MIN(event_time)                                     AS session_start
FROM user_events
GROUP BY session_id, user_id;

-- Feature group 2: user-level behavioral features (medium-window, 7d)
CREATE MATERIALIZED VIEW feature_user_7d AS
SELECT
    user_id,
    COUNT(DISTINCT session_id)                              AS sessions_7d,
    COUNT(*) FILTER (WHERE event_type = 'purchase')        AS purchases_7d,
    SUM(revenue) FILTER (WHERE event_type = 'purchase')    AS revenue_7d,
    COUNT(DISTINCT category)                                AS categories_visited_7d,
    AVG(revenue) FILTER (WHERE event_type = 'purchase')    AS avg_order_value_7d,
    MAX(event_time)                                         AS last_active_at
FROM user_events
WHERE event_time > NOW() - INTERVAL '7 days'
GROUP BY user_id;

-- Feature group 3: item-level popularity features (1h window)
CREATE MATERIALIZED VIEW feature_item_popularity_1h AS
SELECT
    item_id,
    category,
    COUNT(*)                                            AS views_1h,
    COUNT(*) FILTER (WHERE event_type = 'purchase')    AS purchases_1h,
    COUNT(DISTINCT user_id)                            AS unique_users_1h,
    SUM(revenue) FILTER (WHERE event_type = 'purchase') AS revenue_1h
FROM user_events
WHERE event_time > NOW() - INTERVAL '1 hour'
GROUP BY item_id, category;

-- Combined feature vector for purchase propensity model
CREATE MATERIALIZED VIEW feature_vector_purchase_propensity AS
SELECT
    s.session_id,
    s.user_id,
    s.event_count,
    s.unique_items_viewed,
    s.click_count,
    s.add_to_cart_count,
    s.session_duration,
    COALESCE(u.sessions_7d, 0)          AS sessions_7d,
    COALESCE(u.purchases_7d, 0)         AS purchases_7d,
    COALESCE(u.revenue_7d, 0)           AS revenue_7d,
    COALESCE(u.avg_order_value_7d, 0)   AS avg_order_value_7d,
    p.account_age_days,
    p.country_code,
    p.device_type,
    s.session_start
FROM feature_session_engagement s
LEFT JOIN feature_user_7d u ON u.user_id = s.user_id
LEFT JOIN user_profiles p ON p.user_id = s.user_id;

Serving Features at Inference Time

The serving query is a single parameterized lookup:

-- Inference service query (PostgreSQL wire protocol)
SELECT
    event_count,
    unique_items_viewed,
    click_count,
    add_to_cart_count,
    EXTRACT(EPOCH FROM session_duration)    AS session_duration_seconds,
    sessions_7d,
    purchases_7d,
    revenue_7d,
    avg_order_value_7d,
    account_age_days,
    country_code,
    device_type
FROM feature_vector_purchase_propensity
WHERE session_id = $1;

This runs against a precomputed materialized view. The result is available in milliseconds.

Generating Training Data from the Same SQL

The critical insight: you can run the same SQL logic over a historical event log to generate training data. There is no separate Spark job.

-- Historical training data generation
-- Same feature logic, applied to past sessions with known outcomes
WITH session_features AS (
    SELECT
        session_id,
        user_id,
        COUNT(*)                                            AS event_count,
        COUNT(DISTINCT item_id)                             AS unique_items_viewed,
        COUNT(*) FILTER (WHERE event_type = 'click')        AS click_count,
        COUNT(*) FILTER (WHERE event_type = 'add_to_cart')  AS add_to_cart_count,
        SUM(revenue) FILTER (WHERE event_type = 'purchase') AS session_revenue,
        EXTRACT(EPOCH FROM (MAX(event_time) - MIN(event_time))) AS session_duration_seconds,
        MIN(event_time)                                     AS session_start
    FROM user_events_history   -- historical table, same schema as streaming source
    WHERE event_time BETWEEN '2025-01-01' AND '2025-12-31'
    GROUP BY session_id, user_id
),
user_features_at_session_time AS (
    -- Point-in-time user features: only events BEFORE the session started
    SELECT
        e.user_id,
        MIN(s.session_start)                                    AS session_start,
        COUNT(DISTINCT e.session_id)                            AS sessions_7d,
        COUNT(*) FILTER (WHERE e.event_type = 'purchase')       AS purchases_7d,
        SUM(e.revenue) FILTER (WHERE e.event_type = 'purchase') AS revenue_7d
    FROM session_features s
    JOIN user_events_history e
        ON e.user_id = s.user_id
        AND e.event_time BETWEEN s.session_start - INTERVAL '7 days' AND s.session_start
    GROUP BY e.user_id
)
SELECT
    sf.session_id,
    sf.event_count,
    sf.unique_items_viewed,
    sf.click_count,
    sf.add_to_cart_count,
    sf.session_duration_seconds,
    COALESCE(uf.sessions_7d, 0)     AS sessions_7d,
    COALESCE(uf.purchases_7d, 0)    AS purchases_7d,
    COALESCE(uf.revenue_7d, 0)      AS revenue_7d,
    CASE WHEN sf.session_revenue > 0 THEN 1 ELSE 0 END  AS label
FROM session_features sf
LEFT JOIN user_features_at_session_time uf ON uf.user_id = sf.user_id;

The feature logic is the same SQL. The training job runs it over user_events_history. The serving layer runs it incrementally over the live stream. No separate Spark job. No diverging implementations.

Handling Common Feature Engineering Patterns

Rolling Window Features

-- 1h, 6h, 24h click rates for anomaly detection
CREATE MATERIALIZED VIEW feature_click_rates AS
SELECT
    user_id,
    COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '1 hour')   AS clicks_1h,
    COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '6 hours')  AS clicks_6h,
    COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '24 hours') AS clicks_24h
FROM user_events
WHERE event_type = 'click'
GROUP BY user_id;

Ratio and Derived Features

-- Cart abandonment rate (add_to_cart without subsequent purchase)
CREATE MATERIALIZED VIEW feature_cart_behavior AS
SELECT
    user_id,
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart')  AS carts,
    COUNT(*) FILTER (WHERE event_type = 'purchase')     AS purchases,
    CASE
        WHEN COUNT(*) FILTER (WHERE event_type = 'add_to_cart') = 0 THEN NULL
        ELSE COUNT(*) FILTER (WHERE event_type = 'purchase')::FLOAT
             / COUNT(*) FILTER (WHERE event_type = 'add_to_cart')
    END AS purchase_to_cart_ratio
FROM user_events
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY user_id;

Lag Features (User's Last N Events)

-- Last 3 purchase categories (sequence feature)
CREATE MATERIALIZED VIEW feature_purchase_sequence AS
SELECT
    user_id,
    array_agg(category ORDER BY event_time DESC)[1:3] AS last_3_categories,
    MAX(event_time)                                    AS last_purchase_at
FROM user_events
WHERE event_type = 'purchase'
GROUP BY user_id;

Comparison: Feature Engineering Approaches

ApproachTraining/Serving SkewFreshnessIteration SpeedScalability
Spark batch + serving microserviceHigh risk (2 implementations)Minutes to hoursSlow (Spark cycle time)High throughput
Feature store (Feast/Tecton)Medium (abstracted)MinutesMediumHigh
dbt + RedisHigh risk (2 implementations)HoursMediumMedium
Streaming SQL (RisingWave)None (single definition)SecondsFast (SQL iteration)High

The streaming SQL approach eliminates the skew problem structurally. There is one SQL definition. It runs incrementally for serving and over history for training. The logic cannot diverge because there is only one copy of it.


FAQ

Can RisingWave handle the scale of feature computation Spark handles? RisingWave scales horizontally across multiple compute nodes. For streaming feature computation, it handles millions of events per second. For very large-scale training data generation (billions of rows), a combination of RisingWave for serving and a warehouse (Snowflake, BigQuery) for batch training generation is practical — but using compatible SQL so logic stays aligned.

How does iterating on feature definitions work without a Spark cluster? Feature iteration in RisingWave is fast because you write SQL and create a new materialized view. You can test against a small Kafka topic, validate the output, and promote to production in minutes rather than running a full Spark job. The SQL is also much easier to read and review than a PySpark job.

What about feature monitoring and drift detection? RisingWave can compute feature statistics as materialized views — mean, standard deviation, quantiles — which update continuously. These statistics can be compared against training distribution statistics to detect drift. Alerting can trigger when a feature distribution diverges beyond a threshold.

How do you handle missing/null features? SQL COALESCE and CASE expressions handle null imputation the same way in serving and training SQL, ensuring consistent null handling without an extra preprocessing layer.

Can RisingWave integrate with ML training frameworks like PyTorch or scikit-learn? Indirectly: training data is typically exported from RisingWave as a batch query (via PostgreSQL COPY or a parquet sink to S3) and then loaded by the training framework. The output is a standard dataset — RisingWave handles data preparation, the ML framework handles training.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.