Event-Driven Microservices with Streaming SQL Instead of Kafka Consumers

Event-Driven Microservices with Streaming SQL Instead of Kafka Consumers

Event-driven microservices promise loose coupling and real-time responsiveness. In practice, they deliver something else: hundreds of lines of consumer boilerplate, hand-rolled state management, and retry logic that nobody wants to maintain. If you have ever spent a week debugging offset commits or consumer group rebalances, you already know the pain.

The core problem is not Kafka itself. Kafka is an excellent distributed log. The problem is the imperative consumer code that every downstream service must write and operate. Filtering, joining, aggregating - these are data transformations, and they are better expressed declaratively. A streaming SQL engine like RisingWave lets you define these transformations as materialized views that read directly from Kafka topics, maintain results incrementally, and serve queries with millisecond latency.

This article shows how to replace custom Kafka consumer code with declarative SQL. You will see side-by-side comparisons of Java and Python consumer logic versus equivalent SQL materialized views, walk through verified examples running on RisingWave 2.8.0, and learn when this approach makes sense for your architecture.

The Cost of Custom Kafka Consumer Code

Every team that builds event-driven services on Kafka eventually writes the same code. A consumer subscribes to one or more topics, polls for records in a loop, deserializes payloads, applies business logic (filtering, enrichment, aggregation), manages offsets, handles errors, and writes results somewhere downstream.

Here is what a typical Java consumer looks like when all you want to do is filter high-value orders:

// Java: ~60 lines just to filter orders above $1,000
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "high-value-orders-consumer");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            Order order = objectMapper.readValue(
                record.value(), Order.class);
            if (order.getAmount() > 1000.0) {
                // Write to downstream topic or database
                processHighValueOrder(order);
            }
        }
        consumer.commitSync();
    }
} catch (Exception e) {
    logger.error("Consumer failed", e);
} finally {
    consumer.close();
}

The Python version using confluent-kafka is equally verbose:

# Python: ~40 lines for the same filter
from confluent_kafka import Consumer
import json

conf = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'high-value-orders-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}

consumer = Consumer(conf)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            handle_error(msg.error())
            continue
        order = json.loads(msg.value().decode('utf-8'))
        if order['amount'] > 1000:
            process_high_value_order(order)
        consumer.commit(asynchronous=False)
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Both examples are simplified. A production consumer also needs deserialization error handling, dead-letter queue logic, metrics emission, graceful shutdown hooks, and health check endpoints. Multiply this across ten or twenty microservices and you have a significant maintenance burden.

The business logic in both examples is one line: amount > 1000. Everything else is plumbing.

Streaming SQL: Declare What You Want, Not How to Get It

A streaming SQL engine inverts the model. Instead of writing imperative code that describes how to consume and process records, you write a declarative query that describes what result you want. The engine handles consumption, state management, offset tracking, and incremental updates.

RisingWave connects directly to Kafka topics using the CREATE SOURCE statement. Once a source is defined, you create materialized views that continuously process incoming events and store the results for low-latency queries.

Here is the Kafka source definition:

