Continuous E-Commerce Data Pipeline with PostgreSQL CDC and RisingWave Cloud

Continuous E-Commerce Data Pipeline with PostgreSQL CDC and RisingWave Cloud

Change Data Capture (CDC) is a critical process that tracks and replicates real-time data changes from a source system, such as databases and data warehouses, to a destination system. Its primary purpose is to ensure data integrity across all systems, which is crucial for organizations that rely on accurate data. Initially used for Extract, Transform, Load (ETL) jobs, CDC has evolved into a preferred method for cloud migration. It has the capability to seamlessly connect on-premises and cloud environments, facilitating smooth data flow and preserving infrastructure investments. For a more comprehensive understanding of CDC and its benefits in various scenarios, you can refer to this RisingWave blog.

In this blog, we delve into an e-commerce scenario that leverages PostgreSQL, CDC, and RisingWave to establish a real-time data processing pipeline. PostgreSQL serves as the repository for transactional data, while CDC captures and tracks database changes for efficient updates. RisingWave, a streaming database compatible with PostgreSQL, empowers real-time insights through the use of SQL. This enables analytics such as sales and user monitoring, which can adapt to dynamic market conditions, facilitating informed decision-making.

An e-commerce scenario that leverages PostgreSQL, CDC, and RisingWave to establish a real-time data processing pipeline.

Set up PostgreSQL for CDC

To enable Change Data Capture (CDC) in PostgreSQL and set up the required tables, as well as configure the necessary settings, you can follow these steps:

Install PostgreSQL

Before proceeding with the CDC setup, ensure that you have PostgreSQL installed on your system. You can download and install PostgreSQL from the official website or use a package manager that is compatible with your operating system.

Create tables

To create the necessary tables for the e-commerce use case in your PostgreSQL instance, you can connect to your PostgreSQL database using a PostgreSQL client or the command-line interface.

For the sample e-commerce scenario, we’ll create these tables:

userstableuser_idusernameemailregistration_datelast_loginaddress
productstableproduct_idproduct_namebrandcategorypricestock_quantity
salestablesale_iduser_id (foreign key referencing users table)product_id (foreign key referencing products table)sale_datequantitytotal_price

Enable CDC in PostgreSQL

To enable Change Data Capture (CDC) in PostgreSQL for RisingWave, follow these steps:

Check the current Write-Ahead Log (WAL) level by running the following command:

SHOW wal_level;

By default, the value is set to replica. However, for CDC, you need to change it tological. To do this, modify the database configuration file (postgresql.conf) or use the psql command. Run the following command to set the wal_level to logical:

ALTER SYSTEM SET wal_level = logical;

Keep in mind that changing the wal_level requires restarting the PostgreSQL instance and may impact database performance. For more details, refer to the RisingWave PostgreSQL CDC documentation.

Modify the configuration settings in the postgresql.conf and pg_hba.conf files to allow RisingWave to read CDC data from your locally installed PostgreSQL. In the postgresql.conf file, update the following line:

listen_addresses = '*'

In the pg_hba.conf file, add the following lines to allow the necessary connections for logical replication:

host    all             all             0.0.0.0/0               md5

# Allow replication connections from any IP address using scram-sha-256 authentication method.
host    replication     all             0.0.0.0/0               scram-sha-256

After making these changes, restart PostgreSQL to apply the new settings. This configuration will enable RisingWave to read CDC data from PostgreSQL using logical replication.

Connect RisingWave Cloud to PostgreSQL

To get started with RisingWave, create a RisingWave cluster in RisingWave Cloud using the free tier. See the documentation of RisingWave Cloud for detailed instructions.

RisingWave Cloud: Account Registration and Sign-In Process.

After successfully creating the RisingWave cluster, you can proceed to create a source in RisingWave that will allow you to read CDC data from PostgreSQL.

We’ll use the following query to create a source that connects to locally running PostgreSQL. Make sure to fill in the authentication parameters accordingly.

CREATE SOURCE postgres_source WITH(
   connector='postgres-cdc',
   hostname='127.0.0.1',
   port='5432',
   username='postgres',
   password='qwerty1245',
   database.name='postgres',
   schema.name='public'
);

Next, you can create a table named users that corresponds to the public.users table in PostgreSQL. This table will contain columns for user information such as user ID, username, email, registration date, last login timestamp, and address. The data for this table will be sourced from postgres_source in RisingWave:

CREATE TABLE users (
    user_id INTEGER PRIMARY KEY,
    username VARCHAR,
    email VARCHAR,
    registration_date DATE,
    last_login TIMESTAMP,
    address VARCHAR
    )
FROM postgres_source TABLE 'public.users';

Similarly, you can create a table named products with columns for product details such as product ID, name, brand, category, price, and stock quantity. The product ID is set as the primary key. The data for this table will also be sourced from postgres_source in RisingWave and the PostgreSQL table named public.products:

