Streaming Join
A Streaming Join is an operation that combines records from two or more continuously flowing data streams (or a stream and a table) based on common attributes (join keys) and, typically, time-based conditions (join windows). Unlike traditional batch joins that operate on static, bounded datasets, streaming joins must handle dynamic, unbounded inputs where data may arrive out of order and the set of records to join against is constantly evolving.
Streaming joins are fundamental for correlating information from different sources in real-time, enabling complex event processing, enrichment, and contextual analysis.
Key Characteristics and Challenges
- Unbounded Inputs: Both streams in a join are potentially infinite. The system cannot wait for all data to arrive before producing results.
- State Management: To find matching records, the join operator must maintain state for records received from each input stream that are eligible to be joined. The size of this state can become very large, especially for joins with wide time windows or many unmatched records.
- Time Semantics (Event Time): For accurate and deterministic results, streaming joins often rely on Event Time (the time the event occurred at the source) and Watermarks. This ensures that records are joined based on when they actually happened, even if they arrive out of order.
- Windowing: Joins are frequently performed within a Time Window (e.g., "join orders with shipments if they occur within 5 minutes of each other"). This limits the amount of state that needs to be maintained.
- Stream-Stream Joins: Often involve symmetrical windows applied to both streams (e.g., join events from stream A and stream B if their event times are within X minutes of each other).
- Stream-Table Joins (Temporal Joins): Often involve joining a stream event with the version of a table record that was valid at the event's timestamp.
- Output Semantics: The join operator produces a stream of joined records. Depending on the join type (inner, outer) and the continuous nature of inputs, a single event from one stream might join with multiple events from another over time, or previous join results might need to be retracted if an input event is updated or deleted.
Common Types of Streaming Joins
- Inner Join: Produces an output record only when a matching record is found in both input streams (satisfying the join condition and window criteria).
- Outer Join (Left, Right, Full): Produces an output record even if a match is not found in one of the streams.
- Left Outer Join: All records from the left stream are included. If a match is found in the right stream, the joined record is produced; otherwise, fields from the right stream are null.
- Right Outer Join: All records from the right stream are included.
- Full Outer Join: All records from both streams are included.
- Interval Join: A common type of stream-stream join where records from two streams are joined if their event timestamps fall within a specified time interval of each other. event_A.time BETWEEN event_B.time - INTERVAL '5' MINUTE AND event_B.time + INTERVAL '5' MINUTE.
- Temporal Join (Stream-Table Join): Joins a stream of events against a table (which itself might be updated by another stream, like a materialized view representing historical or dimensional data). The join typically uses the event's timestamp to look up the correct version of the record in the table. RisingWave excels at this type of join, particularly when the table side is a Materialized View.
Streaming Joins in RisingWave
RisingWave provides robust support for various streaming join types using SQL:
SELECT
s1.order_id,
s1.product_id,
s2.shipment_status
FROM stream1 AS s1
JOIN stream2 AS s2
ON s1.order_id = s2.order_id
AND s1.event_time BETWEEN s2.event_time - INTERVAL '10' MINUTE AND s2.event_time + INTERVAL '10' MINUTE;
CREATE MATERIALIZED VIEW enriched_orders AS
SELECT
o.order_id,
o.order_time,
c.customer_name,
c.customer_region
FROM orders_stream AS o
JOIN customers_table AS c
ON o.customer_id = c.customer_id;
RisingWave's incremental computation engine efficiently maintains the state for these joins and updates the results as new data arrives or existing data is retracted.
Considerations
- State Size: Joins, especially over long windows or with high-cardinality keys, can lead to large state. Efficient state management (like RisingWave's Hummock) is critical.
- Watermarks and Late Data: Correct watermark propagation is crucial for timely and accurate join results. Handling late data can be complex (e.g., dropping it, or allowing late updates which might require retracting previous join results).
- Performance: Join performance depends on data distribution, join key cardinality, window size, and the efficiency of the state backend.
- Join Predicates: Complex join predicates beyond simple equality and time bounds can increase computational cost.
Related Glossary Terms