Real-Time Feature Engineering for Machine Learning Pipelines

Real-Time Feature Engineering for Machine Learning Pipelines

Most machine learning models are only as good as the features they consume. You can have the most sophisticated model architecture in the world, but if your features are stale, incomplete, or slow to compute, your predictions suffer. This is the feature-freshness problem, and it hits hardest in production systems where decisions need to happen in milliseconds, not minutes.

Real-time feature engineering is the practice of computing ML features from live data streams as events happen. Instead of running batch ETL jobs that process yesterday's data, you continuously transform raw events into feature vectors that are always up to date. Think of it as the difference between checking the weather forecast from this morning versus looking out the window right now.

In this guide, you will build a complete real-time feature engineering pipeline using RisingWave, a streaming database that lets you define feature transformations as SQL materialized views. By the end, you will have user activity aggregations, session-level behavioral features, transaction spending patterns, and a unified feature store, all updating automatically as new events flow in.

Why Batch Feature Engineering Breaks Down

Traditional ML pipelines compute features in batch. A scheduled job runs every hour (or every day), reads from a data warehouse, computes aggregations, and writes results to a feature store. This approach has served data teams well for years, but it introduces several problems that become acute at scale.

Feature staleness is the most obvious issue. If your batch job runs hourly, your features can be up to 60 minutes old. For fraud detection, recommendation engines, and dynamic pricing, that delay means your model is making decisions based on outdated information. A user who added five items to their cart in the last 30 seconds still looks like an idle browser according to your feature store.

Training-serving skew is subtler but equally damaging. When you train a model offline using batch-computed features but serve it with features that have different freshness characteristics, the statistical properties shift. Your model learned patterns from features computed over complete hourly windows, but in production it receives features from partial windows. This mismatch degrades model accuracy in ways that are difficult to diagnose.

Infrastructure complexity adds operational overhead. Batch pipelines require orchestrators (Airflow, Dagster), intermediate storage, retry logic, and monitoring for each stage. When a pipeline fails at 3 AM, someone gets paged. When you need to backfill features after a schema change, you are looking at hours of reprocessing.

Streaming feature engineering addresses all three issues. Features update continuously as events arrive. The same computation logic runs in both training and serving contexts. And a single streaming query replaces an entire DAG of batch jobs.

Architecture: Streaming Feature Store with RisingWave

A streaming feature store sits between your event sources and your ML serving layer. Raw events flow in from message brokers like Apache Kafka, RisingWave transforms them into features using SQL, and downstream services query the results through standard PostgreSQL connections.

graph LR
    A[Event Sources] --> B[Kafka / Kinesis]
    B --> C[RisingWave]
    C --> D[Materialized Views<br/>Feature Store]
    D --> E[ML Serving Layer]
    D --> F[Model Training]
    D --> G[Analytics Dashboard]

RisingWave is a streaming database that speaks PostgreSQL wire protocol. You define feature transformations as materialized views, and RisingWave incrementally maintains them as new data arrives. Unlike batch materialized views in traditional databases, these update continuously, not on a schedule. Unlike stream processors like Apache Flink, you query the results directly with SQL instead of writing to an external store.

This architecture gives you three properties that matter for ML feature engineering:

  • Low latency: Features update within seconds of the source event occurring.
  • SQL interface: Data scientists define features in SQL, the language they already know. No Java, no Scala, no custom DSLs.
  • Point-in-time correctness: Materialized views maintain consistent state, avoiding the partial-update problems that plague hand-rolled streaming pipelines.

Computing User Activity Features

The most common ML features aggregate user behavior over time. How many times did a user click in the last hour? What is their purchase-to-click ratio? How many product categories have they browsed? These signals feed recommendation models, churn predictors, and fraud classifiers.

Start by creating a table that represents your user event stream. In production, this would typically be a source connected to Kafka, but for demonstration purposes a table works identically for the feature computation logic.

CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    event_value DOUBLE PRECISION,
    product_category VARCHAR,
    device_type VARCHAR,
    event_timestamp TIMESTAMPTZ
);

Now define a materialized view that computes user-level activity features:

