Database Lookup Join for Stream Processing: Architecture, Implementation and Limitations

Database Lookup Join for Stream Processing: Architecture, Implementation and Limitations

Recap: Pain Points from Last Time

Remember the coffee shop story from last time? The owner evolved from just wanting order counts to wanting detailed order breakdowns (orders_detail). We solved it in the Serving Layer of the Lambda architecture with a complex SQL JOIN.

But the story doesn't end there. As business boomed, a new pain point emerged:

Note: All code in this article is pseudo-code for educational and conceptual illustration. Focus on understanding the architecture and logic, not on running the exact code. To keep today’s focus clear, we’ll simplify some implementation details from previous days and concentrate on the checkpoint mechanism itself.

The Coffee Shop's New Challenge

  • The shop processes 1000 orders per hour

  • Every time the owner checks the dashboard, this query runs:

SELECT
    o.order_id,
    o.total_amount,
    c.customer_name,
    c.customer_type,
    od.product_name,
    od.quantity
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id -- JOIN customer table
JOIN orders_detail od ON o.order_id = od.order_id -- JOIN order details
WHERE o.order_time >= NOW() - INTERVAL 1 HOUR;

Problem Analysis

  • Peak time: 50 concurrent queries = 50 three-table JOINs per second

  • Database CPU: 35% → 90%

  • Query latency: 200ms → 5000ms

The tech lead frowned and said, "Every query does this heavy JOIN—the database can't keep up. Can we do the JOIN work ahead of time to simplify the queries?"

Engineer's Reaction

First thought that came to the engineer: "Just JOIN the database right inside the streaming layer." Why this instinct?

This comes from our familiar web development mindset:

# Familiar Web API development flow
@app.route('/api/orders/<order_id>')
def get_order_detail(order_id):
    # Direct database JOIN - our most comfortable approach!
    result = db.query("""
    SELECT
    o.order_id,
    o.total_amount,
    c.customer_name,
    c.customer_type,
    p.product_name,
    p.price
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    JOIN products p ON o.product_id = p.product_id
    WHERE o.id = %s
    """, order_id)
    return result

Mindshift: From HTTP Request to Stream Message

But in Stream Processing, we can't just write a SQL JOIN inside the stream engine. So, an engineer's brain naturally thinks:

"If I can't write a JOIN in the stream, I'll just query the database from within the stream processing and manually join the data."

Mindshift Process

Web Development (Database JOIN):

HTTP Request → One SQL Query (JOIN multiple tables) → Return complete data

Stream Development (Manual Lookup):

Stream Message → Multiple Database Queries → Application-layer combination → Output enriched data

Core Flow: Understanding Lookup Join Mechanics

Let's break down SimpleDataFrame's lookup join implementation step by step.

Message Flow with Database Lookup
┌─────────────────────────────────────────────────────────────────┐
│                    Original Message                             │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ { "order_id": 1001, "customer_id": 123 }                │    │
│  └───────────────────────────┬─────────────────────────────┘    │
└──────────────────────────────┼──────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                Database Lookup Process                          │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ Step 1: Extract join_key = "customer_id" = 123          │    │
│  │ Step 2: Query DB "SELECT name, type FROM customers      │    │
│  │         WHERE customer_id = 123"                        │    │
│  │ Step 3: Get result {"name": "Alice", "type": "VIP"}     │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────┬───────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│                   Enriched Message                              │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ {                                                       │    │
│  │   "order_id": 1001,                                     │    │
│  │   "customer_id": 123,                                   │    │
│  │   "name": "Alice",        ← Added from DB               │    │
│  │   "type": "VIP"           ← Added from DB               │    │
│  │ }                                                       │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Architecture: Core Structure of the SimpleDataFrame Class

This SimpleDataFrame implementation focuses on showcasing the core concept of lookup join. It omits complex details needed for production, like connection pool management and error handling. It also simplifies core streaming features implemented previously (backpressure control, checkpointing, sink management) to focus on the database lookup logic.

class SimpleDataFrame:
    def __init__(self, name: str = "dataframe"):
        self.name = name
        # Core: Database connection - key for lookup join
        self._db_connection = None

    def setup_database(self, connection_string: str):
        """Set up database connection"""
        # Establish connection
        self._db_connection = create_connection(connection_string)

    def lookup_join(self, table_name: str, join_key: str, select_fields: list):
        """
        Core method: Add database lookup join

        Example: df.lookup_join("customers", "customer_id", ["name", "type"])
        """
        # Construct query SQL
        fields_sql = ", ".join(select_fields)
        query_sql = f"SELECT {fields_sql} FROM {table_name} WHERE {join_key} = ?"

        # Key: Function Wrapping technique
        original_process = self.process_message

        def enhanced_process(message):
            # 1. Execute original processing logic first
            processed_message = original_process(message)
            # 2. Then perform database lookup to enrich data
            return self._perform_lookup(processed_message, query_sql, join_key)

        # 3. Replace the processing function - the core magic!
        self.process_message = enhanced_process
        return self

    def _perform_lookup(self, message, query_sql, join_key):
        """Execute database query and merge results"""
        join_value = message.get(join_key)
        if not join_value:
            return message

        # Query database
        result = execute_query(self._db_connection, query_sql, join_value)

        # Merge result into original message
        if result:
            return {**message, **result}  # Dictionary merge
        return message

