The Real-Time Mandate in Retail
Retail has really become a real-time game. Every second, shoppers generate signals across web and mobile, such as product views, add-to-cart events, purchases, and returns, while supply-side events like restocks and inventory updates continuously change what’s available. At the same time, competitor price moves can instantly shift demand.
Real-time data is essential to ensure:
Availability: Accurate “in-stock” status and fast inventory updates.
Revenue: Prices that respond to demand and competitor pressure while protecting margins.
Customer experience: Fewer “out of stock at checkout” surprises and better conversion.
Scalability: Pipelines that keep up with spikes (campaigns, flash sales, holidays).
Batch-based systems introduce lag, so inventory, product recommendations, and pricing decisions arrive late, leading to overselling, missed revenue, and a poor customer experience. Real-time stream processing, powered by platforms like RisingWave, lets retail systems react instantly with continuously updated analytics, fresh user insights, and pricing decisions.
Use Case: Real-Time Dynamic Pricing + Live Inventory
Dynamic pricing isn’t useful without live inventory, and live inventory isn’t reliable without real-time event processing.
In this case, we compute:
Live inventory from base stock (CDC) plus or minus purchases and restocks.
Demand pressure using web clicks as a proxy (rolling window)
Competitor pressure using the latest competitor prices (including a “floor” based on the minimum competitor price)
Dynamic pricing using simple, transparent SQL rules
Sales KPIs (orders + price timeline), persisted into Apache Iceberg
RisingWave makes this workflow straightforward using materialized views, time windows, and ASOF joins, so you can keep inventory, pricing, and KPIs continuously fresh with SQL and enable comprehensive analytics across the entire retail platform.
Overview
In this blog post, we’ll build a real-time retail pipeline:
Stream retail events into Kafka (web clicks, purchases, restocks, competitor prices).
Capture operational tables via Postgres CDC (product catalog + base inventory, orders).
Ingest everything into RisingWave and build materialized views for:
inventory deltas
live inventory state
demand and competitor snapshots
dynamic pricing
sales/revenue KPIs
Visualize in Superset or Grafana (live inventory + live price boards).
Persist KPIs to Apache Iceberg (Lakekeeper REST catalog + MinIO) for cross-engine analytics (Spark/Trino/etc.) and downstream ML.

