CREATE TABLE orders_rw (
order_id varchar PRIMARY KEY,
payload jsonb
) WITH (
connector='mongodb-cdc',
mongodb.url='mongodb://localhost:27017/?replicaSet=rs0',
collection.name='dev.*'
);
CREATE SINK big_query_sink_local AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.local.path = '/path/to/my/service-account.json',
bigquery.project = 'my_project_id',
bigquery.dataset = 'my_dataset',
bigquery.table = 'my_table',
force_append_only = 'true'
);