CREATE MATERIALIZED VIEW user_activity_features AS
SELECT
    user_id,
    COUNT(*) FILTER (WHERE event_type = 'click') AS total_clicks,
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart') AS total_add_to_cart,
    COUNT(*) FILTER (WHERE event_type = 'purchase') AS total_purchases,
    COALESCE(SUM(event_value) FILTER (WHERE event_type = 'purchase'), 0)
        AS total_purchase_value,
    COUNT(DISTINCT product_category) AS unique_categories_browsed,
    COUNT(DISTINCT device_type) AS unique_devices_used,
    CASE
        WHEN COUNT(*) FILTER (WHERE event_type = 'click') > 0
        THEN CAST(COUNT(*) FILTER (WHERE event_type = 'purchase') AS DOUBLE PRECISION)
             / COUNT(*) FILTER (WHERE event_type = 'click')
        ELSE 0
    END AS click_to_purchase_ratio
FROM user_events
GROUP BY user_id;

This single materialized view produces eight features per user. The FILTER clause lets you compute conditional aggregations cleanly without nested CASE expressions. Let's insert some sample events and see the results:

INSERT INTO user_events VALUES
(1, 'click', 1.0, 'electronics', 'mobile', '2026-04-01 10:00:00+00'),
(1, 'add_to_cart', 299.99, 'electronics', 'mobile', '2026-04-01 10:05:00+00'),
(1, 'click', 1.0, 'electronics', 'mobile', '2026-04-01 10:10:00+00'),
(1, 'purchase', 299.99, 'electronics', 'mobile', '2026-04-01 10:15:00+00'),
(2, 'click', 1.0, 'clothing', 'desktop', '2026-04-01 10:01:00+00'),
(2, 'click', 1.0, 'clothing', 'desktop', '2026-04-01 10:03:00+00'),
(2, 'add_to_cart', 59.99, 'clothing', 'desktop', '2026-04-01 10:08:00+00'),
(3, 'click', 1.0, 'books', 'tablet', '2026-04-01 10:02:00+00'),
(3, 'click', 1.0, 'books', 'tablet', '2026-04-01 10:06:00+00'),
(3, 'click', 1.0, 'electronics', 'tablet', '2026-04-01 10:12:00+00'),
(3, 'add_to_cart', 49.99, 'books', 'tablet', '2026-04-01 10:14:00+00'),
(3, 'purchase', 49.99, 'books', 'tablet', '2026-04-01 10:16:00+00');

Querying the materialized view:

SELECT * FROM user_activity_features ORDER BY user_id;
 user_id | total_clicks | total_add_to_cart | total_purchases | total_purchase_value | unique_categories_browsed | unique_devices_used | click_to_purchase_ratio
---------+--------------+-------------------+-----------------+----------------------+---------------------------+---------------------+-------------------------
       1 |            2 |                 1 |               1 |               299.99 |                         1 |                   1 |                     0.5
       2 |            2 |                 1 |               0 |                    0 |                         1 |                   1 |                       0
       3 |            3 |                 1 |               1 |                49.99 |                         2 |                   1 |      0.3333333333333333

Each row is a ready-to-use feature vector. User 1 has a 50% click-to-purchase ratio and browses one category. User 3 explores two categories but converts at a lower rate. These signals are exactly what a recommendation or churn model needs.

Session-Level Behavioral Features

Page-view patterns within a session reveal intent signals that raw event counts miss. A user who spends 90 seconds on a product page and navigates from a category listing behaves differently from someone who bounces after 5 seconds from a direct link. Session features capture these behavioral nuances.

CREATE TABLE page_views (
    user_id INT,
    page_url VARCHAR,
    session_id VARCHAR,
    referrer VARCHAR,
    time_on_page_seconds INT,
    event_timestamp TIMESTAMPTZ
);

CREATE MATERIALIZED VIEW user_session_features AS
SELECT
    user_id,
    session_id,
    COUNT(*) AS pages_per_session,
    SUM(time_on_page_seconds) AS total_session_duration_seconds,
    AVG(time_on_page_seconds) AS avg_time_per_page,
    MIN(event_timestamp) AS session_start,
    MAX(event_timestamp) AS session_end
FROM page_views
GROUP BY user_id, session_id;

Insert session data and query:

INSERT INTO page_views VALUES
(1, '/products/laptop', 'sess_001', 'google.com', 45, '2026-04-01 10:00:00+00'),
(1, '/products/laptop/reviews', 'sess_001', '/products/laptop', 30, '2026-04-01 10:02:00+00'),
(1, '/checkout', 'sess_001', '/products/laptop', 20, '2026-04-01 10:05:00+00'),
(2, '/category/clothing', 'sess_002', 'facebook.com', 60, '2026-04-01 10:01:00+00'),
(2, '/products/jacket', 'sess_002', '/category/clothing', 90, '2026-04-01 10:04:00+00'),
(3, '/home', 'sess_003', 'direct', 15, '2026-04-01 10:02:00+00'),
(3, '/category/books', 'sess_003', '/home', 25, '2026-04-01 10:04:00+00'),
(3, '/products/novel', 'sess_003', '/category/books', 40, '2026-04-01 10:07:00+00');

