Speed Layer Performance Bottlenecks and Optimizations

Speed Layer Performance Bottlenecks and Optimizations

In the previous article, we built the basic Speed Layer architecture with three core components — Source, Sink, and a Simple Streaming Engine.

At first, everything worked perfectly: the Consumer processed order data smoothly, and the system appeared to be running just fine.

However, when the system faced a real challenge — for example, during a Double 11 shopping festival — problems began to surface:

  • The Consumer reached its processing limit.

  • Messages started piling up in the queue, waiting to be consumed.

That’s when we realized performance bottlenecks are a core challenge every stream processing system must confront.

Common Consumer Performance Bottlenecks

Based on our hands-on experience, we’ve identified the four most common performance bottlenecks in consumers. Of course, there are other less frequent issues—such as GC tuning, memory leaks, or network configuration—but the following four are the most significant and commonly encountered.

1. Insufficient database write throughput

Problem: Each message triggers a separate insert followed by a commit, overwhelming the database with a large number of small transactions.

Impact: The database becomes busy handling tiny commits and cannot take advantage of batch-processing efficiencies.

Solution: Implement batch inserts to accumulate hundreds of records and commit them together. This dramatically reduces the number of transactions and improves throughput.

2. JSON parsing and serialization overhead

Problem: Each message requires JSON deserialization.

Impact: Heavy JSON parsing consumes a significant portion of CPU resources, lowering overall efficiency.

Solution:

  • Use a more efficient JSON library (e.g., orjson).

  • Consider switching to binary formats (Avro, Protobuf) to reduce parsing overhead.

3. Insufficient Kafka partitions

Problem: The topic’s partition count is too low, limiting the consumer’s parallelism.

Impact: Even if you scale out by adding more consumers, overall throughput remains unchanged.

Solution: Increase the number of partitions to unlock horizontal scalability.

4. Network latency

Problem: Network distance between the broker, consumer, and database introduces additional latency.

Impact: Each network round trip adds latency to message processing.

Solution: Deploy related components as close to each other as possible, ideally within the same network zone.

Tips

In practice, issues #2 and #3 are relatively easy to fix, and #4 is rarely the major bottleneck in most enterprise environments.

The real troublemaker is often #1 — database write performance, which we’ll explore in depth next.

Optimization Strategy: Filter Before Writing

Before addressing database bottlenecks directly, it helps to start from another angle — reducing the volume of data to be processed. Filter out unnecessary data before it hits the database. Only process messages that provide real business value.

For example, if an order has a status of 'removed' and carries no meaningful information, we can filter it out at the Consumer stage to save downstream resources.

Note: All code in this article is pseudo-code for educational and conceptual illustration. Focus on understanding the architecture and logic, not on running the exact code.

Designing the Data Frame Abstraction Layer

To enable flexible data filtering, we introduce a Data Frame abstraction between the Source and Sink.

This layer handles data transformation and filtering.

Data Frame Architecture

Source → DataFrame → Sink
   (filtering, transformation)

Concepts:

  • Filtering: Supports custom filter functions

  • Chaining: Allows multiple filters in sequence

  • Routing: Sends filtered data to specific Sinks

Step-by-Step: Simple Data Frame Implementation

1. Initialization

class SimpleDataFrame:
    def __init__(self, name: str = "dataframe"):
        self.name = name
        self._filters = []  # list of filter functions
        self._sinks = []    # list of sinks

Concepts:

  • The Simple Data Frame manages two key lists: filters and Sinks.

  • It supports method chaining, allowing multiple filter conditions to be linked together seamlessly.

2. The filter Method

def filter(self, func) -> 'SimpleDataFrame':
# Create a new DataFrame instance to support method chaining
new_df = SimpleDataFrame(f"{[self.name](<http://self.name/>)}_filtered")
new_df._filters = self._filters.copy()  # Copy existing filters
new_df._filters.append(func)            # Add the new filter
new_df._sinks = self._sinks.copy()      # Copy existing sinks
return new_df

Concepts:

  • Each filter call creates a new Data Frame instance.

  • Existing filters are preserved, and the new filter condition is added.

  • Supports method chaining: df.filter(...).filter(...).

3. The Sink Method

def sink(self, sink: BaseSink) -> 'SimpleDataFrame':
    self._sinks.append(sink)
    return self # Return self to support method chaining

Concept: Sends the processed data to the specified Sink.

4. The process_message Method

def process_message(self, message) -> bool:
        # Check all filters in sequence
    for filter_func in self._filters:
        if not filter_func(message):
            return False # Filtered out

    # If the message passes all filters, send it to all sinks
    for sink in self._sinks:
        sink.write(message)
    return True

Concepts:

  1. Each message passes through all filters in order.

  2. If any filter returns False, the message is dropped.

  3. Only filtered messages are sent to all registered Sinks.

Complete Simple Data Frame Code

# A simple DataFrame class supporting filter operations
import logging
from typing import Any, Dict, Callable, List
from .sink import BaseSink

logger = logging.getLogger(__name__)

