How to Build Real-Time Feature Pipelines with Streaming SQL

How to Build Real-Time Feature Pipelines with Streaming SQL

·

10 min read

Most machine learning models in production run on stale features. The training pipeline computes features from a snapshot of the data warehouse, but the serving pipeline reads from a feature store that refreshes every hour, or every six hours, or once a day. The model sees a customer's purchase history from this morning, not from five seconds ago. It sees inventory levels from the last batch run, not the current count.

This freshness gap is where ML systems break. A fraud model trained on real-time patterns but served hour-old features will miss the burst of transactions that just started. A recommendation model that cannot see the items a user added to their cart three minutes ago will suggest those same items. A pricing model that relies on yesterday's demand signal will underprice surge periods and overprice quiet ones.

The solution is a real-time feature pipeline: a system that continuously computes feature values from live event streams and serves them with sub-second freshness. This post shows you how to build one using streaming SQL and materialized views in RisingWave, a PostgreSQL-compatible streaming database, without writing a single line of Java, Python, or Scala.

What Is a Real-Time Feature Pipeline?

A feature pipeline is the system that transforms raw data into the numerical inputs (features) that a machine learning model consumes at inference time. In a batch architecture, this pipeline runs on a schedule: a Spark job reads from the data warehouse every hour, computes aggregates, and writes the results to a feature store.

A real-time feature pipeline replaces that scheduled job with a continuous process. Instead of reading from a warehouse, it reads from an event stream (Kafka, Redshift Streaming, Kinesis). Instead of running every hour, it runs constantly, updating feature values within milliseconds of each new event.

Why Batch Feature Pipelines Fall Short

Batch feature pipelines introduce three problems that compound in production:

  • Freshness decay: The model sees features that were accurate at computation time but may be stale by serving time. A feature computed at 2:00 AM is 18 hours old by 8:00 PM. For use cases like fraud detection, pricing, and personalization, this delay directly reduces model accuracy.
  • Training-serving skew: Training pipelines often compute features from raw event data, but serving pipelines read pre-aggregated values from a feature store. Subtle differences in join logic, window boundaries, or null handling between these two code paths cause the model to see different feature distributions in production than it saw during training.
  • Operational complexity: Batch pipelines require orchestration (Airflow, Dagster), monitoring for late arrivals, backfill logic for failed runs, and careful scheduling to avoid resource contention. Each feature table adds another DAG node and another failure mode.

How Streaming SQL Solves These Problems

A streaming SQL engine like RisingWave lets you define features as SQL queries over live streams. The engine continuously and incrementally maintains the query results as materialized views. When a new event arrives, only the affected feature rows are recomputed, not the entire table. This gives you:

  • Sub-second freshness: Features update within milliseconds of each source event.
  • Single definition: The same SQL query defines both the training feature (via a historical backfill) and the serving feature (via the live materialized view). No skew.
  • No orchestration: No DAGs, no schedulers, no cron jobs. The materialized view is always up to date.

How to Design Features as Materialized Views

The key insight is that most ML features are aggregations over time windows. "Number of transactions in the last 30 minutes," "average order value in the last 7 days," "distinct IP addresses seen in the last hour" - these are all expressible as SQL window aggregations.

In RisingWave, you model these as materialized views that read from a streaming source. The engine handles the windowing, the incremental computation, and the state management.

Step 1: Create Your Event Source

First, connect to your event stream. This example uses a Kafka topic containing user transaction events:

CREATE SOURCE transactions_stream (
    transaction_id VARCHAR,
    user_id VARCHAR,
    amount DECIMAL,
    merchant_category VARCHAR,
    ip_address VARCHAR,
    device_id VARCHAR,
    event_time TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'transactions',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

For user profile updates, create a second source:

CREATE SOURCE user_events_stream (
    user_id VARCHAR,
    event_type VARCHAR,
    old_value VARCHAR,
    new_value VARCHAR,
    event_time TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'user_events',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Define Feature Materialized Views

Now define your features. Each materialized view continuously computes one logical group of features:

Transaction velocity features (how fast a user is transacting):

CREATE MATERIALIZED VIEW user_transaction_features AS
SELECT
    user_id,
    COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '5 minutes')
        AS txn_count_5min,
    COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '30 minutes')
        AS txn_count_30min,
    COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '24 hours')
        AS txn_count_24h,
    SUM(amount) FILTER (WHERE event_time > NOW() - INTERVAL '30 minutes')
        AS total_amount_30min,
    AVG(amount) FILTER (WHERE event_time > NOW() - INTERVAL '24 hours')
        AS avg_amount_24h,
    MAX(amount) FILTER (WHERE event_time > NOW() - INTERVAL '24 hours')
        AS max_amount_24h,
    COUNT(DISTINCT merchant_category)
        FILTER (WHERE event_time > NOW() - INTERVAL '1 hour')
        AS distinct_merchants_1h,
    COUNT(DISTINCT ip_address)
        FILTER (WHERE event_time > NOW() - INTERVAL '1 hour')
        AS distinct_ips_1h
