CREATE SOURCE mysql_source WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
CREATE TABLE orders_rw (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
order_status VARCHAR,
total_amount DECIMAL,
last_updated TIMESTAMP,
)
FROM source my_source TABLE orders;
CREATE SINK target_count_postgres_sink AS
SELECT
order_status,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(last_updated) as first_order_time,
MAX(last_updated) as last_order_time
FROM orders_rw WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456',
table.name = 'target_count',
type = 'upsert',
primary_key = 'target_id'
);