In the previous article, we built the basic Speed Layer architecture with three core components — Source, Sink, and a Simple Streaming Engine.
At first, everything worked perfectly: the Consumer processed order data smoothly, and the system appeared to be running just fine.
However, when the system faced a real challenge — for example, during a Double 11 shopping festival — problems began to surface:
The Consumer reached its processing limit.
Messages started piling up in the queue, waiting to be consumed.
That’s when we realized performance bottlenecks are a core challenge every stream processing system must confront.
Common Consumer Performance Bottlenecks
Based on our hands-on experience, we’ve identified the four most common performance bottlenecks in consumers. Of course, there are other less frequent issues—such as GC tuning, memory leaks, or network configuration—but the following four are the most significant and commonly encountered.
1. Insufficient database write throughput
Problem: Each message triggers a separate insert followed by a commit, overwhelming the database with a large number of small transactions.
Impact: The database becomes busy handling tiny commits and cannot take advantage of batch-processing efficiencies.
Solution: Implement batch inserts to accumulate hundreds of records and commit them together. This dramatically reduces the number of transactions and improves throughput.
2. JSON parsing and serialization overhead
Problem: Each message requires JSON deserialization.
Impact: Heavy JSON parsing consumes a significant portion of CPU resources, lowering overall efficiency.
Solution:
Use a more efficient JSON library (e.g.,
orjson).Consider switching to binary formats (Avro, Protobuf) to reduce parsing overhead.
3. Insufficient Kafka partitions
Problem: The topic’s partition count is too low, limiting the consumer’s parallelism.
Impact: Even if you scale out by adding more consumers, overall throughput remains unchanged.
Solution: Increase the number of partitions to unlock horizontal scalability.
4. Network latency
Problem: Network distance between the broker, consumer, and database introduces additional latency.
Impact: Each network round trip adds latency to message processing.
Solution: Deploy related components as close to each other as possible, ideally within the same network zone.
Tips
In practice, issues #2 and #3 are relatively easy to fix, and #4 is rarely the major bottleneck in most enterprise environments.
The real troublemaker is often #1 — database write performance, which we’ll explore in depth next.
Optimization Strategy: Filter Before Writing
Before addressing database bottlenecks directly, it helps to start from another angle — reducing the volume of data to be processed. Filter out unnecessary data before it hits the database. Only process messages that provide real business value.
For example, if an order has a status of 'removed' and carries no meaningful information, we can filter it out at the Consumer stage to save downstream resources.
Note: All code in this article is pseudo-code for educational and conceptual illustration. Focus on understanding the architecture and logic, not on running the exact code.
Designing the Data Frame Abstraction Layer
To enable flexible data filtering, we introduce a Data Frame abstraction between the Source and Sink.
This layer handles data transformation and filtering.
Data Frame Architecture
Source → DataFrame → Sink
(filtering, transformation)
Concepts:
Filtering: Supports custom filter functions
Chaining: Allows multiple filters in sequence
Routing: Sends filtered data to specific Sinks
Step-by-Step: Simple Data Frame Implementation
1. Initialization
class SimpleDataFrame:
def __init__(self, name: str = "dataframe"):
self.name = name
self._filters = [] # list of filter functions
self._sinks = [] # list of sinks
Concepts:
The Simple Data Frame manages two key lists: filters and Sinks.
It supports method chaining, allowing multiple filter conditions to be linked together seamlessly.
2. The filter Method
def filter(self, func) -> 'SimpleDataFrame':
# Create a new DataFrame instance to support method chaining
new_df = SimpleDataFrame(f"{[self.name](<http://self.name/>)}_filtered")
new_df._filters = self._filters.copy() # Copy existing filters
new_df._filters.append(func) # Add the new filter
new_df._sinks = self._sinks.copy() # Copy existing sinks
return new_df
Concepts:
Each
filtercall creates a new Data Frame instance.Existing filters are preserved, and the new filter condition is added.
Supports method chaining:
df.filter(...).filter(...).
3. The Sink Method
def sink(self, sink: BaseSink) -> 'SimpleDataFrame':
self._sinks.append(sink)
return self # Return self to support method chaining
Concept: Sends the processed data to the specified Sink.
4. The process_message Method
def process_message(self, message) -> bool:
# Check all filters in sequence
for filter_func in self._filters:
if not filter_func(message):
return False # Filtered out
# If the message passes all filters, send it to all sinks
for sink in self._sinks:
sink.write(message)
return True
Concepts:
Each message passes through all filters in order.
If any filter returns
False, the message is dropped.Only filtered messages are sent to all registered Sinks.
Complete Simple Data Frame Code
# A simple DataFrame class supporting filter operations
import logging
from typing import Any, Dict, Callable, List
from .sink import BaseSink
logger = logging.getLogger(__name__)
class SimpleDataFrame:
"""
A simple DataFrame class that supports basic stream processing operations.
"""
def __init__(self, name: str = "dataframe"):
self.name = name
self._filters: List[Callable[[Dict[str, Any]], bool]] = [] # List of filter functions
self._sinks: List[BaseSink] = [] # List of sinks
def filter(self, func: Callable[[Dict[str, Any]], bool]) -> 'SimpleDataFrame':
"""
Add a filter condition.
:param func: A filter function that takes a message dictionary and returns True if it passes the filter.
:return: A new SimpleDataFrame instance to support method chaining.
"""
# Create a new DataFrame instance to support method chaining
new_df = SimpleDataFrame(f"{self.name}_filtered")
new_df._filters = self._filters.copy() # Copy existing filters
new_df._filters.append(func) # Add the new filter
new_df._sinks = self._sinks.copy() # Copy existing sinks
logger.debug(f"Added filter to {self.name}")
return new_df
def __getitem__(self, condition: Callable[[Dict[str, Any]], bool]) -> 'SimpleDataFrame':
"""
Add a filter condition using the [] syntax, equivalent to the filter() method.
:param condition: A filter function that takes a message dictionary and returns True if it passes the filter.
:return: A new SimpleDataFrame instance to support method chaining.
"""
return self.filter(condition)
def sink(self, sink: BaseSink) -> 'SimpleDataFrame':
"""
Add a Sink.
:param sink: A Sink instance.
:return: The same SimpleDataFrame instance to support method chaining.
"""
self._sinks.append(sink)
logger.debug(f"Added sink {sink.name} to {self.name}")
return self
def process_message(self, message: Dict[str, Any]) -> bool:
"""
Process a single message: apply all filters, and if it passes, send it to all sinks.
:param message: The message dictionary.
:return: True if the message passed all filters.
"""
# Apply all filters in sequence
for filter_func in self._filters:
try:
if not filter_func(message):
logger.debug("Message filtered out")
return False
except Exception as e:
logger.error(f"Error in filter: {e}")
return False
# If the message passes all filters, send it to all sinks
for sink in self._sinks:
try:
sink.write(message)
except Exception as e:
logger.error(f"Error writing to sink {sink.name}: {e}")
return True
Upgrading the Simple Streaming Engine
Next, we’ll modify the Simple Streaming Engine to support the new Data Frame abstraction.
Now messages from a Source are first processed by the corresponding Data Frame — filtered and transformed — before being passed on to Sinks.
Old Architecture
┌─────────────────────┐
│SimpleStreamingEngine│ ◄── Central Manager
│ │
│ +add_source() │
│ +add_sink() │
│ + run() │
└─────────────────────┘
│
│ manages
▼
┌──────────────┐ ┌──────────────┐
│ Source │───▶│ Sink │
│ │ │ │
│ KafkaSource │ │PostgreSQLSink│
└──────────────┘ └──────────────┘
Upgraded Architecture
┌─────────────────────┐
│SimpleStreamingEngine│ ◄── Central Manager
│ │
│ +add_source() │
│ +dataframe() │
│ + run() │
└─────────────────────┘
│
│ manages
▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Source │───▶│ DataFrame │───▶│ Sink │
│ │ │ │ │ │
│ KafkaSource │ │ +filter() │ │PostgreSQLSink│
└──────────────┘ └──────────────┘ └──────────────┘
Step-by-Step Explanation of the Simple Streaming Engine Upgrade
1. Add Data Frame Support
The upgraded Simple Streaming Engine now manages Data Frames instead of directly managing Sinks:
class SimpleStreamingEngine:
def __init__(self, name: str = "simple-streaming-engine"):
self.name = name
self._sources = []
self._dataframes = [] # NEW: Manage a list of DataFrames
self._source_dataframe_map = {} # NEW: Mapping between Source and DataFrame
Concepts:
Removed direct Sink management.
Added DataFrame list and Source → DataFrame mapping.
2. The dataframe Method — Linking Source and DataFrame
def dataframe(self, *, source: BaseSource) -> SimpleDataFrame:
df = SimpleDataFrame(f"df-{source.name}")
self._dataframes.append(df)
self._source_dataframe_map[source] = df # Create the mapping
self.add_source(source)
return df
Concepts:
Creates a dedicated DataFrame for each Source.
Establishes a Source → DataFrame mapping.
Returns the DataFrame for method chaining.
3. Upgrade the run Method
def run(self):
# Set up all sinks from DataFrames
all_sinks = []
for df in self._dataframes:
all_sinks.extend(df._sinks)
for sink in all_sinks:
sink.setup()
# Assign message handlers to all sources
for source in self._sources:
source.message_handler = self._create_message_handler(source)
source.run()
Concepts:
Collects all Sinks from DataFrames and initializes them.
Assigns a corresponding message handler to each Source.
4. The New Message Handler
def _create_message_handler(self, source: BaseSource):
def message_handler(message):
if source in self._source_dataframe_map:
df = self._source_dataframe_map[source]
df.process_message(message) # Delegate to DataFrame
return message_handler
Concepts:
No longer sends messages directly to Sinks.
Finds the corresponding DataFrame via mapping.
Delegates filtering and forwarding to the DataFrame.
Complete Upgraded Simple Streaming Engine
import logging
from typing import List
from .source import BaseSource
from .dataframe import SimpleDataFrame
logger = logging.getLogger(__name__)
class SimpleStreamingEngine:
"""
A simple streaming engine that supports DataFrames.
"""
def __init__(self, name: str = "simple-streaming-engine"):
self.name = name
self._sources: List[BaseSource] = []
self._dataframes: List[SimpleDataFrame] = []
self._source_dataframe_map = {}
def add_source(self, source: BaseSource):
"""Add a source to the streaming engine."""
self._sources.append(source)
def dataframe(self, *, source: BaseSource) -> SimpleDataFrame:
"""Create a DataFrame and link it to a source."""
df = SimpleDataFrame(f"df-{source.name}")
self._dataframes.append(df)
self._source_dataframe_map[source] = df
self.add_source(source)
return df
def run(self):
"""Start the streaming engine and begin processing streams."""
# Initialize all sinks in DataFrames
all_sinks = []
for df in self._dataframes:
all_sinks.extend(df._sinks)
for sink in all_sinks:
sink.setup()
# Assign message handlers for each source
for source in self._sources:
source.message_handler = self._create_message_handler(source)
source.run()
def _create_message_handler(self, source: BaseSource):
"""Create a message handler to route messages to the corresponding DataFrame."""
def message_handler(message):
if source in self._source_dataframe_map:
df = self._source_dataframe_map[source]
df.process_message(message)
return message_handler
Example: Filtering “Removed” Orders
Let’s see how the upgraded architecture can be used to filter invalid data.
Step-by-Step Usage
1. Create a Simple Streaming Engine
engine = SimpleStreamingEngine(name="orders-processing-engine")
2. Create a Source and Build a DataFrame
orders_source = SimpleKafkaSource(name="orders-source", topic="orders")
sdf = engine.dataframe(source=orders_source)
3. Add Filter Conditions
# Filter out orders with status 'removed'
sdf = sdf.filter(lambda msg: msg.get('value', {}).get('status') != 'removed')
# Alternatively, use [] syntax (equivalent)
sdf = sdf[lambda msg: msg.get('value', {}).get('status') != 'removed']
4. Add Sink and Run the Engine
pg_sink = SimplePostgreSQLSink(...)
sdf.sink(pg_sink)
engine.run() # Start processing: Kafka → Filter → PostgreSQL
Full Example
# 1. Create SimpleStreamingEngine
engine = SimpleStreamingEngine(name="orders-processing-engine")
# 2. Create Kafka Source
orders_source = SimpleKafkaSource(name="orders-source", topic="orders")
# 3. Create DataFrame and add a filter
sdf = engine.dataframe(source=orders_source)
sdf = sdf.filter(lambda msg: msg.get('value', {}).get('status') != 'removed')
# 4. Add PostgreSQL Sink
pg_sink = SimplePostgreSQLSink(
name="orders-sink",
table_name="valid_orders",
# ... other parameters
)
# 5. Assemble and start
sdf.sink(pg_sink)
engine.run() # Start processing: Kafka → Filter → PostgreSQL
Data Flow:
KafkaSource → DataFrame.filter() → PostgreSQLSink
This design makes the filtering logic clear, modular, and easy to extend, allowing you to effortlessly add more filters or modify the processing workflow.
Summary
In this article, we explored performance bottlenecks in the Speed Layer and introduced the “filter before writing” optimization strategy.
Key Takeaways:
Bottleneck Awareness: Understand the four major Consumer bottlenecks and how to fix them.
Data Frame Abstraction: Use a flexible Data Frame layer for data filtering and transformation.
Architecture Upgrade: Extend the Simple Streaming Engine to support the new Data Frame flow.
Practical Example: Filter out invalid “removed” orders efficiently.
By adopting the Source → DataFrame → Sink architecture, we gain:
Greater flexibility in data processing.
Extensible filtering mechanisms.
Improved overall system performance.
Coming Up: Day 6 — Batch Writing Optimization
While filtering reduces unnecessary data, fast-growing businesses still face the challenge of handling large volumes of valid data.
In the next article, we’ll dive into batch writing — learning how to boost database performance through batch commits and keep your Speed Layer running smoothly even during peak loads.