FROM transactions_stream
GROUP BY user_id;

Spending pattern features (what the user typically buys):

CREATE MATERIALIZED VIEW user_spending_patterns AS
SELECT
    user_id,
    merchant_category,
    COUNT(*) AS category_txn_count,
    SUM(amount) AS category_total_spend,
    AVG(amount) AS category_avg_spend
FROM transactions_stream
WHERE event_time > NOW() - INTERVAL '30 days'
GROUP BY user_id, merchant_category;

Device and location features (behavioral fingerprinting):

CREATE MATERIALIZED VIEW user_device_features AS
SELECT
    user_id,
    COUNT(DISTINCT device_id)
        FILTER (WHERE event_time > NOW() - INTERVAL '24 hours')
        AS distinct_devices_24h,
    COUNT(DISTINCT ip_address)
        FILTER (WHERE event_time > NOW() - INTERVAL '24 hours')
        AS distinct_ips_24h
FROM transactions_stream
GROUP BY user_id;

Step 3: Query Features at Serving Time

Because RisingWave is PostgreSQL-compatible, your model serving layer can read features with a simple SQL query over a standard PostgreSQL connection:

SELECT
    t.txn_count_5min,
    t.txn_count_30min,
    t.txn_count_24h,
    t.total_amount_30min,
    t.avg_amount_24h,
    t.max_amount_24h,
    t.distinct_merchants_1h,
    t.distinct_ips_1h,
    d.distinct_devices_24h,
    d.distinct_ips_24h
FROM user_transaction_features t
JOIN user_device_features d ON t.user_id = d.user_id
WHERE t.user_id = 'user_12345';

Expected output:

txn_count_5min | txn_count_30min | txn_count_24h | total_amount_30min | avg_amount_24h | max_amount_24h | distinct_merchants_1h | distinct_ips_1h | distinct_devices_24h | distinct_ips_24h
3              | 7               | 42            | 1250.00            | 89.50          | 450.00         | 4                     | 2               | 1                    | 2

This query returns in single-digit milliseconds because it reads pre-computed results, not raw events. Your inference service calls this once per prediction request, gets fresh features, and passes them to the model.

How Does This Compare to Traditional Feature Stores?

Traditional feature stores like Feast, Tecton, and Hopsworks provide an abstraction for managing features. They solve important problems: feature discovery, feature reuse, and point-in-time correctness for training. But they are not feature computation engines. They still rely on an external pipeline (Spark, Flink, Airflow) to compute feature values and push them into the store.

A streaming SQL approach with RisingWave replaces the computation layer, not the feature store itself. You can use both together: let RisingWave compute features in real time, then sink the results to your feature store for discovery and governance.

CapabilityBatch Pipeline + Feature StoreStreaming SQL (RisingWave)
Feature freshnessMinutes to hoursMilliseconds to seconds
Compute engineSpark, Flink (separate system)Built-in (materialized views)
Feature definitionPython/Scala transformationsSQL queries
Training-serving consistencyRequires careful engineeringSame SQL for both
Orchestration neededYes (Airflow, Dagster)No
Incremental computationDepends on implementationAutomatic
Serving latencyLow (from cache)Low (from materialized view)

The advantage of the streaming SQL approach is simplicity. You write SQL, and the system handles incremental computation, state management, fault tolerance, and serving. You do not need to maintain separate batch and streaming code paths.

How to Sink Features to Downstream Systems

RisingWave materialized views can be queried directly, but you can also push feature values to external systems. This is useful when your inference service already reads from Redis, PostgreSQL, or an Apache Iceberg table.

Sink to PostgreSQL

Push features to a PostgreSQL table that your model serving layer already queries:

CREATE SINK user_features_to_pg FROM user_transaction_features
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://feature-db:5432/features',
    table.name = 'user_transaction_features',
    type = 'upsert',
    primary_key = 'user_id'
);

Sink to Apache Iceberg

Write features to an Iceberg table for both real-time serving and historical training:

CREATE SINK user_features_to_iceberg FROM user_transaction_features
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'user_id',
    warehouse.path = 's3://my-warehouse/features',
    database.name = 'ml_features',
    table.name = 'user_transaction_features',
    catalog.type = 'rest',
    catalog.uri = 'http://iceberg-catalog:8181'
);

This pattern solves training-serving skew at the architecture level. The same materialized view definition produces both the live serving features (queried directly or via the PostgreSQL sink) and the historical training features (via the Iceberg sink). Your training pipeline reads from the Iceberg table, and your serving pipeline reads from the materialized view. Both use identical computation logic because they share the same SQL definition.

What Are the Performance Characteristics?

Real-time feature pipelines need to handle high throughput (millions of events per second) while maintaining low latency (features update within seconds of each event). RisingWave achieves this through several architectural choices:

  • Incremental computation: When a new transaction event arrives, RisingWave does not recompute all features for all users. It updates only the affected rows in the materialized view. For a GROUP BY user_id query, a single event triggers an update to exactly one user's feature row.
  • Shared state: Multiple materialized views can read from the same source without duplicating ingestion. Your transaction velocity features, spending pattern features, and device features all share one Kafka consumer.
  • Automatic checkpointing: RisingWave persists state to S3-compatible storage with periodic checkpoints. If a node fails, it recovers from the last checkpoint without data loss and without reprocessing the entire stream.
  • Horizontal scaling: You can scale compute nodes independently of storage. During peak traffic (Black Friday, market open), add compute nodes to handle the increased event rate. During quiet periods, scale down to reduce cost.

For a fraud detection use case ingesting 50,000 transactions per second, a typical RisingWave deployment maintains feature freshness under 200 milliseconds with three compute nodes.

FAQ

What is a real-time feature pipeline?

A real-time feature pipeline is a system that continuously computes machine learning feature values from live event streams, delivering sub-second feature freshness to model serving endpoints. Unlike batch feature pipelines that run on hourly or daily schedules, real-time pipelines process each event as it arrives and update feature values incrementally.

How do streaming materialized views prevent training-serving skew?

Training-serving skew occurs when the code that computes features for training differs from the code that computes features for serving. With streaming materialized views, you write a single SQL query that defines the feature. The same query produces both the live serving values (from the materialized view) and the historical training data (by sinking to an Iceberg table). Since both paths use identical SQL logic, skew is eliminated by design.

Can RisingWave replace a feature store?

RisingWave replaces the feature computation layer, not the entire feature store. It handles real-time feature computation and low-latency serving via its PostgreSQL-compatible interface. For feature discovery, metadata management, and access control, you can pair RisingWave with a feature store like Feast or Tecton by sinking computed features downstream.

Streaming SQL in RisingWave offers lower operational complexity than Spark Structured Streaming or Apache Flink for feature engineering. You define features as SQL queries instead of writing Java or Python transformation code. RisingWave handles incremental computation, state management, and fault tolerance automatically. The trade-off is that complex feature logic requiring custom algorithms may still benefit from a general-purpose framework, but most aggregate, window, and join-based features are well-suited to SQL.

Conclusion

Real-time feature pipelines eliminate the freshness gap that degrades ML model accuracy in production. Here are the key takeaways:

  • Batch feature pipelines introduce staleness: Hourly or daily refreshes mean your model decisions are based on old data, which directly hurts accuracy for time-sensitive use cases like fraud detection, pricing, and personalization.
  • Materialized views are a natural fit for features: Most ML features are aggregations over time windows, which map directly to SQL GROUP BY queries maintained incrementally as materialized views.
  • Streaming SQL eliminates training-serving skew: A single SQL definition produces both serving features and training data, removing the most common source of model degradation in production.
  • No orchestration needed: Unlike batch pipelines that require Airflow DAGs and backfill logic, streaming materialized views are always up to date with zero operational overhead.
  • Standard PostgreSQL compatibility: Your inference service reads features over a standard PostgreSQL connection, with no custom SDKs or APIs required.

Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart →

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.