Real-Time Dynamic Pricing Engine with Streaming SQL
Dynamic pricing means reacting to competitor prices, inventory levels, and demand signals — all at the same time, continuously. The traditional approach strings together a scheduler, a Python script, and several API calls into a fragile pipeline that runs every few hours. A streaming database replaces this pipeline with SQL materialized views that join live data sources and maintain current price recommendations without any orchestration code.
Why Dynamic Pricing Is Harder Than It Looks
The business case for dynamic pricing is simple: charge more when demand is high and inventory is low, stay competitive when you have stock to move. The implementation is harder because it requires watching multiple signals simultaneously:
- Competitor prices — scraped or sourced from a price intelligence feed
- Your own inventory levels — updated with every order and receipt
- Demand velocity — how fast a product is selling right now
- Price elasticity rules — business logic about floors, ceilings, and margin constraints
Each of these signals lives in a different system and changes at a different rate. Competitor prices might update every 15 minutes. Your inventory changes with every order. Demand velocity is a rolling calculation over recent purchases. Joining these signals in a batch job means your pricing recommendations are always a few cycles stale. A streaming database joins them continuously.
The Architecture: Four Live Data Sources
Dynamic pricing in streaming SQL requires four input streams:
- Competitor price feed (Kafka) — price intelligence vendor publishing competitor price changes as events
- Inventory events (Kafka) — orders, cancellations, and receipts updating stock levels
- Purchase events (Kafka) — sales velocity signal
- Pricing rules (PostgreSQL CDC or static table) — floor prices, ceiling prices, margin constraints
RisingWave is a PostgreSQL-compatible streaming database, open source under Apache 2.0, built in Rust, with S3-backed storage. It ingests all four sources and joins them into a single always-current pricing recommendation view.
Setting Up the Sources
-- Competitor prices from a price intelligence feed
CREATE SOURCE competitor_prices (
feed_id VARCHAR,
product_id VARCHAR,
competitor_name VARCHAR,
competitor_price NUMERIC,
currency VARCHAR,
observed_at TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'competitor_price_feed',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Purchase events for demand velocity
CREATE SOURCE purchase_events (
order_id VARCHAR,
product_id VARCHAR,
quantity INT,
unit_price NUMERIC,
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'purchase_events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Inventory events (orders reduce, receipts add)
CREATE SOURCE inventory_events (
product_id VARCHAR,
warehouse_id VARCHAR,
delta INT, -- negative for orders/reservations, positive for receipts
event_time TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'inventory_events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Pricing rules — loaded from your pricing configuration table via CDC or as a static source
CREATE TABLE pricing_rules (
product_id VARCHAR PRIMARY KEY,
category_id VARCHAR,
floor_price NUMERIC,
ceiling_price NUMERIC,
target_margin NUMERIC, -- as a decimal, e.g. 0.30 for 30%
cost_price NUMERIC
);
Building the Signal Views
Current Competitor Price Position
Find the minimum competitor price per product to understand your price position:
CREATE MATERIALIZED VIEW competitor_min_price AS
SELECT
product_id,
MIN(competitor_price) AS min_competitor_price,
MAX(competitor_price) AS max_competitor_price,
AVG(competitor_price) AS avg_competitor_price,
COUNT(DISTINCT competitor_name) AS competitor_count,
MAX(observed_at) AS last_updated_at
FROM competitor_prices
GROUP BY product_id;
This view updates every time the price intelligence feed publishes a new event. When a competitor drops their price, min_competitor_price changes within seconds.
Demand Velocity (Sales Rate in the Last 2 Hours)
CREATE MATERIALIZED VIEW demand_velocity_2h AS
SELECT
product_id,
SUM(quantity) AS units_sold_2h,
COUNT(DISTINCT order_id) AS orders_2h,
WINDOW_START,
WINDOW_END
FROM HOP(
purchase_events,
event_time,
INTERVAL '15 MINUTES',
INTERVAL '2 HOURS'
)
GROUP BY product_id, WINDOW_START, WINDOW_END;
The HOP window slides every 15 minutes, covering the last 2 hours. Products with high units_sold_2h are in high demand and can support a price increase. Products with zero recent sales may benefit from a reduction.
Live Inventory Levels
CREATE MATERIALIZED VIEW live_inventory AS
SELECT
product_id,
SUM(delta) AS available_quantity,
MAX(event_time) AS last_updated_at
FROM inventory_events
GROUP BY product_id;
The Pricing Recommendation View
Now join all signals with the pricing rules to produce a price recommendation for each product:
CREATE MATERIALIZED VIEW price_recommendations AS
WITH signals AS (
SELECT
pr.product_id,
pr.floor_price,
pr.ceiling_price,
pr.cost_price,
pr.target_margin,
COALESCE(cmp.min_competitor_price, pr.ceiling_price) AS min_competitor_price,
COALESCE(dv.units_sold_2h, 0) AS units_sold_2h,
COALESCE(inv.available_quantity, 0) AS available_quantity
FROM pricing_rules pr
LEFT JOIN competitor_min_price cmp ON pr.product_id = cmp.product_id
LEFT JOIN demand_velocity_2h dv ON pr.product_id = dv.product_id
LEFT JOIN live_inventory inv ON pr.product_id = inv.product_id
),
scored AS (
SELECT
product_id,
floor_price,
ceiling_price,
cost_price,
min_competitor_price,
units_sold_2h,
available_quantity,
-- Demand multiplier: high demand -> price up, low demand -> price down
CASE
WHEN units_sold_2h >= 50 THEN 1.10 -- very high demand
WHEN units_sold_2h >= 20 THEN 1.05 -- high demand
WHEN units_sold_2h >= 5 THEN 1.00 -- normal
WHEN units_sold_2h >= 1 THEN 0.97 -- slow
ELSE 0.95 -- stagnant
END AS demand_multiplier,
-- Inventory multiplier: low stock -> price up, excess stock -> price down
CASE
WHEN available_quantity <= 5 THEN 1.08 -- scarcity premium
WHEN available_quantity <= 20 THEN 1.03
WHEN available_quantity >= 200 THEN 0.95 -- excess, incentivize movement
ELSE 1.00
END AS inventory_multiplier
FROM signals
)
SELECT
product_id,
floor_price,
ceiling_price,
min_competitor_price,
units_sold_2h,
available_quantity,
-- Base recommendation: slightly below cheapest competitor, adjusted for signals
GREATEST(
floor_price,
LEAST(
ceiling_price,
ROUND(
min_competitor_price * 0.99 -- default: 1% below cheapest competitor
* demand_multiplier
* inventory_multiplier,
2
)
)
) AS recommended_price,
NOW() AS computed_at
FROM scored;
This single view continuously reflects the intersection of competitor prices, demand signals, and inventory levels. When a competitor drops their price at 2 AM, price_recommendations updates immediately. When your inventory hits 5 units during a flash sale, the scarcity premium kicks in automatically.
Enforcing Business Constraints
The GREATEST(floor_price, LEAST(ceiling_price, ...)) pattern in the recommendation view enforces floor and ceiling constraints in SQL. No pricing recommendation will ever fall below cost or exceed the configured ceiling.
For margin constraints, add a margin floor check:
-- Minimum acceptable price = cost * (1 + target_margin)
GREATEST(
floor_price,
cost_price * (1 + target_margin), -- margin floor
LEAST(
ceiling_price,
ROUND(min_competitor_price * 0.99 * demand_multiplier * inventory_multiplier, 2)
)
) AS recommended_price
The margin constraint ensures pricing never dips below profitability, even if competitor prices collapse or the demand multiplier pushes downward aggressively.
Applying Price Changes
Price recommendations sitting in a materialized view do not automatically update your storefront. You need to decide on an application strategy:
Option 1: Application-layer polling. Your pricing service queries price_recommendations every N seconds and applies changes to your e-commerce platform API. Simple, controllable, adds a small lag equal to the polling interval.
Option 2: Kafka sink for price change events. Push price change events from RisingWave to a Kafka topic, and have a lightweight consumer apply them to your platform:
CREATE SINK price_change_events
FROM (
SELECT
product_id,
recommended_price,
computed_at
FROM price_recommendations
)
WITH (
connector = 'kafka',
topic = 'price_updates',
properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;
Option 2 gives you a full audit trail of every price change in Kafka, which is valuable for reconciliation, A/B testing analysis, and regulatory compliance.
A/B Testing Pricing Strategies
Because all pricing logic is in SQL, swapping pricing strategies is a schema change, not a code deployment. You can run two strategies in parallel:
-- Strategy A: pure competitor parity
CREATE MATERIALIZED VIEW price_strategy_a AS
SELECT product_id,
GREATEST(floor_price, LEAST(ceiling_price, ROUND(min_competitor_price * 0.99, 2))) AS recommended_price,
'strategy_a' AS strategy
FROM price_recommendations_base;
-- Strategy B: demand-adjusted pricing
CREATE MATERIALIZED VIEW price_strategy_b AS
SELECT product_id,
GREATEST(floor_price, LEAST(ceiling_price, ROUND(min_competitor_price * 0.99 * demand_multiplier, 2))) AS recommended_price,
'strategy_b' AS strategy
FROM price_recommendations_base;
Assign product IDs or traffic segments to each strategy. Compare conversion rates and revenue per session across strategies in your analytics layer.
Comparison: Pricing Engine Approaches
| Dimension | Scheduled Python Script | Rule Engine Platform | Streaming SQL (RisingWave) |
| Update frequency | Every N minutes (cron) | Near-real-time (polling) | Continuously, event-driven |
| Signal freshness | Staleness = cron interval | Seconds to minutes | Seconds |
| Multi-signal joins | Custom code | Vendor-specific DSL | Standard SQL |
| Audit trail | Logs | Vendor dashboard | Kafka event log |
| Logic transparency | Python code | Vendor UI / DSL | Plain SQL |
| Floor/ceiling enforcement | Custom code | Platform feature | SQL constraints |
| Infrastructure | Python runtime + scheduler | SaaS platform + integration | Streaming database |
Frequently Asked Questions
Q: How do we prevent price changes from being too aggressive? Add a rate-limit constraint: only allow price changes beyond a threshold (e.g., more than 2%) if the last change was more than N minutes ago. Track last-applied price and timestamp in a table, and join it into the recommendation view to suppress changes that would happen too frequently.
Q: What if the competitor price feed goes down?
The LEFT JOIN with COALESCE(cmp.min_competitor_price, pr.ceiling_price) falls back to the ceiling price if no competitor data is available. Alternatively, set the fallback to the last known competitor price by persisting it in a separate materialized view with a timestamp guard.
Q: Can we add a human approval step for large price changes? Yes. Add a separate view that flags recommendations where the price change exceeds a threshold (e.g., more than 10% in either direction). Route those to a review queue before applying. Small automated changes apply immediately; large changes wait for approval.
Q: How do we handle currency and multi-region pricing?
Add region and currency dimensions to all source schemas and group by them in the recommendation view. Currency conversion can be handled by joining against a live exchange rate table — itself a materialized view over an exchange rate event stream.
Q: Does this require real-time competitor price data? The architecture works with whatever frequency your price intelligence vendor provides. If competitor prices update every 15 minutes, your recommendations will reflect 15-minute-old competitor data. The inventory and demand signals will still be real-time. The closer to real-time your competitor feed is, the more the system delivers on its promise.

