Point-in-Time Joins in SQL: Building ML Training Datasets Without Data Leakage

Point-in-Time Joins in SQL: Building ML Training Datasets Without Data Leakage

You train a churn model. It shows 91% AUC on your held-out test set. You deploy it. It performs no better than a naive baseline. You've just been bitten by data leakage — and the root cause is almost always the same: your training features were computed using information that wasn't available at the time the label was assigned.

This article explains the problem precisely, shows you how point-in-time joins fix it, and walks through a complete example using RisingWave SQL — including exporting the training dataset to S3 as Parquet.

What Data Leakage Is (and Why It's So Easy to Get Wrong)

Data leakage in ML training happens when a feature used during training contains information from the future relative to the label event. The model learns from a signal that won't exist at inference time, so its apparent accuracy doesn't transfer to production.

The tricky part: leakage is often invisible in a standard feature pipeline. Here's a concrete example.

Say you're building a churn model. A user churns (stops using your product) on day T. You want to predict that churn 7 days in advance — at day T-7 — based on their behavior in the 30 days before that.

Your feature pipeline computes order_count_30d for each user. The problem is when you compute it. If you run the feature computation job on day T+7 (after you've already labeled the user as churned), the order_count_30d feature reflects order behavior up through day T+7. But at inference time, when you're predicting churn for an active user on some future date, that feature is computed in real time — and it only reflects orders through "now."

The result: during training, your model sees order_count_30d = 0 for churned users (because they made no orders after T), computed over a window that extends into the post-churn period. At inference time, order_count_30d for an active user reflects genuine recent orders. The feature distributions are fundamentally different between training and inference. The model has learned from a ghost.

This is the most common form of leakage in tabular ML. Others include:

  • Target leakage: a feature is itself derived from the label (e.g., using refund_count to predict a refund)
  • Group leakage: train/test split doesn't respect time or user boundaries, so test users appear in training aggregates
  • Preprocessing leakage: normalization statistics (mean, stddev) are computed over the entire dataset including the test set

Point-in-time joins address the first category: temporal leakage from feature values computed at the wrong time.

The Correct Approach: Point-in-Time Joins

A point-in-time join says: for each label event at timestamp T, fetch the feature value that was valid at exactly time T — not the feature value as of now, not the feature value as of when the training job ran.

More precisely, it answers: "What would this feature have been if I had queried it at the moment the label was recorded?"

This is exactly what a feature store's offline store provides. When you call get_historical_features() in Feast or Tecton, it performs a point-in-time correct lookup into a table of timestamped feature snapshots. For each (entity, timestamp) pair in your label dataset, it finds the most recent feature row where feature_timestamp <= label_timestamp.

The SQL for this, written manually, looks like an ASOF join or a lateral subquery with a time filter:

-- Manual point-in-time join (verbose, error-prone)
SELECT
    l.user_id,
    l.label,
    l.label_time,
    f.order_count_30d,
    f.total_spend_30d
FROM churn_labels l
LEFT JOIN LATERAL (
    SELECT order_count_30d, total_spend_30d
    FROM user_order_features_history
    WHERE user_id = l.user_id
      AND computed_at <= l.label_time
    ORDER BY computed_at DESC
    LIMIT 1
) f ON true;

This works, but it requires you to maintain a historical snapshot table of every feature at every point in time — which is expensive to store and complex to maintain.

RisingWave takes a different approach.

How RisingWave Implements Temporal Joins

RisingWave is a streaming SQL database that maintains materialized views incrementally as data arrives. Every materialized view in RisingWave is essentially a continuously updated table. But RisingWave also supports reading a materialized view at a specific historical point in time using the FOR SYSTEM_TIME AS OF syntax.

SELECT * FROM my_materialized_view FOR SYSTEM_TIME AS OF '2025-01-15 12:00:00';

This is standard SQL temporal syntax (from SQL:2011), and RisingWave uses its internal versioned state to serve it. You don't need to separately maintain a snapshot table — RisingWave's storage layer handles the versioning.

When used inside a join, this becomes a point-in-time join:

SELECT
    l.user_id,
    l.label,
    l.label_time,
    f.order_count_30d,
    f.total_spend_30d
FROM churn_labels l
JOIN user_order_features FOR SYSTEM_TIME AS OF l.label_time f
    ON l.user_id = f.user_id;

For each row in churn_labels, this fetches the state of user_order_features as it existed at label_time. If the label was recorded at 2025-01-10 09:00:00, the join uses the feature values that were valid at 2025-01-10 09:00:00 — not the current values.

This is point-in-time correctness without a feature store.

Step-by-Step: Building a Leakage-Free Training Dataset

Step 1: Ingest Raw Events

First, connect your event stream. Here we assume orders arrive via Kafka:

CREATE SOURCE orders_source (
    order_id VARCHAR,
    user_id VARCHAR,
    amount NUMERIC,
    created_at TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'orders',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Step 2: Create Feature Materialized Views

Define the features you want to use. RisingWave computes these incrementally as new events arrive.

CREATE MATERIALIZED VIEW user_order_features AS
SELECT
    user_id,
    COUNT(*) AS order_count_30d,
    SUM(amount) AS total_spend_30d,
    AVG(amount) AS avg_order_value,
    MAX(created_at) AS last_order_at
FROM orders
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY user_id;

This view is always fresh — it reflects the current 30-day window in real time. RisingWave maintains it incrementally without recomputing from scratch on each query.

You can define as many feature views as you need. For example, engagement features from a clickstream source:

CREATE MATERIALIZED VIEW user_engagement_features AS
SELECT
    user_id,
    COUNT(*) AS session_count_7d,
    SUM(page_views) AS total_page_views_7d,
    MAX(session_start) AS last_active_at
FROM user_sessions
WHERE session_start >= NOW() - INTERVAL '7 days'
GROUP BY user_id;

Step 3: Create the Labels Table

Label events come from your business logic — a user cancellation event, a payment failure, a support ticket. Store them in a table with an explicit timestamp:

CREATE TABLE churn_labels (
    user_id VARCHAR,
    label INTEGER,         -- 1 = churned, 0 = retained
    label_time TIMESTAMPTZ,
    PRIMARY KEY (user_id, label_time)
);

The label_time column is the anchor for the point-in-time join. It should represent the time at which the label became known — not the time the job ran, not the current time.

Step 4: Run the Point-in-Time Join

Now produce the training dataset:

SELECT
    l.user_id,
    l.label,
    l.label_time,
    f.order_count_30d,
    f.total_spend_30d,
    f.avg_order_value,
    f.last_order_at,
    e.session_count_7d,
    e.total_page_views_7d,
    e.last_active_at
FROM churn_labels l
JOIN user_order_features FOR SYSTEM_TIME AS OF l.label_time f
    ON l.user_id = f.user_id
LEFT JOIN user_engagement_features FOR SYSTEM_TIME AS OF l.label_time e
    ON l.user_id = e.user_id;

Notice the LEFT JOIN for engagement features — a user may have zero sessions in the 7-day window, so we want to keep the row and fill with NULLs rather than dropping it.

This query returns one row per label event, with features anchored to label_time. For a label at 2025-01-10 09:00:00, you get order_count_30d and session_count_7d as they existed at that exact moment — not retroactively updated values.

Step 5: Export Training Data to S3 as Parquet

Once you have the training query, export it for model training. RisingWave supports sinking directly to S3 in Parquet format:

CREATE SINK training_dataset_sink AS
SELECT
    l.user_id,
    l.label,
    l.label_time,
    f.order_count_30d,
    f.total_spend_30d,
    f.avg_order_value,
    f.last_order_at,
    e.session_count_7d,
    e.total_page_views_7d
FROM churn_labels l
JOIN user_order_features FOR SYSTEM_TIME AS OF l.label_time f
    ON l.user_id = f.user_id
LEFT JOIN user_engagement_features FOR SYSTEM_TIME AS OF l.label_time e
    ON l.user_id = e.user_id
WITH (
    connector = 's3',
    s3.region_name = 'us-east-1',
    s3.bucket_name = 'my-ml-training-data',
    s3.path = 'churn-model/training/',
    s3.file_type = 'parquet'
);

The Parquet files land in S3 and can be read directly by PyTorch, scikit-learn, or any training framework that supports the Parquet format:

import pandas as pd

df = pd.read_parquet("s3://my-ml-training-data/churn-model/training/")
X = df.drop(columns=["user_id", "label", "label_time"])
y = df["label"]

For periodic re-exports (e.g., daily training runs), you can filter the label table by date range before sinking, or use an external orchestrator like Airflow or Prefect to trigger the export job.

Common Pitfalls

Clock Skew Between Sources

If your orders stream and your sessions stream have different clock sources or processing delays, a feature computed "at time T" may reflect different actual times depending on which source it came from. For example, if Kafka lag causes session events to be processed 2 minutes late, your user_engagement_features at time T may actually reflect sessions up to T+2.

Mitigation: use watermarks and event-time semantics consistently across all sources. RisingWave supports event-time processing with WATERMARK definitions on source columns.

Feature Computation Lag

Materialized views in any streaming system have a small lag between when an event arrives and when the view is updated. For most ML use cases, lag of a few seconds to a few minutes is acceptable. But if your label events arrive faster than your features are updated, the point-in-time join may return stale feature values.

Mitigation: monitor materialized view freshness with SHOW MATERIALIZED VIEW STATUS. For training datasets, inject a small delay (e.g., subtract 1 minute from label_time) to ensure features have settled before the join.

Label Delay

The label itself may be delayed. For churn, you often don't know a user has churned until 30 days of inactivity have passed. If you anchor features to the event that triggered the label assignment (which may be a batch job run on day T+30), the features you fetch may reflect a period that's too far in the future.

Mitigation: define label_time as the time the churn began — the last active timestamp plus your churn threshold — not the time the label was generated. This keeps the feature window aligned with the actual prediction moment.

State Retention Window

FOR SYSTEM_TIME AS OF in RisingWave is bounded by the system's state retention window. If you're joining to a label that's 90 days old but the materialized view only retains 30 days of versioned state, the lookup will fail or return empty.

Mitigation: configure state_retention_seconds appropriately for your use case, or snapshot feature state to a historical table if you need very long lookback windows.

FAQ

Do I still need a feature store if I use RisingWave?

It depends on what you need from a feature store. If your primary requirements are online serving (low-latency feature retrieval at inference time) and training dataset generation, RisingWave covers both: materialized views serve features online, and temporal joins produce training datasets. If you need a feature registry, lineage tracking, or cross-team feature sharing, a dedicated feature store layer may still add value.

What's the difference between FOR SYSTEM_TIME AS OF and a time-travel query?

Time-travel queries in systems like Snowflake or Delta Lake let you query the state of a table at a past point in time — useful for auditing or recovery. FOR SYSTEM_TIME AS OF is the same mechanism but used inside a join, so each row in the join gets its own time anchor. The result is row-level temporal correctness rather than a single snapshot.

Can I use this for regression or multi-class problems?

Yes. The join mechanism is label-agnostic. churn_labels could store any numeric target or categorical label. The only requirement is that the label table has a timestamp column representing when the label event occurred.

How does this compare to ASOF joins?

ASOF joins (available in DuckDB and some other systems) find the nearest preceding row by timestamp. They're useful when you have a history table with explicit timestamps for each feature version. FOR SYSTEM_TIME AS OF in RisingWave is different: it queries the internal versioned state of a materialized view, so you don't need to maintain a separate history table at all.

What happens if a user has no feature row at label_time?

With JOIN, the label row is dropped. With LEFT JOIN, it's kept and feature columns are NULL. For users who had no activity in the feature window, NULL features are the correct representation. Handle them in your preprocessing pipeline (imputation, separate indicator features, etc.).

Can I backfill labels and still get correct point-in-time features?

Yes, as long as the versioned state covers the label timestamp. If you're backfilling labels from 6 months ago, you need 6 months of retained state in the materialized view. For very long historical backfills, it's often more practical to replay the raw event stream and compute features from scratch rather than relying on retained state.


Data leakage is a silent killer for ML models. It's invisible in evaluation metrics and only becomes apparent when models fail to generalize to production. Point-in-time joins are the correct technical solution, and they don't require a dedicated feature store infrastructure when you're running a streaming SQL database like RisingWave.

The key discipline is simple: never join a feature to a label unless you can guarantee that the feature value was knowable at the time the label was recorded. FOR SYSTEM_TIME AS OF enforces that guarantee at the database level, so you can't accidentally get it wrong.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.