Batch Writes in Stream Processing: Building a PostgreSQL Sink with Batch Support

Batch Writes in Stream Processing: Building a PostgreSQL Sink with Batch Support

TL; DR

Writing records to a database one by one will eventually choke any stream processing system. By introducing a buffer and using both count-based and time-based flush triggers, you can batch writes, dramatically increase throughput, reduce database pressure, and still keep data fresh. This article walks through how to implement such a design by building a PostgreSQL sink with batch write support.

Introduction

In stream processing, “real-time” is often misunderstood as writing every single record to the database immediately. That’s like a delivery driver taking each order individually, stopping to deliver one at a time – slow, inefficient, and prone to getting stuck along the way.

As we discussed on Day 5, a smarter approach is to filter first, then batch insert into the database. Today, we’ll explore best practices for batch writing.

Building a PostgreSQL Sink with Batch Support

Note: All code in this article is pseudo-code for educational and conceptual illustration.

Focus on understanding the architecture and logic, not on running the exact code.

As mentioned earlier, batch inserts significantly improve throughput and performance. Here’s a simple implementation of a SimplePostgreSQLSink, which:

  • Receives data from upstream (e.g., after Kafka Consumer processing)

  • Buffers it temporarily

  • Flushes the buffer to the database once the batch size is reached

Initialization and Buffer Design

In the init method, we initialize a _buffer to temporarily store incoming data:

class SimplePostgreSQLSink(BaseSink):
    """
    PostgreSQL Sink with batch write support
    """
    def __init__(self, batch_size: int = 100):
        self.batch_size = batch_size
        self.connection = None  # DB connection

        # Batch-related
        self._buffer: List[Dict[str, Any]] = []

Write Logic: write()

This is the main entry point. For each incoming message:

  1. Add it to the buffer.

  2. If the buffer size reaches batch_size, flush to the database.

    ┌─────────────────────┐
    │       Buffer        │
    │                     │
    │ [data1, data2, ...] │
    └─────────────────────┘
           │
           ▼
    ┌──────────────────────┐
    │   Count Trigger      │
    │                      │
    │   size ≥ 100?        │
    │   YesFlush to DB  │
    └──────────────────────┘
def write(self, message: Dict[str, Any]):
    """
    Add the message to the buffer and flush to the database when batch_size is reached
    """
# Extract actual data from the message
    data = message.get('value', {})
    if not data:
        return

# Add data to the buffer
    self._buffer.append(data)

# Flush to the database when batch_size is reached
    if len(self._buffer) >= self.batch_size:
        # Buffer is full, perform batch insert
        # ...
        self._buffer.clear()  # Clear the buffer

Concept: Flush to the database once the buffer reaches the configured batch size.

Timer Mechanism: Handling Slow Fills

If data arrives slowly, the buffer may take a long time to fill, delaying writes.

Solution: add a time-based trigger in addition to the count trigger.

    ┌─────────────────────┐
    │       Buffer        │
    │                     │
    │ [data1, data2, ...] │
    └─────────────────────┘
           │         │
           ▼         ▼
    ┌──────────┐   ┌──────────┐
    │  Count   │   │  Timer   │
    │ Trigger  │   │ Trigger  │
    │ size ≥ 100 │ │ 100ms tick │
    └──────────┘   └──────────┘
           │         │
           └─────┬───┘
                 │
                 ▼
    ┌─────────────────────┐
    │    Flush to DB      │
    │                     │
    │ INSERT batch data   │
    │ Clear buffer        │
    └─────────────────────┘
def __init__(self, batch_size: int = 100):
    self.batch_size = batch_size
    self._buffer = []
    self._start_timer()  # Start the timer

def write(self, message):
    data = message.get('value', {})
    self._buffer.append(data)

    # Count-based trigger: flush when buffer is full
    if len(self._buffer) >= self.batch_size:
        self._flush_and_clear()

def _start_timer(self):
    # Check every 100ms whether a flush is needed
    timer = threading.Timer(0.1, self._timer_flush)
    timer.start()

def _timer_flush(self):
    if self._buffer:  # Flush if there is data
        self._flush_and_clear()
    self._start_timer()  # Restart the timer

Concept: Dual-trigger mechanism ensures data is processed in batches without excessive delay.

Real Examples: Batch Writes Everywhere

Batch write design is standard practice in the industry:

Popular Streaming Engines:

  • Apache Flink: supports batch sink with configurable batch.size and batch.interval

  • Apache Kafka Streams: built-in batch commit mechanism

Database Clients:

  • JDBC: executeBatch()

  • PostgreSQL: COPY or execute_values()

  • MongoDB: insertMany()

This is a necessary performance optimization adopted by all high-performance systems.

Summary

Batch writing is a core optimization in stream processing, solving the performance bottleneck caused by “one record at a time” writes.

Key Benefits of Batch Writes

  • Reduce DB pressure: 1,000 individual inserts become 10 batch inserts

  • Increase throughput: batch operations are 10–100× more efficient than single inserts

  • Lower network overhead: fewer round-trips to the database

Value of the Dual-Trigger Mechanism

  • Count trigger: ensures efficiency under high traffic

  • Time trigger: ensures timeliness under low traffic

  • Balanced approach: achieves optimal trade-off between throughput and latency

During peak events like Double 11 or Black Friday, this design keeps the system responsive rather than being bogged down by individual inserts. During off-peak hours, users still see timely updates.

What happens when consumers cannot keep up with producers? This leads to the importance of backpressure and flow control.

Day 7 Preview: Backpressure and Flow Control

Imagine your SimpleStreamEngine processes 1,000 records per second, but the database can only handle 500 records per second at maximum. The extra 500 records accumulate in memory, potentially causing OOM crashes.

Next time we will explore how backpressure can protect your system under high load.

The Modern Backbone for
Real-Time Data and AI
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.