Process-Time Temporal Joins in RisingWave: Zero State on the Stream Side

Process-Time Temporal Joins in RisingWave: Zero State on the Stream Side

Every e-commerce platform has some version of this pipeline: orders land in a Kafka topic, you enrich each one with product details from a catalog table, and the result feeds a dashboard, a feature store, or a downstream sink. The join looks trivial on paper. In a streaming system, it creates a state management problem that gets worse with every order you process.

A regular streaming join holds state for both sides. That means every order you have ever received sits in memory (or spilled to disk) waiting for a hypothetical late-arriving product record that will never come. For an append-only orders stream joined against a product catalog that changes once a week, you are paying for state that does almost no work.

Process-time temporal joins solve this directly. The syntax is FOR SYSTEM_TIME AS OF PROCTIME(), and it tells RisingWave to look up the current state of the dimension table at the moment each event arrives rather than maintaining a copy of the stream for future matching. The stream side accumulates zero join state. Only the dimension table itself is retained.

This post covers how temporal joins work in RisingWave, when to use them, and the two gotchas you need to know before deploying them in production.

What Is a Process-Time Temporal Join?

A temporal join looks up the right-side table at processing time. Each incoming row on the left (the stream) triggers a point-in-time read of the right-side table using whatever value is current at that moment.

The key distinction from a regular hash join is what each side stores:

  • In a regular hash join, RisingWave builds and maintains hash tables for both sides. New events on either side can match against accumulated state from the other.
  • In a temporal join, the stream side maintains no state at all. The dimension table is the state. When a new order arrives, RisingWave looks up the product catalog directly and moves on.

This is not a theoretical difference. In practice, it means your stream-side state does not grow with event volume. A pipeline processing one million orders per day has the same join-state footprint as one processing ten thousand.

Here is a comparison of the two approaches:

Join TypeStream-side stateDimension-side stateWhen to use
Regular hash joinFull stream historyFull dimension tableBoth sides change frequently
Temporal join (PROCTIME)ZeroDimension table IS the stateAppend-only stream + slowly-changing dimension

The other consequence is update visibility. When a product is recategorized in the catalog, the next order that arrives will pick up the new category immediately. There is no stale buffered state on the stream side that could produce outdated enriched rows.

Architecture of a Temporal Join in RisingWave

The data flow looks like this:

flowchart LR
    subgraph Sources
        K[Kafka: orders_append_only]
        DB[Database: product_catalog_source]
    end

    subgraph RisingWave
        S[Stream operator\nno join state]
        D[Dimension state\nproduct catalog]
        MV[Materialized view:\norders_with_product]
    end

    K -->|append-only events| S
    DB -->|CDC / upsert| D
    S -->|lookup at PROCTIME| D
    S --> MV
    D -.->|read current value| S

Each order event triggers a lookup into the dimension state. If the product exists, the row is enriched and written to the materialized view. If it does not exist and you are using a LEFT JOIN, a NULL-filled row is emitted. No order event is ever stored waiting for a future product record.

Setting Up the Example: Orders and Product Catalog

The running example is an e-commerce platform. Orders arrive as an append-only stream from Kafka. The product catalog is a mutable table that changes occasionally as products are recategorized, rebranded, or transferred to different suppliers.

-- Append-only order events from Kafka
CREATE SOURCE orders_append_only (
    order_id BIGINT,
    user_id BIGINT,
    product_id BIGINT,
    amount DECIMAL,
    created_at TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'orders',
    properties.bootstrap.server = 'localhost:9092'
) FORMAT PLAIN ENCODE JSON;

-- Product catalog: a mutable table updated via CDC or direct upserts
CREATE TABLE product_catalog_source (
    product_id BIGINT PRIMARY KEY,
    product_name VARCHAR,
    category VARCHAR,
    brand VARCHAR,
    supplier_id BIGINT
);

The product catalog needs a primary key. Temporal joins require it because the lookup is a key-based point read, not a full scan.

The Temporal Join Query

