Do you remember the concept of stateful operations we discussed on Day 12? And the Database Lookup Join we implemented earlier?
Today, we take it a step further and dive into the true mindset of stream processing: Stream-to-Stream Join. Instead of relying on an external database for lookups, we let two streams match events directly. This is where the real power of streaming systems emerges.
Rethinking JOIN
In relational databases, a JOIN combines two tables based on a condition.
In stream processing, however, a “table” becomes an unbounded stream of events. This fundamentally changes the nature of JOIN—it becomes continuous, stateful, and time-aware.
Traditional RDB Join: The Build–Probe Model
Step 1: Build Phase
Table A
(smaller) ────────────────────> Hash Table
│ (key→value index)
│ │
└─ Load all data into memory │
│
Step 2: Probe Phase │
Table B │
(larger) │
│ │
├─ row1 ──────────┐ │
├─ row2 ──────────┼─ Lookup ───────────►│
├─ row3 ──────────┼─ Hash Table │
├─ ... ──────────┘ │
│ │
│ Match Found ◄───────┘
│ │
▼ │
Join Result ◄────────────┘
Characteristics:
One side builds an index; the other probes for matches.
The smaller table is fully loaded into memory to build a hash table.
The larger table is scanned row by row and probed against the hash table.
It is a finite, one-time operation with a clear start and end.
Streaming Join: Bidirectional State Management
Stream A Processing Stream B Processing
┌─────────────────┐ ┌─────────────────┐
│ │ │ │
│ Event A │ │ Event B │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Store in │ │ Store in │
│ State A │ │ State B │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Query State B ──┼────────────────┼── Query State A │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Emit Result │ │ Emit Result │
│ │ │ │
└─────────────────┘ └─────────────────┘
│ │
│ State Management │
│ ┌─────────────────────────┐ │
└───►│ State Store A │◄────┘
│ Key → [timestamp,value] │
│ │
│ State Store B │
│ Key → [timestamp,value] │
└─────────────────────────┘
Key differences:
Bidirectional Build + Probe: Both streams must maintain state and probe the other side.
Continuous operation: There is no defined end; data keeps flowing indefinitely.
Time semantics: Event timestamps and arrival order matter.
State management: Historical data must be retained and cleaned up intelligently.
Implementing Streaming Join in Simple Streaming
Important Note
All code in this article is pseudo code for educational purposes. It is meant to illustrate the architectural ideas and core design concepts behind stream processing. The code is not production-ready.
For clarity, we simplify earlier implementation details and focus specifically on today’s core mechanism.
Step 1: Designing the State Store
class SimpleDataFrame:
def __init__(self, name: str = "dataframe", sink_callback=None):
self.name = name
# Core of Streaming Join: bidirectional state storage
self._join_state = defaultdict(list) # key -> [events...]
self._join_partner = None # the paired DataFrame
self._join_key = None # join field name
Internal Structure of _join_state
{
"O001": [event1, event2, ...], # one key may correspond to multiple events
"O002": [event3, event4, ...],
}
Design Considerations
defaultdict(list)supports one-to-many relationships.A single
order_idmay have multiple associated detail events.
Step 2: Establishing Bidirectional Association in join()
def join(self, other: 'SimpleDataFrame', on: str) -> 'SimpleDataFrame':
# 1. Create a new DataFrame to collect join results
joined_df = SimpleDataFrame(f"{self.name}_join_{other.name}")
# 2. Establish bidirectional association
self._join_partner = other
other._join_partner = self
self._join_key = on
other._join_key = on
# 3. Intercept original processing logic to inject join functionality
self_original_process = self.process_message
def enhanced_process(message):
# First execute original logic (filter, transform, etc.)
result = self_original_process(message)
# Then apply join logic
join_results = self._process_join_event(message)
for join_result in join_results:
joined_df.process_message(join_result)
return result
self.process_message = enhanced_process
return joined_df
Core Design Principles
Bidirectional references: Each DataFrame knows its join partner.
Event interception: Join logic is injected without breaking existing behavior.
Backward compatibility: Existing DataFrame functionality remains intact.
Design Advantages
Backward compatible: filters and sinks continue to work.
Separation of concerns: join results are handled independently.
Chainable operations: joined results can be further processed.
Step 3: The Core Join Algorithm
def _process_join_event(self, event) -> List[Dict]:
if not self._join_partner:
return []
join_value = str(event.get(self._join_key))
results = []
# 1. Store: save current event into state
self._join_state[join_value].append(event)
# 2. Query: look up matching events in partner's state
partner_events = self._join_partner._join_state.get(join_value, [])
# 3. Merge: combine with each matched event
for partner_event in partner_events:
merged = {**event, **partner_event}
results.append(merged)
return results
Algorithm Steps
Store: Persist the current event in local state by join key.
Query: Look up matching events in the partner’s state store.
Merge: Combine matched events and emit results.
Step 4: End-to-End Execution Flow
Let’s walk through a complete example:
# Define the join relationship
orders_df = app.dataframe(source=orders_source)
details_df = app.dataframe(source=details_source)
joined_df = orders_df.join(details_df, on="order_id")
# Event arrival order
1. detail = {"order_id": "O001", "product": "咖啡", "qty": 2}
2. order = {"order_id": "O001", "user_id": "U123", "total": 500}
Event 1: Detail Arrives First
# details_df receives the detail event:
join_value = "O001"
# 1. Store in local state
details_df._join_state["O001"] = [{"order_id": "O001", "product": "咖啡", "qty": 2}]
# 2. Query partner (orders_df)
orders_df._join_state.get("O001", []) # returns [], no order yet
# 3. No match, no output
results = []
Event 2: Order Arrives Later
# orders_df receives the order event:
join_value = "O001"
# 1. Store in local state
orders_df._join_state["O001"] = [{"order_id": "O001", "user_id": "U123", "total": 500}]
# 2. Query partner (details_df)
details_df._join_state.get("O001", [])
# Found! [{"order_id": "O001", "product": "咖啡", "qty": 2}]
# 3. Merge events
merged = {
**{"order_id": "O001", "user_id": "U123", "total": 500}, # current event
**{"order_id": "O001", "product": "咖啡", "qty": 2} # matched event
}
# Output:
# {"order_id": "O001", "user_id": "U123", "total": 500, "product": "咖啡", "qty": 2}
results = [merged]
Regardless of arrival order, the join eventually succeeds because both sides retain state.
Design Advantages Summary
Clean API:
df1.join(df2, on="key")is intuitive and expressive.Automatic state handling: State management and routing are transparent.
Non-intrusive design: Existing DataFrame operations remain unaffected.
Composable: Join results can be chained with further transformations.
Out-of-order tolerant: Matching works regardless of arrival order.
This implementation demonstrates how complex streaming join logic can be encapsulated behind a simple API, allowing developers to focus on business logic rather than low-level state mechanics.
Architecture Recap
Day 13: SimpleStreamingEngine with Streaming JOIN
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ KafkaSource │───►│DataFrame │───►│PostgreSQLSink │
│ │ │ │ │ │
│ • Topic consume │ │ • Filter │ │ • Batch insert │
│ • Consumer pause│ │ • Lookup Join │ │ • Timer trigger │
│ • Offset resume │ │ • Streaming Join │ │ • Overload detect│
└─────────────────┘ └──────────────────┘ └──────────────────┘
▲ │
│ ┌─────────────────────────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleStreamingEngine │
│ • Backpressure │
│ • Checkpoint │
│ • State Storage │
└─────────────────────────────────────────────────────────────┘
Newly Introduced Capabilities
Day 10
- Lookup Join: DataFrame can enrich data by querying a database.
Day 13
Streaming Join: Direct join between two streams.
State storage: Bidirectional state management for event matching and out-of-order handling.
Conclusion
Streaming Join represents a major step forward—from simple event forwarding to intelligent stream correlation.
By introducing bidirectional state management, two independent streams can become aware of each other, wait for matching counterparts, and ultimately meet within the streaming layer itself. This not only solves out-of-order arrival challenges but, more importantly, eliminates the dependency on external databases. Data processing can now be completed entirely within the streaming domain.
Up Next: Day 14 – Streaming GroupBy
If JOIN is the “encounter between streams,” then GROUP BY is “a stream organizing itself.”
In Day 14, we’ll explore how to implement Streaming GroupBy. Stay tuned.

