Dynamic pricing models are only as good as their input features. You can build the most sophisticated pricing algorithm in the world, but if it runs on features computed from yesterday's data, it will miss the demand spike happening right now, the competitor who just dropped their price by 15%, and the warehouse that shipped its last pallet an hour ago.
The bottleneck in most pricing systems is not the model -- it is the feature pipeline. Batch ETL jobs compute pricing features on a schedule and write them to a feature store. The pricing model reads stale features and produces stale prices. By the time those prices reach the storefront, the market has already moved.
This post shows how to compute pricing-relevant features in real time using streaming SQL in RisingWave. You will build materialized views that continuously track demand velocity, inventory pressure, competitor price movements, time-of-day patterns, and surge conditions. These features update within seconds of each source event, giving your pricing model the freshness it needs to react to market conditions as they happen.
Why Batch Feature Pipelines Fail Dynamic Pricing
Batch feature engineering creates three specific problems for pricing systems.
Missed demand windows. A product goes viral at 2 PM. The batch job runs at 6 PM. For four hours, the pricing model does not know demand spiked. By the time features update, inventory is gone and you sold everything at the base price.
Stale competitive positioning. Your competitor drops their price at 10 AM. Your batch job picks it up at midnight. For 14 hours, your prices are misaligned with the market. Customers comparison-shop in seconds, and stale competitive features mean your model cannot keep up.
Invisible inventory pressure. A warehouse reports low stock through a CDC event. The batch pipeline will not aggregate this until the next run. The pricing model keeps recommending prices as if supply is plentiful.
Streaming SQL solves these problems by treating feature computation as a continuous process. You define each feature as a materialized view that RisingWave incrementally maintains as new events arrive. No scheduling, no orchestration, no stale windows.
Modeling the Source Data
A pricing feature pipeline consumes several event streams. In production, these would come from Kafka topics via RisingWave's Kafka connector. For this tutorial, we use tables so you can follow along locally.
Order events
Every purchase generates an order event. This is the primary demand signal:
CREATE TABLE order_events (
order_id VARCHAR,
product_id VARCHAR,
category VARCHAR,
quantity INT,
unit_price NUMERIC,
customer_region VARCHAR,
order_ts TIMESTAMPTZ
);
Page view events
Browsing activity is an early indicator of demand -- it spikes before orders do:
CREATE TABLE page_view_events (
view_id VARCHAR,
product_id VARCHAR,
session_id VARCHAR,
referrer VARCHAR,
view_ts TIMESTAMPTZ
);
Competitor price updates
Price intelligence feeds, whether from scrapers or APIs, flow in as events:
CREATE TABLE competitor_price_events (
competitor_id VARCHAR,
product_id VARCHAR,
competitor_name VARCHAR,
price NUMERIC,
observed_ts TIMESTAMPTZ
);
Inventory updates
Warehouse management systems emit stock-change events:
CREATE TABLE inventory_updates (
product_id VARCHAR,
warehouse_id VARCHAR,
quantity_on_hand INT,
updated_ts TIMESTAMPTZ
);
Product catalog
A reference table with base prices and pricing constraints:
CREATE TABLE product_catalog (
product_id VARCHAR PRIMARY KEY,
product_name VARCHAR,
category VARCHAR,
base_price NUMERIC,
cost NUMERIC,
min_price NUMERIC,
max_price NUMERIC
);
Feature 1: Demand Velocity
Demand velocity measures how fast interest in a product is growing. A pricing model needs to know not just "how many orders happened" but "how fast is order volume accelerating." This materialized view computes demand metrics across multiple time windows:
CREATE MATERIALIZED VIEW demand_velocity_features AS
SELECT
o.product_id,
-- Order counts at different granularities
COUNT(*) FILTER (WHERE o.order_ts > NOW() - INTERVAL '5 minutes')
AS orders_5min,
COUNT(*) FILTER (WHERE o.order_ts > NOW() - INTERVAL '30 minutes')
AS orders_30min,
COUNT(*) FILTER (WHERE o.order_ts > NOW() - INTERVAL '2 hours')
AS orders_2h,
-- Units sold (not just order count)
COALESCE(SUM(o.quantity) FILTER (
WHERE o.order_ts > NOW() - INTERVAL '30 minutes'
), 0) AS units_sold_30min,
-- Revenue velocity
COALESCE(SUM(o.quantity * o.unit_price) FILTER (
WHERE o.order_ts > NOW() - INTERVAL '1 hour'
), 0) AS revenue_1h,
-- Average order size (quantity signal)
AVG(o.quantity) FILTER (WHERE o.order_ts > NOW() - INTERVAL '1 hour')
AS avg_order_size_1h
FROM order_events o
GROUP BY o.product_id;
The ratio between short-window and long-window counts is the acceleration signal. If orders_5min is 20 but orders_30min is 25, that product is in a demand spike right now -- most of the 30-minute volume happened in the last 5 minutes.
To add page view velocity as a leading indicator:
CREATE MATERIALIZED VIEW browsing_demand_features AS
SELECT
product_id,
COUNT(DISTINCT session_id) FILTER (
WHERE view_ts > NOW() - INTERVAL '10 minutes'
) AS unique_viewers_10min,
COUNT(DISTINCT session_id) FILTER (
WHERE view_ts > NOW() - INTERVAL '1 hour'
) AS unique_viewers_1h,
COUNT(*) FILTER (WHERE view_ts > NOW() - INTERVAL '10 minutes')
AS views_10min
FROM page_view_events
GROUP BY product_id;
When unique_viewers_10min spikes before orders_5min does, your pricing model gets advance notice that demand is about to increase.
Feature 2: Inventory Pressure
Inventory pressure captures the relationship between stock levels and consumption rate. A product with 1,000 units is not scarce -- unless it is selling 500 per hour.
CREATE MATERIALIZED VIEW inventory_pressure_features AS
WITH current_stock AS (
SELECT
product_id,
SUM(quantity_on_hand) AS total_stock
FROM inventory_updates
GROUP BY product_id
),
sell_rate AS (
SELECT
product_id,
COALESCE(SUM(quantity) FILTER (
WHERE order_ts > NOW() - INTERVAL '1 hour'
), 0) AS units_sold_1h,
COALESCE(SUM(quantity) FILTER (
WHERE order_ts > NOW() - INTERVAL '24 hours'
), 0) AS units_sold_24h
FROM order_events
GROUP BY product_id
)
SELECT
cs.product_id,
cs.total_stock,
sr.units_sold_1h,
sr.units_sold_24h,
-- Hours until stockout at current sell rate
CASE
WHEN sr.units_sold_1h > 0
THEN ROUND(cs.total_stock::NUMERIC / sr.units_sold_1h, 1)
ELSE 999.0
END AS hours_to_stockout,
-- Stock-to-sales ratio (lower = more pressure)
CASE
WHEN sr.units_sold_24h > 0
THEN ROUND(cs.total_stock::NUMERIC / (sr.units_sold_24h::NUMERIC / 24), 1)
ELSE 999.0
END AS days_of_supply,
-- Categorical pressure signal for the model
CASE
WHEN cs.total_stock <= 0 THEN 'OUT_OF_STOCK'
WHEN sr.units_sold_1h > 0 AND cs.total_stock::NUMERIC / sr.units_sold_1h < 4
THEN 'CRITICAL'
WHEN sr.units_sold_1h > 0 AND cs.total_stock::NUMERIC / sr.units_sold_1h < 12
THEN 'HIGH'
WHEN sr.units_sold_24h > 0 AND cs.total_stock::NUMERIC / (sr.units_sold_24h::NUMERIC / 24) < 3
THEN 'MODERATE'
ELSE 'LOW'
END AS pressure_level
FROM current_stock cs
LEFT JOIN sell_rate sr ON cs.product_id = sr.product_id;
The hours_to_stockout feature translates raw stock numbers into a time horizon the pricing model can act on. A product with 4 hours of supply left should be priced differently than one with 40 hours.
Feature 3: Competitor Price Signals
Raw competitor prices are useful, but a pricing model needs derived signals: where you stand relative to the market and which direction competitors are moving.
CREATE MATERIALIZED VIEW competitor_price_features AS
SELECT
cp.product_id,
-- Market positioning
MIN(cp.price) AS lowest_competitor_price,
MAX(cp.price) AS highest_competitor_price,
ROUND(AVG(cp.price), 2) AS avg_competitor_price,
COUNT(DISTINCT cp.competitor_id) AS competitor_count,
-- Your position relative to the market
ROUND(pc.base_price - AVG(cp.price), 2) AS price_gap_vs_avg,
ROUND(pc.base_price - MIN(cp.price), 2) AS price_gap_vs_lowest,
-- Price rank: percentage of competitors you are cheaper than
ROUND(
COUNT(*) FILTER (WHERE cp.price > pc.base_price)::NUMERIC
/ NULLIF(COUNT(*), 0) * 100
, 1) AS pct_competitors_above_us,
-- Margin headroom
ROUND(AVG(cp.price) - pc.cost, 2) AS market_margin
FROM competitor_price_events cp
JOIN product_catalog pc ON cp.product_id = pc.product_id
GROUP BY cp.product_id, pc.base_price, pc.cost;
The price_gap_vs_avg tells the model whether there is room to increase (negative gap = below market) or pressure to decrease (positive gap = above market). The pct_competitors_above_us gives a percentile ranking that is easier for models to learn from than raw dollar gaps.
Feature 4: Time-of-Day and Day-of-Week Patterns
Pricing sensitivity varies by time. Lunch-hour food delivery orders have different elasticity than midnight orders. These temporal features help the model learn periodic patterns:
CREATE MATERIALIZED VIEW temporal_demand_features AS
SELECT
product_id,
EXTRACT(HOUR FROM order_ts) AS hour_of_day,
EXTRACT(DOW FROM order_ts) AS day_of_week,
COUNT(*) AS order_count,
AVG(quantity) AS avg_quantity,
AVG(unit_price) AS avg_transaction_price,
-- Whether this hour/day combo tends to have high or low volume
COUNT(*) FILTER (WHERE quantity >= 3) AS bulk_order_count
FROM order_events
WHERE order_ts > NOW() - INTERVAL '30 days'
GROUP BY product_id, EXTRACT(HOUR FROM order_ts), EXTRACT(DOW FROM order_ts);
At serving time, the pricing model joins this view on the current hour and day to get the historical baseline. If orders_5min from the demand velocity view is far above the historical mean for this time slot, the model knows demand is abnormally high.
Feature 5: Surge Detection
Surge detection synthesizes multiple signals into a composite indicator. This view combines demand velocity, inventory pressure, and browsing intensity:
CREATE MATERIALIZED VIEW surge_detection AS
SELECT
dv.product_id,
pc.product_name,
dv.orders_5min,
dv.orders_30min,
bd.unique_viewers_10min,
ip.hours_to_stockout,
ip.pressure_level,
-- Acceleration ratio: short-window vs long-window
CASE
WHEN dv.orders_30min > 0
THEN ROUND(
(dv.orders_5min * 6.0) / dv.orders_30min, 2
)
ELSE 0
END AS demand_acceleration,
-- Composite surge score (0-100)
LEAST(100, (
-- Demand component (0-40 points)
LEAST(40, dv.orders_5min * 4)
-- Browsing component (0-25 points)
+ LEAST(25, COALESCE(bd.unique_viewers_10min, 0) * 2.5)
-- Inventory scarcity component (0-35 points)
+ CASE
WHEN ip.hours_to_stockout < 4 THEN 35
WHEN ip.hours_to_stockout < 12 THEN 20
WHEN ip.hours_to_stockout < 24 THEN 10
ELSE 0
END
)) AS surge_score,
-- Classification for operational dashboards
CASE
WHEN dv.orders_5min >= 10 AND ip.hours_to_stockout < 12 THEN 'ACTIVE_SURGE'
WHEN dv.orders_5min >= 5 OR (bd.unique_viewers_10min >= 20
AND ip.pressure_level IN ('HIGH', 'CRITICAL')) THEN 'ELEVATED'
ELSE 'NORMAL'
END AS surge_status
FROM demand_velocity_features dv
JOIN product_catalog pc ON dv.product_id = pc.product_id
LEFT JOIN browsing_demand_features bd ON dv.product_id = bd.product_id
LEFT JOIN inventory_pressure_features ip ON dv.product_id = ip.product_id;
The surge_score is a weighted composite your pricing model can use directly, or that your operations team can monitor on a dashboard. A demand_acceleration above 2.0 means the 5-minute run rate is more than double the 30-minute average -- a rapid demand ramp.
Assembling the Feature Vector
The pricing model needs a single row per product with all features joined together. This final materialized view assembles the complete feature vector:
CREATE MATERIALIZED VIEW pricing_feature_store AS
SELECT
dv.product_id,
pc.product_name,
pc.base_price,
pc.cost,
pc.min_price,
pc.max_price,
-- Demand features
dv.orders_5min,
dv.orders_30min,
dv.orders_2h,
dv.units_sold_30min,
dv.revenue_1h,
dv.avg_order_size_1h,
-- Browsing features
bd.unique_viewers_10min,
bd.unique_viewers_1h,
bd.views_10min,
-- Inventory features
ip.total_stock,
ip.hours_to_stockout,
ip.days_of_supply,
ip.pressure_level,
-- Competitor features
cp.avg_competitor_price,
cp.price_gap_vs_avg,
cp.price_gap_vs_lowest,
cp.pct_competitors_above_us,
-- Surge features
sd.demand_acceleration,
sd.surge_score,
sd.surge_status
FROM demand_velocity_features dv
JOIN product_catalog pc ON dv.product_id = pc.product_id
LEFT JOIN browsing_demand_features bd ON dv.product_id = bd.product_id
LEFT JOIN inventory_pressure_features ip ON dv.product_id = ip.product_id
LEFT JOIN competitor_price_features cp ON dv.product_id = cp.product_id
LEFT JOIN surge_detection sd ON dv.product_id = sd.product_id;
Your pricing service queries this view over a standard PostgreSQL connection. The query returns in single-digit milliseconds because it reads pre-computed results. Every feature reflects the latest data, updated continuously by RisingWave's incremental view maintenance engine.
Architecture for the Real-Time Pricing Pipeline
In production, the end-to-end flow is:
Event sources → Kafka → RisingWave → Pricing model → Product catalog
- Order events, page views, competitor prices, and inventory updates flow from their respective services into Kafka topics.
- RisingWave consumes all four streams, maintains the chain of materialized views, and serves the assembled feature vector.
- The pricing model (microservice or UDF) reads features from
pricing_feature_storeand computes a recommended price. - The recommended price is written to the product catalog or API cache for storefront reads.
RisingWave handles the hardest part: stateful, incremental computation across multiple joined streams. For teams already running Kafka, the integration is a matter of defining sources -- no Flink cluster, no custom Java consumers, no DAGs to manage.
You can also sink features to external systems. Push to PostgreSQL for your model serving layer, to Apache Iceberg for historical training data, or to Kafka for downstream consumers:
CREATE SINK pricing_features_to_pg FROM pricing_feature_store
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://feature-db:5432/pricing',
table.name = 'pricing_features',
type = 'upsert',
primary_key = 'product_id'
);
This pattern eliminates training-serving skew. The same SQL that produces live serving features also sinks to Iceberg for training, so the model sees consistent feature distributions in both environments.
FAQ
What is real-time feature engineering for dynamic pricing?
It is the process of continuously computing pricing-relevant signals -- demand velocity, inventory levels, competitor prices, temporal patterns, and surge indicators -- from live event streams. Unlike batch feature pipelines that refresh on a schedule, real-time feature engineering updates values within seconds of each new event, enabling pricing models to respond to current market conditions.
How many features does a dynamic pricing model typically need?
The feature vector in this post contains around 20 features across five categories. In practice, mature pricing systems use 30 to 100 features, including product attributes, customer segment signals, and promotional calendars. The streaming SQL approach scales well because each new feature group is just another materialized view joined into the feature store.
Can I use RisingWave with an existing ML model serving framework?
Yes. RisingWave exposes a PostgreSQL-compatible interface, so any serving framework that can read from PostgreSQL (MLflow, FastAPI services using psycopg2) can query features directly. You can also sink features to Redis or a dedicated feature store for sub-millisecond lookups.
How does this compare to computing features in Apache Flink?
Both RisingWave and Flink can compute streaming features. The difference is operational. Flink requires managing a JVM cluster, writing transformations in Java or Python, and configuring state backends. RisingWave lets you define features as SQL materialized views with built-in state management. For pricing feature engineering, where most features are aggregations and joins, SQL is a natural fit that smaller teams can maintain.
What latency should I expect for feature updates?
RisingWave typically delivers feature updates within hundreds of milliseconds to low single-digit seconds of each source event, depending on query complexity and cluster size. For pricing use cases, this is orders of magnitude faster than hourly or daily batch pipelines.
Conclusion
The features you compute matter more than the model you choose. A simple rule-based pricing engine with fresh features will outperform a sophisticated ML model running on stale data. Here is what we covered:
- Demand velocity features track order rates and browsing intensity across multiple time windows, giving the model acceleration signals rather than snapshots.
- Inventory pressure features translate raw stock counts into actionable metrics like hours-to-stockout, capturing scarcity relative to consumption rate.
- Competitor price features compute market positioning, percentile rankings, and margin headroom -- not just raw prices.
- Temporal features encode hour-of-day and day-of-week patterns for detecting abnormal demand relative to historical baselines.
- Surge detection synthesizes multiple signals into a composite score the model can consume directly.
All features are SQL materialized views that RisingWave maintains incrementally. No batch jobs, no Airflow DAGs, no stale windows. The pricing model always sees the latest market state.
Ready to build your own pricing feature pipeline? Get started with RisingWave in 5 minutes. Quickstart →
Join our Slack community to ask questions and connect with other stream processing developers.

