CREATE SOURCE [IF NOT EXISTS] orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP)
WITH (
connector='kinesis',
stream='kafka',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
aws.credentials.role.external_id='demo_external_id',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key'
) FORMAT PLAIN ENCODE AVRO (
schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
);
CREATE SINK iceberg_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = true,
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'access_key',
s3.secret.key = 'secret_key',
s3.region = 'ap-southeast-1',
catalog.type = 'storage',
catalog.name = 'demo',
warehouse.path = 's3://icebergdata/demo',
database.name = 's1',
table.name = 't1'
);