CREATE SOURCE orders_source (
    order_id VARCHAR,
    customer_id VARCHAR,
    product VARCHAR,
    amount DOUBLE PRECISION,
    region VARCHAR,
    order_status VARCHAR,
    created_at TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'orders',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

And here is the entire filter logic that replaces both the Java and Python consumers above:

CREATE MATERIALIZED VIEW high_value_orders AS
SELECT
    order_id,
    customer_id,
    product,
    amount,
    region,
    created_at
FROM orders_source
WHERE amount > 1000;

Two SQL statements replace 60+ lines of Java or 40+ lines of Python. No poll loops, no offset management, no deserialization code, no error handling boilerplate. RisingWave reads from the Kafka topic, applies the filter incrementally as new records arrive, and maintains the result set automatically.

Querying the materialized view returns current results instantly:

SELECT * FROM high_value_orders ORDER BY amount DESC;
 order_id | customer_id |   product   | amount |  region  |        created_at
----------+-------------+-------------+--------+----------+---------------------------
 ORD-1005 | C001        | Sensor Pack |   8900 | us-east  | 2026-04-01 08:00:00+00:00
 ORD-1007 | C003        | Sensor Pack |   4500 | ap-south | 2026-04-01 10:00:00+00:00
 ORD-1003 | C001        | Widget Pro  |   3200 | us-east  | 2026-03-31 09:00:00+00:00
 ORD-1001 | C001        | Widget Pro  |   2500 | us-east  | 2026-03-30 10:00:00+00:00
 ORD-1006 | C002        | Widget Pro  |   2500 | eu-west  | 2026-04-01 09:30:00+00:00
(5 rows)

Verified on RisingWave 2.8.0.

Side-by-Side: Three Common Consumer Patterns Replaced with SQL

Filtering is the simplest case. Real microservice architectures involve joining streams with reference data, aggregating metrics, and maintaining stateful views. Let us compare each pattern.

Pattern 1: Stream Enrichment (Join with Reference Data)

A common consumer pattern reads order events from Kafka, looks up customer details from a database, and writes the enriched record downstream.

Java consumer approach:

// Inside the consumer loop:
for (ConsumerRecord<String, String> record : records) {
    Order order = objectMapper.readValue(record.value(), Order.class);

    // Blocking DB call for every record
    Customer customer = customerDao.findById(order.getCustomerId());

    EnrichedOrder enriched = new EnrichedOrder(
        order.getOrderId(),
        order.getProduct(),
        order.getAmount(),
        order.getRegion(),
        order.getOrderStatus(),
        customer.getName(),
        customer.getTier(),
        order.getCreatedAt()
    );
    enrichedOrderProducer.send(enriched);
}

This code has a hidden performance trap: a synchronous database call per record. At high throughput, the consumer stalls waiting on the database. Teams work around this with local caches, batch lookups, or async database clients, adding more complexity.

Streaming SQL approach:

CREATE MATERIALIZED VIEW order_enriched AS
SELECT
    o.order_id,
    o.product,
    o.amount,
    o.region,
    o.order_status,
    c.customer_name,
    c.tier,
    o.created_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id;

The join is maintained incrementally. When a new order arrives, RisingWave looks up the matching customer and updates the view. When customer data changes (say, a tier upgrade), all related orders in the view update automatically.

SELECT * FROM order_enriched ORDER BY created_at DESC LIMIT 5;
 order_id |   product   | amount |  region  | order_status | customer_name |    tier    |        created_at
----------+-------------+--------+----------+--------------+---------------+------------+---------------------------
 ORD-1007 | Sensor Pack |   4500 | ap-south | completed    | Initech       | enterprise | 2026-04-01 10:00:00+00:00
 ORD-1006 | Widget Pro  |   2500 | eu-west  | pending      | Globex Inc    | standard   | 2026-04-01 09:30:00+00:00
 ORD-1005 | Sensor Pack |   8900 | us-east  | completed    | Acme Corp     | enterprise | 2026-04-01 08:00:00+00:00
 ORD-1004 | Gadget Lite |     75 | ap-south | completed    | Initech       | enterprise | 2026-03-31 12:00:00+00:00
 ORD-1003 | Widget Pro  |   3200 | us-east  | pending      | Acme Corp     | enterprise | 2026-03-31 09:00:00+00:00
(5 rows)

Verified on RisingWave 2.8.0.

Pattern 2: Real-Time Aggregation (Revenue per Region)

Aggregation is where imperative consumers get truly painful. You need a state store, and you need to handle late-arriving data, rebalancing, and state recovery.

Python consumer with stateful aggregation:

# Stateful aggregation requires manual state management
from collections import defaultdict

revenue = defaultdict(lambda: {'count': 0, 'total': 0.0, 'sum_for_avg': 0.0})

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    order = json.loads(msg.value().decode('utf-8'))
    if order['order_status'] == 'completed':
        region = order['region']
        revenue[region]['count'] += 1
        revenue[region]['total'] += order['amount']
        revenue[region]['sum_for_avg'] = (
            revenue[region]['total'] / revenue[region]['count']
        )
    # Where do you persist this state?
    # What happens on consumer restart?
    # How do you handle rebalancing?
    consumer.commit(asynchronous=False)

This in-memory state vanishes on restart. Making it durable requires an external store (Redis, RocksDB, a database), serialization logic, and recovery code. Kafka Streams solves some of this with built-in state stores, but you still write and maintain Java code for the processing topology.

Streaming SQL approach:

CREATE MATERIALIZED VIEW revenue_per_region AS
SELECT
    region,
    COUNT(*) AS total_orders,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value
FROM orders
WHERE order_status = 'completed'
GROUP BY region;

One statement. The state is durable, survives restarts, and updates incrementally as new completed orders arrive.

SELECT * FROM revenue_per_region ORDER BY total_revenue DESC;
  region  | total_orders | total_revenue | avg_order_value
----------+--------------+---------------+-----------------
 us-east  |            2 |         11400 |            5700
 ap-south |            2 |          4575 |          2287.5
 eu-west  |            1 |           150 |             150
(3 rows)

Verified on RisingWave 2.8.0.

Pattern 3: Multi-Source Join with Aggregation

The most complex consumer pattern combines multiple sources, joins, and aggregation. In imperative code, this typically requires multiple consumers, a shared state store, and careful coordination.

Streaming SQL approach:

CREATE MATERIALIZED VIEW order_stats_by_customer AS
SELECT
    c.customer_name,
    c.tier,
    COUNT(*) AS order_count,
    SUM(o.amount) AS lifetime_value,
    MAX(o.created_at) AS last_order_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
GROUP BY c.customer_name, c.tier;

This single materialized view replaces what would typically be a consumer reading from Kafka, looking up customer data, maintaining per-customer aggregates in a state store, and exposing results through a separate query service. RisingWave handles the join, the aggregation, the state, and the serving layer.

SELECT * FROM order_stats_by_customer ORDER BY lifetime_value DESC;
 customer_name |    tier    | order_count | lifetime_value |       last_order_at
---------------+------------+-------------+----------------+---------------------------
 Acme Corp     | enterprise |           3 |          14600 | 2026-04-01 08:00:00+00:00
 Initech       | enterprise |           2 |           4575 | 2026-04-01 10:00:00+00:00
 Globex Inc    | standard   |           2 |           2650 | 2026-04-01 09:30:00+00:00
(3 rows)

Verified on RisingWave 2.8.0.

Architecture: Where Streaming SQL Fits in an Event-Driven System

Replacing consumer code with streaming SQL does not mean removing Kafka. Kafka remains your event backbone, providing durable, ordered, replayable event storage. The change is in what sits downstream.

graph LR
    A[Producer Services] -->|Events| B[Apache Kafka]
    B -->|CREATE SOURCE| C[RisingWave]
    C -->|Materialized Views| D[Query API / Dashboard]
    C -->|Sink to Kafka| E[Downstream Topics]
    C -->|Sink to DB| F[PostgreSQL / Iceberg]

In the traditional model, each downstream service runs its own consumer, manages its own state, and exposes its own query interface. With streaming SQL, RisingWave acts as a shared compute layer:

  • Sources connect to Kafka topics (and other systems like Pulsar, Kinesis, or CDC streams from PostgreSQL).
  • Materialized views define the processing logic declaratively.
  • Sinks push results to downstream Kafka topics, databases, or data lakes when other services need the processed data.
  • Direct queries let applications read from materialized views using any PostgreSQL-compatible client, since RisingWave speaks the PostgreSQL wire protocol.

This architecture collapses the consumer-state-store-query-service pattern into a single layer. Instead of operating N consumer services with N state stores, you operate one streaming database with N materialized views.

When This Approach Works Best

Streaming SQL is the right choice when:

  • The processing logic is data transformation: filtering, joining, aggregating, windowing. These operations map naturally to SQL.
  • Multiple consumers duplicate similar logic: if three services all read from the same Kafka topic and apply slightly different filters, three materialized views are simpler.
  • You need queryable results: materialized views are directly queryable, eliminating the need for a separate serving layer.
  • Your team knows SQL better than Java/Python stream processing: SQL is a more widely known skill than Kafka Streams or Flink APIs.

When to Keep Imperative Consumers

Streaming SQL is not a universal replacement. Keep custom consumers when:

  • The processing involves complex external API calls: calling a third-party REST API for each event is not a SQL operation.
  • You need custom retry and circuit-breaker logic: imperative code gives you fine-grained control over failure handling.
  • The logic is heavily procedural: multi-step workflows with conditional branching are better expressed in code.
  • You need sub-millisecond latency per event: while RisingWave processes events in low-millisecond latency, a hand-optimized consumer can be faster for single-event processing.

Getting Started: From Kafka Consumer to Materialized View

Here is a practical migration path for teams currently running Kafka consumers.

Step 1: Identify candidate consumers

Look for consumers that primarily do one of these operations:

  • Filter events by field values
  • Enrich events by joining with reference data
  • Aggregate events (counts, sums, averages, windows)
  • Route events to different topics based on content

These are all operations that map directly to SQL.

Step 2: Define your Kafka source in RisingWave

Connect RisingWave to your existing Kafka topic. The schema must match your event format:

CREATE SOURCE orders_source (
    order_id VARCHAR,
    customer_id VARCHAR,
    product VARCHAR,
    amount DOUBLE PRECISION,
    region VARCHAR,
    order_status VARCHAR,
    created_at TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'orders',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

RisingWave supports JSON, Avro, Protobuf, and CSV encodings. For Avro with a schema registry, add the schema.registry property. See the full connector documentation for details.

Step 3: Create materialized views for your processing logic

Translate the consumer's business logic into SQL. For most filtering, joining, and aggregation patterns, this is straightforward:

-- Replace a filter consumer
CREATE MATERIALIZED VIEW high_value_orders AS
SELECT * FROM orders_source WHERE amount > 1000;

-- Replace a join-and-enrich consumer
CREATE MATERIALIZED VIEW enriched_orders AS
SELECT o.*, c.customer_name, c.tier
FROM orders_source o
JOIN customers c ON o.customer_id = c.customer_id;

-- Replace an aggregation consumer
CREATE MATERIALIZED VIEW revenue_by_region AS
SELECT region, SUM(amount) AS revenue, COUNT(*) AS orders
FROM orders_source
WHERE order_status = 'completed'
GROUP BY region;

Step 4: Point downstream services to the materialized views

Since RisingWave is PostgreSQL-compatible, any application that can connect to PostgreSQL can query materialized views directly. Use your existing PostgreSQL driver (psycopg2, JDBC, node-postgres) and point it to RisingWave.

If downstream services need events pushed to them (rather than pulling via queries), use a RisingWave sink to write results to another Kafka topic, a PostgreSQL table, or an Apache Iceberg table.

Step 5: Run both systems in parallel, then cut over

Run the materialized view alongside the existing consumer. Compare outputs to verify correctness. Once you are confident the results match, decommission the consumer.

What Is a Streaming SQL Engine and How Does It Differ from ksqlDB?

A streaming SQL engine is a database designed to process continuously arriving data using SQL queries. Unlike a traditional database that runs queries against data at rest, a streaming SQL engine evaluates queries incrementally as new data arrives and maintains up-to-date results in materialized views.

RisingWave and ksqlDB both offer SQL over streaming data, but they differ in architecture. ksqlDB is tightly coupled to Kafka and runs as part of the Kafka ecosystem. RisingWave is an independent streaming database that connects to Kafka (and many other sources) as an external system. This means RisingWave can ingest from Kafka, Pulsar, Kinesis, PostgreSQL CDC, and S3 simultaneously, and it stores its own state in a cloud-native storage layer rather than relying on Kafka's changelog topics for state management. RisingWave also supports standard PostgreSQL syntax and wire protocol, making it accessible to any tool that works with PostgreSQL.

How Do Materialized Views Handle Late-Arriving or Out-of-Order Events?

Materialized views in RisingWave process events based on their arrival order by default. For use cases that require event-time processing, such as windowed aggregations where late data must be handled correctly, RisingWave supports watermarks. A watermark defines how far behind the current event time the system should wait before considering a window complete. You define watermarks in the source definition, and the engine uses them to manage late-arriving data across all downstream materialized views. See the watermark documentation for configuration details.

Can Streaming SQL Scale to Handle High-Throughput Kafka Topics?

Yes. RisingWave is designed for horizontal scalability. It separates compute from storage, allowing you to scale processing independently of state storage. For high-throughput Kafka topics, RisingWave reads from multiple Kafka partitions in parallel, distributes processing across compute nodes, and checkpoints state to object storage (S3, GCS, or Azure Blob) for durability. In production deployments, RisingWave Cloud handles scaling automatically based on workload.

When Should I Keep Kafka Consumers Instead of Using Streaming SQL?

Keep Kafka consumers when your processing logic requires side effects that go beyond data transformation. Examples include calling external REST APIs per event, implementing complex retry policies with exponential backoff, orchestrating multi-step workflows with conditional branching, or processing that requires libraries not available in SQL (machine learning model inference, image processing). The best architectures often combine both: streaming SQL for data transformation and aggregation, imperative consumers for side-effect-heavy processing.

Conclusion

Event-driven microservices do not require event-driven boilerplate. The patterns that account for most Kafka consumer code - filtering, joining, aggregating, and routing - are data transformations that SQL expresses more concisely and maintains more reliably than imperative code.

Key takeaways:

  • Kafka stays: Streaming SQL replaces consumer code, not Kafka. Your event backbone remains unchanged.
  • SQL replaces boilerplate: A single CREATE MATERIALIZED VIEW statement replaces 40-60+ lines of consumer code for common patterns.
  • State is handled for you: No manual state stores, no offset management, no recovery logic.
  • Results are queryable: Materialized views serve queries directly via the PostgreSQL protocol, eliminating separate serving layers.
  • Migration is incremental: Run materialized views alongside existing consumers, verify correctness, then cut over one service at a time.

The shift from imperative consumers to declarative SQL is not theoretical. Teams running RisingWave in production have replaced dozens of consumer services with materialized views, reducing operational complexity while maintaining (or improving) latency and correctness.


Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart →

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.