CREATE TABLE orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP)
INCLUDE file as file_name
INCLUDE offset -- default column name is `_rw_s3_offset`
WITH (
connector = 's3',
match_pattern = '%Ring%*.ndjson',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 's3-source',
s3.credentials.access = 'credentials_access',
s3.credentials.secret = 'credentials_secret',
s3.endpoint_url = 'https://s3.us-east-1.amazonaws.com'
) FORMAT PLAIN ENCODE JSON;
CREATE SINK dl_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'deltalake',
type = 'append-only',
location = 's3a://my-delta-lake-bucket/path/to/table',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = 'access_key',
s3.secret.key = 'secret_key'
);