CREATE SOURCE orders_rw
WITH (
connector = 'iceberg',
warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse,
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY},
catalog.name='demo',
database.name='dev',
table.name='table'
);
CREATE SINK elasticsearch_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'elasticsearch',
primary_key = 'user_id',
index = 'user_index',
url = 'http://localhost:9200',
username = 'user_name',
password = 'secure_password',
delimiter = ','
);