PostgreSQL CDC to Streaming SQL: A Complete Tutorial

PostgreSQL CDC to Streaming SQL: A Complete Tutorial

Change Data Capture (CDC) solves a problem every data team hits eventually: your application database is the source of truth, but your analytics, dashboards, and downstream services need that data in real time. Polling the database every few minutes introduces latency, wastes resources, and misses short-lived state changes. CDC captures every insert, update, and delete as it happens by reading PostgreSQL's Write-Ahead Log (WAL), turning your database into a real-time event stream.

This tutorial walks you through the complete pipeline: configuring PostgreSQL for logical replication, connecting it to RisingWave as a CDC source, building materialized views that stay current as data changes, and querying results with sub-second freshness. By the end, you will have a working CDC pipeline that requires zero application code changes and no external middleware like Kafka or Debezium.

All SQL in this tutorial has been verified against RisingWave 2.8.0.

How PostgreSQL CDC Works

PostgreSQL's Write-Ahead Log records every committed transaction before it reaches the actual data files. This log exists for crash recovery, but it also serves as the foundation for logical replication and CDC. When you set wal_level to logical, PostgreSQL decodes WAL entries into a structured stream of row-level changes that external consumers can read.

A CDC pipeline reads this stream through a replication slot, which is a server-side cursor that tracks which WAL entries have been consumed. The slot ensures that PostgreSQL retains WAL data until the consumer acknowledges it, preventing data loss even if the consumer goes offline temporarily.

RisingWave's native PostgreSQL CDC connector reads directly from these replication slots without requiring Kafka, Debezium, or any intermediate message broker. This reduces operational complexity and cuts end-to-end latency from seconds to milliseconds.

flowchart LR
    A[PostgreSQL WAL] -->|Logical Replication| B[Replication Slot]
    B -->|CDC Stream| C[RisingWave CDC Source]
    C --> D[CDC Tables]
    D --> E[Materialized Views]
    E --> F[Real-Time Queries]

Step 1: Configure PostgreSQL for Logical Replication

Before RisingWave can read change events, PostgreSQL must be configured to produce them. This requires two changes: enabling logical decoding in the WAL and creating a user with replication privileges.

Enable Logical WAL Level

Open your postgresql.conf file and set the WAL level:

# Find your postgresql.conf location
psql -U postgres -c "SHOW config_file;"

Edit the file and set these parameters:

# postgresql.conf
wal_level = logical          # Enable logical decoding (default is 'replica')
max_replication_slots = 4    # At least 1 per CDC consumer
max_wal_senders = 4          # At least 1 per CDC consumer

Restart PostgreSQL for the changes to take effect:

# Linux (systemd)
sudo systemctl restart postgresql

# macOS (Homebrew)
brew services restart postgresql

Verify the setting:

SHOW wal_level;
-- Should return: logical

If you are using a managed service like Amazon RDS, you cannot edit postgresql.conf directly. Instead, modify the parameter group and set rds.logical_replication to 1. For Aurora PostgreSQL, the process is identical. See the AWS documentation on RDS logical replication for details.

Create a Replication User

Create a dedicated database user for CDC. This user needs the REPLICATION privilege and SELECT access on the tables you want to capture:

-- Create a user with replication privilege
CREATE USER replication_user WITH REPLICATION PASSWORD 'securepassword';

-- Grant access to the target schema and tables
GRANT USAGE ON SCHEMA public TO replication_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;

-- Ensure future tables are also accessible
ALTER DEFAULT PRIVILEGES IN SCHEMA public
    GRANT SELECT ON TABLES TO replication_user;

Set Up the Source Database Tables

For this tutorial, assume your PostgreSQL application database has these e-commerce tables:

-- Run these in your PostgreSQL database (not RisingWave)
CREATE TABLE customers (
    customer_id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(255) UNIQUE,
    region VARCHAR(50),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    customer_id INT REFERENCES customers(customer_id),
    product VARCHAR(200),
    quantity INT,
    price DECIMAL(10,2),
    status VARCHAR(20) DEFAULT 'pending',
    order_date TIMESTAMPTZ DEFAULT NOW()
);

Step 2: Create a CDC Source in RisingWave

With PostgreSQL configured, connect to your RisingWave instance and create a CDC source. RisingWave uses a two-step approach: first you create a shared source that defines the connection, then you create individual tables that map to upstream PostgreSQL tables.

