Move your data from Apache Kafka to Elasticsearch, continuously

Continuously ingest data from different sources, transform data on-the-fly, and then deliver data to any destinations using RisingWave’s connectors.
Apache Kafka
→
RisingWave
→
Elasticsearch
Apache Kafka
↓-
RisingWave
↓-
Elasticsearch
Apache Kafka
|
CREATE SOURCE IF NOT EXISTS orders_rw (
    order_id INTEGER PRIMARY KEY,
    customer_id INTEGER,
    order_status VARCHAR,
    total_amount DECIMAL,
    last_updated TIMESTAMP)
WITH (
   connector='kafka',
   topic='demo_topic',
   properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
   scan.startup.mode='latest',
   scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
   message = 'message_name',
   schema.registry = 'http://127.0.0.1:8081'
);
For comprehensive configuration details, please refer to the Kafka connector documentation.
|
RisingWave
|
CREATE SINK elasticsearch_sink AS
SELECT
    order_status,
    COUNT(*) as order_count,
    SUM(total_amount) as total_revenue,
    AVG(total_amount) as avg_order_value,
    MIN(last_updated) as first_order_time,
    MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
    connector = 'elasticsearch',
    primary_key = 'user_id',
    index = 'user_index',
    url = 'http://localhost:9200',
    username = 'user_name',
    password = 'secure_password',
    delimiter = ','
);
For comprehensive configuration details, please refer to the Elasticsearch connector documentation.
|
Elasticsearch
The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.