Deep Dive: The Magic of Function Wrapping

The essence of the lookup_join method is function wrapping. Let's illustrate:

Function Wrapping Process
┌─────────────────────────────────────────────────────────────────┐
│                   Original DataFrame                            │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ process_message(msg) {                                  │    │
│  │   // Original processing logic                          │    │
│  │   1. Apply filters                                      │    │
│  │   2. Send to sinks                                      │    │
│  │   return success                                        │    │
│  │ }                                                       │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────┬───────────────────────────────────────┘
                          │ Function Wrapping
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│                Enhanced DataFrame                               │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ enhanced_process_message(msg) {                         │    │
│  │   // Enhanced processing logic                          │    │
│  │   1. Call original_process(msg)                         │    │
│  │   2. Perform database lookup  ← NEW!                    │    │
│  │   3. Merge results into message ← NEW!                  │    │
│  │   return enriched_message                               │    │
│  │ }                                                       │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Key code snippet explained:

# 1. Save the original processing function
original_process = self.process_message

# 2. Define enhanced processing function
def enhanced_process(message):
    # First, execute the original processing flow (filtering, transformation, etc.)
    processed_message = original_process(message)
    # Then, perform lookup to enrich the processed message
    return self._perform_lookup(processed_message, query_sql, join_key)

# 3. Replace the processing function - This is the core.
self.process_message = enhanced_process

Complete Flow: Data Processing Trace

Complete Message Processing Flow
┌─────────────────────────────────────────────────────────────────┐
│ Step 1: Message Arrives                                         │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ {"order_id": 1001, "customer_id": 123, "amount": 85}    │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 2: Enhanced Process Message Called                         │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ enhanced_process_message(message)                       │     │
│ │ 1. First call original_process(message)                 │     │
│ │ 2. Apply existing filters, transformations              │     │
│ │ 3. Then perform database lookup                         │     │
│ │ 4. Extract customer_id = 123                            │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 3: Database Lookup Process                                 │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ SQL: SELECT name, type FROM customers WHERE id=123      │     │
│ │ Result: {"name": "Alice", "type": "VIP"}                │     │
│ │ Merge with processed message                            │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 4: Final Enriched Message                                  │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ {                                                       │     │
│ │   "order_id": 1001,                                     │     │
│ │   "customer_id": 123,                                   │     │
│ │   "amount": 85,                                         │     │
│ │   "name": "Alice",     ← Added by lookup                │     │
│ │   "type": "VIP"        ← Added by lookup                │     │
│ │ }                                                       │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────────────┘

Practical Use Case

Imagine a simple order processing system with the following scenario:

Data Sources:

  • Kafka topic orders: Basic order info {"order_id": 1001, "customer_id": 123}

  • Kafka topic customers: Customer data stream {"customer_id": 123, "name": "Alice", "type": "VIP"}

  • PostgreSQL customers table: Populated by another streaming pipeline from the customers topic

  • Goal: Enrich order stream with customer info in real-time, producing complete order data.

System Architecture:

Kafka customers topicStreaming Pipeline APostgreSQL customers tablelookup
Kafka orders topicStreaming Pipeline Benriched orders output

Implementation Steps

Pipeline A: Customer Data Sync (runs in background)

# Sync customer data from Kafka to PostgreSQL
sync_engine = SimpleStreamingEngine("customer-sync")
customer_df = sync_engine.dataframe(source=customers_kafka_source)
customer_df.sink(postgres_customers_table)  # Continuously syncs customer data

Pipeline B: Order Enrichment (main logic)

# Order enrichment streaming pipeline
sync_engine = SimpleStreamingEngine("order-enrichment")

# Create DataFrame and set up database
df = sync_engine.dataframe(source=orders_kafka_source)
df.setup_database("postgresql://localhost/orders_db")

# Perform lookup join
enriched_df = df.lookup_join("customers", "customer_id", ["name", "type", "email"])

# Output to destination
enriched_df.sink(output_sink)

Execution Flow

When the system runs, for each new order message:

  1. Original message enters: {"order_id": 1001, "customer_id": 123}

  2. Perform Lookup:

     SELECT name, type, email FROM customers WHERE customer_id = 123
    
  3. Merge Data:

     {
       "order_id": 1001,
       "customer_id": 123,
       "name": "Alice Chen",      ← From database
       "type": "VIP",             ← From database
       "email": "alice@email.com" ← From database
     }
    
  4. Output enriched result: Complete order data is sent to downstream systems.

