A real-time price comparison engine continuously ingests competitor price feeds, computes the gap between competitor prices and your own prices, and triggers alerts the moment a competitor undercuts you beyond an acceptable threshold. Built with streaming SQL, the entire system runs as a set of materialized views that update automatically whenever new price data arrives -- no batch jobs, no polling loops, no custom stream processing code.
This guide walks you through building one from scratch using RisingWave, a PostgreSQL-compatible streaming database. You will connect a Kafka topic carrying competitor price events, build a chain of materialized views that track the latest prices and compute differentials, and wire up an alert layer with severity tiers and suggested response prices. All SQL in this article is verified against RisingWave 2.8.0.
Why Price Monitoring Needs Streaming, Not Batch
E-commerce pricing is a continuous auction. Amazon reprices items millions of times per day. If your monitoring pipeline runs on a 4-hour batch job, you may be losing sales for hours before your team notices a competitor dropped below your price.
A traditional batch pricing pipeline typically looks like this:
- Scrape competitor prices on a schedule (every few hours)
- Load into a data warehouse
- Run a dbt model or SQL query
- Refresh a BI dashboard
- Wait for a human to notice and respond
By the time your analyst reviews the dashboard, your competitor's lower price has already been indexed by Google Shopping and pushed to price-sensitive shoppers.
A streaming approach compresses this loop to seconds:
| Dimension | Batch Pipeline | Streaming SQL (RisingWave) |
| Price update latency | Hours | Seconds |
| Alert detection | Manual dashboard review | Automatic, threshold-based |
| Compute model | Full dataset reprocessed on schedule | Incremental updates per event |
| Rule changes | Redeploy pipeline | ALTER MATERIALIZED VIEW |
| Infrastructure | Spark + Airflow + Warehouse | Single streaming database |
The key insight is that price gaps are not interesting in aggregate -- they are interesting the moment they happen. A competitor dropping 20% below your price at 11 PM on a Friday is a pricing emergency, not a weekend analytics footnote.
Architecture Overview
The system has four layers:
Competitor Price Feeds (Kafka)
|
v
[price_events table / Kafka source]
|
v
[price_competitor_latest MV] <-- deduplicate to latest price per competitor
|
v
[price_differentials MV] <-- join with our product catalog, compute gaps
/ \
/ \
v v
[price_gap_alerts MV] [price_summary_by_category MV]
(threshold alerts) (category-level rollups)
|
v
Kafka Sink --> Notification Service (Slack, PagerDuty, repricing API)
Each layer is a materialized view in RisingWave. When a new price event arrives from Kafka, RisingWave incrementally updates every downstream view in the chain within milliseconds.
Step 1: Define the Data Model
Competitor Price Events
In production, competitor prices arrive as a stream of events from a web scraping service, a data vendor API, or a market intelligence platform. These events flow into a Kafka topic and RisingWave consumes them directly.
For production, create a Kafka source:
CREATE SOURCE price_events_source (
event_id VARCHAR,
product_id VARCHAR,
product_name VARCHAR,
category VARCHAR,
competitor VARCHAR,
competitor_price DOUBLE PRECISION,
currency VARCHAR,
captured_at TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'competitor-prices',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
For development and testing without Kafka, use a regular table -- all the materialized view logic works identically:
CREATE TABLE price_events (
event_id VARCHAR,
product_id VARCHAR NOT NULL,
product_name VARCHAR,
category VARCHAR,
competitor VARCHAR NOT NULL,
competitor_price DOUBLE PRECISION NOT NULL,
currency VARCHAR DEFAULT 'USD',
captured_at TIMESTAMPTZ NOT NULL
);
Our Product Catalog
Your own product prices and costs live in a separate table. This is the baseline against which competitor prices are measured:
CREATE TABLE price_our_products (
product_id VARCHAR PRIMARY KEY,
product_name VARCHAR NOT NULL,
category VARCHAR NOT NULL,
our_price DOUBLE PRECISION NOT NULL,
cost DOUBLE PRECISION NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
Insert the catalog:
INSERT INTO price_our_products VALUES
('prod_001', 'Sony WH-1000XM5 Headphones', 'Electronics', 349.99, 210.00, '2026-04-01 08:00:00+00'),
('prod_002', 'Nike Air Max 270', 'Footwear', 120.00, 62.00, '2026-04-01 08:00:00+00'),
('prod_003', 'Instant Pot Duo 7-in-1', 'Kitchen', 89.99, 42.00, '2026-04-01 08:00:00+00'),
('prod_004', 'Logitech MX Master 3S Mouse', 'Electronics', 99.99, 52.00, '2026-04-01 08:00:00+00'),
('prod_005', 'Levi''s 511 Slim Jeans', 'Apparel', 59.99, 22.00, '2026-04-01 08:00:00+00');
Now insert some competitor price events. In reality, these would arrive continuously from Kafka:
INSERT INTO price_events VALUES
('ev_001', 'prod_001', 'Sony WH-1000XM5 Headphones', 'Electronics', 'amazon', 329.99, 'USD', '2026-04-01 09:00:00+00'),
('ev_002', 'prod_001', 'Sony WH-1000XM5 Headphones', 'Electronics', 'bestbuy', 339.99, 'USD', '2026-04-01 09:05:00+00'),
('ev_003', 'prod_001', 'Sony WH-1000XM5 Headphones', 'Electronics', 'walmart', 334.99, 'USD', '2026-04-01 09:10:00+00'),
('ev_004', 'prod_001', 'Sony WH-1000XM5 Headphones', 'Electronics', 'amazon', 319.99, 'USD', '2026-04-01 09:30:00+00'),
('ev_005', 'prod_002', 'Nike Air Max 270', 'Footwear', 'amazon', 122.00, 'USD', '2026-04-01 09:00:00+00'),
('ev_006', 'prod_002', 'Nike Air Max 270', 'Footwear', 'zappos', 125.00, 'USD', '2026-04-01 09:05:00+00'),
('ev_007', 'prod_002', 'Nike Air Max 270', 'Footwear', 'footlocker', 118.00, 'USD', '2026-04-01 09:10:00+00'),
('ev_008', 'prod_003', 'Instant Pot Duo 7-in-1', 'Kitchen', 'amazon', 97.99, 'USD', '2026-04-01 09:00:00+00'),
('ev_009', 'prod_003', 'Instant Pot Duo 7-in-1', 'Kitchen', 'walmart', 95.00, 'USD', '2026-04-01 09:05:00+00'),
('ev_010', 'prod_003', 'Instant Pot Duo 7-in-1', 'Kitchen', 'target', 99.99, 'USD', '2026-04-01 09:10:00+00'),
('ev_011', 'prod_004', 'Logitech MX Master 3S Mouse', 'Electronics', 'amazon', 79.99, 'USD', '2026-04-01 09:00:00+00'),
('ev_012', 'prod_004', 'Logitech MX Master 3S Mouse', 'Electronics', 'bestbuy', 89.99, 'USD', '2026-04-01 09:05:00+00'),
('ev_013', 'prod_005', 'Levi''s 511 Slim Jeans', 'Apparel', 'amazon', 57.99, 'USD', '2026-04-01 09:00:00+00'),
('ev_014', 'prod_005', 'Levi''s 511 Slim Jeans', 'Apparel', 'macys', 62.00, 'USD', '2026-04-01 09:05:00+00');
Notice that Amazon repriced the Sony headphones twice (events ev_001 and ev_004). The next step handles exactly this -- keeping only the most recent price per competitor.
Step 2: Deduplicate to the Latest Price per Competitor
A competitor can update prices multiple times per hour. You only care about the current price, not the history. Use a window function inside a materialized view to keep the latest entry per product-competitor pair:
CREATE MATERIALIZED VIEW price_competitor_latest AS
SELECT
product_id,
product_name,
category,
competitor,
competitor_price,
captured_at
FROM (
SELECT
product_id,
product_name,
category,
competitor,
competitor_price,
captured_at,
ROW_NUMBER() OVER (
PARTITION BY product_id, competitor
ORDER BY captured_at DESC
) AS rn
FROM price_events
) ranked
WHERE rn = 1;
The ROW_NUMBER() OVER (PARTITION BY product_id, competitor ORDER BY captured_at DESC) pattern assigns rank 1 to the most recent price event for each product-competitor combination. Only rank 1 rows pass the outer WHERE rn = 1 filter.
Query the view to confirm Amazon's Sony headphone price reflects the most recent update ($319.99, not the earlier $329.99):
SELECT product_id, product_name, competitor, competitor_price, captured_at
FROM price_competitor_latest
ORDER BY product_id, competitor;
product_id | product_name | competitor | competitor_price | captured_at
------------+-----------------------------+------------+------------------+---------------------------
prod_001 | Sony WH-1000XM5 Headphones | amazon | 319.99 | 2026-04-01 09:30:00+00:00
prod_001 | Sony WH-1000XM5 Headphones | bestbuy | 339.99 | 2026-04-01 09:05:00+00:00
prod_001 | Sony WH-1000XM5 Headphones | walmart | 334.99 | 2026-04-01 09:10:00+00:00
prod_002 | Nike Air Max 270 | amazon | 122 | 2026-04-01 09:00:00+00:00
prod_002 | Nike Air Max 270 | footlocker | 118 | 2026-04-01 09:10:00+00:00
prod_002 | Nike Air Max 270 | zappos | 125 | 2026-04-01 09:05:00+00:00
prod_003 | Instant Pot Duo 7-in-1 | amazon | 97.99 | 2026-04-01 09:00:00+00:00
prod_003 | Instant Pot Duo 7-in-1 | target | 99.99 | 2026-04-01 09:10:00+00:00
prod_003 | Instant Pot Duo 7-in-1 | walmart | 95 | 2026-04-01 09:05:00+00:00
prod_004 | Logitech MX Master 3S Mouse | amazon | 79.99 | 2026-04-01 09:00:00+00:00
prod_004 | Logitech MX Master 3S Mouse | bestbuy | 89.99 | 2026-04-01 09:05:00+00:00
prod_005 | Levi's 511 Slim Jeans | amazon | 57.99 | 2026-04-01 09:00:00+00:00
prod_005 | Levi's 511 Slim Jeans | macys | 62 | 2026-04-01 09:05:00+00:00
(13 rows)
Amazon's Sony headphone price is correctly shown as $319.99 (the latest event), not $329.99. The earlier event has been superseded.
Because this is a materialized view, RisingWave maintains it incrementally. When a new price event arrives for Amazon's Sony headphone listing, only the affected partition (prod_001, amazon) is recomputed -- not the entire dataset.
Step 3: Compute Price Differentials
With the latest competitor prices available, join them against your product catalog to compute the price gap:
CREATE MATERIALIZED VIEW price_differentials AS
SELECT
cl.product_id,
cl.product_name,
cl.category,
cl.competitor,
cl.competitor_price,
p.our_price,
p.cost,
cl.competitor_price - p.our_price AS price_gap,
ROUND(((cl.competitor_price - p.our_price)
/ p.our_price * 100)::numeric, 2) AS gap_pct,
ROUND((p.our_price - p.cost)::numeric, 2) AS our_margin,
cl.captured_at
FROM price_competitor_latest cl
JOIN price_our_products p ON cl.product_id = p.product_id;
A negative gap_pct means a competitor is cheaper than us. A positive gap_pct means we are the cheapest. Query the view sorted by the worst undercut:
SELECT product_name, competitor, competitor_price, our_price, price_gap, gap_pct
FROM price_differentials
ORDER BY gap_pct;
product_name | competitor | competitor_price | our_price | price_gap | gap_pct
-----------------------------+------------+------------------+-----------+-------------------+---------
Logitech MX Master 3S Mouse | amazon | 79.99 | 99.99 | -20 | -20.00
Logitech MX Master 3S Mouse | bestbuy | 89.99 | 99.99 | -10 | -10.00
Sony WH-1000XM5 Headphones | amazon | 319.99 | 349.99 | -30 | -8.57
Sony WH-1000XM5 Headphones | walmart | 334.99 | 349.99 | -15 | -4.29
Levi's 511 Slim Jeans | amazon | 57.99 | 59.99 | -2 | -3.33
Sony WH-1000XM5 Headphones | bestbuy | 339.99 | 349.99 | -10 | -2.86
Nike Air Max 270 | footlocker | 118 | 120 | -2 | -1.67
Nike Air Max 270 | amazon | 122 | 120 | 2 | 1.67
Levi's 511 Slim Jeans | macys | 62 | 59.99 | 2.009999999999998 | 3.35
Nike Air Max 270 | zappos | 125 | 120 | 5 | 4.17
Instant Pot Duo 7-in-1 | walmart | 95 | 89.99 | 5.010000000000005 | 5.57
Instant Pot Duo 7-in-1 | amazon | 97.99 | 89.99 | 8 | 8.89
Instant Pot Duo 7-in-1 | target | 99.99 | 89.99 | 10 | 11.11
(13 rows)
The pattern immediately stands out. The Logitech mouse is in crisis: Amazon is selling it for $79.99, a full 20% below our $99.99. The Sony headphones have a similar problem across all three tracked competitors. Meanwhile, the Instant Pot is priced competitively -- all three competitors are more expensive than us.
Step 4: Build Per-Competitor Alert Views
The price_differentials view gives you the data. The price_gap_alerts view gives you the signal. Use a WHERE gap_pct < -5 filter to surface only the situations that need immediate attention, then assign severity tiers and compute a suggested response price:
CREATE MATERIALIZED VIEW price_gap_alerts AS
SELECT
product_id,
product_name,
category,
competitor,
competitor_price,
our_price,
our_margin,
ROUND(price_gap::numeric, 2) AS price_gap,
gap_pct,
CASE
WHEN gap_pct <= -15 THEN 'CRITICAL'
WHEN gap_pct <= -10 THEN 'HIGH'
WHEN gap_pct <= -5 THEN 'MEDIUM'
ELSE 'LOW'
END AS alert_severity,
ROUND((our_price + price_gap * 0.5)::numeric, 2) AS suggested_price,
captured_at
FROM price_differentials
WHERE gap_pct < -5;
The suggested_price column uses a simple midpoint formula: split the gap halfway, giving up half the price difference to stay competitive without fully matching the competitor. In practice, you would refine this formula to account for minimum margins (the cost field) and category-specific elasticity rules.
Query the alerts view:
SELECT product_name, competitor, competitor_price, our_price,
price_gap, gap_pct, alert_severity, suggested_price
FROM price_gap_alerts
ORDER BY gap_pct;
product_name | competitor | competitor_price | our_price | price_gap | gap_pct | alert_severity | suggested_price
-----------------------------+------------+------------------+-----------+-----------+---------+----------------+-----------------
Logitech MX Master 3S Mouse | amazon | 79.99 | 99.99 | -20 | -20.00 | CRITICAL | 89.99
Logitech MX Master 3S Mouse | bestbuy | 89.99 | 99.99 | -10 | -10.00 | HIGH | 94.99
Sony WH-1000XM5 Headphones | amazon | 319.99 | 349.99 | -30 | -8.57 | MEDIUM | 334.99
(3 rows)
Three rows. Three decisions to make. Amazon is undercutting the Logitech mouse by 20% -- that is a CRITICAL alert demanding an immediate pricing response. Amazon is also undercutting the Sony headphones by 8.57% -- a MEDIUM alert worth reviewing. The suggested prices give the merchandising team a starting point for their repricing decision.
Because this view is materialized and incrementally maintained, the moment Amazon drops the Logitech mouse price further (say, to $69.99), the row in price_gap_alerts updates automatically and the gap_pct recalculates to -30%. No query needs to be re-run. No pipeline needs to be triggered.
Step 5: Roll Up to Category-Level Insights
Individual product alerts tell you what to fix right now. Category-level summaries tell you where your pricing strategy has structural problems. Build a rollup materialized view on top of price_differentials:
CREATE MATERIALIZED VIEW price_summary_by_category AS
SELECT
category,
COUNT(DISTINCT product_id) AS products_tracked,
COUNT(*) AS competitor_price_points,
ROUND(AVG(gap_pct)::numeric, 2) AS avg_gap_pct,
ROUND(MIN(gap_pct)::numeric, 2) AS worst_undercut_pct,
ROUND(MAX(gap_pct)::numeric, 2) AS best_premium_pct,
COUNT(*) FILTER (WHERE gap_pct < -5) AS undercut_count,
COUNT(*) FILTER (WHERE gap_pct > 5) AS premium_count
FROM price_differentials
GROUP BY category;
SELECT category, products_tracked, competitor_price_points,
avg_gap_pct, worst_undercut_pct, best_premium_pct,
undercut_count, premium_count
FROM price_summary_by_category
ORDER BY worst_undercut_pct;
category | products_tracked | competitor_price_points | avg_gap_pct | worst_undercut_pct | best_premium_pct | undercut_count | premium_count
-------------+------------------+-------------------------+-------------+--------------------+------------------+----------------+---------------
Electronics | 2 | 5 | -9.14 | -20.00 | -2.86 | 3 | 0
Apparel | 1 | 2 | 0.01 | -3.33 | 3.35 | 0 | 0
Footwear | 1 | 3 | 1.39 | -1.67 | 4.17 | 0 | 0
Kitchen | 1 | 3 | 8.52 | 5.57 | 11.11 | 0 | 3
(4 rows)
Electronics has a systemic problem: 3 out of 5 competitor price points are undercutting us by more than 5%, with an average gap of -9.14%. This is not a one-off event -- it is a category pricing strategy failure. Kitchen is the opposite story: we are consistently cheaper than all three tracked competitors, with an average premium of +8.52%.
This view is the one you would surface on a category manager's dashboard. It updates continuously as new price events arrive, so a category manager checking the dashboard at any point sees the current competitive position.
Step 6: Track Price Volatility with Tumbling Windows
Beyond gap alerts, you want to know which competitors are aggressively repricing and which products are most volatile. Use a TUMBLE window to measure how much a competitor's price swings within each hour:
SELECT
product_id,
product_name,
competitor,
COUNT(*) AS price_updates,
ROUND(MIN(competitor_price)::numeric, 2) AS min_price,
ROUND(MAX(competitor_price)::numeric, 2) AS max_price,
ROUND((MAX(competitor_price) - MIN(competitor_price))::numeric, 2) AS price_swing,
window_start,
window_end
FROM TUMBLE(price_events, captured_at, INTERVAL '1' HOUR)
GROUP BY product_id, product_name, competitor, window_start, window_end
HAVING COUNT(*) > 1
ORDER BY price_swing DESC;
product_id | product_name | competitor | price_updates | min_price | max_price | price_swing | window_start | window_end
------------+----------------------------+------------+---------------+-----------+-----------+-------------+---------------------------+---------------------------
prod_001 | Sony WH-1000XM5 Headphones | amazon | 2 | 319.99 | 329.99 | 10 | 2026-04-01 09:00:00+00:00 | 2026-04-01 10:00:00+00:00
(1 row)
Amazon repriced the Sony headphones twice in a single hour: from $329.99 down to $319.99, a $10 swing. That pattern of intra-hour repricing is a signal that Amazon's automated repricing algorithm is actively targeting your category. Knowing this helps you decide whether to match prices in real time or hold your pricing strategy.
RisingWave's TUMBLE window function creates non-overlapping fixed-size time buckets. Each row in the output above represents one hour-long window. You can wrap this query in a CREATE MATERIALIZED VIEW to maintain it continuously.
Step 7: Competitive Position Summary
The final view combines everything into a per-product competitive position report that pricing teams and dashboards consume directly:
SELECT
d.product_name,
d.category,
d.our_price,
ROUND(MIN(d.competitor_price)::numeric, 2) AS cheapest_competitor,
ROUND(MAX(d.competitor_price)::numeric, 2) AS most_expensive_competitor,
ROUND(AVG(d.competitor_price)::numeric, 2) AS avg_competitor_price,
ROUND(MIN(d.gap_pct)::numeric, 2) AS worst_gap_pct,
COUNT(DISTINCT d.competitor) AS competitors_tracked,
COUNT(*) FILTER (WHERE d.gap_pct < 0) AS competitors_cheaper
FROM price_differentials d
GROUP BY d.product_id, d.product_name, d.category, d.our_price
ORDER BY worst_gap_pct;
product_name | category | our_price | cheapest_competitor | most_expensive_competitor | avg_competitor_price | worst_gap_pct | competitors_tracked | competitors_cheaper
-----------------------------+-------------+-----------+---------------------+---------------------------+----------------------+---------------+---------------------+---------------------
Logitech MX Master 3S Mouse | Electronics | 99.99 | 79.99 | 89.99 | 84.99 | -20.00 | 2 | 2
Sony WH-1000XM5 Headphones | Electronics | 349.99 | 319.99 | 339.99 | 331.66 | -8.57 | 3 | 3
Levi's 511 Slim Jeans | Apparel | 59.99 | 57.99 | 62 | 60.00 | -3.33 | 2 | 1
Nike Air Max 270 | Footwear | 120 | 118 | 125 | 121.67 | -1.67 | 3 | 1
Instant Pot Duo 7-in-1 | Kitchen | 89.99 | 95 | 99.99 | 97.66 | 5.57 | 3 | 0
(5 rows)
This table tells the complete story at a glance. The Logitech mouse is the most urgent problem: both tracked competitors are cheaper than us, with the cheapest at $79.99 against our $99.99. The Instant Pot is the opposite: we are the cheapest across all three tracked competitors with a healthy 5.57% premium advantage.
Step 8: Sink Alerts to Kafka for Downstream Notification
With price_gap_alerts producing a continuously updated stream of pricing exceptions, the final step is routing those alerts to your notification systems. RisingWave's Kafka sink connector pushes each new alert row to a Kafka topic:
CREATE SINK price_alerts_sink FROM price_gap_alerts
WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'price-alerts'
)
FORMAT PLAIN ENCODE JSON;
Each alert message on the price-alerts topic looks like this:
{
"product_id": "prod_004",
"product_name": "Logitech MX Master 3S Mouse",
"category": "Electronics",
"competitor": "amazon",
"competitor_price": 79.99,
"our_price": 99.99,
"our_margin": 47.99,
"price_gap": -20.00,
"gap_pct": -20.00,
"alert_severity": "CRITICAL",
"suggested_price": 89.99,
"captured_at": "2026-04-01T09:00:00Z"
}
A lightweight consumer subscribes to this topic and routes alerts based on alert_severity:
import json
import requests
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'price-alerts',
bootstrap_servers='kafka:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
REPRICING_API = "https://internal-repricing-api/suggest"
SEVERITY_EMOJI = {"CRITICAL": ":rotating_light:", "HIGH": ":warning:", "MEDIUM": ":large_yellow_circle:"}
for message in consumer:
alert = message.value
severity = alert["alert_severity"]
emoji = SEVERITY_EMOJI.get(severity, ":bell:")
# Send to Slack
slack_text = (
f"{emoji} *PRICE ALERT [{severity}]*\n"
f"Product: `{alert['product_name']}`\n"
f"Competitor: `{alert['competitor']}` is at `${alert['competitor_price']:.2f}` "
f"({alert['gap_pct']:.1f}% below our `${alert['our_price']:.2f}`)\n"
f"Suggested response price: `${alert['suggested_price']:.2f}`"
)
requests.post(SLACK_WEBHOOK, json={"text": slack_text})
# For CRITICAL alerts, also trigger the repricing API
if severity == "CRITICAL":
requests.post(REPRICING_API, json={
"product_id": alert["product_id"],
"suggested_price": alert["suggested_price"],
"reason": f"Competitor {alert['competitor']} at {alert['gap_pct']}%"
})
The consumer is intentionally thin. All the complex logic -- deduplication, differential computation, severity classification, suggested pricing -- lives in SQL inside RisingWave. The consumer only routes and formats; it holds no state and can be replaced or scaled independently.
How the Materialized View Chain Updates in Real Time
When Amazon drops the Logitech mouse price to $69.99, here is what happens inside RisingWave:
- The new event (
prod_004,amazon,$69.99) arrives from Kafka intoprice_events price_competitor_latestrecomputes the Amazon row forprod_004, replacing $79.99 with $69.99price_differentialsrecomputes the row for (prod_004,amazon): the newprice_gapis -30.00,gap_pctis -30.00price_gap_alertsre-evaluates:gap_pctof -30 still satisfiesWHERE gap_pct < -5, butalert_severitychanges fromCRITICAL(was -20) toCRITICAL(still -20+ range) -- already the most severe tierprice_summary_by_categoryupdates the Electronics row:avg_gap_pctandworst_undercut_pctshift to reflect the new data- The sink pushes the updated alert row to the
price-alertsKafka topic
This entire chain completes within milliseconds. RisingWave only recomputes the rows affected by the new event -- it does not re-scan the entire price_events table. This is the core promise of incremental materialized views: compute once, maintain continuously.
Extending the System
The architecture above is a foundation. Several extensions are straightforward to add in SQL:
Minimum margin enforcement. Modify price_gap_alerts to include a WHERE (our_price + price_gap * 0.5) > cost * 1.1 condition, ensuring the suggested price never drops below a 10% margin.
Competitor-specific rules. Add a configuration table mapping competitors to response strategies (match, beat_by_1pct, ignore) and join it into price_differentials.
Price history for trend analysis. Because price_events retains all events (not just the latest), you can run time-series queries directly on it to compute 7-day price trends per competitor.
Multi-currency support. Add a price_exchange_rates table and join it into price_differentials to normalize all prices to a single currency before computing gaps.
Each of these is a SQL view, not a new service. The pipeline stays simple.
What Is a Real-Time Price Comparison Engine Built with Streaming SQL?
A real-time price comparison engine built with streaming SQL is a system that ingests competitor price feeds continuously (typically via Kafka), maintains materialized views that always reflect the current state of competitor prices and differentials against your own prices, and surfaces alerts the moment a price gap exceeds a defined threshold. Unlike batch-based pricing pipelines that refresh on a schedule, a streaming approach processes each price event as it arrives. RisingWave implements this using incrementally maintained materialized views: the differential and alert computations update within milliseconds of a new price event, with no manual triggering required.
How Do Materialized Views Handle Competitor Repricing Events That Arrive Out of Order?
Out-of-order events are handled by the ROW_NUMBER() OVER (ORDER BY captured_at DESC) pattern in price_competitor_latest. The view always selects the event with the latest captured_at timestamp for each product-competitor pair. If an older event arrives late (with an earlier captured_at), it will rank below the existing latest event and be filtered out by WHERE rn = 1. This means the view reflects the most recent known price, not the most recently received message. For production systems with significant out-of-order arrival, you can add a processing delay using RisingWave's watermark mechanism to wait for late events before finalizing window results.
What Alert Threshold Should I Use for Price Gap Detection?
The right threshold depends on your category economics. A 5% threshold is a common starting point for fast-moving consumer electronics where shoppers routinely compare prices across multiple retailers. For categories with strong brand loyalty (premium cosmetics, specialty outdoor gear), thresholds of 10-15% may be more appropriate because customers are less price-elastic. The severity tiers in price_gap_alerts (MEDIUM at -5%, HIGH at -10%, CRITICAL at -15%) let different teams respond differently: a MEDIUM alert might trigger an analyst review, while a CRITICAL alert triggers an automated repricing API call. Start with conservative thresholds and tighten them as you measure conversion impact.
Can This System Handle Thousands of Products and Dozens of Competitors?
Yes. RisingWave is designed for high-throughput streaming workloads. The materialized view chain processes only the incremental changes triggered by each new price event, not the full product-competitor matrix. For a catalog of 50,000 products with 20 competitors each (1 million price combinations), a price update from Amazon for one product triggers incremental recomputation of only the affected rows in each view -- not a full scan of 1 million rows. RisingWave can be deployed on Kubernetes with horizontal scaling to handle millions of price events per day while keeping end-to-end latency in the milliseconds range.
Conclusion
Building a real-time price comparison engine with streaming SQL reduces the gap between "a competitor changed their price" and "your team knows about it and has a suggested response" from hours to seconds. Here is the complete system in five SQL objects:
price_eventstable (or Kafka source) receives the raw competitor price streamprice_competitor_latestMV deduplicates to the most recent price per product-competitor pair using a window functionprice_differentialsMV joins competitor prices against your catalog to compute gaps and marginsprice_gap_alertsMV filters for significant undercuts, assigns severity tiers, and suggests response pricesprice_summary_by_categoryMV rolls up to category-level competitive intelligence for strategic review
A Kafka sink routes alerts to Slack, PagerDuty, or a repricing API. The consumer stays thin -- the intelligence lives in SQL.
Ready to build your own price comparison engine? Try RisingWave Cloud free -- no credit card required. Sign up here.
Join our Slack community to ask questions and connect with other stream processing developers.

