CREATE SOURCE IF NOT EXISTS orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP)
WITH (
connector='pulsar',
topic='demo_topic',
service.url='pulsar://localhost:6650/',
oauth.issuer.url='https://auth.streamnative.cloud/',
oauth.credentials.url='s3://bucket_name/your_key_file.file',
oauth.audience='urn:sn:pulsar:o-d6fgh:instance-0',
aws.credentials.access_key_id='aws.credentials.access_key_id',
aws.credentials.secret_access_key='aws.credentials.secret_access_key',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message',
schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
);
CREATE SINK iceberg_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = true,
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'access_key',
s3.secret.key = 'secret_key',
s3.region = 'ap-southeast-1',
catalog.type = 'storage',
catalog.name = 'demo',
warehouse.path = 's3://icebergdata/demo',
database.name = 's1',
table.name = 't1'
);