In the previous article, we introduced the design principles and implementation of the Lambda Architecture.
We talked about its three layers:
Batch Layer – slow but precise; processes full historical data to ensure eventual consistency
Speed Layer – handles incremental data in real time
Serving Layer – merges results from both layers and provides a unified query interface
Starting today, we’ll dive into the implementation of the Speed Layer.
We’ll begin by writing a Kafka Consumer and gradually explore the core mechanisms of stream processing.
In today’s article, we’ll write a Kafka Consumer — a key component of the Speed Layer — responsible for instantly receiving order data and writing it into the Serving DB (while the Batch Layer focuses on processing full historical data each day).
⚠️ Important
All the code in this article is for educational and conceptual demonstration only (pseudo code).
It’s designed to help you understand and solve real-world problems, but it’s not production-ready.
Focus on grasping the overall architecture and design logic — you can skim over low-level code details.
Step 1: Speed Layer Kafka Consumer: Ingesting Orders into the Serving DB
Let’s start with the simplest version:
from kafka import KafkaConsumer
import json
# Subscribe to the orders topic
consumer = KafkaConsumer('orders')
print("[Speed Layer] Waiting for fresh orders...")
for message in consumer:
order = json.loads(message.value.decode('utf-8'))
insert_db(order)
conn.commit()
print(f"[Speed Layer] Inserted order {order['id']}")
This logic is straightforward:
Kafka provides a real-time stream of order data
The Consumer consumes and processes the stream
The Serving DB stores the processed results
Core task: write incoming data into storage as fast as possible
Step 2: Designing the Serving DB
Here we have two key tables:
orders_batch_summary– pre-aggregated historical data computed daily by the Batch Layerorders_realtime– detailed real-time orders sent by the Speed Layer
When querying from the dashboard, the system merges both tables while filtering out invalid orders with status = 'removed':
SELECT status, SUM(count) AS total
FROM (
SELECT status, count(*)
FROM orders_batch_summary
WHERE status != 'removed'
UNION ALL
SELECT status, COUNT(*) AS count
FROM orders_realtime
WHERE status != 'removed'
GROUP BY status
) t
GROUP BY status;
However, as the company grows, more Consumers are written independently by different developers, each with their own logic. Over time, the codebase becomes tangled and unmaintainable.
We need to refactor — to establish a unified Stream Processing architecture so everyone can build upon a consistent framework.
Designing a Source Abstraction Layer
On a team project, different Kafka Consumer implementations can make integration harder.
The solution is to define a unified Source Interface.
Source Architecture Design
┌─────────────┐
│ BaseSource │ ◄── Abstract Interface
│ │
│ + run() │
└─────────────┘
△
│ implements
┌─────────────┐
│KafkaSource │ ◄── Concrete Implementation
│ │
│ + run() │
└─────────────┘
Step-by-Step Explanation of the Source Core
Step 1: Define the BaseSource Abstract Interface
from abc import ABC, abstractmethod
class BaseSource(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def run(self):
pass
Key Points:
Each Source must have a unique
namerun()is abstract — forcing subclasses to implement it
Step 2: Initialize SimpleKafkaSource
class SimpleKafkaSource(BaseSource):
def __init__(self, name: str, topic: str, broker_address: str = "localhost:9092"):
super().__init__(name)
self.topic = topic
self.broker_address = broker_address
self.consumer = None
self.message_handler = self._default_handler
Key Points:
Inherits from
BaseSourceto conform to a unified interfacemessage_handleris replaceable, offering flexibility in message processing logic
Step 3: Set Up the Kafka Consumer
def _setup_consumer(self):
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.broker_address,
group_id=f"simple-source-{self.name}",
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None
)
Key Points:
group_idis automatically generated to avoid conflictsauto_offset_reset='latest'ensures consumption starts from the latest messagesAutomatic JSON deserialization
Step 4: Core Runtime Logic
def run(self):
self._setup_consumer() # initialize Consumer
for message in self.consumer: # continuously listen for messages
self.message_handler({
'key': message.key,
'value': message.value,
'topic': message.topic,
'offset': message.offset
})
Execution Flow:
Initialize Kafka Consumer
Continuously read messages from the Topic
Wrap messages in a standard format
Call
message_handlerfor processing
Key Points:
The Source only handles data ingestion; message processing logic is injected externally through message_handler, enabling high flexibility.
import logging
import json
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any
from kafka import KafkaConsumer
logger = logging.getLogger(__name__)
class BaseSource(ABC):
"""Base abstract class for all Sources"""
def __init__(self, name: str):
self.name = name
self._running = False
@abstractmethod
def run(self):
"""Main execution method"""
pass
def stop(self):
"""Stop the Source"""
self._running = False
logger.info(f"Source {self.name} stopped")
class SimpleKafkaSource(BaseSource):
"""Simple Kafka Source implementation"""
def __init__(
self,
name: str,
topic: str,
broker_address: str = "localhost:9092",
consumer_group: Optional[str] = None,
message_handler: Optional[Callable[[Any], None]] = None
):
super().__init__(name)
self.topic = topic
self.broker_address = broker_address
self.consumer_group = consumer_group or f"simple-source-{name}"
self.message_handler = message_handler or self._default_handler
self.consumer: Optional[KafkaConsumer] = None
def _default_handler(self, message):
print(f"[{self.name}] Received message: {message}")
def _setup_consumer(self):
try:
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.broker_address,
group_id=self.consumer_group,
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None,
key_deserializer=lambda m: m.decode('utf-8') if m else None
)
logger.info(f"Consumer setup for topic: {self.topic}, group: {self.consumer_group}")
except Exception as e:
logger.error(f"Failed to setup consumer: {e}")
raise
def run(self):
logger.info(f"Starting Source {self.name} for topic {self.topic}")
self._setup_consumer()
self._running = True
try:
while self._running:
message_batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items():
for message in messages:
if not self._running:
break
try:
self.message_handler({
'key': message.key,
'value': message.value,
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'timestamp': message.timestamp
})
except Exception as e:
logger.error(f"Error processing message: {e}")
except KeyboardInterrupt:
logger.info("Received interrupt signal")
except Exception as e:
logger.error(f"Error in run loop: {e}")
finally:
if self.consumer:
self.consumer.close()
logger.info(f"Source {self.name} finished")
def stop(self):
super().stop()
if self.consumer:
self.consumer.close()
Designing the Sink Abstraction Layer
In the Speed Layer architecture, Sources handle data input, while Sinks handle data output.
To avoid inconsistent implementations, we define a unified Sink interface.
Sink Architecture Design
┌─────────────┐
│ BaseSink │ ◄── Abstract Interface
│ │
│ + write() │
└─────────────┘
△
│ implements
┌──────────────────┐
│SimplePostgreSQL │ ◄── Concrete Implementation
│Sink │
│ + write() │
└──────────────────┘
Step-by-Step Explanation of the Sink Core
Step 1: Define the Base Sink Interface
from abc import ABC, abstractmethod
class BaseSink(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def write(self, message):
pass
def setup(self):
pass # default no-op
Key Points:
Each Sink must have a unique
namewrite()handles actual data writessetup()is optional and can be overridden
Step 2: Initialize the Simple PostgreSQL Sink
class SimplePostgreSQLSink(BaseSink):
def __init__(self, name: str, host: str, dbname: str, table_name: str):
super().__init__(name)
self.host = host
self.dbname = dbname
self.table_name = table_name
self.connection = None
Key Points:
Inherits from
BaseSinkfor interface consistencyStores database connection information
Lazy connection initialization (
connection = None)
Step 3: Core Logic of write()
def write(self, message):
# Automatically detect columns and insert into DB
data = message.get('value', {})
# ... dynamically generate SQL and insert
Key Points:
Automatically detects field structures in message['value'], dynamically generates INSERT SQL, and writes to the database.
This allows the Sink to adapt to various data schemas automatically.
Complete Sink Implementation
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict
try:
import psycopg2
from psycopg2.extras import Json
from psycopg2 import sql
except ImportError:
psycopg2 = None
print("Warning: psycopg2 not installed. Run: pip install psycopg2-binary")
logger = logging.getLogger(__name__)
class BaseSink(ABC):
"""Base abstract class for all Sinks"""
def __init__(self, name: str):
self.name = name
@abstractmethod
def write(self, message: Dict[str, Any]):
"""Write a message"""
pass
def setup(self):
"""Setup connection"""
pass
def close(self):
"""Close connection"""
pass
class SimplePostgreSQLSink(BaseSink):
"""PostgreSQL Sink with automatic schema detection"""
def __init__(
self,
name: str,
host: str,
port: int,
dbname: str,
user: str,
password: str,
table_name: str
):
super().__init__(name)
self.host = host
self.port = port
self.dbname = dbname
self.user = user
self.password = password
self.table_name = table_name
self.connection = None
def setup(self):
if psycopg2 is None:
raise ImportError("psycopg2 is required")
self.connection = psycopg2.connect(
host=self.host,
port=self.port,
dbname=self.dbname,
user=self.user,
password=self.password
)
logger.info(f"Connected to PostgreSQL: {self.host}:{self.port}/{self.dbname}")
def write(self, message: Dict[str, Any]):
"""Automatically detect fields and write to PostgreSQL"""
data = message.get('value', {})
# ... dynamic field detection and SQL execution logic
def close(self):
if self.connection:
self.connection.close()
logger.info("PostgreSQL connection closed")
Simple Streaming Engine: Unified Management Layer
Between the Source (data input) and the Sink (data output), we need a unified management layer to handle orchestration, monitoring, and lifecycle management.
This component is the SimpleStreamingEngine.
Simple Streaming Engine Architecture Design
┌─────────────────────┐
│SimpleStreamingEngine│ ◄── Central Manager
│ │
│ +add_source() │
│ +add_sink() │
│ + run() │
└─────────────────────┘
│
│ manages
▼
┌──────────────┐ ┌──────────────┐
│ Source │───▶│ Sink │
│ │ │ │
│ KafkaSource │ │PostgreSQLSink│
└──────────────┘ └──────────────┘
Step-by-Step Breakdown of the Simple Streaming Engine Core Code
Step 1: Initializing the Simple Streaming Engine
class SimpleStreamingEngine:
def __init__(self, name: str = "simple-streaming-app"):
self.name = name
self._sources = [] # List of Sources
self._sinks = [] # List of Sinks
Key Points:
The
SimpleStreamingEnginemanages two lists: Sources and Sinks.It provides unified registration interfaces for both.
Step 2: Registering Sources and Sinks
def add_source(self, source: BaseSource):
self._sources.append(source)
def add_sink(self, sink: BaseSink):
self._sinks.append(sink)
Key Points:
Simple list-based management supporting multiple sources and sinks.
Follows a unified interface — any class implementing
BaseSourceorBaseSinkcan be registered.
Step 3: Core Execution Logic
def run(self):
# Initialize all sinks
for sink in self._sinks:
sink.setup()
# Set message handler for each source
for source in self._sources:
source.message_handler = self._create_message_handler()
source.run() # Start consuming data
Execution Flow:
Initialize connections for all sinks.
Assign message handlers to sources.
Start all sources to begin consuming data.
Step 4: Message Handler Core Logic
def _create_message_handler(self):
def handler(message):
# Forward message to all sinks
for sink in self._sinks:
sink.write(message)
return handler
Detailed Data Flow Explanation:
When SimpleStreamingEngine starts:
# Inside SimpleStreamingEngine.run() for source in self._sources: source.message_handler = self._create_message_handler() # Inject handler source.run() # Start sourceWhen Source receives data:
# Inside SimpleKafkaSource.run() for message in self.consumer: # Fetch messages from Kafka formatted_message = { 'key': message.key, 'value': message.value } self.message_handler(formatted_message) # Call injected handlerWhen handler forwards messages:
# Handler returned by _create_message_handler() def handler(message): # message = formatted data from source for sink in self._sinks: sink.write(message)
Overall Data Flow:
Kafka → Source.run() → message_handler() → Sink.write()
Key Point:
The SimpleStreamingEngine uses function injection so that sources are unaware of sinks, achieving complete decoupling between components.
Why Do We Need the Simple Streaming Engine?
Decoupled Design: Sources and sinks are fully independent and interchangeable.
Scalability: Supports multiple sinks simultaneously (e.g., PostgreSQL + Elasticsearch).
Unified Management: Offers a consistent API for registration and execution.
Complete Simple Streaming Engine Code
import logging
from typing import List
from .source import BaseSource
from .sink import BaseSink
logger = logging.getLogger(__name__)
class SimpleStreamingEngine:
"""
A minimal streaming processing engine
"""
def __init__(self, name: str = "simple-streaming-engine"):
self.name = name
self._sources: List[BaseSource] = [] # List of sources
self._sinks: List[BaseSink] = [] # List of sinks
def add_source(self, source: BaseSource):
"""
Register a source with the streaming engine
"""
self._sources.append(source)
def add_sink(self, sink: BaseSink):
"""
Register a sink with the streaming engine
"""
self._sinks.append(sink)
def run(self):
"""
Start the streaming engine and process data streams
"""
# Initialize all sinks
for sink in self._sinks:
sink.setup()
# Assign message handler to each source
for source in self._sources:
source.message_handler = self._create_message_handler()
source.run() # Start consuming data
def _create_message_handler(self):
"""
Create a message handler that dispatches data to all sinks
"""
def handler(message):
for sink in self._sinks:
sink.write(message)
return handler
Putting It All Together: Automatic Data Flow in Action
At this point, we have:
Source: Ingest data from Kafka
Sink: Deliver data into PostgreSQL
SimpleStreamingEngine: Connects, manages, and orchestrates both
Once assembled, we can kick off the automatic end-to-end data flow.
# 1. Create the SimpleStreamingEngine
engine = SimpleStreamingEngine(...)
# 2. Create a Kafka Source
orders_source = SimpleKafkaSource(...)
# 3. Create a PostgreSQL Sink
pg_sink = SimplePostgreSQLSink(...)
# 4. Assemble and start
engine.add_source(orders_source)
engine.add_sink(pg_sink)
engine.run() # Start processing: Kafka → PostgreSQL
Summary
In this section, we explored the core implementation of the Speed Layer:
The Batch Layer provides reliable historical data processing.
The Speed Layer delivers real-time responsiveness and streaming data handling.
Without the Speed Layer, true real-time capability in the Lambda Architecture is impossible.
Through the Source–Sink–SimpleStreamingEngine architecture, we built:
A unified data processing interface
A scalable stream processing framework
A fully functional Speed Layer implementation
Day 5 Preview: Tackling Performance Bottlenecks
At first, the system runs smoothly — the consumer processes order data without issue.
But under heavy traffic:
The console starts showing latency warnings.
The consumer’s processing power hits its limit.
Orders begin to queue up for processing.
In the next part, we’ll explore Speed Layer performance challenges and how to optimize for high-throughput workloads.

