You can replace your Kafka consumer code with streaming SQL. Instead of writing and maintaining Java or Python consumer applications, you declare your data processing logic as SQL statements, connect RisingWave directly to your Kafka topics, and query continuously updated results from a PostgreSQL-compatible interface. This approach cuts hundreds of lines of application code down to a handful of SQL statements while eliminating the operational burden of running, deploying, and debugging consumer processes.
The Kafka Consumer Problem
Every team running Kafka eventually accumulates consumer code. A new business requirement arrives: calculate revenue per region in real time. Someone writes a Python consumer that reads from the orders topic, aggregates by region in a dict, and flushes to Redis every 30 seconds. It works. Then a new requirement: detect failed orders immediately. Another consumer. Then enrichment with product metadata. Another consumer. Then a windowed count of events per minute.
Within months, the team is operating a fleet of consumer processes: each with its own offset management, its own error handling, its own deployment pipeline, and its own on-call rotation. A change as simple as adding a new field to an aggregation requires a code change, a PR review, a test run, and a deployment.
This is the Kafka consumer accumulation problem. The processing logic itself is simple and SQL-expressible. The surrounding infrastructure is not.
RisingWave is a PostgreSQL-compatible streaming database that connects directly to Kafka topics and continuously evaluates SQL queries against incoming data. You write CREATE MATERIALIZED VIEW instead of a consumer loop. RisingWave handles offset management, fault tolerance, and incremental computation automatically.
Side-by-Side: Python Consumer vs. Streaming SQL
The clearest way to understand the trade-off is a direct code comparison. Consider a common task: consume an orders topic from Kafka, filter to completed orders, group by region, and keep a running total of revenue.
The Python Consumer Approach
from confluent_kafka import Consumer, KafkaError
import json
import redis
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Configuration ---
KAFKA_BROKER = "broker:9092"
KAFKA_TOPIC = "orders"
KAFKA_GROUP_ID = "revenue-consumer-v1"
REDIS_HOST = "localhost"
# --- State ---
revenue_by_region = {}
def process_message(msg_value: dict):
if msg_value.get("status") != "completed":
return
region = msg_value.get("region", "unknown")
total = msg_value.get("quantity", 0) * msg_value.get("unit_price", 0.0)
revenue_by_region[region] = revenue_by_region.get(region, 0.0) + total
def flush_to_redis(r: redis.Redis):
for region, revenue in revenue_by_region.items():
r.set(f"revenue:{region}", revenue)
logger.info("Flushed %d regions to Redis", len(revenue_by_region))
def main():
consumer = Consumer({
"bootstrap.servers": KAFKA_BROKER,
"group.id": KAFKA_GROUP_ID,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
consumer.subscribe([KAFKA_TOPIC])
r = redis.Redis(host=REDIS_HOST, decode_responses=True)
msg_count = 0
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error("Consumer error: %s", msg.error())
break
try:
value = json.loads(msg.value().decode("utf-8"))
process_message(value)
msg_count += 1
except json.JSONDecodeError as e:
logger.warning("Failed to parse message: %s", e)
continue
consumer.commit(asynchronous=False)
if msg_count % 100 == 0:
flush_to_redis(r)
except KeyboardInterrupt:
pass
finally:
consumer.close()
if __name__ == "__main__":
main()
This is roughly 65 lines of application code. It does not include the requirements.txt, a Docker or systemd deployment configuration, log rotation setup, a dead-letter queue for malformed messages, or monitoring instrumentation. A production-hardened version easily reaches 150-200 lines.
What is this code doing functionally? Filtering by status, grouping by region, summing a computed value. That is three clauses in SQL.
The Streaming SQL Approach
-- Step 1: Tell RisingWave where your Kafka topic lives and what the schema looks like
CREATE TABLE kcons_orders (
order_id VARCHAR,
customer_id INT,
product_id VARCHAR,
quantity INT,
unit_price NUMERIC,
status VARCHAR,
region VARCHAR,
event_time TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'orders',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
-- Step 2: Define your processing logic as a materialized view
CREATE MATERIALIZED VIEW kcons_order_summary_mv AS
SELECT
region,
COUNT(*) AS order_count,
SUM(quantity * unit_price) AS total_revenue,
AVG(quantity * unit_price) AS avg_order_value
FROM kcons_orders
WHERE status = 'completed'
GROUP BY region;
Two SQL statements. No consumer loop. No offset commits. No Redis flush logic. No deployment pipeline beyond running the SQL.
RisingWave reads from the Kafka topic, parses JSON, applies the filter and aggregation continuously, and keeps kcons_order_summary_mv up to date. To query the current state, you run a standard SELECT:
SELECT * FROM kcons_order_summary_mv ORDER BY total_revenue DESC;
region | order_count | total_revenue | avg_order_value
----------+-------------+---------------+-----------------
us-east | 3 | 359.91 | 119.97
eu-west | 2 | 268.98 | 134.49
ap-south | 2 | 238.99 | 119.4950
Any PostgreSQL-compatible client can query this: psql, Grafana, Metabase, SQLAlchemy, JDBC. The result is always current, updated incrementally as each new Kafka message arrives.
Line Count Comparison
| Task | Python consumer | RisingWave SQL |
| Kafka connection | ~10 lines | WITH (connector='kafka', ...) |
| Schema definition | Implicit / JSON parsing | Column list in CREATE TABLE |
| Filter logic | if statement | WHERE clause |
| Aggregation | Dict accumulation | GROUP BY |
| State persistence | Redis flush loop | Built-in (automatic) |
| Offset management | consumer.commit() | Automatic |
| Fault tolerance | Restart logic | Built-in checkpoint |
| Total | 65-200+ lines | ~20 lines |
How RisingWave Processes Kafka Data
RisingWave is built around incremental computation. When you create a materialized view on top of a Kafka-backed table, RisingWave builds a dataflow graph that processes each incoming message exactly once and propagates changes to all downstream views.
The data flow looks like this:
Kafka Topic
|
v
CREATE TABLE (Kafka connector reads, parses JSON)
|
+---> CREATE MATERIALIZED VIEW mv_1 (aggregation)
|
+---> CREATE MATERIALIZED VIEW mv_2 (join + enrichment)
|
+---> CREATE MATERIALIZED VIEW mv_3 (filter for errors)
When a new message arrives on the Kafka topic, all three materialized views update in a single incremental pass. You do not run three separate consumer processes. RisingWave handles fan-out internally.
Materialized views in RisingWave are not cached query results that get stale. They are continuously maintained projections of your streaming data. The moment a Kafka message lands, the view reflects it. When you query the view, you get low-latency reads from RisingWave's own storage layer, not another Kafka poll.
Hands-On: Three Consumer Patterns Replaced with SQL
The following examples use a verified local RisingWave setup (version 2.8.0). For demonstration, the kcons_orders table uses direct inserts instead of a live Kafka topic, but the SQL is identical to what you would use with a Kafka connector.
Pattern 1: Aggregation Consumer
The old way (Python): maintain an in-memory dict, flush periodically to a store.
The SQL way: one materialized view.
CREATE MATERIALIZED VIEW kcons_order_summary_mv AS
SELECT
region,
COUNT(*) AS order_count,
SUM(quantity * unit_price) AS total_revenue,
AVG(quantity * unit_price) AS avg_order_value
FROM kcons_orders
WHERE status = 'completed'
GROUP BY region;
Query result:
SELECT * FROM kcons_order_summary_mv ORDER BY total_revenue DESC;
region | order_count | total_revenue | avg_order_value
----------+-------------+---------------+-----------------
us-east | 3 | 359.91 | 119.97
eu-west | 2 | 268.98 | 134.49
ap-south | 2 | 238.99 | 119.4950
No periodic flush. The view is always current.
Pattern 2: Enrichment Consumer
The old way (Java): use a KafkaConsumer in one thread, look up reference data from a database or cache in another, join them in application code.
The SQL way: a JOIN in a materialized view.
CREATE MATERIALIZED VIEW kcons_enriched_orders_mv AS
SELECT
o.order_id,
o.customer_id,
p.product_name,
p.category,
o.quantity,
o.unit_price,
o.quantity * o.unit_price AS order_total,
o.status,
o.region,
o.event_time
FROM kcons_orders o
JOIN kcons_products p ON o.product_id = p.product_id;
Query result:
SELECT order_id, product_name, category, order_total, status, region
FROM kcons_enriched_orders_mv
ORDER BY event_time;
order_id | product_name | category | order_total | status | region
----------+---------------------+-------------+-------------+-----------+----------
ORD-1001 | Wireless Headphones | electronics | 59.98 | completed | us-east
ORD-1002 | Office Chair | furniture | 149.00 | completed | eu-west
ORD-1003 | Running Shoes | apparel | 179.97 | completed | us-east
ORD-1004 | Coffee Maker | appliances | 89.99 | completed | ap-south
ORD-1005 | Wireless Headphones | electronics | 149.95 | failed | us-east
ORD-1006 | Running Shoes | apparel | 119.98 | completed | eu-west
ORD-1007 | Office Chair | furniture | 149.00 | completed | ap-south
ORD-1008 | Wireless Headphones | electronics | 119.96 | completed | us-east
The kcons_products table can be populated from a CDC source (PostgreSQL, MySQL), static inserts, or another Kafka topic. RisingWave maintains the join automatically as either side changes.
Pattern 3: Alerting / Filter Consumer
The old way: a dedicated consumer that reads every message, checks a condition, and sends an alert if the condition is met.
The SQL way: a filtered materialized view. Downstream systems poll it or receive updates via a sink.
CREATE MATERIALIZED VIEW kcons_order_errors_mv AS
SELECT
order_id,
customer_id,
product_id,
quantity * unit_price AS order_total,
status,
region,
event_time
FROM kcons_orders
WHERE status != 'completed';
Query result:
SELECT * FROM kcons_order_errors_mv;
order_id | customer_id | product_id | order_total | status | region | event_time
----------+-------------+------------+-------------+--------+---------+---------------------
ORD-1005 | 104 | SKU-A100 | 149.95 | failed | us-east | 2026-04-01 10:03:05
To push these alerts downstream (to Slack, PagerDuty, another Kafka topic), add a sink connector:
CREATE SINK kcons_error_alerts_sink
FROM kcons_order_errors_mv
WITH (
connector = 'kafka',
topic = 'order-alerts',
properties.bootstrap.server = 'broker:9092'
) FORMAT PLAIN ENCODE JSON;
One statement replaces the entire filter consumer process plus the alerting integration layer.
Pattern 4: Windowed Aggregation Consumer
Windowed processing is where consumer code gets especially verbose. Tracking window boundaries, handling late arrivals, and expiring old state all require significant boilerplate in application code.
The SQL way: use RisingWave's TUMBLE function.
CREATE MATERIALIZED VIEW kcons_revenue_by_region_mv AS
SELECT
region,
window_start,
window_end,
SUM(quantity * unit_price) AS total_revenue,
COUNT(*) AS order_count
FROM TUMBLE(kcons_orders, event_time, INTERVAL '2' MINUTE)
WHERE status = 'completed'
GROUP BY region, window_start, window_end;
Query result:
SELECT * FROM kcons_revenue_by_region_mv ORDER BY window_start, total_revenue DESC;
region | window_start | window_end | total_revenue | order_count
----------+---------------------+---------------------+---------------+-------------
eu-west | 2026-04-01 10:00:00 | 2026-04-01 10:02:00 | 149.00 | 1
us-east | 2026-04-01 10:00:00 | 2026-04-01 10:02:00 | 59.98 | 1
us-east | 2026-04-01 10:02:00 | 2026-04-01 10:04:00 | 179.97 | 1
eu-west | 2026-04-01 10:02:00 | 2026-04-01 10:04:00 | 119.98 | 1
ap-south | 2026-04-01 10:02:00 | 2026-04-01 10:04:00 | 89.99 | 1
ap-south | 2026-04-01 10:04:00 | 2026-04-01 10:06:00 | 149.00 | 1
us-east | 2026-04-01 10:04:00 | 2026-04-01 10:06:00 | 119.96 | 1
RisingWave supports TUMBLE (fixed, non-overlapping windows), HOP (sliding windows), and SESSION windows. For a detailed guide on windowing semantics, see Windowing in Stream Processing.
What Happens to Offsets and Fault Tolerance?
One of the most common concerns when moving away from consumer code is: "who manages Kafka offsets?"
RisingWave manages offsets internally as part of its checkpoint mechanism. When you create a table with the Kafka connector, RisingWave creates a consumer group and commits offsets in sync with its periodic checkpoints (default: every 10 seconds). If RisingWave restarts, it resumes from the last checkpointed offset. This provides exactly-once semantics for ingestion.
You do not write consumer.commit(). You do not configure enable.auto.commit. You do not implement retry logic for failed offset commits. This is all handled by the system.
To control where consumption starts:
-- Start from the beginning of the topic
WITH (scan.startup.mode = 'earliest')
-- Start from the most recent messages
WITH (scan.startup.mode = 'latest')
-- Start from a specific point in time
WITH (
scan.startup.mode = 'timestamp',
scan.startup.timestamp.millis = '1743480000000'
)
For authenticated Kafka clusters, RisingWave supports SASL/SCRAM, SSL/TLS, and AWS MSK IAM authentication via properties in the WITH clause. See the Kafka source documentation for the full list of connection options.
Operational Comparison
Replacing consumer code with streaming SQL changes more than the line count. It changes the operational model entirely.
| Concern | Consumer application | RisingWave SQL |
| Deployment | Package + deploy a process | Run a SQL statement |
| Scaling | Increase partition count, add consumer instances | RisingWave scales horizontally |
| Change logic | Code change + PR + redeploy | Recreate the materialized view |
| Monitor lag | External Kafka lag tooling | Query rw_kafka_lag system table |
| State persistence | External store (Redis, Postgres) | Built-in storage |
| Offset management | consumer.commit() in code | Automatic with checkpoints |
| Multiple consumers | N separate processes | One RisingWave, N materialized views |
| Schema registry | Custom deserializer code | FORMAT PLAIN ENCODE JSON or Avro/Protobuf |
The most significant operational win is the last row: multiple consumers. Every materialized view you add is a new "consumer" that processes data without adding a new process to your infrastructure. RisingWave fans the data out internally.
When Consumer Code Still Makes Sense
Streaming SQL with RisingWave covers a large majority of Kafka consumer workloads. There are a few cases where custom consumer code remains the better tool:
Custom ML inference in the hot path: If each message needs to pass through a machine learning model before any SQL transformation, an application-level consumer that calls a model serving endpoint is often cleaner. RisingWave supports user-defined functions in Python, Java, and JavaScript, which can handle some of these cases, but a tightly integrated inference pipeline may be better served by application code.
Complex event pattern matching: Detecting a sequence of events across multiple messages (for example, "user viewed product, then added to cart, then abandoned for 30 minutes") is expressible in SQL with window functions, but if you have very complex CEP requirements with many interdependent patterns, purpose-built CEP frameworks have a natural advantage.
Proprietary binary protocols: If your Kafka messages use a custom binary format that requires business-specific decoding logic, you may need a consumer-side deserializer before the data can be expressed as typed SQL columns. Protobuf and Avro schemas are handled natively; ad-hoc binary formats are not.
For everything else (aggregations, enrichment, filtering, windowed counts, stream-table joins, multi-topic merging), SQL is the cleaner path.
Getting Started
To replace your first consumer with streaming SQL, you need RisingWave running and a Kafka topic to connect to.
Install RisingWave (macOS):
brew install risingwave
risingwave
Or with Docker:
docker run -it --pull=always -p 4566:4566 -p 5691:5691 \
risingwavelabs/risingwave:latest single_node
Connect with psql:
psql -h localhost -p 4566 -U root -d dev
Connect to your first Kafka topic:
CREATE TABLE my_events (
-- your fields here
event_id VARCHAR,
user_id INT,
event_type VARCHAR,
payload JSONB,
created_at TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'your-topic-name',
properties.bootstrap.server = 'your-broker:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Define your processing logic:
CREATE MATERIALIZED VIEW my_first_streaming_view AS
SELECT
event_type,
COUNT(*) AS event_count
FROM my_events
GROUP BY event_type;
From there, query my_first_streaming_view from any PostgreSQL client, connect Grafana with the PostgreSQL data source, or use a SQLAlchemy connection string in your application.
For a complete walkthrough, the RisingWave quickstart guide covers setup to first query in under five minutes.
FAQ
Can RisingWave connect to multiple Kafka topics at the same time?
Yes. Each CREATE TABLE with a Kafka connector reads from one topic, and you can create as many tables as your cluster supports. Materialized views can join, union, or otherwise combine data from multiple Kafka-backed tables in a single query. This replaces the pattern of having separate consumer processes for each topic.
What happens if my Kafka topic uses Avro or Protobuf encoding?
RisingWave supports both. Use FORMAT PLAIN ENCODE AVRO with a schema registry URL, or FORMAT PLAIN ENCODE PROTOBUF with either a schema registry or a local .proto file. The schema is resolved at table creation time, so RisingWave knows the field types without any custom deserializer code. See the Kafka source documentation for the full list of encoding options.
How does this compare to ksqlDB?
Both tools let you write SQL against Kafka topics. The key differences are architecture and SQL completeness. ksqlDB runs on the JVM and wraps Kafka Streams, so it inherits Kafka's storage model. RisingWave is a standalone streaming database with its own storage layer, supports full PostgreSQL-compatible SQL (including complex subqueries and arbitrary joins), and uses Apache 2.0 licensing with no usage restrictions. For a detailed comparison, see RisingWave vs. ksqlDB.
Will switching from consumer code break my existing downstream applications?
Downstream applications that read from a database or Redis can point to RisingWave materialized views instead, since RisingWave is PostgreSQL wire-compatible. Applications that consume from a downstream Kafka topic can continue unchanged if you add a RisingWave sink that writes the processed output to that topic. The upstream consumer process can then be retired. The migration can be done incrementally: run both the old consumer and the RisingWave view in parallel, verify the results match, then cut over.
Conclusion
Kafka consumer code accumulates because each new processing requirement looks small in isolation. A filter here, an aggregation there, an enrichment join somewhere else. Over time, the team operates a fleet of consumer processes, each representing maintenance burden and operational risk.
Streaming SQL with RisingWave addresses the root cause. Processing logic that is fundamentally SQL-expressible should be written in SQL. RisingWave handles the Kafka connection, offset management, incremental state maintenance, and fault tolerance. Your team writes CREATE MATERIALIZED VIEW instead of consumer loops, and the result is queryable from any PostgreSQL client the moment it is created.
The side-by-side comparison is unambiguous: 65-200 lines of Python or Java become 15-20 lines of SQL, and the SQL version has better fault tolerance, no deployment pipeline, and no external state store dependency.
If you have a Kafka topic and a processing requirement that fits in a SQL query, that is the fastest path to replace your next consumer application.