Ingest and Process Data into RisingWave
This blog is based on a fully runnable demo in the RisingWave awesome stream processing repository. It requires:
Docker and Docker Compose
psql(PostgreSQL interactive terminal) to connect to RisingWave
Then, launch the demo cluster.
Clone the demo repo and start the stack
git clone https://github.com/risingwavelabs/awesome-stream-processing.git
cd awesome-stream-processing/03-solution-demos/live-inventory-management-and-dynamic-pricing
# Launch demo stack
docker compose up -d
Connect to RisingWave (psql)
After launching the demo cluster, connect to RisingWave using psql. This will let you ingest data into RisingWave through the Kafka source connectors and PostgreSQL CDC connectors, as shown below:
psql -h localhost -p 4566 -d dev -U root
Now, create streaming sources (Kafka) and CDC sources (Postgres) as follows:
Kafka sources
CREATE SOURCE web_clicks (
event_time timestamptz,
product_id int,
event_type varchar -- 'view'|'add_to_cart'|'remove_from_cart'
) WITH (
connector = 'kafka',
topic = 'web_clicks',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE competitor_prices (
ts timestamptz,
product_id int,
competitor varchar,
competitor_price numeric
) WITH (
connector = 'kafka',
topic = 'competitor_prices',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE purchases (
purchase_time timestamptz,
product_id int,
quantity_purchased int,
customer_id varchar
) WITH (
connector = 'kafka',
topic = 'purchases',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE restocks (
restock_time timestamptz,
product_id int,
quantity_restocked int
) WITH (
connector = 'kafka',
topic = 'restocks',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Postgres CDC source
CREATE SOURCE postgres_cdc
WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'myuser',
password = '123456',
database.name = 'mydb',
schema.name = 'public',
publication.name = 'rw_publication'
);
Mirror base tables from CDC into RisingWave tables
CREATE TABLE product_inventory (
"product_id" INT PRIMARY KEY,
"product_name" varchar,
"stock_level" int,
"reorder_threshold" int,
"supplier" varchar,
"base_price" decimal
)
FROM postgres_cdc TABLE 'public.product_inventory';
CREATE TABLE orders (
order_id INT PRIMARY KEY,
order_time TIMESTAMPTZ,
product_id INT,
quantity INT,
customer_id VARCHAR
)
FROM postgres_cdc TABLE 'public.orders';
Materialized views: Live inventory + dynamic pricing (Core Logic)
These six materialized views cover the full end-to-end story.
MV 1 — Per-minute inventory deltas (purchases & restocks)
CREATE MATERIALIZED VIEW minute_join AS
SELECT
COALESCE(p.product_id, r.product_id) AS product_id,
COALESCE(p.window_start, r.window_start) AS window_start,
COALESCE(p.total_purchased, 0) AS total_purchased,
COALESCE(r.total_restocked, 0) AS total_restocked
FROM (
SELECT product_id, window_start, SUM(quantity_purchased) AS total_purchased
FROM TUMBLE(purchases, purchase_time, INTERVAL '1 MINUTE')
GROUP BY product_id, window_start
) p
FULL JOIN (
SELECT product_id, window_start, SUM(quantity_restocked) AS total_restocked
FROM TUMBLE(restocks, restock_time, INTERVAL '1 MINUTE')
GROUP BY product_id, window_start
) r
ON p.product_id = r.product_id AND p.window_start = r.window_start;
MV 2 — Live stock state (base stock + cumulative net flows)
CREATE MATERIALIZED VIEW product_stock_status AS
WITH inventory_updates AS (
SELECT
b.product_id,
b.product_name,
b.base_price,
b.reorder_threshold,
mj.window_start,
SUM(COALESCE(mj.total_restocked, 0))
OVER (PARTITION BY b.product_id ORDER BY mj.window_start)
- SUM(COALESCE(mj.total_purchased, 0))
OVER (PARTITION BY b.product_id ORDER BY mj.window_start)
+ b.stock_level AS current_inventory
FROM product_inventory b
LEFT JOIN minute_join mj
ON b.product_id = mj.product_id
)
SELECT
product_id,
product_name,
window_start AS inventory_window,
current_inventory,
base_price,
reorder_threshold,
CASE
WHEN current_inventory < 20 THEN 'Low'
WHEN current_inventory BETWEEN 20 AND 50 THEN 'Medium'
ELSE 'High'
END AS stock_level_category
FROM inventory_updates;
MV 3 — Demand proxy (clicks per minute)
CREATE MATERIALIZED VIEW web_clicks_minutely AS
SELECT
product_id,
window_start,
COUNT(*) AS clicks
FROM TUMBLE(web_clicks, event_time, INTERVAL '1 MINUTE')
GROUP BY product_id, window_start;
MV 4 — Latest competitor price per (product, competitor)
CREATE MATERIALIZED VIEW competitor_latest AS
SELECT product_id, competitor, competitor_price, ts
FROM (
SELECT
product_id,
competitor,
competitor_price,
ts,
ROW_NUMBER() OVER (
PARTITION BY product_id, competitor
ORDER BY ts DESC
) AS rn
FROM competitor_prices
) t
WHERE rn = 1;
MV 5 — Dynamic pricing (inventory rule + demand & competitor pressure)
CREATE MATERIALIZED VIEW dynamic_pricing_enriched AS
WITH demand AS (
SELECT
product_id,
window_start,
SUM(clicks) OVER (
PARTITION BY product_id
ORDER BY window_start
RANGE BETWEEN INTERVAL '15 MINUTE' PRECEDING AND CURRENT ROW
) AS clicks_15m
FROM web_clicks_minutely
),
comp_min AS (
SELECT product_id, MIN(competitor_price) AS min_competitor_price
FROM competitor_latest
GROUP BY product_id
)
SELECT
s.product_id,
s.product_name,
s.inventory_window,
s.current_inventory,
s.stock_level_category,
s.base_price,
d.clicks_15m,
c.min_competitor_price,
CASE
WHEN s.stock_level_category = 'Low' THEN ROUND(s.base_price * 1.20, 2)
WHEN s.stock_level_category = 'Medium' THEN s.base_price
WHEN s.stock_level_category = 'High' THEN ROUND(s.base_price * 0.90, 2)
END AS stock_rule_price,
GREATEST(
ROUND(
CASE
WHEN c.min_competitor_price IS NOT NULL
AND s.base_price <= c.min_competitor_price
AND COALESCE(d.clicks_15m, 0) > 30
THEN
(CASE
WHEN s.stock_level_category = 'Low' THEN s.base_price * 1.20
WHEN s.stock_level_category = 'Medium' THEN s.base_price
ELSE s.base_price * 0.90
END) * 1.02
WHEN c.min_competitor_price IS NOT NULL
AND
(CASE
WHEN s.stock_level_category = 'Low' THEN s.base_price * 1.20
WHEN s.stock_level_category = 'Medium' THEN s.base_price
ELSE s.base_price * 0.90
END) > c.min_competitor_price
THEN
(CASE
WHEN s.stock_level_category = 'Low' THEN s.base_price * 1.20
WHEN s.stock_level_category = 'Medium' THEN s.base_price
ELSE s.base_price * 0.90
END) * 0.99
ELSE
(CASE
WHEN s.stock_level_category = 'Low' THEN s.base_price * 1.20
WHEN s.stock_level_category = 'Medium' THEN s.base_price
ELSE s.base_price * 0.90
END)
END, 2),
ROUND(0.5 * s.base_price, 2)
) AS final_price
FROM product_stock_status s
LEFT JOIN demand d ON d.product_id = s.product_id AND d.window_start = s.inventory_window
LEFT JOIN comp_min c ON c.product_id = s.product_id;
MV 6 — Sales & revenue KPIs (orders CDC + price timeline via ASOF join)
CREATE MATERIALIZED VIEW sales_profit AS
SELECT
o.product_id,
MAX(pi.product_name) AS product_name,
SUM(o.quantity) AS total_orders,
CAST(SUM(dp.final_price * o.quantity) AS DECIMAL) AS revenue_estimate
FROM orders o
ASOF LEFT JOIN dynamic_pricing_enriched dp
ON o.product_id = dp.product_id
AND o.order_time >= dp.inventory_window
LEFT JOIN product_inventory pi
ON pi.product_id = o.product_id
GROUP BY o.product_id;
Visualize real-time data with Superset or Grafana
Once the real-time materialized views are in place, visualization is straightforward:
Connect RisingWave as a data source in Apache Superset or Grafana
Add materialized views as datasets (for example):
product_stock_status-> live inventory dashboarddynamic_pricing_enriched-> live price board and competitor comparisonsales_profit-> sales KPIs leaderboard
This gives you a real-time view for inventory, pricing, demand, and revenue, refreshing continuously without batch jobs.
Persist sales KPIs to Iceberg (Lakekeeper REST Catalog + MinIO)
You can write data to Apache Iceberg for long-term storage and cross-engine analytics. For example, you can continuously stream KPIs into Apache Iceberg using RisingWave with a self-hosted Lakekeeper REST catalog.
CREATE CONNECTION lakekeeper_catalog_conn
WITH (
type = 'iceberg',
catalog.type = 'rest',
catalog.uri = 'http://lakekeeper:8181/catalog/',
warehouse.path = 'risingwave-warehouse',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.path.style.access = 'true',
s3.endpoint = 'http://minio-0:9301',
s3.region = 'us-east-1'
);
SET iceberg_engine_connection = 'public.lakekeeper_catalog_conn';
CREATE TABLE sales_profit_iceberg (
product_id INT,
product_name VARCHAR,
total_orders BIGINT,
revenue_estimate DOUBLE PRECISION
)
WITH (commit_checkpoint_interval = 1)
ENGINE = iceberg;
INSERT INTO sales_profit_iceberg
SELECT
product_id,
product_name,
total_orders,
revenue_estimate::DOUBLE PRECISION
FROM sales_profit;
Now your real-time KPIs are queryable from Spark/Trino and ready for downstream analytics workloads.
Integrating real-time retail analytics and AI
Dynamic pricing and live inventory don’t have to stop at dashboards:
Analytics Platforms
Send outputs to systems like BigQuery, Snowflake, or StarRocks for broader BI, experimentation, and historical analysis.
AI/ML Pipelines
Use streaming features for models such as:
Personalized recommendations (demand + availability-aware ranking)
Promotion optimization (price elasticity signals)
Churn/return risk prediction
Dynamic replenishment planning
Strategic Benefit
Combining real-time signals with AI enables better decisions that adapt continuously to customer behavior and market conditions.
Conclusion
In this blog post, we built a real-time retail pipeline with Kafka + Postgres CDC + RisingWave to power live inventory and dynamic pricing using plain SQL:
Ingested event streams (clicks, purchases, restocks, competitor prices)
Mirrored operational tables via Postgres CDC
Computed live inventory state and pricing decisions via materialized views
Tracked real-time sales KPIs using an ASOF join against the pricing timeline
Persisted KPIs to Apache Iceberg using a Lakekeeper REST catalog and MinIO
This setup demonstrates how RisingWave enables low-latency, continuously updated pricing and inventory intelligence, helping retail systems stay accurate, responsive, and ready for downstream analytics and AI-driven optimization.

