Recap: Pain Points from Last Time
Remember the coffee shop story from last time? The owner evolved from just wanting order counts to wanting detailed order breakdowns (orders_detail). We solved it in the Serving Layer of the Lambda architecture with a complex SQL JOIN.
But the story doesn't end there. As business boomed, a new pain point emerged:
Note: All code in this article is pseudo-code for educational and conceptual illustration. Focus on understanding the architecture and logic, not on running the exact code. To keep today’s focus clear, we’ll simplify some implementation details from previous days and concentrate on the checkpoint mechanism itself.
The Coffee Shop's New Challenge
The shop processes 1000 orders per hour
Every time the owner checks the dashboard, this query runs:
SELECT
o.order_id,
o.total_amount,
c.customer_name,
c.customer_type,
od.product_name,
od.quantity
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id -- JOIN customer table
JOIN orders_detail od ON o.order_id = od.order_id -- JOIN order details
WHERE o.order_time >= NOW() - INTERVAL 1 HOUR;
Problem Analysis
Peak time: 50 concurrent queries = 50 three-table JOINs per second
Database CPU: 35% → 90%
Query latency: 200ms → 5000ms
The tech lead frowned and said, "Every query does this heavy JOIN—the database can't keep up. Can we do the JOIN work ahead of time to simplify the queries?"
Engineer's Reaction
First thought that came to the engineer: "Just JOIN the database right inside the streaming layer." Why this instinct?
This comes from our familiar web development mindset:
# Familiar Web API development flow
@app.route('/api/orders/<order_id>')
def get_order_detail(order_id):
# Direct database JOIN - our most comfortable approach!
result = db.query("""
SELECT
o.order_id,
o.total_amount,
c.customer_name,
c.customer_type,
p.product_name,
p.price
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE o.id = %s
""", order_id)
return result
Mindshift: From HTTP Request to Stream Message
But in Stream Processing, we can't just write a SQL JOIN inside the stream engine. So, an engineer's brain naturally thinks:
"If I can't write a JOIN in the stream, I'll just query the database from within the stream processing and manually join the data."
Mindshift Process
Web Development (Database JOIN):
HTTP Request → One SQL Query (JOIN multiple tables) → Return complete data
Stream Development (Manual Lookup):
Stream Message → Multiple Database Queries → Application-layer combination → Output enriched data
Core Flow: Understanding Lookup Join Mechanics
Let's break down SimpleDataFrame's lookup join implementation step by step.
Message Flow with Database Lookup
┌─────────────────────────────────────────────────────────────────┐
│ Original Message │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ { "order_id": 1001, "customer_id": 123 } │ │
│ └───────────────────────────┬─────────────────────────────┘ │
└──────────────────────────────┼──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Database Lookup Process │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Step 1: Extract join_key = "customer_id" = 123 │ │
│ │ Step 2: Query DB "SELECT name, type FROM customers │ │
│ │ WHERE customer_id = 123" │ │
│ │ Step 3: Get result {"name": "Alice", "type": "VIP"} │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Enriched Message │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ { │ │
│ │ "order_id": 1001, │ │
│ │ "customer_id": 123, │ │
│ │ "name": "Alice", ← Added from DB │ │
│ │ "type": "VIP" ← Added from DB │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Architecture: Core Structure of the SimpleDataFrame Class
This SimpleDataFrame implementation focuses on showcasing the core concept of lookup join. It omits complex details needed for production, like connection pool management and error handling. It also simplifies core streaming features implemented previously (backpressure control, checkpointing, sink management) to focus on the database lookup logic.
class SimpleDataFrame:
def __init__(self, name: str = "dataframe"):
self.name = name
# Core: Database connection - key for lookup join
self._db_connection = None
def setup_database(self, connection_string: str):
"""Set up database connection"""
# Establish connection
self._db_connection = create_connection(connection_string)
def lookup_join(self, table_name: str, join_key: str, select_fields: list):
"""
Core method: Add database lookup join
Example: df.lookup_join("customers", "customer_id", ["name", "type"])
"""
# Construct query SQL
fields_sql = ", ".join(select_fields)
query_sql = f"SELECT {fields_sql} FROM {table_name} WHERE {join_key} = ?"
# Key: Function Wrapping technique
original_process = self.process_message
def enhanced_process(message):
# 1. Execute original processing logic first
processed_message = original_process(message)
# 2. Then perform database lookup to enrich data
return self._perform_lookup(processed_message, query_sql, join_key)
# 3. Replace the processing function - the core magic!
self.process_message = enhanced_process
return self
def _perform_lookup(self, message, query_sql, join_key):
"""Execute database query and merge results"""
join_value = message.get(join_key)
if not join_value:
return message
# Query database
result = execute_query(self._db_connection, query_sql, join_value)
# Merge result into original message
if result:
return {**message, **result} # Dictionary merge
return message
Deep Dive: The Magic of Function Wrapping
The essence of the lookup_join method is function wrapping. Let's illustrate:
Function Wrapping Process
┌─────────────────────────────────────────────────────────────────┐
│ Original DataFrame │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ process_message(msg) { │ │
│ │ // Original processing logic │ │
│ │ 1. Apply filters │ │
│ │ 2. Send to sinks │ │
│ │ return success │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ Function Wrapping
▼
┌─────────────────────────────────────────────────────────────────┐
│ Enhanced DataFrame │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ enhanced_process_message(msg) { │ │
│ │ // Enhanced processing logic │ │
│ │ 1. Call original_process(msg) │ │
│ │ 2. Perform database lookup ← NEW! │ │
│ │ 3. Merge results into message ← NEW! │ │
│ │ return enriched_message │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Key code snippet explained:
# 1. Save the original processing function
original_process = self.process_message
# 2. Define enhanced processing function
def enhanced_process(message):
# First, execute the original processing flow (filtering, transformation, etc.)
processed_message = original_process(message)
# Then, perform lookup to enrich the processed message
return self._perform_lookup(processed_message, query_sql, join_key)
# 3. Replace the processing function - This is the core.
self.process_message = enhanced_process
Complete Flow: Data Processing Trace
Complete Message Processing Flow
┌─────────────────────────────────────────────────────────────────┐
│ Step 1: Message Arrives │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ {"order_id": 1001, "customer_id": 123, "amount": 85} │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 2: Enhanced Process Message Called │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ enhanced_process_message(message) │ │
│ │ 1. First call original_process(message) │ │
│ │ 2. Apply existing filters, transformations │ │
│ │ 3. Then perform database lookup │ │
│ │ 4. Extract customer_id = 123 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 3: Database Lookup Process │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ SQL: SELECT name, type FROM customers WHERE id=123 │ │
│ │ Result: {"name": "Alice", "type": "VIP"} │ │
│ │ Merge with processed message │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 4: Final Enriched Message │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ { │ │
│ │ "order_id": 1001, │ │
│ │ "customer_id": 123, │ │
│ │ "amount": 85, │ │
│ │ "name": "Alice", ← Added by lookup │ │
│ │ "type": "VIP" ← Added by lookup │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Practical Use Case
Imagine a simple order processing system with the following scenario:
Data Sources:
Kafka topic
orders: Basic order info{"order_id": 1001, "customer_id": 123}Kafka topic
customers: Customer data stream{"customer_id": 123, "name": "Alice", "type": "VIP"}PostgreSQL
customerstable: Populated by another streaming pipeline from the customers topicGoal: Enrich order stream with customer info in real-time, producing complete order data.
System Architecture:
Kafka customers topic → Streaming Pipeline A → PostgreSQL customers table
↓ lookup
Kafka orders topic → Streaming Pipeline B → enriched orders output
Implementation Steps
Pipeline A: Customer Data Sync (runs in background)
# Sync customer data from Kafka to PostgreSQL
sync_engine = SimpleStreamingEngine("customer-sync")
customer_df = sync_engine.dataframe(source=customers_kafka_source)
customer_df.sink(postgres_customers_table) # Continuously syncs customer data
Pipeline B: Order Enrichment (main logic)
# Order enrichment streaming pipeline
sync_engine = SimpleStreamingEngine("order-enrichment")
# Create DataFrame and set up database
df = sync_engine.dataframe(source=orders_kafka_source)
df.setup_database("postgresql://localhost/orders_db")
# Perform lookup join
enriched_df = df.lookup_join("customers", "customer_id", ["name", "type", "email"])
# Output to destination
enriched_df.sink(output_sink)
Execution Flow
When the system runs, for each new order message:
Original message enters:
{"order_id": 1001, "customer_id": 123}Perform Lookup:
SELECT name, type, email FROM customers WHERE customer_id = 123Merge Data:
{ "order_id": 1001, "customer_id": 123, "name": "Alice Chen", ← From database "type": "VIP", ← From database "email": "alice@email.com" ← From database }Output enriched result: Complete order data is sent to downstream systems.
Core Advantages
Developer Experience: As simple as writing a SQL JOIN, but executed in Streaming.
Immediate Usability: No need to change existing database structure.
Flexible Scaling: Easily add multiple lookup joins.
# Advanced example: Multiple lookup joins
df.lookup_join("customers", "customer_id", ["name", "type"])
.lookup_join("products", "product_id", ["product_name", "price"])
.filter(lambda msg: msg.get("type") == "VIP")
.sink(output_sink)
This concise API design lets developers use familiar chaining syntax to quickly build complex data enrichment pipelines.
Real-World Challenges
1. Increased Latency
Latency Analysis:
Pure stream processing: ~1ms
Single database lookup: ~50ms
In high-throughput scenarios, this added latency might be unacceptable, though many streaming pipelines can tolerate latencies on the order of seconds.
2. Data Consistency Window
This is the trickiest problem with streaming lookup join: When reference data in the database updates, there is a time window where newly arriving messages will still fetch the old data.
Let's illustrate this problem with a concrete scenario:
Data Consistency Gap Timeline
┌─────────────────────────────────────────────────────────────────┐
│ T0: Database State │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ products table: {"product_id": 123, "price": 85} │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ Price Update Event
▼
┌─────────────────────────────────────────────────────────────────┐
│ T1: Price Update (takes 500ms to propagate) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ products table: {"product_id": 123, "price": 90} │ │
│ │ Status: Update in progress... │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ Order arrives during update window
▼
┌─────────────────────────────────────────────────────────────────┐
│ T1.1: Order Processing (within consistency gap) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Incoming: {"order_id": 1001, "product_id": 123} │ │
│ │ Lookup result: price = 85 ← Wrong! Should be 90 │ │
│ │ Output: Wrong pricing in enriched order │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ Update completes
▼
┌─────────────────────────────────────────────────────────────────┐
│ T2: Consistency Restored │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ All new orders will get correct price = 90 │ │
│ │ But orders processed during T1-T1.5 have wrong data │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Common Solutions
1. Accept Eventual Consistency
Suitable for: Analytical scenarios with low real-time requirements
Approach: Periodically correct historical data
2. Bi-Directional Trigger Mechanism (Delta Join)
# Not only does the order stream trigger lookup, but product price updates also trigger recalculation
price_update_stream.process(lambda msg:
reprocess_recent_orders(msg.product_id, msg.new_price))
This challenge reminds us: While streaming lookup join is convenient for development, careful design of fault tolerance mechanisms is needed for scenarios with strict data consistency requirements.
Summary
This time, we started from the requirement of "the owner wants more details" and worked our way to the practical problem of "the JOIN is too slow."
We adopted the Database Lookup Join solution, performing real-time database queries within the Streaming Pipeline to enrich data, achieving the goal of "flattening" the data before query time.
Advantages of Database Lookup Join
Excellent Experience: Intuitive syntax, as simple as writing SQL JOIN.
Immediately Usable: No need to modify existing database structures—write today, deploy today.
Highly Flexible: Easily add multiple lookups, supporting complex data enrichment.
Simple Maintenance: Logic is concentrated in the streaming pipeline, easy to debug and monitor.
Challenges of Database Lookup Join
Increased Latency: Lookups are convenient, but each piece of data requires a database query. For high-throughput systems, latency can easily accumulate.
Data Consistency Window: Lookups depend on the current database state. Delays in data updates are directly reflected in the results, and can even lead to incorrect information.
These challenges make us start thinking: "Since Database Lookup has latency and consistency issues, can we move the JOIN entirely to the Stream layer? Let two data streams JOIN directly in memory, completely avoiding the database bottleneck?"
Day11 Preview: Kappa Architecture, All on Streaming!
From this experience, a bold idea flashed in the tech lead's mind:
"What if all data transformations, aggregations, and JOINs were completed within the Streaming layer? Wouldn't our queries then be blazing fast?"
And so, a quiet evolution from Lambda architecture to Kappa architecture began.

