Change Data Capture (CDC) is a critical process that tracks and replicates real-time data changes from a source system, such as databases and data warehouses, to a destination system. Its primary purpose is to ensure data integrity across all systems, which is crucial for organizations that rely on accurate data. Initially used for Extract, Transform, Load (ETL) jobs, CDC has evolved into a preferred method for cloud migration. It has the capability to seamlessly connect on-premises and cloud environments, facilitating smooth data flow and preserving infrastructure investments. For a more comprehensive understanding of CDC and its benefits in various scenarios, you can refer to this RisingWave blog.
In this blog, we delve into an e-commerce scenario that leverages PostgreSQL, CDC, and RisingWave to establish a real-time data processing pipeline. PostgreSQL serves as the repository for transactional data, while CDC captures and tracks database changes for efficient updates. RisingWave, a streaming database compatible with PostgreSQL, empowers real-time insights through the use of SQL. This enables analytics such as sales and user monitoring, which can adapt to dynamic market conditions, facilitating informed decision-making.
Set up PostgreSQL for CDC
To enable Change Data Capture (CDC) in PostgreSQL and set up the required tables, as well as configure the necessary settings, you can follow these steps:
Install PostgreSQL
Before proceeding with the CDC setup, ensure that you have PostgreSQL installed on your system. You can download and install PostgreSQL from the official website or use a package manager that is compatible with your operating system.
Create tables
To create the necessary tables for the e-commerce use case in your PostgreSQL instance, you can connect to your PostgreSQL database using a PostgreSQL client or the command-line interface.
For the sample e-commerce scenario, we’ll create these tables:
users table | user_id | username | registration_date | last_login | address | |
products table | product_id | product_name | brand | category | price | stock_quantity |
sales table | sale_id | user_id (foreign key referencing users table) | product_id (foreign key referencing products table) | sale_date | quantity | total_price |
Enable CDC in PostgreSQL
To enable Change Data Capture (CDC) in PostgreSQL for RisingWave, follow these steps:
Check the current Write-Ahead Log (WAL) level by running the following command:
SHOW wal_level;
By default, the value is set to replica
. However, for CDC, you need to change it tological
. To do this, modify the database configuration file (postgresql.conf
) or use the psql
command. Run the following command to set the wal_level
to logical
:
ALTER SYSTEM SET wal_level = logical;
Keep in mind that changing the wal_level requires restarting the PostgreSQL instance and may impact database performance. For more details, refer to the RisingWave PostgreSQL CDC documentation.
Modify the configuration settings in the postgresql.conf
and pg_hba.conf
files to allow RisingWave to read CDC data from your locally installed PostgreSQL. In the postgresql.conf
file, update the following line:
listen_addresses = '*'
In the pg_hba.conf
file, add the following lines to allow the necessary connections for logical replication:
host all all 0.0.0.0/0 md5
# Allow replication connections from any IP address using scram-sha-256 authentication method.
host replication all 0.0.0.0/0 scram-sha-256
After making these changes, restart PostgreSQL to apply the new settings. This configuration will enable RisingWave to read CDC data from PostgreSQL using logical replication.
Connect RisingWave Cloud to PostgreSQL
To get started with RisingWave, create a RisingWave cluster in RisingWave Cloud using the free tier. See the documentation of RisingWave Cloud for detailed instructions.
After successfully creating the RisingWave cluster, you can proceed to create a source in RisingWave that will allow you to read CDC data from PostgreSQL.
We’ll use the following query to create a source that connects to locally running PostgreSQL. Make sure to fill in the authentication parameters accordingly.
CREATE SOURCE postgres_source WITH(
connector='postgres-cdc',
hostname='127.0.0.1',
port='5432',
username='postgres',
password='qwerty1245',
database.name='postgres',
schema.name='public'
);
Next, you can create a table named users
that corresponds to the public.users
table in PostgreSQL. This table will contain columns for user information such as user ID, username, email, registration date, last login timestamp, and address. The data for this table will be sourced from postgres_source
in RisingWave:
CREATE TABLE users (
user_id INTEGER PRIMARY KEY,
username VARCHAR,
email VARCHAR,
registration_date DATE,
last_login TIMESTAMP,
address VARCHAR
)
FROM postgres_source TABLE 'public.users';
Similarly, you can create a table named products
with columns for product details such as product ID, name, brand, category, price, and stock quantity. The product ID is set as the primary key. The data for this table will also be sourced from postgres_source
in RisingWave and the PostgreSQL table named public.products
:
CREATE TABLE products (
product_id INTEGER,
product_name VARCHAR,
brand VARCHAR,
category VARCHAR,
price NUMERIC,
stock_quantity INTEGER,
PRIMARY KEY (product_id)
)
FROM postgres_source TABLE 'public.products';
Finally, you can create a table named sales
that includes columns for sale details such as sale ID, user ID, product ID, sale date, quantity sold, and total price. The sale ID is set as the primary key. The data for this table will be sourced from postgres_source
in RisingWave and the PostgreSQL table named public.sales
:
CREATE TABLE sales (
sale_id INTEGER PRIMARY KEY,
user_id INTEGER,
product_id INTEGER,
sale_date DATE,
quantity INTEGER,
total_price NUMERIC
)
FROM postgres_source TABLE 'public.sales';
Data analysis in RisingWave
After successfully creating the postgres_source
source and the associated users
, products
, and sales
tables, we can now analyze the Change Data Capture (CDC) data in RisingWave.
To calculate aggregated sales data for products within 1-minute intervals and select specific columns from the result for analysis or reporting, you can use the following query. It utilizes a Common Table Expression (CTE) named product_sales
to perform the calculations and retrieve the desired columns:
WITH product_sales AS (
SELECT
window_start,
window_end,
s.sale_id,
p.product_id,
SUM(s.total_price) AS total_sales,
AVG(s.total_price) AS average_revenue_per_sale
FROM
TUMBLE (sales, sale_timestamp, INTERVAL '1 MINUTES') as s
JOIN
products p ON s.product_id = p.product_id
GROUP BY
s.sale_id,
p.product_id,
s.window_start,
s.window_end
)
SELECT
ps.sale_id,
ps.product_id,
ps.total_sales,
ps.average_revenue_per_sale,
ps.window_start,
ps.window_end
FROM
product_sales ps;
To calculate the count of distinct active users within 1-minute intervals based on their last login times, you can use the following query. It utilizes the TUMBLE function to define the time windows and groups the results by user ID, window start, and window end to compute the total active users in each interval:
SELECT
COUNT(DISTINCT user_id) AS total_active_users,
window_start,
window_end
FROM
TUMBLE (users, last_login, INTERVAL '1 MINUTES')
GROUP BY
user_id,
window_start,
window_end;
To retrieve the top 5 products by total quantity sold within 1-minute intervals, you can use the following query. It utilizes the TUMBLE function to define the time windows based on sale timestamps and joins the sales
and products
tables to gather data about product categories, names, and quantities sold. The results are then grouped by category, product name, window start, and window end. The query is sorted in descending order by total quantity sold and limited to the top 5 products:
SELECT
p.category,
p.product_name,
SUM(s.quantity) AS total_quantity_sold,
window_start,
window_end
FROM
TUMBLE (sales, sale_timestamp, INTERVAL '1 MINUTES') as s
JOIN
products p
ON
s.product_id = p.product_id
GROUP BY
p.category, p.product_name,
window_start,
window_end
ORDER BY
total_quantity_sold DESC
LIMIT
5;
To calculate the top 10 users by their total spending within 1-minute intervals, you can use the following query. This query utilizes the TUMBLE function to define the time windows based on user login times. It joins the users
and sales
tables to gather user information and sales data. The results are then grouped by user ID, username, email, window start, and window end. The query is sorted in descending order by total spending and limited to the top 10 users:
SELECT
u.user_id,
u.username,
u.email,
SUM(s.total_price) AS total_spent,
window_start,
window_end
FROM
TUMBLE (users, last_login, INTERVAL '1 MINUTES') as u
JOIN
sales s
ON
u.user_id = s.user_id
GROUP BY
u.user_id,
u.username,
u.email,
window_start,
window_end
ORDER BY
total_spent DESC
LIMIT
10;
>
In this blog post, we explored the combination of PostgreSQL, Change Data Capture (CDC), and RisingWave to build a robust e-commerce data pipeline. PostgreSQL serves as the storage for transactional data, CDC captures and tracks changes efficiently, and RisingWave enables real-time analytics capabilities such as sales monitoring and inventory management. Additionally, RisingWave can be leveraged for data preprocessing to support downstream personalized recommendation systems. > >
>
By adopting this architecture, an e-commerce system can achieve responsiveness and adaptability to dynamic market conditions. It empowers businesses with timely insights and informed decision-making capabilities. With PostgreSQL, CDC, and RisingWave working together, organizations can build a data pipeline that supports their e-commerce operations effectively. > >