Create the Shared CDC Source

CREATE SOURCE pg_ecommerce WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '5432',
    username = 'replication_user',
    password = 'securepassword',
    database.name = 'ecommerce',
    schema.name = 'public',
    slot.name = 'risingwave_slot'
);

RisingWave automatically creates a publication named rw_publication on the PostgreSQL side and begins tracking WAL changes. You can customize this behavior with additional parameters:

ParameterDefaultDescription
publication.namerw_publicationName of the PostgreSQL publication
publication.create.enabletrueAuto-create the publication if it does not exist
slot.nameAuto-generatedName of the replication slot
transactionaltrueProcess changes within transaction boundaries
ssl.modedisabledSSL/TLS mode: disabled, preferred, required, verify-ca, verify-full

Create CDC Tables

For each upstream table you want to ingest, create a corresponding table in RisingWave using the FROM SOURCE clause:

-- Map the customers table
CREATE TABLE customers (
    customer_id INT PRIMARY KEY,
    name VARCHAR,
    email VARCHAR,
    region VARCHAR,
    created_at TIMESTAMPTZ
)
FROM pg_ecommerce TABLE 'public.customers';

-- Map the orders table
CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    customer_id INT,
    product VARCHAR,
    quantity INT,
    price DECIMAL,
    status VARCHAR,
    order_date TIMESTAMPTZ
)
FROM pg_ecommerce TABLE 'public.orders';

Each CDC table must define a PRIMARY KEY that matches the upstream table's primary key. RisingWave uses this key to correctly apply updates and deletes.

By default, RisingWave performs an initial snapshot (backfill) of the existing data in the upstream table, then switches to streaming incremental changes. You can disable the snapshot if you only care about new changes:

CREATE TABLE orders_incremental (
    order_id INT PRIMARY KEY,
    customer_id INT,
    product VARCHAR,
    quantity INT,
    price DECIMAL,
    status VARCHAR,
    order_date TIMESTAMPTZ
)
WITH (snapshot = 'false')
FROM pg_ecommerce TABLE 'public.orders';

RisingWave also supports a wildcard syntax that automatically maps all columns from the upstream table:

CREATE TABLE customers (*)
FROM pg_ecommerce TABLE 'public.customers';

For more details on CDC source configuration, see the RisingWave PostgreSQL CDC documentation.

Step 3: Build Materialized Views on CDC Data

This is where streaming SQL shines. A materialized view in RisingWave is not a static snapshot that you refresh on a schedule. It is an incrementally maintained query result that updates automatically as new CDC events arrive. When a row is inserted, updated, or deleted in PostgreSQL, the change propagates through the materialized view within milliseconds.

The following examples use tables and data verified against RisingWave 2.8.0.

Real-Time Order Summary by Customer

This materialized view joins customers with their orders and maintains running aggregates:

CREATE MATERIALIZED VIEW mv_order_summary AS
SELECT
    c.customer_id,
    c.name AS customer_name,
    c.region,
    COUNT(o.order_id) AS total_orders,
    SUM(o.quantity * o.price) AS total_spent,
    MAX(o.order_date) AS last_order_date
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.region;

Query the result:

SELECT * FROM mv_order_summary ORDER BY total_spent DESC;
 customer_id | customer_name | region  | total_orders | total_spent |      last_order_date
-------------+---------------+---------+--------------+-------------+---------------------------
           1 | Alice Johnson | US-West |            3 |      348.94 | 2026-03-22 11:00:00+00:00
           3 | Carol Davis   | EU-West |            2 |      144.50 | 2026-03-22 14:30:00+00:00
           2 | Bob Smith     | US-East |            2 |      104.98 | 2026-03-21 13:00:00+00:00
           4 | David Lee     | US-West |            2 |      104.98 | 2026-03-22 16:00:00+00:00
           5 | Eva Martinez  | US-East |            1 |       39.99 | 2026-03-22 08:45:00+00:00

Daily Revenue by Product

Track revenue trends across products with a daily roll-up:

CREATE MATERIALIZED VIEW mv_daily_revenue AS
SELECT
    DATE_TRUNC('day', order_date) AS order_day,
    product,
    SUM(quantity * price) AS revenue,
    SUM(quantity) AS units_sold,
    COUNT(order_id) AS order_count
