Warehouse Inventory Tracking with CDC and Streaming SQL

Warehouse Inventory Tracking with CDC and Streaming SQL

Warehouse inventory levels change continuously — items are received, picked, packed, shipped, and returned every minute. Maintaining an accurate real-time view of stock across locations, SKUs, and bins requires capturing every database change as it happens. RisingWave, a PostgreSQL-compatible streaming database, makes this straightforward through Change Data Capture (CDC) combined with incremental materialized views.

Why Real-Time Inventory Visibility Matters

Inventory inaccuracy is a root cause of two of the most expensive warehouse problems: stockouts and overstock. When a system shows 50 units in a location but the physical count is 12, pickers waste time searching for stock that isn't there. When it shows 200 units when there are 600, buyers over-order, tying up capital in excess inventory.

Traditional warehouse management systems (WMS) update a central database transactionally. Individual transactions are accurate, but aggregate views — total on-hand across locations, available-to-promise quantities, pick face reorder triggers — are typically computed by scheduled jobs that run every 15 to 60 minutes. During those intervals, the aggregate view is stale.

Real-time inventory tracking means every transaction that changes inventory — a GRN (goods received note), a pick event, a cycle count adjustment, a putaway — immediately reflects in all downstream views. No waiting for the next batch job.

How CDC-Powered Inventory Streaming Works

RisingWave connects directly to a PostgreSQL or MySQL WMS database using CDC connectors (postgres-cdc or mysql-cdc). Instead of polling the database or relying on application-level hooks, CDC reads the database's replication log (WAL for PostgreSQL, binlog for MySQL) and captures every committed change — inserts, updates, and deletes — as a stream of events.

RisingWave ingests these change events and maintains materialized views that reflect the current state of inventory. The result: aggregate views that are accurate to within milliseconds of the most recent transaction, with no changes required to the WMS application.

Step-by-Step Tutorial

Step 1: Set Up the Data Source

Connect RisingWave to the WMS PostgreSQL database using the postgres-cdc connector. The inventory_transactions table captures every movement:

-- CDC source from WMS PostgreSQL database
CREATE SOURCE wms_inventory_transactions
WITH (
    connector         = 'postgres-cdc',
    hostname          = 'wms-db.internal',
    port              = '5432',
    username          = 'risingwave_reader',
    password          = 'secret',
    database.name     = 'warehouse',
    schema.name       = 'public',
    table.name        = 'inventory_transactions',
    slot.name         = 'rw_inventory_slot'
)
ROW FORMAT DEBEZIUM_JSON;

-- The captured table structure mirrors the source:
-- transaction_id, sku_id, location_id, warehouse_id,
-- txn_type (RECEIVE/PICK/PUTAWAY/ADJUST/RETURN/TRANSFER),
-- quantity_delta (positive = in, negative = out),
-- transaction_time, operator_id, reference_doc

Also capture the product and location reference tables:

CREATE SOURCE wms_skus
WITH (
    connector         = 'postgres-cdc',
    hostname          = 'wms-db.internal',
    port              = '5432',
    username          = 'risingwave_reader',
    password          = 'secret',
    database.name     = 'warehouse',
    schema.name       = 'public',
    table.name        = 'skus',
    slot.name         = 'rw_skus_slot'
)
ROW FORMAT DEBEZIUM_JSON;

CREATE SOURCE wms_locations
WITH (
    connector         = 'postgres-cdc',
    hostname          = 'wms-db.internal',
    port              = '5432',
    username          = 'risingwave_reader',
    password          = 'secret',
    database.name     = 'warehouse',
    schema.name       = 'public',
    table.name        = 'storage_locations',
    slot.name         = 'rw_locations_slot'
)
ROW FORMAT DEBEZIUM_JSON;

Step 2: Build the Core Materialized View

Create the live inventory position view — on-hand quantity per SKU per location, continuously updated as CDC events arrive:

CREATE MATERIALIZED VIEW live_inventory_positions AS
SELECT
    t.warehouse_id,
    t.location_id,
    l.zone,
    l.aisle,
    l.bay,
    l.level,
    t.sku_id,
    s.sku_code,
    s.description,
    s.unit_of_measure,
    s.reorder_point,
    s.max_capacity,
    SUM(t.quantity_delta)                    AS qty_on_hand,
    MAX(t.transaction_time)                  AS last_movement,
    NOW() - MAX(t.transaction_time)          AS time_since_last_movement
FROM inventory_transactions t
JOIN storage_locations l ON t.location_id = l.location_id
JOIN skus s              ON t.sku_id      = s.sku_id
GROUP BY
    t.warehouse_id, t.location_id, l.zone, l.aisle, l.bay, l.level,
    t.sku_id, s.sku_code, s.description, s.unit_of_measure,
    s.reorder_point, s.max_capacity;