With both sides defined, the enrichment materialized view uses FOR SYSTEM_TIME AS OF PROCTIME() on the right-side table alias:

CREATE MATERIALIZED VIEW orders_with_product AS
SELECT
  o.order_id,
  o.user_id,
  o.amount,
  o.created_at,
  p.category,
  p.brand,
  p.supplier_id
FROM orders_append_only AS o
LEFT JOIN product_catalog_source
  FOR SYSTEM_TIME AS OF PROCTIME() AS p
  ON o.product_id = p.product_id;

A few things to note about the syntax:

  • FOR SYSTEM_TIME AS OF PROCTIME() goes between the table name and the alias. The alias p comes after it.
  • LEFT JOIN is appropriate here because orders may arrive before the product catalog is fully populated, and you want to emit the order row rather than drop it.
  • The join predicate uses product_id, which is the primary key of the dimension table.

For a full reference on the temporal join syntax and its requirements, see the RisingWave temporal join documentation.

Gotcha 1: Indexes When the Right Side Is a Materialized View

If your dimension table is a base table (as in the example above), RisingWave can look up rows using the primary key directly. No additional configuration needed.

If the right side of the temporal join is a materialized view rather than a base table, RisingWave may need an explicit index whose key covers the lookup predicate. Without it, the query will either fail at plan time or fall back to a less efficient execution path.

To create the index:

CREATE INDEX ON product_catalog_source(category, product_id);

The general rule: the index key should include the column(s) you use in the ON clause of the temporal join. If you join on product_id alone, an index on product_id is sufficient. If you join on a composite key, include all columns.

Consult the RisingWave indexing guide for details on how indexes interact with materialized view lookups.

Gotcha 2: One Temporal Scan Per Query Block

RisingWave enforces a limit of one temporal scan per materialized view query. If you try to join a single stream against two dimension tables using FOR SYSTEM_TIME AS OF PROCTIME() in the same MV definition, the query will be rejected at planning time.

This is a deliberate constraint tied to how the executor handles process-time semantics. A single stream event triggers a single point-in-time lookup. Two lookups against different tables at the same PROCTIME would require two separate executor passes.

There are two clean ways to work around this:

Option 1: Chain materialized views. Each MV does one temporal join, and the output of the first becomes the input of the second.

-- First enrichment: join orders to product catalog
CREATE MATERIALIZED VIEW orders_with_product AS
SELECT
  o.order_id,
  o.user_id,
  o.product_id,
  o.amount,
  o.created_at,
  p.category,
  p.brand
FROM orders_append_only AS o
LEFT JOIN product_catalog_source
  FOR SYSTEM_TIME AS OF PROCTIME() AS p
  ON o.product_id = p.product_id;

-- Second enrichment: join the first MV to the user table
CREATE MATERIALIZED VIEW orders_enriched AS
SELECT
  op.order_id,
  op.user_id,
  op.amount,
  op.created_at,
  op.category,
  op.brand,
  u.region,
  u.tier
FROM orders_with_product AS op
LEFT JOIN user_profiles_source
  FOR SYSTEM_TIME AS OF PROCTIME() AS u
  ON op.user_id = u.user_id;

Option 2: Pre-join dimensions into a unified lookup view, then do one temporal join against it. This avoids the chain and produces a single enriched row in one step.

-- Unify two dimension tables into one lookup view
CREATE MATERIALIZED VIEW product_pricing_lookup AS
SELECT
  p.product_id,
  p.category,
  p.brand,
  pr.price_usd,
  pr.currency
FROM product_catalog_source p
JOIN pricing_source pr ON p.product_id = pr.product_id;

-- One temporal join in the enrichment MV
CREATE MATERIALIZED VIEW orders_enriched AS
SELECT
  o.order_id,
  o.user_id,
  o.amount,
  o.created_at,
  pl.category,
  pl.price_usd
FROM orders_append_only AS o
LEFT JOIN product_pricing_lookup
  FOR SYSTEM_TIME AS OF PROCTIME() AS pl
  ON o.product_id = pl.product_id;

