In stream processing, joins are a fundamental operation for enriching data streams. However, they can introduce a significant performance problem known as high join amplification. This article explains this problem, its impact on a streaming pipeline, and introduces a new feature in RisingWave, unaligned joins, designed to mitigate this issue.
Understanding High Join Amplification and Backpressure
High join amplification occurs when a single input record in a join operation matches numerous records in the other stream or table, resulting in a large volume of output records.
Consider a common stream-to-stream join scenario where you enrich a stream of user clicks with user profile information.
CREATE MATERIALIZED VIEW enriched_clicks AS
SELECT
c.click_id,
c.item_id,
u.user_name
FROM
clicks AS c
JOIN
users AS u ON c.user_id = u.user_id;
If a particular user is highly active (e.g., a bot or power user), a single update to that user's profile in the users stream could match thousands of existing records in the clicks stream. This generates a massive burst of output data from the join operator.
This sudden data surge can overwhelm downstream operators (like aggregations or sinks), causing backpressure. Backpressure is a mechanism where the slow downstream operator signals upstream operators to reduce their data output rate. This slowdown propagates through the entire pipeline.
In a fault-tolerant stream processing system like RisingWave, which relies on checkpoint barriers to guarantee data consistency, backpressure has a critical side effect. For a checkpoint to complete, these barriers must be processed by all operators. A slow operator stalled by backpressure will delay the barrier, leading to high barrier latency. This slows down the entire system's processing and recovery capabilities.
Introducing Unaligned Joins
To address the backpressure caused by high-amplification joins, RisingWave introduces the unaligned joins feature. This feature isolates the problematic join operator from its downstream operators, preventing its performance issues from affecting the entire pipeline.
You can enable this feature for a session with the following command:
SET streaming_enable_unaligned_join = true;
When enabled, RisingWave automatically inserts an intermediate buffer after the join operator. The operator writes its output to this buffer. This allows checkpoint barriers to bypass the buffered data and pass through to the next operator immediately, rather than waiting for the slow downstream operator to consume the amplified output.
This mechanism effectively decouples the high-amplification join from the rest of the dataflow. Even if the downstream operator is slow to process the large volume of data from the buffer, the checkpoint barriers can flow through the system unimpeded. This ensures low barrier latency and maintains the overall stability of the pipeline.
Performance Considerations
The use of unaligned joins introduces a trade-off: a slight increase in end-to-end latency for the data flowing through that specific part of the query. This is due to the overhead of writing to and reading from the intermediate buffer.
However, this localized latency increase is generally preferable to the alternative of having the entire pipeline slow down due to backpressure. It's a targeted solution to contain the problem. While unaligned joins address barrier latency, you may still need to monitor for potential data lag in the buffer and consider scaling the cluster to handle the high data volume if necessary.
Example
To apply this feature to the e-commerce example from before, simply enable unaligned joins before creating the materialized view.
- Enable the feature for the current session
SET streaming_enable_unaligned_join = true;
-- The join will now be automatically buffered if it causes backpressure
CREATE MATERIALIZED VIEW enriched_clicks AS
SELECT c.click_id, c.item_id, u.user_name
FROM clicks AS c
JOIN users AS u ON c.user_id = u.user_id;
With this setting, RisingWave will buffer the join's output, preventing high amplification from stalling downstream operators and ensuring that checkpoints complete promptly.
Conclusion
High join amplification is a common source of backpressure that can degrade the performance and stability of a stream processing pipeline. RisingWave's unaligned join feature provides a direct and effective solution by isolating the problematic join, ensuring that the rest of your dataflows remain responsive. For a deeper dive into this feature, see Isolating high-amplification joins.
Get Started with RisingWave
Try RisingWave Today:
Download the open-sourced version of RisingWave to deploy on your own infrastructure.
Get started quickly with RisingWave Cloud for a fully managed experience.
Talk to Our Experts: Have a complex use case or want to see a personalized demo? Contact us to discuss how RisingWave can address your specific challenges.
Join Our Community: Connect with fellow developers, ask questions, and share your experiences in our vibrant Slack community.
If you’d like to see a personalized demo or discuss how this could work for your use case, please contact our sales team.