CREATE SOURCE IF NOT EXISTS orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP)
WITH (
connector = 'kafka',
topic = 'topic_0',
properties.bootstrap.server = 'xyz-x00xx.us-east-1.aws.confluent.cloud:9092',
scan.startup.mode = 'earliest',
properties.security.protocol = 'SASL_SSL',
properties.sasl.mechanism = 'PLAIN',
properties.sasl.username = 'username',
properties.sasl.password = 'password'
) FORMAT PLAIN ENCODE JSON;
CREATE SINK iceberg_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = true,
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'access_key',
s3.secret.key = 'secret_key',
s3.region = 'ap-southeast-1',
catalog.type = 'storage',
catalog.name = 'demo',
warehouse.path = 's3://icebergdata/demo',
database.name = 's1',
table.name = 't1'
);