CREATE SOURCE pg_cdc_source WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
slot.name = 'mydb_slot'
);
CREATE TABLE orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP,
)
FROM source pg_cdc_source TABLE orders;
CREATE SINK clickhouse_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw
WITH (
connector = 'clickhouse',
type = 'append-only',
clickhouse.url = 'clickhouse://localhost:8123',
clickhouse.user = 'clickhouse_demo',
clickhouse.password = 'demoPass123!',
clickhouse.database = 'default',
clickhouse.table='demo_test'
);