SELECT * FROM user_session_features ORDER BY user_id;
 user_id | session_id | pages_per_session | total_session_duration_seconds | avg_time_per_page |       session_start       |        session_end
---------+------------+-------------------+--------------------------------+-------------------+---------------------------+---------------------------
       1 | sess_001   |                 3 |                             95 |             31.67 | 2026-04-01 10:00:00+00:00 | 2026-04-01 10:05:00+00:00
       2 | sess_002   |                 2 |                            150 |                75 | 2026-04-01 10:01:00+00:00 | 2026-04-01 10:04:00+00:00
       3 | sess_003   |                 3 |                             80 |             26.67 | 2026-04-01 10:02:00+00:00 | 2026-04-01 10:07:00+00:00

User 2 viewed only two pages but spent 150 seconds total, with 90 seconds on the product page alone. That high dwell time signals strong purchase intent. User 3 browsed three pages quickly, suggesting exploratory behavior. These session features complement the activity aggregations from the previous section, giving your model a richer view of each user.

Transaction Spending Pattern Features

For models that predict lifetime value, detect fraud, or personalize pricing, transaction-level features are essential. You need spending velocity, average transaction size, category diversity, and amount ranges.

CREATE TABLE transactions (
    user_id INT,
    transaction_id VARCHAR,
    amount DOUBLE PRECISION,
    currency VARCHAR,
    merchant_category VARCHAR,
    event_timestamp TIMESTAMPTZ
);

CREATE MATERIALIZED VIEW sliding_window_features AS
SELECT
    user_id,
    COUNT(*) AS transaction_count,
    SUM(amount) AS total_spent,
    AVG(amount) AS avg_transaction_amount,
    MIN(amount) AS min_transaction_amount,
    MAX(amount) AS max_transaction_amount,
    COUNT(DISTINCT merchant_category) AS unique_merchant_categories
FROM transactions
GROUP BY user_id;
INSERT INTO transactions VALUES
(1, 'txn_001', 299.99, 'USD', 'electronics', '2026-04-01 10:15:00+00'),
(1, 'txn_002', 49.99, 'USD', 'clothing', '2026-04-01 10:25:00+00'),
(1, 'txn_005', 19.99, 'USD', 'books', '2026-04-01 09:30:00+00'),
(2, 'txn_003', 59.99, 'USD', 'clothing', '2026-04-01 10:30:00+00'),
(2, 'txn_006', 149.99, 'USD', 'electronics', '2026-04-01 09:00:00+00'),
(3, 'txn_004', 49.99, 'USD', 'books', '2026-04-01 10:16:00+00');

SELECT * FROM sliding_window_features ORDER BY user_id;
 user_id | transaction_count | total_spent | avg_transaction_amount | min_transaction_amount | max_transaction_amount | unique_merchant_categories
---------+-------------------+-------------+------------------------+------------------------+------------------------+----------------------------
       1 |                 3 |      369.97 |                 123.32 |                  19.99 |                 299.99 |                          3
       2 |                 2 |      209.98 |                 104.99 |                  59.99 |                 149.99 |                          2
       3 |                 1 |       49.99 |                  49.99 |                  49.99 |                  49.99 |                          1

User 1 has high spending diversity: three categories, amounts ranging from $19.99 to $299.99. This wide range and category diversity are strong signals for a customer lifetime value model. User 3 has a single small transaction, indicating an early-stage customer where the model might prioritize engagement features over spending patterns.

Serving Features via a Unified Feature Store

The real power of this approach emerges when you combine all feature sources into a single materialized view that serves as your feature store. Downstream ML services query one view to get a complete feature vector for any user.

CREATE MATERIALIZED VIEW ml_feature_store AS
SELECT
    a.user_id,
    -- Activity features
    a.total_clicks,
    a.total_add_to_cart,
    a.total_purchases,
    a.total_purchase_value,
    a.unique_categories_browsed,
    a.click_to_purchase_ratio,
    -- Transaction features
    t.transaction_count,
    t.total_spent,
    t.avg_transaction_amount,
    t.min_transaction_amount,
    t.max_transaction_amount,
    t.unique_merchant_categories
