TL; DR
Writing records to a database one by one will eventually choke any stream processing system. By introducing a buffer and using both count-based and time-based flush triggers, you can batch writes, dramatically increase throughput, reduce database pressure, and still keep data fresh. This article walks through how to implement such a design by building a PostgreSQL sink with batch write support.
Introduction
In stream processing, “real-time” is often misunderstood as writing every single record to the database immediately. That’s like a delivery driver taking each order individually, stopping to deliver one at a time – slow, inefficient, and prone to getting stuck along the way.
As we discussed on Day 5, a smarter approach is to filter first, then batch insert into the database. Today, we’ll explore best practices for batch writing.
Building a PostgreSQL Sink with Batch Support
Note: All code in this article is pseudo-code for educational and conceptual illustration.
Focus on understanding the architecture and logic, not on running the exact code.
As mentioned earlier, batch inserts significantly improve throughput and performance. Here’s a simple implementation of a SimplePostgreSQLSink, which:
Receives data from upstream (e.g., after Kafka Consumer processing)
Buffers it temporarily
Flushes the buffer to the database once the batch size is reached
Initialization and Buffer Design
In the init method, we initialize a _buffer to temporarily store incoming data:
class SimplePostgreSQLSink(BaseSink):
"""
PostgreSQL Sink with batch write support
"""
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
self.connection = None # DB connection
# Batch-related
self._buffer: List[Dict[str, Any]] = []
Write Logic: write()
This is the main entry point. For each incoming message:
Add it to the buffer.
If the buffer size reaches
batch_size, flush to the database.
┌─────────────────────┐
│ Buffer │
│ │
│ [data1, data2, ...] │
└─────────────────────┘
│
▼
┌──────────────────────┐
│ Count Trigger │
│ │
│ size ≥ 100? │
│ Yes → Flush to DB │
└──────────────────────┘
def write(self, message: Dict[str, Any]):
"""
Add the message to the buffer and flush to the database when batch_size is reached
"""
# Extract actual data from the message
data = message.get('value', {})
if not data:
return
# Add data to the buffer
self._buffer.append(data)
# Flush to the database when batch_size is reached
if len(self._buffer) >= self.batch_size:
# Buffer is full, perform batch insert
# ...
self._buffer.clear() # Clear the buffer
Concept: Flush to the database once the buffer reaches the configured batch size.
Timer Mechanism: Handling Slow Fills
If data arrives slowly, the buffer may take a long time to fill, delaying writes.
Solution: add a time-based trigger in addition to the count trigger.
┌─────────────────────┐
│ Buffer │
│ │
│ [data1, data2, ...] │
└─────────────────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Count │ │ Timer │
│ Trigger │ │ Trigger │
│ size ≥ 100 │ │ 100ms tick │
└──────────┘ └──────────┘
│ │
└─────┬───┘
│
▼
┌─────────────────────┐
│ Flush to DB │
│ │
│ INSERT batch data │
│ Clear buffer │
└─────────────────────┘
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
self._buffer = []
self._start_timer() # Start the timer
def write(self, message):
data = message.get('value', {})
self._buffer.append(data)
# Count-based trigger: flush when buffer is full
if len(self._buffer) >= self.batch_size:
self._flush_and_clear()
def _start_timer(self):
# Check every 100ms whether a flush is needed
timer = threading.Timer(0.1, self._timer_flush)
timer.start()
def _timer_flush(self):
if self._buffer: # Flush if there is data
self._flush_and_clear()
self._start_timer() # Restart the timer
Concept: Dual-trigger mechanism ensures data is processed in batches without excessive delay.
Real Examples: Batch Writes Everywhere
Batch write design is standard practice in the industry:
Popular Streaming Engines:
Apache Flink: supports batch sink with configurable
batch.sizeandbatch.intervalApache Kafka Streams: built-in batch commit mechanism
Database Clients:
JDBC:
executeBatch()PostgreSQL:
COPYorexecute_values()MongoDB:
insertMany()
This is a necessary performance optimization adopted by all high-performance systems.
Summary
Batch writing is a core optimization in stream processing, solving the performance bottleneck caused by “one record at a time” writes.
Key Benefits of Batch Writes
Reduce DB pressure: 1,000 individual inserts become 10 batch inserts
Increase throughput: batch operations are 10–100× more efficient than single inserts
Lower network overhead: fewer round-trips to the database
Value of the Dual-Trigger Mechanism
Count trigger: ensures efficiency under high traffic
Time trigger: ensures timeliness under low traffic
Balanced approach: achieves optimal trade-off between throughput and latency
During peak events like Double 11 or Black Friday, this design keeps the system responsive rather than being bogged down by individual inserts. During off-peak hours, users still see timely updates.
What happens when consumers cannot keep up with producers? This leads to the importance of backpressure and flow control.
Day 7 Preview: Backpressure and Flow Control
Imagine your SimpleStreamEngine processes 1,000 records per second, but the database can only handle 500 records per second at maximum. The extra 500 records accumulate in memory, potentially causing OOM crashes.
Next time we will explore how backpressure can protect your system under high load.