FROM orders
WHERE status = 'completed'
GROUP BY DATE_TRUNC('day', order_date), product;
SELECT * FROM mv_daily_revenue ORDER BY order_day, revenue DESC;
         order_day         |       product       | revenue | units_sold | order_count
---------------------------+---------------------+---------+------------+-------------
 2026-03-20 00:00:00+00:00 | Wireless Headphones |  159.98 |          2 |           1
 2026-03-20 00:00:00+00:00 | Mechanical Keyboard |  149.99 |          1 |           1
 2026-03-20 00:00:00+00:00 | USB-C Hub           |   45.00 |          1 |           1
 2026-03-21 00:00:00+00:00 | Webcam HD           |   65.00 |          1 |           1
 2026-03-21 00:00:00+00:00 | Laptop Sleeve       |   59.98 |          2 |           1
 2026-03-22 00:00:00+00:00 | Phone Stand         |   39.98 |          2 |           1
 2026-03-22 00:00:00+00:00 | USB-C Cable Pack    |   38.97 |          3 |           1

Customer Lifetime Value

Compute customer lifetime value (CLV) in real time, considering only completed orders:

CREATE MATERIALIZED VIEW mv_customer_lifetime_value AS
SELECT
    c.customer_id,
    c.name,
    c.email,
    c.region,
    COUNT(o.order_id) AS total_orders,
    SUM(o.quantity * o.price) AS lifetime_value,
    AVG(o.quantity * o.price) AS avg_order_value
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
    AND o.status = 'completed'
GROUP BY c.customer_id, c.name, c.email, c.region;
SELECT * FROM mv_customer_lifetime_value ORDER BY lifetime_value DESC NULLS LAST;
 customer_id |     name      |       email       | region  | total_orders | lifetime_value | avg_order_value
-------------+---------------+-------------------+---------+--------------+----------------+-----------------
           1 | Alice Johnson | alice@example.com | US-West |            3 |         348.94 |          116.31
           4 | David Lee     | david@example.com | US-West |            2 |         104.98 |           52.49
           2 | Bob Smith     | bob@example.com   | US-East |            2 |         104.98 |           52.49
           3 | Carol Davis   | carol@example.com | EU-West |            0 |                |
           5 | Eva Martinez  | eva@example.com   | US-East |            0 |                |

Notice that Carol Davis and Eva Martinez show zero completed orders because their orders have pending or shipped status. When those orders are updated to completed in PostgreSQL, the CDC event flows into RisingWave and the materialized view updates automatically.

Step 4: Query in Real Time

The real power of this architecture becomes visible when data changes. Every INSERT, UPDATE, or DELETE in PostgreSQL propagates through the CDC pipeline into RisingWave's materialized views within milliseconds.

Observe Incremental Updates

When a new order arrives in PostgreSQL:

-- This happens in your PostgreSQL application database
INSERT INTO orders VALUES
    (111, 3, 'Ergonomic Chair', 1, 299.99, 'completed', '2026-03-23 09:00:00+00');

The mv_customer_lifetime_value view in RisingWave updates automatically:

SELECT * FROM mv_customer_lifetime_value WHERE customer_id = 3;
 customer_id |    name     |       email       | region  | total_orders | lifetime_value | avg_order_value
-------------+-------------+-------------------+---------+--------------+----------------+-----------------
           3 | Carol Davis | carol@example.com | EU-West |            1 |         299.99 |          299.99

Carol Davis's lifetime value jumped from null to 299.99, and her order count went from 0 to 1. No batch job, no manual refresh, no cron schedule. The materialized view processed the CDC event and updated the result incrementally.

Point Queries for Dashboards

Because RisingWave maintains materialized views in its storage layer, querying them is a simple table read with consistent low latency. This makes them ideal for powering real-time dashboards:

-- Top customers by region (sub-millisecond response)
SELECT region, customer_name, total_spent
FROM mv_order_summary
WHERE region = 'US-West'
ORDER BY total_spent DESC;

-- Today's top products
SELECT product, revenue, units_sold
FROM mv_daily_revenue
WHERE order_day = DATE_TRUNC('day', NOW())
ORDER BY revenue DESC
LIMIT 5;

Connecting Downstream Systems

RisingWave speaks the PostgreSQL wire protocol, so any tool that connects to PostgreSQL can query these materialized views directly. This includes Grafana, Metabase, Superset, custom applications using JDBC/ODBC, and even other PostgreSQL instances via foreign data wrappers.

