Stream-Stream Join
A Stream-Stream Join is a type of join operation in stream processing that combines events from two or more independent, continuously flowing data streams based on common attributes (join keys) and typically within a specified time window. This is one of the most powerful and complex operations in stream processing, as it requires correlating data from multiple dynamic sources that are constantly evolving.
Core Idea
Unlike traditional batch joins on static datasets, stream-stream joins operate on unbounded inputs. To make this feasible and meaningful:
- Join Keys: Events from the different streams are matched based on the equality of one or more common fields (e.g., user_id, order_id, product_id).
- Windowing: Because streams are infinite, joins are almost always constrained by a time window. An event from one stream will only be joined with an event from another stream if they both arrive within a defined time proximity of each other (e.g., join an ad impression with a click if the click happens within 5 minutes of the impression on the same campaign). Without windowing, the state required to hold unmatched events could grow indefinitely.
- State Management: The stream processing system must maintain state for events that have arrived from one stream but have not yet found a match from the other stream(s) within the join window. As events fall out of the window, they are typically removed from the state.
Types of Windows for Stream-Stream Joins
Common window types used for stream-stream joins include:
- Tumbling Windows: Joins events that fall into the same fixed-size, non-overlapping time window.
- Hopping Windows: Joins events within fixed-size, overlapping windows. An event can belong to multiple windows and thus participate in multiple join results.
- Session Windows: Joins events from different streams that fall within the same session of activity for a given key.
- Interval Joins: A more general form where an event from one stream is joined with an event from another if their timestamps are within a specified interval of each other (e.g., streamA.timestamp BETWEEN streamB.timestamp - INTERVAL '5 minutes' AND streamB.timestamp + INTERVAL '5 minutes').
Challenges
- State Size: The amount of state to be maintained can be large, especially with wide windows or high-volume streams.
- Out-of-Order Events: Handling events that arrive late (relative to their event time) can complicate window semantics and join correctness. Watermarks are crucial here.
- Latency vs. Completeness: There's often a trade-off. Waiting longer (larger window or more relaxed watermarks) increases the chance of finding all matching events (completeness) but also increases latency.
- Join Semantics (Inner, Outer): Like batch SQL joins, stream-stream joins can be INNER JOIN (only outputting results when matches are found in all streams) or OUTER JOIN (e.g., LEFT OUTER JOIN, RIGHT OUTER JOIN, FULL OUTER JOIN), which can output results even if a match is not found in one of the streams (typically with NULL values for the unmatched side).
Stream-Stream Joins in RisingWave
RisingWave supports stream-stream joins through its SQL interface, leveraging its robust state management and incremental computation capabilities.
- SQL Syntax: Users define stream-stream joins using familiar SQL JOIN syntax, including INNER JOIN, LEFT OUTER JOIN, RIGHT OUTER JOIN, and FULL OUTER JOIN.
- Windowing Clause: A windowing condition is typically specified in the WHERE clause or using specific window join functions (e.g., TUMBLE, HOP when available in join context, or interval conditions).
- Event Time Processing: RisingWave emphasizes event time processing, using watermarks to handle out-of-order data correctly for windowed joins.
- Incremental Updates: The results of stream-stream joins, often materialized into views, are updated incrementally as new events arrive and existing events expire from join windows.
Example (Conceptual SQL for an Interval Join):
CREATE MATERIALIZED VIEW ad_conversions AS
SELECT
impressions.ad_id,
impressions.user_id,
impressions.timestamp AS impression_time,
clicks.timestamp AS click_time
FROM
ad_impressions_stream AS impressions
INNER JOIN
ad_clicks_stream AS clicks
ON
impressions.ad_id = clicks.ad_id AND impressions.user_id = clicks.user_id
WHERE
clicks.timestamp BETWEEN impressions.timestamp AND impressions.timestamp + INTERVAL '10 minutes';
This example joins ad impressions with subsequent clicks if the click occurs within 10 minutes of the impression for the same ad and user. RisingWave would maintain the necessary state for ad_impressions_stream events for 10 minutes, waiting for potential matches from ad_clicks_stream.
Related Glossary Terms