Getting Started with RisingWave: Your First Streaming Pipeline
Process Apache Kafka messages using PostgreSQL-compatible SQL in RisingWave. No Java consumer code, no Kafka Streams, no Flink — just SQL.
Step-by-Step
1. Create a Kafka Source
CREATE SOURCE user_events (
user_id INT, event_type VARCHAR, page VARCHAR, ts TIMESTAMP WITH TIME ZONE
) WITH (
connector = 'kafka',
topic = 'user-events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
2. Process with SQL
-- Count events per type in the last 5 minutes
CREATE MATERIALIZED VIEW event_counts AS
SELECT event_type, COUNT(*) as cnt
FROM user_events WHERE ts > NOW() - INTERVAL '5 minutes'
GROUP BY event_type;
-- Filter and transform
CREATE MATERIALIZED VIEW error_events AS
SELECT * FROM user_events WHERE event_type = 'error';
3. Query Results
SELECT * FROM event_counts;
-- Or connect any PostgreSQL client (Grafana, Python, Node.js)
4. Sink Back to Kafka (optional)
CREATE SINK errors_to_kafka AS SELECT * FROM error_events
WITH (connector='kafka', topic='error-alerts', properties.bootstrap.server='kafka:9092')
FORMAT PLAIN ENCODE JSON;
Frequently Asked Questions
Can I process Avro/Protobuf Kafka messages?
Yes. RisingWave supports JSON, Avro, Protobuf, and CSV encoding for Kafka sources. Specify with FORMAT and ENCODE clauses.
What Kafka features does RisingWave support?
Consumer groups, topic partitioning, offset management (earliest/latest), and exactly-once semantics via checkpointing.