You can also use RisingWave sinks to push materialized view changes to downstream systems like Kafka, Apache Iceberg, Elasticsearch, or another PostgreSQL database.

What Happens When PostgreSQL Schema Changes

One concern with CDC pipelines is schema evolution. What happens when someone adds a column to the upstream PostgreSQL table?

RisingWave supports automatic schema change propagation when you enable the auto.schema.change parameter on the source:

CREATE SOURCE pg_ecommerce WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '5432',
    username = 'replication_user',
    password = 'securepassword',
    database.name = 'ecommerce',
    schema.name = 'public',
    auto.schema.change = 'true'
);

With this enabled, ALTER TABLE ... ADD COLUMN and ALTER TABLE ... DROP COLUMN operations in PostgreSQL are automatically replicated to RisingWave CDC tables. This is a premium feature, so check RisingWave's pricing for availability.

For the standard connector without auto schema change, you would need to drop and recreate the CDC table in RisingWave after modifying the upstream schema.

How Does CDC Handle Large Initial Snapshots

When you create a CDC table against a PostgreSQL table with millions of existing rows, RisingWave performs a snapshot backfill before switching to incremental streaming. For large tables, you can tune the backfill performance with parallelism settings:

CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    customer_id INT,
    product VARCHAR,
    quantity INT,
    price DECIMAL,
    status VARCHAR,
    order_date TIMESTAMPTZ
)
WITH (
    backfill.parallelism = '4',
    backfill.num_rows_per_split = '50000',
    backfill.as_even_splits = 'true'
)
FROM pg_ecommerce TABLE 'public.orders';

The backfill.parallelism parameter controls how many parallel readers scan the upstream table during the initial snapshot. For a table with 10 million rows, setting parallelism to 4 with 50,000 rows per split means each reader processes 125 splits of 50K rows, significantly reducing backfill time.

During backfill, new CDC events are buffered and applied after the snapshot completes, ensuring no data is lost.

What Is the Latency of PostgreSQL CDC to RisingWave

End-to-end latency from a PostgreSQL commit to updated materialized view results in RisingWave depends on several factors:

  • WAL flush interval: PostgreSQL flushes WAL on every commit by default (synchronous_commit = on), so changes are available for CDC immediately after the transaction commits.
  • Network latency: The time for WAL events to travel from PostgreSQL to RisingWave. On the same network, this is typically under 1 millisecond.
  • RisingWave processing: Incremental materialized view updates typically complete within 10-100 milliseconds depending on query complexity.

In practice, most deployments see end-to-end latency between 100 milliseconds and 1 second for straightforward materialized views. Complex multi-way joins or windowed aggregations may add modest additional latency.

When Should You Use CDC Instead of Batch ETL

CDC pipelines are the right choice when:

  • Freshness matters: Your dashboards, alerts, or downstream systems need data that is seconds old, not hours old.
  • Source database load is a concern: CDC reads the WAL, which PostgreSQL produces regardless. There is no additional query load on the source database, unlike batch ETL that runs periodic SELECT queries.
  • You need to capture deletes and updates: Batch ETL with timestamp-based extraction often misses deletes and can miss updates if the timestamp column is not indexed or not present on every table.
  • Event-driven architectures: CDC naturally produces an event stream that can trigger downstream processing, notifications, or AI agent workflows.

Batch ETL is simpler and may be sufficient when data freshness of minutes or hours is acceptable, or when you only need periodic full snapshots for data warehousing.

Conclusion

This tutorial covered the complete path from PostgreSQL WAL configuration to real-time materialized views in RisingWave:

  • PostgreSQL setup: Set wal_level = logical, create a replication user, and no application code changes required.
  • CDC source creation: A single CREATE SOURCE statement establishes the connection. Individual CREATE TABLE ... FROM SOURCE statements map upstream tables.
  • Materialized views: Standard SQL CREATE MATERIALIZED VIEW statements define the transformations. RisingWave maintains them incrementally as CDC events arrive.
  • Real-time queries: Query materialized views like regular tables with consistent low latency. Connect any PostgreSQL-compatible tool for dashboards and analytics.
  • No middleware: RisingWave's native CDC connector eliminates the need for Kafka, Debezium, or Flink in the pipeline.

The entire pipeline runs with standard SQL, making it accessible to anyone who knows PostgreSQL.


Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.