class SimpleDataFrame:
    """
    A simple DataFrame class that supports basic stream processing operations.
    """

    def __init__(self, name: str = "dataframe"):
        self.name = name
        self._filters: List[Callable[[Dict[str, Any]], bool]] = []  # List of filter functions
        self._sinks: List[BaseSink] = []  # List of sinks

    def filter(self, func: Callable[[Dict[str, Any]], bool]) -> 'SimpleDataFrame':
        """
        Add a filter condition.

        :param func: A filter function that takes a message dictionary and returns True if it passes the filter.
        :return: A new SimpleDataFrame instance to support method chaining.
        """
        # Create a new DataFrame instance to support method chaining
        new_df = SimpleDataFrame(f"{self.name}_filtered")
        new_df._filters = self._filters.copy()  # Copy existing filters
        new_df._filters.append(func)            # Add the new filter
        new_df._sinks = self._sinks.copy()      # Copy existing sinks

        logger.debug(f"Added filter to {self.name}")
        return new_df

    def __getitem__(self, condition: Callable[[Dict[str, Any]], bool]) -> 'SimpleDataFrame':
        """
        Add a filter condition using the [] syntax, equivalent to the filter() method.

        :param condition: A filter function that takes a message dictionary and returns True if it passes the filter.
        :return: A new SimpleDataFrame instance to support method chaining.
        """
        return self.filter(condition)

    def sink(self, sink: BaseSink) -> 'SimpleDataFrame':
        """
        Add a Sink.

        :param sink: A Sink instance.
        :return: The same SimpleDataFrame instance to support method chaining.
        """
        self._sinks.append(sink)
        logger.debug(f"Added sink {sink.name} to {self.name}")
        return self

    def process_message(self, message: Dict[str, Any]) -> bool:
        """
        Process a single message: apply all filters, and if it passes, send it to all sinks.

        :param message: The message dictionary.
        :return: True if the message passed all filters.
        """
        # Apply all filters in sequence
        for filter_func in self._filters:
            try:
                if not filter_func(message):
                    logger.debug("Message filtered out")
                    return False
            except Exception as e:
                logger.error(f"Error in filter: {e}")
                return False

        # If the message passes all filters, send it to all sinks
        for sink in self._sinks:
            try:
                sink.write(message)
            except Exception as e:
                logger.error(f"Error writing to sink {sink.name}: {e}")

        return True

Upgrading the Simple Streaming Engine

Next, we’ll modify the Simple Streaming Engine to support the new Data Frame abstraction.

Now messages from a Source are first processed by the corresponding Data Frame — filtered and transformed — before being passed on to Sinks.

Old Architecture

    ┌─────────────────────┐
    │SimpleStreamingEngine│  ◄── Central Manager
    │                     │
    │    +add_source()    │
    │    +add_sink()      │
    │    + run()          │
    └─────────────────────┘
           │
           │ manages
           ▼
    ┌──────────────┐    ┌──────────────┐
    │    Source    │───▶│     Sink     │
    │              │    │              │
    │ KafkaSource  │    │PostgreSQLSink│
    └──────────────┘    └──────────────┘

Upgraded Architecture

    ┌─────────────────────┐
    │SimpleStreamingEngine│  ◄── Central Manager
    │                     │
    │    +add_source()    │
    │    +dataframe()     │
    │    + run()          │
    └─────────────────────┘
           │
           │ manages
           ▼
    ┌──────────────┐    ┌──────────────┐    ┌──────────────┐
    │    Source    │───▶│  DataFrame   │───▶│     Sink     │
    │              │    │              │    │              │
    │ KafkaSource  │    │ +filter()    │    │PostgreSQLSink│
    └──────────────┘    └──────────────┘    └──────────────┘

Step-by-Step Explanation of the Simple Streaming Engine Upgrade

1. Add Data Frame Support

The upgraded Simple Streaming Engine now manages Data Frames instead of directly managing Sinks:

class SimpleStreamingEngine:
    def __init__(self, name: str = "simple-streaming-engine"):
        self.name = name
        self._sources = []
        self._dataframes = []  # NEW: Manage a list of DataFrames
        self._source_dataframe_map = {}  # NEW: Mapping between Source and DataFrame

Concepts:

  • Removed direct Sink management.

  • Added DataFrame list and Source → DataFrame mapping.

2. The dataframe Method — Linking Source and DataFrame

def dataframe(self, *, source: BaseSource) -> SimpleDataFrame:
    df = SimpleDataFrame(f"df-{source.name}")
    self._dataframes.append(df)
    self._source_dataframe_map[source] = df  # Create the mapping
    self.add_source(source)
    return df

Concepts:

  • Creates a dedicated DataFrame for each Source.

  • Establishes a Source → DataFrame mapping.

  • Returns the DataFrame for method chaining.

3. Upgrade the run Method

def run(self):
    # Set up all sinks from DataFrames
    all_sinks = []
    for df in self._dataframes:
        all_sinks.extend(df._sinks)

    for sink in all_sinks:
        sink.setup()

    # Assign message handlers to all sources
    for source in self._sources:
        source.message_handler = self._create_message_handler(source)
        source.run()