Option 2 is generally cleaner for read latency because you avoid an extra streaming layer. Option 1 is preferable when the two dimensions are large and you want to avoid materializing a wide join that only a subset of queries needs.

The Trade-Off: What Happens When a Dimension Changes

Process-time temporal joins capture dimension state at the moment the event is processed. If a product moves from "Apparel" to "Accessories" on Tuesday, orders processed before that change show "Apparel" and orders after show "Accessories." Both reflect the catalog value at processing time.

This is the right behavior for most operational use cases: dimension changes are infrequent, and downstream pipelines (dashboards, feature stores, notifications) care about "what category applied when the order arrived" rather than today's value. For retrospective accuracy, meaning re-enriching historical orders with a current catalog snapshot, a batch pipeline remains the right tool. The two approaches are complementary.

See the RisingWave sources and formats documentation for guidance on ingesting CDC updates to keep dimension tables in sync with your source database.

State Cost Comparison in Practice

To make the state savings concrete: if orders arrive at 500 per second and the product catalog has 200,000 products, a regular hash join accumulates 43 million stream-side rows after 24 hours (plus 200,000 for the dimension). A temporal join holds 200,000 rows total, regardless of how many orders have been processed. That difference in state size directly affects memory footprint, state backend I/O, and recovery time after a restart.

For high-throughput pipelines, temporal joins are not just cheaper. They are the only practical option if you want to avoid provisioning state storage that scales with unbounded event history.

FAQ

What is the difference between a RisingWave temporal join and a regular streaming join?

A regular streaming join maintains hash tables for both sides, allowing any row to match against accumulated state from the other side. A temporal join maintains no state on the stream side. Each event triggers a point-in-time lookup and is not retained after processing. The requirement is that the left side is append-only and you are looking up current dimension values rather than joining across historical state.

Does the left side of a temporal join have to be append-only?

Yes. Because no stream-side state is kept, there is no mechanism to handle retractions or updates to already-processed rows. If your stream source produces updates or deletes, use a regular streaming join instead.

What happens if a product does not exist in the catalog when an order arrives?

With a LEFT JOIN, the order row is emitted with NULL values for all dimension columns. With an INNER JOIN, the order row is dropped entirely. For most operational pipelines, LEFT JOIN is the safer default because it preserves the event for downstream processing and lets you handle missing dimension data downstream.

Can I use a temporal join against a table fed by CDC?

Yes. If your product catalog table is updated via Change Data Capture from PostgreSQL or MySQL, RisingWave applies the CDC updates to the table, and temporal join lookups will see the latest state. This is the standard pattern: ingest dimension table changes via CDC, then join an event stream against it using FOR SYSTEM_TIME AS OF PROCTIME().

Is there a performance difference between joining against a base table vs. a materialized view?

For base tables with a primary key, lookups are O(1) key reads. For materialized views, RisingWave uses an explicit index to achieve equivalent performance, but the index key must match the lookup predicate. Always create the index before deploying a temporal join against a materialized view.

Conclusion

Process-time temporal joins are the right tool when you are joining an append-only event stream against a slowly-changing dimension table and you want zero stream-side join state. The FOR SYSTEM_TIME AS OF PROCTIME() syntax is straightforward, but there are two operational constraints to plan for: the right-side table needs an index when it is a materialized view, and each MV query can contain only one temporal scan.

The state savings are substantial. In a high-throughput pipeline, switching from a regular hash join to a temporal join can reduce your join-state footprint from hundreds of millions of rows down to the size of the dimension table alone.

For most e-commerce enrichment patterns, orders plus product catalog, orders plus user profiles, clickstream events plus content metadata, process-time temporal joins are the default choice. Reserve regular hash joins for cases where both sides are mutable and you need full historical join semantics.


Ready to try temporal joins yourself? Get started with RisingWave in five minutes using the quickstart guide.

Join the RisingWave Slack community to ask questions, share what you are building, and get help from the engineering team and other practitioners.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.