CREATE SINK IF NOT EXISTS pulsar_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'pulsar',
topic = 'test-topic',
service.url = 'pulsar://broker:6650',
oauth.issuer.url = 'https://issuer.com',
oauth.credentials.url = 'https://provider.com',
oauth.audience = 'test-aud',
oauth.scope = 'consume',
aws.credentials.access_key_id = 'xxx',
aws.credentials.secret_access_key = 'xxx'
)
FORMAT DEBEZIUM ENCODE JSON;