Cold chain violations — temperature excursions during storage or transport — can render pharmaceutical shipments unusable, spoil perishable food, and expose companies to regulatory liability. With RisingWave, a PostgreSQL-compatible streaming database, you can ingest IoT sensor data and detect excursions within seconds, not hours.
Why Cold Chain Monitoring Must Be Real-Time
The cold chain carries cargo that is sensitive to environmental conditions: vaccines, blood products, fresh produce, frozen goods, and specialty chemicals. Every link in that chain — a refrigerated truck, a cold storage room, a pharmaceutical freezer — must maintain strict temperature ranges. A two-hour excursion that goes undetected can destroy an entire pallet.
Traditional cold chain monitoring collects data in batches: sensors log readings locally, the logger is downloaded at delivery, and any excursion is discovered after the fact. Even connected IoT systems often rely on periodic polling at 5 or 15-minute intervals — meaning a sensor that briefly spiked above the acceptable range may be caught only at the next reading, and the cumulative exposure (Mean Kinetic Temperature, or MKT) is computed overnight.
Real-time monitoring means every sensor reading is evaluated immediately upon arrival. Alerts fire within seconds of an excursion beginning. Cumulative exposure metrics are maintained continuously. And the full sensor stream is available for regulatory compliance audits — all computed with streaming SQL, without a separate monitoring application.
How IoT Cold Chain Streaming Works
IoT temperature sensors transmit readings (temperature, humidity, door-open events) over MQTT or directly to Kafka. RisingWave consumes these readings as a streaming source and maintains materialized views that:
- Track the current condition of every monitored location
- Detect excursions in real time (temperature outside the acceptable range)
- Compute cumulative exposure metrics over rolling time windows
- Generate audit-ready logs of every reading
All of this runs continuously, updated with sub-second latency as each sensor transmission arrives.
Step-by-Step Tutorial
Step 1: Set Up the Data Source
Define the Kafka source for IoT sensor readings and the reference table for monitoring zones:
-- IoT sensor readings from cold chain assets
CREATE SOURCE cold_chain_readings (
sensor_id VARCHAR,
asset_id VARCHAR, -- truck, container, room ID
asset_type VARCHAR, -- 'REEFER_TRUCK','COLD_ROOM','FREEZER','PALLET'
event_time TIMESTAMPTZ,
temperature_c DOUBLE PRECISION,
humidity_pct DOUBLE PRECISION,
door_open BOOLEAN,
battery_pct DOUBLE PRECISION,
signal_strength INTEGER
)
WITH (
connector = 'kafka',
topic = 'cold-chain-sensors',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Zone configuration: acceptable ranges per asset type or specific asset
CREATE TABLE monitoring_zones (
asset_id VARCHAR PRIMARY KEY,
asset_name VARCHAR,
asset_type VARCHAR,
min_temp_c DOUBLE PRECISION,
max_temp_c DOUBLE PRECISION,
min_humidity_pct DOUBLE PRECISION,
max_humidity_pct DOUBLE PRECISION,
alert_delay_sec INTEGER, -- excursion must persist this long before alert
cargo_type VARCHAR, -- e.g., 'FROZEN_FOOD','PHARMA_2_8','PHARMA_15_25'
shipment_id VARCHAR
);
Step 2: Build the Core Materialized View
Create a live asset status view that shows the current temperature and humidity for every monitored asset, and whether it is within specification:
CREATE MATERIALIZED VIEW live_cold_chain_status AS
SELECT
r.asset_id,
z.asset_name,
z.asset_type,
z.cargo_type,
z.shipment_id,
r.temperature_c,
r.humidity_pct,
r.door_open,
r.battery_pct,
r.event_time AS last_reading,
NOW() - r.event_time AS reading_age,
-- In-spec check
r.temperature_c >= z.min_temp_c
AND r.temperature_c <= z.max_temp_c AS temp_in_spec,
r.humidity_pct >= z.min_humidity_pct
AND r.humidity_pct <= z.max_humidity_pct AS humidity_in_spec,
-- Deviation magnitude
GREATEST(
z.min_temp_c - r.temperature_c,
r.temperature_c - z.max_temp_c,
0
) AS temp_deviation_c,
z.min_temp_c,
z.max_temp_c
FROM (
SELECT DISTINCT ON (asset_id)
asset_id, temperature_c, humidity_pct,
door_open, battery_pct, event_time
FROM cold_chain_readings
ORDER BY asset_id, event_time DESC
) r
JOIN monitoring_zones z ON r.asset_id = z.asset_id;
Step 3: Add Alerts and Aggregations
Create the excursion alert view — assets currently out of specification — and rolling exposure metrics:
-- Active excursion alerts
CREATE MATERIALIZED VIEW active_excursion_alerts AS
SELECT
asset_id,
asset_name,
cargo_type,
shipment_id,
temperature_c,
min_temp_c,
max_temp_c,
temp_deviation_c,
door_open,
last_reading,
reading_age
FROM live_cold_chain_status
WHERE NOT temp_in_spec
OR NOT humidity_in_spec
OR reading_age > INTERVAL '15 minutes'; -- stale sensor alert
-- 30-minute window: average and max temperature per asset
-- Used for Mean Kinetic Temperature (MKT) approximation
CREATE MATERIALIZED VIEW temperature_rolling_30min AS
SELECT
window_start,
window_end,
asset_id,
AVG(temperature_c) AS avg_temp_c,
MAX(temperature_c) AS max_temp_c,
MIN(temperature_c) AS min_temp_c,
MAX(temperature_c) - MIN(temperature_c) AS temp_range_c,
COUNT(*) AS reading_count,
SUM(CASE WHEN temperature_c > z.max_temp_c
THEN 1 ELSE 0 END) AS excursion_readings,
AVG(humidity_pct) AS avg_humidity_pct
FROM TUMBLE(cold_chain_readings, event_time, INTERVAL '30 minutes') r
JOIN monitoring_zones z ON r.asset_id = z.asset_id
GROUP BY window_start, window_end, r.asset_id;
-- Door-open events: track duration of potential exposure
CREATE MATERIALIZED VIEW door_open_events AS
SELECT
asset_id,
event_time AS opened_at,
temperature_c AS temp_at_open,
humidity_pct AS humidity_at_open
FROM cold_chain_readings
WHERE door_open = TRUE;
-- Fleet cold chain compliance summary: % of 5-minute windows in spec
CREATE MATERIALIZED VIEW compliance_summary AS
SELECT
window_start,
window_end,
z.cargo_type,
COUNT(DISTINCT r.asset_id) AS monitored_assets,
COUNT(DISTINCT CASE
WHEN r.temperature_c BETWEEN z.min_temp_c AND z.max_temp_c
THEN r.asset_id END) AS compliant_assets,
ROUND(
100.0 * COUNT(DISTINCT CASE
WHEN r.temperature_c BETWEEN z.min_temp_c AND z.max_temp_c
THEN r.asset_id END)::NUMERIC /
NULLIF(COUNT(DISTINCT r.asset_id), 0),
1) AS compliance_rate_pct
FROM TUMBLE(cold_chain_readings, event_time, INTERVAL '5 minutes') r
JOIN monitoring_zones z ON r.asset_id = z.asset_id
GROUP BY window_start, window_end, z.cargo_type;
How This Compares to Traditional Approaches
| Aspect | Data Logger (Download on Arrival) | Polled Monitoring (5-min intervals) | RisingWave Streaming |
| Excursion detection | After delivery | Up to 5 minutes after event | Within seconds |
| Regulatory audit trail | Manual report | Batch export | Continuous materialized log |
| Stale sensor detection | Not possible | Depends on poll logic | Native (reading_age view) |
| Fleet-wide compliance view | Manual aggregation | Periodic refresh | Always current |
| Alert latency | Hours to days | Up to 5 minutes | Sub-second |
| MKT computation | Post-trip, manual | Scheduled job | Rolling window, continuous |
FAQ
How do I handle sensors that transmit intermittently over cellular networks?
The reading_age column in live_cold_chain_status shows how old the latest reading is for each asset. The active_excursion_alerts view flags assets where reading_age > 15 minutes, surfacing connectivity issues alongside temperature excursions. For gap-filling, you can use session window functions to detect silent periods and interpolate expected readings.
Can RisingWave store the complete sensor history for regulatory compliance?
Yes. The raw cold_chain_readings source retains data based on your Kafka retention policy. For long-term archival, define an Iceberg sink that writes all sensor readings to object storage. Your compliance queries can then run against either the live RisingWave views or the Iceberg archive, depending on the time range needed.
How do I compute Mean Kinetic Temperature (MKT) in RisingWave?
MKT requires computing exp(-ΔH/(R*T)) for each reading where ΔH is the activation energy and T is temperature in Kelvin. This can be expressed as a SQL expression using the EXP and LN functions. You can define a materialized view that computes an approximation of MKT over any rolling time window, updated continuously as new readings arrive.
Key Takeaways
- RisingWave ingests IoT sensor streams from Kafka and maintains always-current materialized views of cold chain status, detecting temperature excursions within seconds of each sensor transmission — not at the next poll or batch run.
- The
active_excursion_alertsview surfaces both out-of-spec conditions and stale sensors (connectivity failures) in the same interface, reducing the operational burden of monitoring multiple alert channels. - Rolling window aggregations like
temperature_rolling_30minandcompliance_summaryprovide the data foundation for regulatory reporting and MKT computation without any separate scheduled jobs.
Ready to try this yourself? Get started with RisingWave in minutes. Join our Slack community.

