Your database already contains the richest feature source in your entire data stack. Every order placed, every payment processed, every inventory adjustment – these row-level changes in PostgreSQL and MySQL encode the behavioral signals that ML models depend on. The problem is getting those signals out of the database and into a feature computation layer quickly enough to matter.
The standard approach strings together several systems: Debezium captures changes into Kafka, a Flink or Spark job reads from Kafka and computes aggregates, and the results land in a feature store. Each hop adds latency, operational burden, and another system to monitor. By the time a payment event becomes an updated customer lifetime value feature, minutes or hours have passed.
Change Data Capture (CDC) in RisingWave collapses this into a single system. RisingWave connects directly to your database's replication log, captures every insert, update, and delete, and lets you define feature computations as SQL materialized views on top of those changes. One system handles ingestion, transformation, and serving – with no Kafka cluster, no Debezium deployment, and no orchestration DAGs.
This post walks through the full setup: connecting to a PostgreSQL database via CDC, computing order frequency features, customer lifetime value, and inventory velocity from live change events, and serving those features to your ML models.
Why Your Database Is the Best Feature Source
Event streams are the default data source for real-time feature pipelines. But for many production ML use cases, the most valuable features come not from application-emitted events but from the database itself:
- Transactional data is authoritative. An order record in PostgreSQL reflects the actual state of the business, including corrections, refunds, and status changes that event streams may not capture.
- Database changes capture the full lifecycle. A row that goes from
status = 'pending'tostatus = 'shipped'tostatus = 'returned'tells a richer story than a single "order placed" event. - You already have the data. No instrumentation required. No event schema to design. The tables exist, the rows change, and CDC reads the changes directly from the transaction log.
The challenge has been that reading a database's WAL requires specialized tooling, and computing features from those changes requires a stream processing engine. RisingWave handles both, which means you can go from a PostgreSQL table to a live ML feature with nothing but SQL.
Setting Up the CDC Source
RisingWave supports direct CDC connections to PostgreSQL and MySQL without requiring an external message broker. The database's own replication protocol delivers changes directly to RisingWave.
Prerequisites
For PostgreSQL, enable logical replication:
-- Run in your source PostgreSQL database
ALTER SYSTEM SET wal_level = logical;
-- Restart PostgreSQL after this change
-- Create a publication for the tables you want to capture
CREATE PUBLICATION features_publication FOR TABLE
orders, payments, inventory;
For MySQL, enable the binlog:
-- In my.cnf
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
Creating the CDC Source and Tables
In RisingWave, create a CDC source that points to your database, then map individual tables:
-- Connect to the upstream PostgreSQL database
CREATE SOURCE pg_source WITH (
connector = 'postgres-cdc',
hostname = 'db-host',
port = '5432',
username = 'replication_user',
password = 'secure_password',
database.name = 'production',
slot.name = 'risingwave_features',
publication.name = 'features_publication'
);
-- Map the orders table
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
order_total DECIMAL,
status VARCHAR,
created_at TIMESTAMP,
updated_at TIMESTAMP
) FROM pg_source TABLE 'public.orders';
-- Map the payments table
CREATE TABLE payments (
payment_id INT PRIMARY KEY,
customer_id INT,
amount DECIMAL,
payment_method VARCHAR,
status VARCHAR,
paid_at TIMESTAMP
) FROM pg_source TABLE 'public.payments';
-- Map the inventory table
CREATE TABLE inventory (
product_id INT PRIMARY KEY,
warehouse_id INT,
quantity INT,
updated_at TIMESTAMP
) FROM pg_source TABLE 'public.inventory';
At this point, RisingWave begins reading the PostgreSQL WAL. Every insert, update, and delete on the orders, payments, and inventory tables is captured and reflected in the corresponding RisingWave tables within seconds. For more details on configuring CDC connectors, see the PostgreSQL CDC documentation.
Computing Features from Change Events
With the CDC tables in place, you define features as materialized views. Each materialized view continuously recomputes its results as new changes arrive from the source database. No scheduling, no batch triggers – the computation is always running.
Use Case 1: Order Frequency Features
Order frequency is a strong predictor for churn models, recommendation engines, and customer segmentation. You want to know how often a customer orders, how recently, and whether the pace is accelerating or slowing.
CREATE MATERIALIZED VIEW customer_order_features AS
SELECT
customer_id,
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '7 days')
AS orders_last_7d,
COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '30 days')
AS orders_last_30d,
AVG(order_total) AS avg_order_value,
MAX(order_total) AS max_order_value,
MIN(created_at) AS first_order_at,
MAX(created_at) AS last_order_at,
COUNT(*) FILTER (WHERE status = 'returned')
AS return_count,
ROUND(
COUNT(*) FILTER (WHERE status = 'returned')::DECIMAL
/ NULLIF(COUNT(*), 0), 3
) AS return_rate
FROM orders
GROUP BY customer_id;
This materialized view updates every time an order is inserted, updated, or deleted in the source PostgreSQL database. When a customer places a new order, the orders_last_7d, total_orders, and avg_order_value columns reflect the change within seconds. When an order's status changes to 'returned', the return_count and return_rate update automatically.
Use Case 2: Customer Lifetime Value from Payment Events
Customer lifetime value (CLV) features combine payment amounts, frequency, and recency. These drive personalization models, acquisition cost optimization, and support prioritization.
CREATE MATERIALIZED VIEW customer_ltv_features AS
SELECT
customer_id,
SUM(amount) AS total_lifetime_spend,
SUM(amount) FILTER (WHERE paid_at > NOW() - INTERVAL '90 days')
AS spend_last_90d,
SUM(amount) FILTER (WHERE paid_at > NOW() - INTERVAL '30 days')
AS spend_last_30d,
COUNT(*) AS total_payments,
AVG(amount) AS avg_payment_amount,
MAX(paid_at) AS last_payment_at,
COUNT(DISTINCT payment_method) AS distinct_payment_methods,
COUNT(*) FILTER (WHERE status = 'refunded')
AS refund_count,
ROUND(
SUM(amount) FILTER (WHERE status = 'refunded')::DECIMAL
/ NULLIF(SUM(amount), 0), 3
) AS refund_rate_by_value
FROM payments
GROUP BY customer_id;
Every time the source database records a new payment or updates a payment status (such as a chargeback or refund), this view recalculates. A customer whose refund_rate_by_value suddenly spikes will be flagged by your model immediately, not in the next hourly batch.
Use Case 3: Inventory Velocity from Stock Changes
Inventory velocity – how quickly stock levels change – is a critical feature for demand forecasting, dynamic pricing, and supply chain optimization. With CDC, every stock adjustment in the database becomes a signal.
Since the inventory table tracks the current state (not a log of changes), you can combine it with an event-driven pattern. One approach is to capture the current state and track rate of change through a derived view:
CREATE MATERIALIZED VIEW inventory_features AS
SELECT
product_id,
warehouse_id,
quantity AS current_stock,
CASE
WHEN quantity = 0 THEN 'out_of_stock'
WHEN quantity < 10 THEN 'low_stock'
WHEN quantity < 50 THEN 'medium_stock'
ELSE 'high_stock'
END AS stock_status,
updated_at AS last_stock_change
FROM inventory;
For richer velocity features, add a stock change log table to your CDC source:
-- Assumes your source database has a stock_changes audit table
CREATE TABLE stock_changes (
change_id INT PRIMARY KEY,
product_id INT,
warehouse_id INT,
quantity_delta INT,
reason VARCHAR,
changed_at TIMESTAMP
) FROM pg_source TABLE 'public.stock_changes';
CREATE MATERIALIZED VIEW inventory_velocity_features AS
SELECT
product_id,
warehouse_id,
SUM(quantity_delta) FILTER (WHERE changed_at > NOW() - INTERVAL '1 hour')
AS net_change_1h,
SUM(quantity_delta) FILTER (WHERE changed_at > NOW() - INTERVAL '24 hours')
AS net_change_24h,
COUNT(*) FILTER (WHERE changed_at > NOW() - INTERVAL '1 hour')
AS change_count_1h,
SUM(ABS(quantity_delta))
FILTER (WHERE changed_at > NOW() - INTERVAL '24 hours')
AS absolute_movement_24h,
SUM(quantity_delta)
FILTER (WHERE quantity_delta < 0
AND changed_at > NOW() - INTERVAL '24 hours')
AS total_sold_24h
FROM stock_changes
GROUP BY product_id, warehouse_id;
A product whose net_change_1h is dropping sharply signals a demand spike. A product with high change_count_1h but low absolute_movement_24h may be experiencing inventory churn. These patterns are invisible in hourly batch snapshots.
Combining Features Across Tables
ML models rarely use features from a single table. Join across your CDC-backed materialized views to build a unified feature vector:
CREATE MATERIALIZED VIEW customer_unified_features AS
SELECT
o.customer_id,
o.total_orders,
o.orders_last_7d,
o.orders_last_30d,
o.avg_order_value,
o.return_rate,
l.total_lifetime_spend,
l.spend_last_90d,
l.avg_payment_amount,
l.distinct_payment_methods,
l.refund_rate_by_value
FROM customer_order_features o
JOIN customer_ltv_features l ON o.customer_id = l.customer_id;
This view stays current as both orders and payments receive changes from the source database. A single query against customer_unified_features returns the full feature vector for any customer, with every column reflecting the latest state.
Serving Features to ML Models
RisingWave exposes a PostgreSQL-compatible interface. Your inference service reads features with a standard SQL query over any PostgreSQL driver – psycopg2, JDBC, or a Go/Rust PostgreSQL client:
-- Low-latency point lookup at inference time
SELECT *
FROM customer_unified_features
WHERE customer_id = 42;
This query reads pre-computed results from the materialized view, not raw data. Response times are in the single-digit milliseconds, well within the latency budget of a real-time inference request.
Sinking to External Feature Stores
If your ML infrastructure already reads from a feature store or a dedicated serving database, sink the features from RisingWave:
-- Sink to a PostgreSQL feature-serving database
CREATE SINK customer_features_sink
FROM customer_unified_features
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://feature-store-db:5432/features',
table.name = 'customer_features',
type = 'upsert',
primary_key = 'customer_id'
);
You can also sink to Apache Iceberg for historical training data, ensuring that your training pipeline and serving pipeline share the same feature definitions. This eliminates training-serving skew at the architecture level.
Why a Single-System Approach Matters
The conventional CDC-to-features pipeline looks like this:
PostgreSQL → Debezium → Kafka → Flink/Spark → Feature Store → Model
Each arrow is a system to deploy, monitor, and maintain. Debezium needs a Kafka Connect cluster. Kafka needs brokers and ZooKeeper (or KRaft). Flink needs a job manager and task managers. The feature store needs its own infrastructure.
With RisingWave's built-in CDC:
PostgreSQL → RisingWave → Model
One system reads the WAL, computes the features, and serves them. Fewer moving parts mean fewer failure modes, lower operational cost, and faster time to production. For teams that want to add real-time features without adopting an entire streaming platform, this is a significant advantage.
FAQ
What databases does RisingWave CDC support?
RisingWave supports direct CDC from PostgreSQL and MySQL. For other databases (SQL Server, MongoDB, Oracle), you can use Debezium to capture changes into Kafka and create a Kafka source in RisingWave with the Debezium format.
How fresh are the features?
Features update within seconds of the source database change. The exact latency depends on the CDC connector configuration and the complexity of the materialized view, but typical end-to-end latency from database write to updated feature value is under two seconds.
Can I use this for model training as well as serving?
Yes. Sink your materialized views to Apache Iceberg or a data warehouse for historical training data. The same SQL query that defines the serving feature also defines the training feature, which eliminates training-serving skew. Your training pipeline reads the historical table, and your serving pipeline reads the live materialized view – both computed by identical logic.
What happens when the source database schema changes?
Schema changes in the source database (adding a column, for example) require updating the corresponding RisingWave table definition. RisingWave does not automatically propagate DDL changes from the source. Plan schema migrations carefully, and update your CDC table definitions as part of the migration process.
How does this compare to using Debezium with Flink?
Both approaches accomplish the same goal, but with different operational profiles. Debezium plus Flink requires managing a Kafka Connect cluster, a Kafka cluster, and a Flink cluster – three separate distributed systems. RisingWave replaces all three with a single system that handles CDC ingestion, stream processing, and feature serving. The trade-off is that Flink supports more complex event processing patterns (CEP, custom operators), while RisingWave covers the aggregation-heavy workloads that dominate feature engineering.
Conclusion
Your database's transaction log is a continuous stream of behavioral signals. CDC turns that log into a real-time data source, and materialized views turn that source into ML features. Here are the key takeaways:
- The database is the feature source. Orders, payments, and inventory changes encode the signals your models need. CDC extracts them without any application-level instrumentation.
- Materialized views are continuous feature queries. Define your feature as a SQL aggregation, and RisingWave keeps it up to date as every row changes in the source database.
- One system replaces a multi-hop pipeline. Direct CDC eliminates the Debezium-to-Kafka-to-Flink chain, reducing operational complexity and end-to-end latency.
- Serving is built in. Query features over a standard PostgreSQL connection, or sink them to your existing feature store. No custom serving infrastructure required.
Ready to turn your database changes into ML features? Get started with RisingWave in 5 minutes. Quickstart →
Have questions about CDC or feature engineering? Join the RisingWave Slack community to connect with other stream processing practitioners.
Written by Yuhao Su