FROM user_activity_features a
LEFT JOIN sliding_window_features t ON a.user_id = t.user_id;

Query the unified feature store:

SELECT * FROM ml_feature_store ORDER BY user_id;
 user_id | total_clicks | total_add_to_cart | total_purchases | total_purchase_value | unique_categories_browsed | click_to_purchase_ratio | transaction_count | total_spent | avg_transaction_amount | min_transaction_amount | max_transaction_amount | unique_merchant_categories
---------+--------------+-------------------+-----------------+----------------------+---------------------------+-------------------------+-------------------+-------------+------------------------+------------------------+------------------------+----------------------------
       1 |            2 |                 1 |               1 |               299.99 |                         1 |                     0.5 |                 3 |      369.97 |                 123.32 |                  19.99 |                 299.99 |                          3
       2 |            2 |                 1 |               0 |                    0 |                         1 |                       0 |                 2 |      209.98 |                 104.99 |                  59.99 |                 149.99 |                          2
       3 |            3 |                 1 |               1 |                49.99 |                         2 |                    0.33 |                 1 |       49.99 |                  49.99 |                  49.99 |                  49.99 |                          1

Thirteen features per user, computed in real time, queryable via standard PostgreSQL. Your ML serving layer fetches features with a simple SELECT * FROM ml_feature_store WHERE user_id = ? query using any PostgreSQL client library.

Real-Time Updates in Action

The critical advantage over batch systems is that these features update automatically when new events arrive. Watch what happens when user 2 generates new activity:

-- New events arrive for user 2
INSERT INTO user_events VALUES
(2, 'click', 1.0, 'electronics', 'mobile', '2026-04-01 11:00:00+00'),
(2, 'add_to_cart', 799.99, 'electronics', 'mobile', '2026-04-01 11:05:00+00'),
(2, 'purchase', 799.99, 'electronics', 'mobile', '2026-04-01 11:10:00+00');

INSERT INTO transactions VALUES
(2, 'txn_007', 799.99, 'USD', 'electronics', '2026-04-01 11:10:00+00');

-- Features are automatically updated
SELECT * FROM ml_feature_store WHERE user_id = 2;
 user_id | total_clicks | total_add_to_cart | total_purchases | total_purchase_value | unique_categories_browsed | click_to_purchase_ratio | transaction_count | total_spent | avg_transaction_amount | min_transaction_amount | max_transaction_amount | unique_merchant_categories
---------+--------------+-------------------+-----------------+----------------------+---------------------------+-------------------------+-------------------+-------------+------------------------+------------------------+------------------------+----------------------------
       2 |            3 |                 2 |               2 |               859.98 |                         2 |                    0.67 |                 3 |     1009.97 |                 336.66 |                  59.99 |                 799.99 |                          2

User 2's features reflect the new activity immediately. Their click-to-purchase ratio jumped from 0 to 0.67, total spent increased from $209.98 to $1009.97, and they now browse two categories instead of one. No batch job ran. No pipeline was triggered. The materialized views propagated the changes automatically through the dependency chain: user_events updated user_activity_features, transactions updated sliding_window_features, and both fed into ml_feature_store.

Connecting to Your ML Serving Layer

Since RisingWave is PostgreSQL-compatible, your ML serving layer connects to it the same way it would connect to any PostgreSQL database. Here is a Python example using psycopg2:

import psycopg2

conn = psycopg2.connect(
    host="localhost",
    port=4566,
    user="root",
    dbname="dev"
)

def get_user_features(user_id: int) -> dict:
    with conn.cursor() as cur:
        cur.execute(
            "SELECT * FROM ml_feature_store WHERE user_id = %s",
            (user_id,)
        )
        columns = [desc[0] for desc in cur.description]
        row = cur.fetchone()
        if row:
            return dict(zip(columns, row))
        return {}

# Fetch real-time features for model inference
features = get_user_features(user_id=2)
# Pass features to your model
# prediction = model.predict(features)

This pattern gives you sub-millisecond feature lookups because you are reading precomputed results from a materialized view, not running aggregations on the fly.

Streaming Feature Engineering vs. Batch: A Practical Comparison

DimensionBatch Feature EngineeringStreaming with RisingWave
Feature freshnessMinutes to hoursSeconds
Training-serving skewHigh risk (different compute paths)Low risk (same SQL logic)
InfrastructureOrchestrator + warehouse + feature storeSingle streaming database
Query languageSQL + Python/SparkSQL only
Scaling new featuresNew DAG nodes, pipeline changesNew materialized view
Backfill complexityFull reprocessing requiredReplay from source topic
Operational burdenHigh (monitoring, retries, scheduling)Low (always-on, self-maintaining)