Concepts:

  • Collects all Sinks from DataFrames and initializes them.

  • Assigns a corresponding message handler to each Source.

4. The New Message Handler

def _create_message_handler(self, source: BaseSource):
    def message_handler(message):
        if source in self._source_dataframe_map:
            df = self._source_dataframe_map[source]
            df.process_message(message)  # Delegate to DataFrame

    return message_handler

Concepts:

  • No longer sends messages directly to Sinks.

  • Finds the corresponding DataFrame via mapping.

  • Delegates filtering and forwarding to the DataFrame.

Complete Upgraded Simple Streaming Engine

import logging
from typing import List
from .source import BaseSource
from .dataframe import SimpleDataFrame

logger = logging.getLogger(__name__)

class SimpleStreamingEngine:
    """
    A simple streaming engine that supports DataFrames.
    """

    def __init__(self, name: str = "simple-streaming-engine"):
        self.name = name
        self._sources: List[BaseSource] = []
        self._dataframes: List[SimpleDataFrame] = []
        self._source_dataframe_map = {}

    def add_source(self, source: BaseSource):
        """Add a source to the streaming engine."""
        self._sources.append(source)

    def dataframe(self, *, source: BaseSource) -> SimpleDataFrame:
        """Create a DataFrame and link it to a source."""
        df = SimpleDataFrame(f"df-{source.name}")
        self._dataframes.append(df)
        self._source_dataframe_map[source] = df
        self.add_source(source)
        return df

    def run(self):
        """Start the streaming engine and begin processing streams."""
        # Initialize all sinks in DataFrames
        all_sinks = []
        for df in self._dataframes:
            all_sinks.extend(df._sinks)

        for sink in all_sinks:
            sink.setup()

        # Assign message handlers for each source
        for source in self._sources:
            source.message_handler = self._create_message_handler(source)
            source.run()

    def _create_message_handler(self, source: BaseSource):
        """Create a message handler to route messages to the corresponding DataFrame."""
        def message_handler(message):
            if source in self._source_dataframe_map:
                df = self._source_dataframe_map[source]
                df.process_message(message)
        return message_handler

Example: Filtering “Removed” Orders

Let’s see how the upgraded architecture can be used to filter invalid data.

Step-by-Step Usage

1. Create a Simple Streaming Engine

engine = SimpleStreamingEngine(name="orders-processing-engine")

2. Create a Source and Build a DataFrame

orders_source = SimpleKafkaSource(name="orders-source", topic="orders")
sdf = engine.dataframe(source=orders_source)

3. Add Filter Conditions

# Filter out orders with status 'removed'
sdf = sdf.filter(lambda msg: msg.get('value', {}).get('status') != 'removed')

# Alternatively, use [] syntax (equivalent)
sdf = sdf[lambda msg: msg.get('value', {}).get('status') != 'removed']

4. Add Sink and Run the Engine

pg_sink = SimplePostgreSQLSink(...)
sdf.sink(pg_sink)
engine.run()  # Start processing: Kafka → Filter → PostgreSQL

Full Example

# 1. Create SimpleStreamingEngine
engine = SimpleStreamingEngine(name="orders-processing-engine")

# 2. Create Kafka Source
orders_source = SimpleKafkaSource(name="orders-source", topic="orders")

# 3. Create DataFrame and add a filter
sdf = engine.dataframe(source=orders_source)
sdf = sdf.filter(lambda msg: msg.get('value', {}).get('status') != 'removed')

# 4. Add PostgreSQL Sink
pg_sink = SimplePostgreSQLSink(
    name="orders-sink",
    table_name="valid_orders",
    # ... other parameters
)

# 5. Assemble and start
sdf.sink(pg_sink)
engine.run()  # Start processing: Kafka → Filter → PostgreSQL

Data Flow:

KafkaSource → DataFrame.filter() → PostgreSQLSink

This design makes the filtering logic clear, modular, and easy to extend, allowing you to effortlessly add more filters or modify the processing workflow.

Summary

In this article, we explored performance bottlenecks in the Speed Layer and introduced the “filter before writing” optimization strategy.

Key Takeaways:

  1. Bottleneck Awareness: Understand the four major Consumer bottlenecks and how to fix them.

  2. Data Frame Abstraction: Use a flexible Data Frame layer for data filtering and transformation.

  3. Architecture Upgrade: Extend the Simple Streaming Engine to support the new Data Frame flow.

  4. Practical Example: Filter out invalid “removed” orders efficiently.

By adopting the Source → DataFrame → Sink architecture, we gain:

  • Greater flexibility in data processing.

  • Extensible filtering mechanisms.

  • Improved overall system performance.

Coming Up: Day 6 — Batch Writing Optimization

While filtering reduces unnecessary data, fast-growing businesses still face the challenge of handling large volumes of valid data.

In the next article, we’ll dive into batch writing — learning how to boost database performance through batch commits and keep your Speed Layer running smoothly even during peak loads.

The Modern Backbone for
Real-Time Data and AI
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.