CREATE SOURCE IF NOT EXISTS orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP)
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message_name',
schema.registry = 'http://127.0.0.1:8081'
);
CREATE SINK big_query_sink_local AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.local.path = '/path/to/my/service-account.json',
bigquery.project = 'my_project_id',
bigquery.dataset = 'my_dataset',
bigquery.table = 'my_table',
force_append_only = 'true'
);