CREATE TABLE products (
    product_id INTEGER,
    product_name VARCHAR,
    brand VARCHAR,
    category VARCHAR,
    price NUMERIC,
    stock_quantity INTEGER,
    PRIMARY KEY (product_id)
)
FROM postgres_source TABLE 'public.products';

Finally, you can create a table named sales that includes columns for sale details such as sale ID, user ID, product ID, sale date, quantity sold, and total price. The sale ID is set as the primary key. The data for this table will be sourced from postgres_source in RisingWave and the PostgreSQL table named public.sales:

CREATE TABLE sales (
    sale_id INTEGER PRIMARY KEY,
    user_id INTEGER,
    product_id INTEGER,
    sale_date DATE,
    quantity INTEGER,
    total_price NUMERIC
)
FROM postgres_source TABLE 'public.sales';

Data analysis in RisingWave

After successfully creating the postgres_source source and the associated users, products, and sales tables, we can now analyze the Change Data Capture (CDC) data in RisingWave.

To calculate aggregated sales data for products within 1-minute intervals and select specific columns from the result for analysis or reporting, you can use the following query. It utilizes a Common Table Expression (CTE) named product_sales to perform the calculations and retrieve the desired columns:

WITH product_sales AS (
    SELECT
        window_start,
        window_end,
        s.sale_id,
        p.product_id,
        SUM(s.total_price) AS total_sales,
        AVG(s.total_price) AS average_revenue_per_sale
    FROM
        TUMBLE (sales, sale_timestamp, INTERVAL '1 MINUTES') as s
    JOIN
        products p ON s.product_id = p.product_id
    GROUP BY
        s.sale_id,
        p.product_id,
        s.window_start,
        s.window_end
)
SELECT
    ps.sale_id,
    ps.product_id,
    ps.total_sales,
    ps.average_revenue_per_sale,
    ps.window_start,
    ps.window_end
FROM
    product_sales ps;

To calculate the count of distinct active users within 1-minute intervals based on their last login times, you can use the following query. It utilizes the TUMBLE function to define the time windows and groups the results by user ID, window start, and window end to compute the total active users in each interval:

SELECT
    COUNT(DISTINCT user_id) AS total_active_users,
    window_start,
    window_end
FROM
     TUMBLE (users, last_login, INTERVAL '1 MINUTES')
GROUP BY
    user_id,
    window_start,
    window_end;

To retrieve the top 5 products by total quantity sold within 1-minute intervals, you can use the following query. It utilizes the TUMBLE function to define the time windows based on sale timestamps and joins the sales and products tables to gather data about product categories, names, and quantities sold. The results are then grouped by category, product name, window start, and window end. The query is sorted in descending order by total quantity sold and limited to the top 5 products:

SELECT
    p.category,
    p.product_name,
    SUM(s.quantity) AS total_quantity_sold,
    window_start,
    window_end
FROM
    TUMBLE (sales, sale_timestamp, INTERVAL '1 MINUTES') as s
JOIN
    products p
ON
    s.product_id = p.product_id
GROUP BY
    p.category, p.product_name,
    window_start,
    window_end
ORDER BY
    total_quantity_sold DESC
LIMIT
    5;

To calculate the top 10 users by their total spending within 1-minute intervals, you can use the following query. This query utilizes the TUMBLE function to define the time windows based on user login times. It joins the users and sales tables to gather user information and sales data. The results are then grouped by user ID, username, email, window start, and window end. The query is sorted in descending order by total spending and limited to the top 10 users:

SELECT
        u.user_id,
        u.username,
        u.email,
        SUM(s.total_price) AS total_spent,
        window_start,
        window_end
    FROM
        TUMBLE (users, last_login, INTERVAL '1 MINUTES') as u
    JOIN
        sales s
    ON
        u.user_id = s.user_id
    GROUP BY
        u.user_id,
        u.username,
        u.email,
        window_start,
        window_end
    ORDER BY
    total_spent DESC
LIMIT
    10;

>

In this blog post, we explored the combination of PostgreSQL, Change Data Capture (CDC), and RisingWave to build a robust e-commerce data pipeline. PostgreSQL serves as the storage for transactional data, CDC captures and tracks changes efficiently, and RisingWave enables real-time analytics capabilities such as sales monitoring and inventory management. Additionally, RisingWave can be leveraged for data preprocessing to support downstream personalized recommendation systems. > >

>

By adopting this architecture, an e-commerce system can achieve responsiveness and adaptability to dynamic market conditions. It empowers businesses with timely insights and informed decision-making capabilities. With PostgreSQL, CDC, and RisingWave working together, organizations can build a data pipeline that supports their e-commerce operations effectively. > >

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.