CREATE TABLE orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP)
INCLUDE file as file_name
INCLUDE offset -- default column name is `_rw_gcs_offset`
WITH (
connector = 'gcs',
gcs.bucket_name = 'bucket',
gcs.credential = 'gcs_credential'
) FORMAT PLAIN ENCODE JSON (
without_header = 'true',
delimiter = ',' -- set delimiter = E' ' for tab-separated files
);
CREATE SINK mysql_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://<aws_rds_endpoint>:<port>/test_db?user=<username>&password=<password>',
table.name='personnel',
type = 'upsert',
primary_key = 'order_id'
);