Build the warehouse-level SKU summary (aggregated across all locations):

CREATE MATERIALIZED VIEW inventory_sku_summary AS
SELECT
    warehouse_id,
    sku_id,
    sku_code,
    description,
    unit_of_measure,
    reorder_point,
    SUM(qty_on_hand)     AS total_qty_on_hand,
    COUNT(*)             AS location_count,
    SUM(CASE WHEN qty_on_hand > 0 THEN 1 ELSE 0 END) AS stocked_location_count,
    MAX(last_movement)   AS last_movement_any_location
FROM live_inventory_positions
GROUP BY warehouse_id, sku_id, sku_code, description,
         unit_of_measure, reorder_point;

Step 3: Add Alerts and Aggregations

Create a reorder alert view that surfaces SKUs below their reorder point in real time:

CREATE MATERIALIZED VIEW reorder_alerts AS
SELECT
    warehouse_id,
    sku_id,
    sku_code,
    description,
    total_qty_on_hand,
    reorder_point,
    reorder_point - total_qty_on_hand AS deficit,
    last_movement_any_location
FROM inventory_sku_summary
WHERE total_qty_on_hand < reorder_point
  AND total_qty_on_hand >= 0;

-- Hourly inventory movement throughput per zone
CREATE MATERIALIZED VIEW zone_throughput_per_hour AS
SELECT
    window_start,
    window_end,
    l.zone,
    t.warehouse_id,
    COUNT(*)                                          AS transaction_count,
    SUM(ABS(t.quantity_delta))                        AS units_moved,
    COUNT(DISTINCT t.sku_id)                          AS distinct_skus_moved,
    SUM(CASE WHEN t.txn_type = 'PICK'    THEN 1 ELSE 0 END) AS picks,
    SUM(CASE WHEN t.txn_type = 'RECEIVE' THEN 1 ELSE 0 END) AS receipts,
    SUM(CASE WHEN t.txn_type = 'RETURN'  THEN 1 ELSE 0 END) AS returns
FROM TUMBLE(inventory_transactions, transaction_time, INTERVAL '1 hour') t
JOIN storage_locations l ON t.location_id = l.location_id
GROUP BY window_start, window_end, l.zone, t.warehouse_id;

-- Negative inventory detection (data quality alert)
CREATE MATERIALIZED VIEW negative_inventory_alerts AS
SELECT
    warehouse_id,
    location_id,
    sku_id,
    sku_code,
    qty_on_hand,
    last_movement
FROM live_inventory_positions
WHERE qty_on_hand < 0;

How This Compares to Traditional Approaches

AspectScheduled Batch AggregationApplication-Level EventsRisingWave CDC
Inventory freshness15–60 minutesNear-real-time but fragileMilliseconds
WMS code changes requiredNoYes (event hooks)No
Handles schema changesManual reworkManual reworkAutomatic via CDC
Handles deletes/correctionsOften missedDepends on implementationCaptured from WAL
Historical replayNot possibleNot possibleYes (from WAL offset)
Aggregate view consistencyEventual (stale)Complex to guaranteeStrong (incremental)

The CDC approach captures every database change — including corrections, reversals, and administrative adjustments — that application-level event hooks often miss.

FAQ

Does CDC have any performance impact on the WMS database?

The postgres-cdc connector reads the WAL (Write-Ahead Log), which PostgreSQL already writes for its own replication. The additional load is minimal — comparable to adding a single read replica. You should create a dedicated replication slot for RisingWave and monitor slot lag to ensure it does not fall too far behind.

How do I handle initial load when the warehouse already has inventory?

RisingWave's CDC connector performs a snapshot of the source table when first configured, then transitions to streaming from the WAL. The initial snapshot is handled automatically, so live_inventory_positions is populated from existing data before streaming begins.

Can I track inventory across multiple warehouses and databases?

Yes. Define multiple CDC sources — one per warehouse database — and include the warehouse_id in your materialized views to partition results by facility. Views like inventory_sku_summary can aggregate across all sources once all CDC streams are ingested.

Key Takeaways

  • RisingWave's postgres-cdc and mysql-cdc connectors capture every WMS database change — including updates and deletes — without requiring any application code changes, enabling real-time inventory views with millisecond freshness.
  • Materialized views like live_inventory_positions and reorder_alerts update incrementally as CDC events arrive, replacing the batch aggregation jobs that introduce staleness in traditional WMS architectures.
  • The CDC approach captures corrections and reversals that application-level event hooks frequently miss, making it a more complete and reliable foundation for inventory accuracy.

Ready to try this yourself? Get started with RisingWave in minutes. Join our Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.