Real-time fulfillment monitoring means tracking every order through the pick → pack → ship → deliver pipeline as it happens, computing time-at-stage metrics, and alerting on SLA breaches seconds after they occur — not after the customer calls to complain.
Why This Matters for E-Commerce
Fulfillment is where the customer's patience runs thinnest. A promised 2-day delivery missed by a day generates a support ticket, a negative review, and sometimes a chargeback. The cost of a single fulfillment failure can far exceed the margin on the original order.
Most e-commerce operations run their fulfillment monitoring on batch analytics: warehouse management system (WMS) exports are processed nightly or hourly, dashboards show where orders were yesterday, and SLA breach alerts fire after the breach has already cost you. By the time your operations team sees that 200 orders are stuck at the packing stage, they're already late.
Streaming fulfillment monitoring changes the response time from hours to seconds:
- A pick-stage order that hasn't moved to pack within 30 minutes triggers an alert
- A shipment that hasn't received a carrier scan within 2 hours of label creation flags for investigation
- Last-mile delivery delays surface on your operations dashboard before the customer checks their tracking link
Real-time fulfillment data also enables better carrier performance analysis: which carriers miss their SLAs, which routes have chronic delays, and how promised delivery windows compare to actuals.
How Streaming SQL Solves This
Each stage of the fulfillment pipeline emits events: a WMS publishes pick events, packing station software publishes pack events, the shipping integration publishes label and carrier scan events, and last-mile carrier APIs publish delivery events. Each event arrives in Kafka.
RisingWave, a PostgreSQL-compatible streaming database, ingests these events and maintains materialized views of order status, time-at-stage metrics, and SLA compliance. When a new event arrives, only the affected order's metrics update — the entire pipeline history doesn't need reprocessing.
This architecture mirrors how SHOPLINE built their real-time analytics layer: all commerce events flow through Kafka into RisingWave, which maintains the serving layer for merchant dashboards with a 76.7% improvement in API response times compared to their previous batch approach.
Step-by-Step Tutorial
Step 1: Data Source
Create a source for fulfillment events from Kafka. Each record represents one stage transition in the fulfillment pipeline.
CREATE SOURCE fulfillment_events (
event_id VARCHAR,
order_id VARCHAR,
customer_id VARCHAR,
stage VARCHAR, -- 'picked', 'packed', 'labeled', 'carrier_scanned', 'in_transit', 'out_for_delivery', 'delivered', 'failed'
warehouse_id VARCHAR,
carrier VARCHAR,
tracking_number VARCHAR,
promised_delivery_ts TIMESTAMPTZ,
event_ts TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'fulfillment_events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View — Order Fulfillment Status
Compute the current stage of each order, time elapsed since each key stage, and whether the promised delivery window is still achievable.
CREATE MATERIALIZED VIEW order_fulfillment_status AS
SELECT
order_id,
customer_id,
warehouse_id,
carrier,
tracking_number,
MAX(event_ts) FILTER (WHERE stage = 'picked') AS picked_at,
MAX(event_ts) FILTER (WHERE stage = 'packed') AS packed_at,
MAX(event_ts) FILTER (WHERE stage = 'labeled') AS labeled_at,
MAX(event_ts) FILTER (WHERE stage = 'carrier_scanned') AS carrier_scanned_at,
MAX(event_ts) FILTER (WHERE stage = 'delivered') AS delivered_at,
MAX(promised_delivery_ts) AS promised_delivery_ts,
-- Current stage: last event stage for this order
(ARRAY_AGG(stage ORDER BY event_ts DESC))[1] AS current_stage,
-- Time from pick to pack
EXTRACT(EPOCH FROM (
MAX(event_ts) FILTER (WHERE stage = 'packed')
- MAX(event_ts) FILTER (WHERE stage = 'picked')
)) / 60 AS pick_to_pack_minutes,
-- Time from pack to carrier scan
EXTRACT(EPOCH FROM (
MAX(event_ts) FILTER (WHERE stage = 'carrier_scanned')
- MAX(event_ts) FILTER (WHERE stage = 'labeled')
)) / 60 AS label_to_scan_minutes
FROM fulfillment_events
GROUP BY order_id, customer_id, warehouse_id, carrier, tracking_number;
Step 3: SLA Breach Detection and Carrier Performance
Identify orders breaching their promised delivery SLA and compute carrier-level SLA compliance metrics.
-- Orders at risk of SLA breach or already breached
CREATE MATERIALIZED VIEW sla_breach_alerts AS
SELECT
order_id,
customer_id,
current_stage,
carrier,
promised_delivery_ts,
NOW() AS check_ts,
EXTRACT(EPOCH FROM (promised_delivery_ts - NOW())) / 3600 AS hours_until_promise,
CASE
WHEN delivered_at IS NOT NULL
AND delivered_at > promised_delivery_ts THEN 'breached'
WHEN delivered_at IS NULL
AND NOW() > promised_delivery_ts THEN 'breached'
WHEN delivered_at IS NULL
AND NOW() > promised_delivery_ts - INTERVAL '4 hours' THEN 'at_risk'
ELSE 'on_track'
END AS sla_status
FROM order_fulfillment_status
WHERE current_stage != 'delivered';
-- Carrier SLA compliance summary
CREATE MATERIALIZED VIEW carrier_performance AS
SELECT
carrier,
COUNT(order_id) AS total_orders,
COUNT(order_id) FILTER (
WHERE delivered_at IS NOT NULL
AND delivered_at <= promised_delivery_ts
) AS on_time_deliveries,
COUNT(order_id) FILTER (
WHERE delivered_at IS NOT NULL
AND delivered_at > promised_delivery_ts
) AS late_deliveries,
COUNT(order_id) FILTER (
WHERE delivered_at IS NOT NULL
AND delivered_at <= promised_delivery_ts
)::NUMERIC / NULLIF(COUNT(order_id) FILTER (WHERE delivered_at IS NOT NULL), 0) * 100
AS on_time_rate_pct,
AVG(pick_to_pack_minutes) AS avg_pick_to_pack_min,
AVG(label_to_scan_minutes) AS avg_label_to_scan_min
FROM order_fulfillment_status
WHERE delivered_at IS NOT NULL
GROUP BY carrier;
Step 4: Serving Layer — Sink to Operations Dashboard
Push real-time fulfillment status and SLA alerts to your operations database.
CREATE SINK order_status_sink
FROM order_fulfillment_status
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://opsdb:5432/fulfillment?user=rw&password=secret',
table.name = 'order_fulfillment_live',
type = 'upsert',
primary_key = 'order_id'
);
CREATE SINK sla_alerts_sink
FROM sla_breach_alerts
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://opsdb:5432/fulfillment?user=rw&password=secret',
table.name = 'sla_alerts_live',
type = 'upsert',
primary_key = 'order_id'
);
Comparison Table
| Capability | WMS Batch Export | Streaming SQL |
| Order stage visibility | Hourly or daily | Seconds after each event |
| SLA breach detection | After the fact | 4+ hours before breach |
| Carrier performance reporting | Weekly | Continuously updated |
| Stuck order detection | Next-day review | Minutes after stage stall |
| Customer-facing tracking freshness | Delayed | Near real-time |
FAQ
Q: How do I handle out-of-order events (e.g., a carrier scan arriving before the label event)?
RisingWave supports watermarks for event-time processing. For fulfillment, a small out-of-order tolerance (a few minutes) is usually sufficient. The ARRAY_AGG(stage ORDER BY event_ts DESC) pattern in the core view handles ordering by event timestamp rather than ingestion order.
Q: Can I track fulfillment performance per warehouse?
Add warehouse_id to the GROUP BY clause in the carrier performance view. You can create a separate warehouse_performance materialized view grouping by warehouse_id to compare pick-to-pack times and SLA compliance across facilities.
Q: How does this integrate with customer-facing tracking pages?
Sink the order_fulfillment_status view to the same PostgreSQL database your customer-facing API reads from. The customer's tracking page then reflects the most recent fulfillment stage within seconds of each warehouse or carrier event.
Key Takeaways
- Streaming fulfillment monitoring tracks every pick → pack → ship → deliver stage in real time, not in batch exports
- SLA breach detection hours in advance gives operations teams time to intervene before customers are impacted
- Carrier performance metrics (on-time rate, average stage durations) update continuously, enabling data-driven carrier decisions
- RisingWave's incremental materialized view model handles high fulfillment event volumes efficiently — only affected orders update on each new event
- The same streaming foundation that powers SHOPLINE's merchant dashboards applies directly to fulfillment operations monitoring