Core Advantages

Developer Experience: As simple as writing a SQL JOIN, but executed in Streaming.

Immediate Usability: No need to change existing database structure.

Flexible Scaling: Easily add multiple lookup joins.

# Advanced example: Multiple lookup joins
df.lookup_join("customers", "customer_id", ["name", "type"])
  .lookup_join("products", "product_id", ["product_name", "price"])
  .filter(lambda msg: msg.get("type") == "VIP")
  .sink(output_sink)

This concise API design lets developers use familiar chaining syntax to quickly build complex data enrichment pipelines.

Real-World Challenges

1. Increased Latency

Latency Analysis:

  • Pure stream processing: ~1ms

  • Single database lookup: ~50ms

In high-throughput scenarios, this added latency might be unacceptable, though many streaming pipelines can tolerate latencies on the order of seconds.

2. Data Consistency Window

This is the trickiest problem with streaming lookup join: When reference data in the database updates, there is a time window where newly arriving messages will still fetch the old data.

Let's illustrate this problem with a concrete scenario:

Data Consistency Gap Timeline
┌─────────────────────────────────────────────────────────────────┐
 T0: Database State                                              
 ┌─────────────────────────────────────────────────────────┐     
  products table: {"product_id": 123, "price": 85}             
 └─────────────────────────────────────────────────────────┘     
└─────────────────────────┬───────────────────────────────────────┘
                           Price Update Event
                          
┌─────────────────────────────────────────────────────────────────┐
 T1: Price Update (takes 500ms to propagate)                     
 ┌─────────────────────────────────────────────────────────┐     
  products table: {"product_id": 123, "price": 90}             
  Status: Update in progress...                                
 └─────────────────────────────────────────────────────────┘     
└─────────────────────────┬───────────────────────────────────────┘
                           Order arrives during update window
                          
┌─────────────────────────────────────────────────────────────────┐
 T1.1: Order Processing (within consistency gap)                 
 ┌─────────────────────────────────────────────────────────┐     
  Incoming: {"order_id": 1001, "product_id": 123}              
  Lookup result: price = 85   Wrong! Should be 90             
  Output: Wrong pricing in enriched order                      
 └─────────────────────────────────────────────────────────┘     
└─────────────────────────┬───────────────────────────────────────┘
                           Update completes
                          
┌─────────────────────────────────────────────────────────────────┐
 T2: Consistency Restored                                        
 ┌─────────────────────────────────────────────────────────┐     
  All new orders will get correct price = 90                   
  But orders processed during T1-T1.5 have wrong data          
 └─────────────────────────────────────────────────────────┘     
└─────────────────────────────────────────────────────────────────┘

Common Solutions

1. Accept Eventual Consistency

  • Suitable for: Analytical scenarios with low real-time requirements

  • Approach: Periodically correct historical data

2. Bi-Directional Trigger Mechanism (Delta Join)

# Not only does the order stream trigger lookup, but product price updates also trigger recalculation
price_update_stream.process(lambda msg:
    reprocess_recent_orders(msg.product_id, msg.new_price))

This challenge reminds us: While streaming lookup join is convenient for development, careful design of fault tolerance mechanisms is needed for scenarios with strict data consistency requirements.

Summary

This time, we started from the requirement of "the owner wants more details" and worked our way to the practical problem of "the JOIN is too slow."

We adopted the Database Lookup Join solution, performing real-time database queries within the Streaming Pipeline to enrich data, achieving the goal of "flattening" the data before query time.

Advantages of Database Lookup Join

Excellent Experience: Intuitive syntax, as simple as writing SQL JOIN.

Immediately Usable: No need to modify existing database structures—write today, deploy today.

Highly Flexible: Easily add multiple lookups, supporting complex data enrichment.

Simple Maintenance: Logic is concentrated in the streaming pipeline, easy to debug and monitor.

Challenges of Database Lookup Join

Increased Latency: Lookups are convenient, but each piece of data requires a database query. For high-throughput systems, latency can easily accumulate.

Data Consistency Window: Lookups depend on the current database state. Delays in data updates are directly reflected in the results, and can even lead to incorrect information.

These challenges make us start thinking: "Since Database Lookup has latency and consistency issues, can we move the JOIN entirely to the Stream layer? Let two data streams JOIN directly in memory, completely avoiding the database bottleneck?"

Day11 Preview: Kappa Architecture, All on Streaming!

From this experience, a bold idea flashed in the tech lead's mind:

"What if all data transformations, aggregations, and JOINs were completed within the Streaming layer? Wouldn't our queries then be blazing fast?"

And so, a quiet evolution from Lambda architecture to Kappa architecture began.

The Modern Backbone for
Real-Time Data and AI
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.