CREATE TABLE orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP)
WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
) FORMAT PLAIN ENCODE JSON;
CREATE SINK kinesis_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'kinesis',
stream = 'kinesis-sink-demo',
aws.region = 'us-east-1',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key'
)
FORMAT DEBEZIUM ENCODE JSON;