Manufacturers can cut excess inventory and prevent stockouts by treating inventory data as a continuous stream rather than a batch report. RisingWave, a PostgreSQL-compatible streaming database, lets you write plain SQL over live machine, ERP, and WMS events to maintain real-time inventory positions, flag reorder points, and compute inventory turns without custom application code.
Why Inventory Optimization Matters
In discrete and process manufacturing, inventory is the largest balance-sheet item most companies control directly. Too much stock ties up working capital, inflates carrying costs, and hides quality defects—problems that compound across a multi-level bill of materials (BOM). Too little stock causes line stoppages that cost far more than the parts themselves.
The traditional answer—nightly batch reconciliation fed into an ERP—fails in three ways. First, the data is stale; a 12-hour-old inventory position is useless for a line running 2,000 units per shift. Second, aggregates are coarse; averages smooth over the intra-shift peaks that actually cause stockouts. Third, alerts are reactive; by the time a report surfaces a problem, the line has already stopped.
Modern factories generate rich real-time signals: WMS scan events, MES production completions, purchase-order acknowledgements, and quality-hold flags. The missing piece is a layer that joins these streams continuously, maintains running aggregates, and surfaces actionable numbers the moment they change.
The Streaming SQL Approach
RisingWave ingests events from Kafka topics and persists continuously updated query results as materialized views. Every time a new event arrives, only the affected rows in the view are recomputed—no full table scans, no batch windows. Application dashboards and alerting systems query materialized views exactly like normal PostgreSQL tables, with sub-second latency.
For inventory optimization the key objects are:
- Raw-material positions maintained from WMS receipt and issue events
- WIP positions maintained from MES start/complete/scrap events
- Finished-goods positions maintained from shipping events
- Reorder-point alerts triggered when any SKU crosses a configured threshold
- Inventory-turns views that aggregate issues against average on-hand across configurable windows
Building It Step by Step
Step 1: Data Source
Connect RisingWave to the Kafka topics that carry WMS and MES events.
CREATE SOURCE wms_events (
event_id VARCHAR,
event_type VARCHAR, -- 'receipt', 'issue', 'adjust', 'hold'
sku_id VARCHAR,
location_id VARCHAR,
qty_delta NUMERIC,
unit_cost NUMERIC,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'wms.inventory.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE mes_events (
order_id VARCHAR,
sku_id VARCHAR,
bom_level INT,
event_type VARCHAR, -- 'start', 'complete', 'scrap'
qty NUMERIC,
scrap_qty NUMERIC,
work_center_id VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'mes.production.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
Build a running inventory position per SKU and location. The view accumulates all WMS deltas since the beginning of the stream; materialized-view state is persisted by RisingWave so it survives restarts.
CREATE MATERIALIZED VIEW inventory_position AS
SELECT
sku_id,
location_id,
SUM(qty_delta) AS on_hand_qty,
SUM(qty_delta * unit_cost) AS on_hand_value,
COUNT(*) FILTER (WHERE event_type = 'receipt') AS receipt_count,
COUNT(*) FILTER (WHERE event_type = 'issue') AS issue_count,
COUNT(*) FILTER (WHERE event_type = 'hold') AS hold_count,
MAX(event_ts) AS last_movement_ts
FROM wms_events
GROUP BY sku_id, location_id;
-- WIP by work center aggregated from MES events
CREATE MATERIALIZED VIEW wip_position AS
SELECT
work_center_id,
sku_id,
SUM(qty) FILTER (WHERE event_type = 'start') AS units_started,
SUM(qty) FILTER (WHERE event_type = 'complete') AS units_completed,
SUM(scrap_qty) AS total_scrap,
SUM(qty) FILTER (WHERE event_type = 'start')
- SUM(qty) FILTER (WHERE event_type = 'complete')
- SUM(scrap_qty) AS wip_qty,
MAX(event_ts) AS last_event_ts
FROM mes_events
GROUP BY work_center_id, sku_id;
-- Rolling 30-day inventory turns using a tumbling window
CREATE MATERIALIZED VIEW inventory_turns_30d AS
SELECT
sku_id,
window_start,
window_end,
SUM(ABS(qty_delta)) FILTER (WHERE event_type = 'issue') AS total_issues,
AVG(SUM(qty_delta)) OVER (
PARTITION BY sku_id
ORDER BY window_start
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS avg_on_hand_30d
FROM TUMBLE(wms_events, event_ts, INTERVAL '1 day')
GROUP BY sku_id, window_start, window_end;
Step 3: Alerts and Aggregations
Define per-SKU reorder points in a reference table, then create an alert view that joins live positions against thresholds.
-- Static reference table for reorder parameters
CREATE TABLE sku_reorder_params (
sku_id VARCHAR PRIMARY KEY,
reorder_point NUMERIC,
order_qty NUMERIC,
lead_time_days INT,
safety_stock NUMERIC
);
-- Alert view: SKUs below reorder point right now
CREATE MATERIALIZED VIEW reorder_alerts AS
SELECT
p.sku_id,
p.location_id,
p.on_hand_qty,
r.reorder_point,
r.order_qty,
r.lead_time_days,
(p.on_hand_qty - r.reorder_point) AS qty_below_rop,
NOW() AS alert_ts
FROM inventory_position p
JOIN sku_reorder_params r ON p.sku_id = r.sku_id
WHERE p.on_hand_qty < r.reorder_point;
-- Push reorder alerts to Kafka for ERP consumption
CREATE SINK reorder_alerts_sink
AS SELECT * FROM reorder_alerts
WITH (
connector = 'kafka',
topic = 'manufacturing.alerts.reorder',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- High-scrap alert: work centers with scrap rate > 3% in last hour
CREATE MATERIALIZED VIEW scrap_rate_alerts AS
SELECT
work_center_id,
sku_id,
SUM(scrap_qty) AS scrap_qty_1h,
SUM(qty) FILTER (WHERE event_type = 'start') AS started_qty_1h,
ROUND(
SUM(scrap_qty) * 100.0 /
NULLIF(SUM(qty) FILTER (WHERE event_type = 'start'), 0),
2
) AS scrap_rate_pct,
window_start,
window_end
FROM TUMBLE(mes_events, event_ts, INTERVAL '1 hour')
GROUP BY work_center_id, sku_id, window_start, window_end
HAVING SUM(scrap_qty) * 100.0 /
NULLIF(SUM(qty) FILTER (WHERE event_type = 'start'), 0) > 3.0;
Comparison Table
| Approach | Data Freshness | Alert Latency | Infrastructure | SQL Skill Required |
| Nightly ERP batch | 12–24 hours | Next business day | ERP only | Moderate |
| Hourly Spark job | 1 hour | 1 hour | Spark cluster + scheduler | High |
| Change-data-capture + custom app | Minutes | Minutes | CDC pipeline + app servers | High |
| RisingWave streaming SQL | Sub-second | Sub-second | Single cluster | Standard SQL |
FAQ
Can RisingWave join the live inventory stream with static BOM data?
Yes. You can load BOM tables into RisingWave as regular tables and join them with streaming sources in a materialized view. The join is maintained incrementally—when a new WMS event arrives, RisingWave looks up the relevant BOM rows and updates only the affected view rows.
How does RisingWave handle late-arriving events from offline scanners?
RisingWave supports watermark-based late-event handling. You configure an allowed lateness on your source, and events arriving within that window are incorporated into the correct time bucket without recomputing the entire view.
What happens to materialized view state during a RisingWave restart?
RisingWave persists all materialized-view state to object storage (S3 or compatible). On restart, the service recovers state from the last checkpoint and replays only the events since that checkpoint from Kafka, so inventory positions remain accurate without replaying the entire event history.
Key Takeaways
- Replace nightly batch inventory reconciliation with continuously updated materialized views in RisingWave.
- Model inventory position as an incremental sum over WMS events; model WIP as the difference between MES start and complete events.
- Use
TUMBLEwindows for rolling inventory-turn calculations and join live positions against static reorder-point tables for sub-second alerts. - Push reorder and scrap-rate alerts downstream via Kafka sinks, feeding ERP purchasing workflows automatically.
- The entire pipeline requires only PostgreSQL-compatible SQL—no custom streaming application code.

