




CREATE TABLE orders_rw (
   order_id varchar PRIMARY KEY,
   payload jsonb
) WITH (
   connector='mongodb-cdc',
   mongodb.url='mongodb://localhost:27017/?replicaSet=rs0',
   collection.name='dev.*'
);CREATE SINK kafka_sink AS
SELECT
    order_status,
    COUNT(*) as order_count,
    SUM(total_amount) as total_revenue,
    AVG(total_amount) as avg_order_value,
    MIN(last_updated) as first_order_time,
    MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
   connector='kafka',
   properties.bootstrap.server='localhost:9092',
   topic='test',
   properties.message.max.bytes = 2000
)
FORMAT PLAIN ENCODE JSON;