The streaming approach is not a replacement for all batch processing. Historical feature computation over months of data, complex feature engineering that requires iterative Python logic, and features derived from data at rest (like annual summaries) still benefit from batch pipelines. The sweet spot for streaming feature engineering is features that need freshness measured in seconds, are expressible as SQL aggregations, and feed online serving systems.

Production Considerations

Connecting to Kafka Sources

In production, you replace the CREATE TABLE statements with Kafka sources that consume events directly from your message broker:

CREATE SOURCE user_events_source (
    user_id INT,
    event_type VARCHAR,
    event_value DOUBLE PRECISION,
    product_category VARCHAR,
    device_type VARCHAR,
    event_timestamp TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'user-events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

The materialized views built on top of this source work identically to the table-based examples shown earlier. The only difference is that data arrives from Kafka instead of INSERT statements.

Monitoring Feature Quality

Feature drift and data quality issues can silently degrade model performance. Build monitoring views that track feature distributions:

CREATE MATERIALIZED VIEW feature_quality_monitor AS
SELECT
    COUNT(*) AS total_users,
    AVG(total_clicks) AS avg_clicks_across_users,
    AVG(click_to_purchase_ratio) AS avg_conversion_rate,
    AVG(total_spent) AS avg_total_spent
FROM ml_feature_store;

Query this view periodically or connect it to your alerting system to detect distributional shifts that might indicate upstream data quality problems or genuine behavioral changes that require model retraining.

Sink to External Feature Stores

If your organization already uses a dedicated feature store like Feast, you can use RisingWave sinks to push computed features downstream. RisingWave supports sinking to Kafka, PostgreSQL, Iceberg, and other destinations, letting you integrate with your existing ML infrastructure without replacing it.

What is real-time feature engineering for machine learning?

Real-time feature engineering is the process of computing ML model input features from live data streams as events occur, rather than in scheduled batch jobs. It ensures that features used for model inference are always up to date, reducing the gap between when an event happens and when a model can act on it. This is particularly important for use cases like fraud detection, real-time recommendations, and dynamic pricing where stale features lead to poor predictions.

RisingWave and Apache Flink both process streaming data, but they differ significantly in developer experience for feature engineering. RisingWave lets data scientists define features as SQL materialized views and query results directly via PostgreSQL protocol, requiring no Java or Scala code. Flink requires writing applications in Java, Scala, or PyFlink, and typically writes results to an external database for serving. For SQL-heavy feature engineering workloads, RisingWave reduces both development time and infrastructure complexity.

Can I use streaming features for model training, not just serving?

Yes. Since RisingWave materialized views are queryable with standard SQL, you can extract training datasets directly from the same feature definitions used in serving. Run a SELECT query with a WHERE clause on the timestamp range you need, export to CSV or Parquet via tools like psql \copy, and feed that into your training pipeline. This ensures your training features match your serving features exactly, eliminating training-serving skew.

What types of ML features work best with streaming SQL?

Streaming SQL excels at aggregation-based features: counts, sums, averages, distinct counts, min/max values, and ratios computed over event streams. Windowed aggregations (tumbling, sliding, session windows) and joins across multiple event streams are also well-suited. Features that require complex iterative computation, matrix operations, or access to large historical datasets are better handled by batch Python pipelines.

Conclusion

Real-time feature engineering transforms ML pipelines from reactive systems that process yesterday's data into proactive systems that act on what is happening right now. Here are the key takeaways:

  • Materialized views as feature definitions: Each CREATE MATERIALIZED VIEW statement defines a set of features that RisingWave maintains incrementally. No scheduling, no orchestration, no batch jobs.
  • SQL is sufficient for most features: Aggregations, ratios, distinct counts, and joins cover the majority of production ML features. You do not need a complex framework.
  • Unified feature store: A single materialized view that joins multiple feature views gives your ML serving layer one place to query for complete feature vectors.
  • Automatic propagation: When upstream data changes, all downstream materialized views update automatically through the dependency chain.
  • PostgreSQL compatibility: Your existing ML infrastructure connects to RisingWave without code changes if it can talk to PostgreSQL.

Ready to build your own streaming feature store? Get started with RisingWave in 5 minutes. Quickstart here.

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.