Real-Time Cold Chain Monitoring with IoT and SQL

Real-Time Cold Chain Monitoring with IoT and SQL

Cold chain violations — temperature excursions during storage or transport — can render pharmaceutical shipments unusable, spoil perishable food, and expose companies to regulatory liability. With RisingWave, a PostgreSQL-compatible streaming database, you can ingest IoT sensor data and detect excursions within seconds, not hours.

Why Cold Chain Monitoring Must Be Real-Time

The cold chain carries cargo that is sensitive to environmental conditions: vaccines, blood products, fresh produce, frozen goods, and specialty chemicals. Every link in that chain — a refrigerated truck, a cold storage room, a pharmaceutical freezer — must maintain strict temperature ranges. A two-hour excursion that goes undetected can destroy an entire pallet.

Traditional cold chain monitoring collects data in batches: sensors log readings locally, the logger is downloaded at delivery, and any excursion is discovered after the fact. Even connected IoT systems often rely on periodic polling at 5 or 15-minute intervals — meaning a sensor that briefly spiked above the acceptable range may be caught only at the next reading, and the cumulative exposure (Mean Kinetic Temperature, or MKT) is computed overnight.

Real-time monitoring means every sensor reading is evaluated immediately upon arrival. Alerts fire within seconds of an excursion beginning. Cumulative exposure metrics are maintained continuously. And the full sensor stream is available for regulatory compliance audits — all computed with streaming SQL, without a separate monitoring application.

How IoT Cold Chain Streaming Works

IoT temperature sensors transmit readings (temperature, humidity, door-open events) over MQTT or directly to Kafka. RisingWave consumes these readings as a streaming source and maintains materialized views that:

  1. Track the current condition of every monitored location
  2. Detect excursions in real time (temperature outside the acceptable range)
  3. Compute cumulative exposure metrics over rolling time windows
  4. Generate audit-ready logs of every reading

All of this runs continuously, updated with sub-second latency as each sensor transmission arrives.

Step-by-Step Tutorial

Step 1: Set Up the Data Source

Define the Kafka source for IoT sensor readings and the reference table for monitoring zones:

-- IoT sensor readings from cold chain assets
CREATE SOURCE cold_chain_readings (
    sensor_id        VARCHAR,
    asset_id         VARCHAR,   -- truck, container, room ID
    asset_type       VARCHAR,   -- 'REEFER_TRUCK','COLD_ROOM','FREEZER','PALLET'
    event_time       TIMESTAMPTZ,
    temperature_c    DOUBLE PRECISION,
    humidity_pct     DOUBLE PRECISION,
    door_open        BOOLEAN,
    battery_pct      DOUBLE PRECISION,
    signal_strength  INTEGER
)
WITH (
    connector    = 'kafka',
    topic        = 'cold-chain-sensors',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Zone configuration: acceptable ranges per asset type or specific asset
CREATE TABLE monitoring_zones (
    asset_id         VARCHAR PRIMARY KEY,
    asset_name       VARCHAR,
    asset_type       VARCHAR,
    min_temp_c       DOUBLE PRECISION,
    max_temp_c       DOUBLE PRECISION,
    min_humidity_pct DOUBLE PRECISION,
    max_humidity_pct DOUBLE PRECISION,
    alert_delay_sec  INTEGER,   -- excursion must persist this long before alert
    cargo_type       VARCHAR,   -- e.g., 'FROZEN_FOOD','PHARMA_2_8','PHARMA_15_25'
    shipment_id      VARCHAR
);

Step 2: Build the Core Materialized View

Create a live asset status view that shows the current temperature and humidity for every monitored asset, and whether it is within specification:

CREATE MATERIALIZED VIEW live_cold_chain_status AS
SELECT
    r.asset_id,
    z.asset_name,
    z.asset_type,
    z.cargo_type,
    z.shipment_id,
    r.temperature_c,
    r.humidity_pct,
    r.door_open,
    r.battery_pct,
    r.event_time                        AS last_reading,
    NOW() - r.event_time                AS reading_age,
    -- In-spec check
    r.temperature_c >= z.min_temp_c
    AND r.temperature_c <= z.max_temp_c AS temp_in_spec,
    r.humidity_pct >= z.min_humidity_pct
    AND r.humidity_pct <= z.max_humidity_pct AS humidity_in_spec,
    -- Deviation magnitude
    GREATEST(
        z.min_temp_c - r.temperature_c,
        r.temperature_c - z.max_temp_c,
        0
    )                                   AS temp_deviation_c,
    z.min_temp_c,
    z.max_temp_c
FROM (
    SELECT DISTINCT ON (asset_id)
        asset_id, temperature_c, humidity_pct,
        door_open, battery_pct, event_time
    FROM cold_chain_readings
    ORDER BY asset_id, event_time DESC
) r
JOIN monitoring_zones z ON r.asset_id = z.asset_id;

Step 3: Add Alerts and Aggregations

Create the excursion alert view — assets currently out of specification — and rolling exposure metrics:

-- Active excursion alerts
CREATE MATERIALIZED VIEW active_excursion_alerts AS
SELECT
    asset_id,
    asset_name,
    cargo_type,
    shipment_id,
    temperature_c,
    min_temp_c,
    max_temp_c,
    temp_deviation_c,
    door_open,
    last_reading,
    reading_age
FROM live_cold_chain_status
WHERE NOT temp_in_spec
   OR NOT humidity_in_spec
   OR reading_age > INTERVAL '15 minutes';  -- stale sensor alert

-- 30-minute window: average and max temperature per asset
-- Used for Mean Kinetic Temperature (MKT) approximation
CREATE MATERIALIZED VIEW temperature_rolling_30min AS
SELECT
    window_start,
    window_end,
    asset_id,
    AVG(temperature_c)             AS avg_temp_c,
    MAX(temperature_c)             AS max_temp_c,
    MIN(temperature_c)             AS min_temp_c,
    MAX(temperature_c) - MIN(temperature_c) AS temp_range_c,
    COUNT(*)                       AS reading_count,
    SUM(CASE WHEN temperature_c > z.max_temp_c
             THEN 1 ELSE 0 END)    AS excursion_readings,
    AVG(humidity_pct)              AS avg_humidity_pct
FROM TUMBLE(cold_chain_readings, event_time, INTERVAL '30 minutes') r
JOIN monitoring_zones z ON r.asset_id = z.asset_id
GROUP BY window_start, window_end, r.asset_id;

-- Door-open events: track duration of potential exposure
CREATE MATERIALIZED VIEW door_open_events AS
SELECT
    asset_id,
    event_time          AS opened_at,
    temperature_c       AS temp_at_open,
    humidity_pct        AS humidity_at_open
FROM cold_chain_readings
WHERE door_open = TRUE;

-- Fleet cold chain compliance summary: % of 5-minute windows in spec
CREATE MATERIALIZED VIEW compliance_summary AS
SELECT
    window_start,
    window_end,
    z.cargo_type,
    COUNT(DISTINCT r.asset_id)                       AS monitored_assets,
    COUNT(DISTINCT CASE
        WHEN r.temperature_c BETWEEN z.min_temp_c AND z.max_temp_c
        THEN r.asset_id END)                         AS compliant_assets,
    ROUND(
        100.0 * COUNT(DISTINCT CASE
            WHEN r.temperature_c BETWEEN z.min_temp_c AND z.max_temp_c
            THEN r.asset_id END)::NUMERIC /
        NULLIF(COUNT(DISTINCT r.asset_id), 0),
    1)                                               AS compliance_rate_pct
FROM TUMBLE(cold_chain_readings, event_time, INTERVAL '5 minutes') r
JOIN monitoring_zones z ON r.asset_id = z.asset_id
GROUP BY window_start, window_end, z.cargo_type;

How This Compares to Traditional Approaches

AspectData Logger (Download on Arrival)Polled Monitoring (5-min intervals)RisingWave Streaming
Excursion detectionAfter deliveryUp to 5 minutes after eventWithin seconds
Regulatory audit trailManual reportBatch exportContinuous materialized log
Stale sensor detectionNot possibleDepends on poll logicNative (reading_age view)
Fleet-wide compliance viewManual aggregationPeriodic refreshAlways current
Alert latencyHours to daysUp to 5 minutesSub-second
MKT computationPost-trip, manualScheduled jobRolling window, continuous

FAQ

How do I handle sensors that transmit intermittently over cellular networks?

The reading_age column in live_cold_chain_status shows how old the latest reading is for each asset. The active_excursion_alerts view flags assets where reading_age > 15 minutes, surfacing connectivity issues alongside temperature excursions. For gap-filling, you can use session window functions to detect silent periods and interpolate expected readings.

Can RisingWave store the complete sensor history for regulatory compliance?

Yes. The raw cold_chain_readings source retains data based on your Kafka retention policy. For long-term archival, define an Iceberg sink that writes all sensor readings to object storage. Your compliance queries can then run against either the live RisingWave views or the Iceberg archive, depending on the time range needed.

How do I compute Mean Kinetic Temperature (MKT) in RisingWave?

MKT requires computing exp(-ΔH/(R*T)) for each reading where ΔH is the activation energy and T is temperature in Kelvin. This can be expressed as a SQL expression using the EXP and LN functions. You can define a materialized view that computes an approximation of MKT over any rolling time window, updated continuously as new readings arrive.

Key Takeaways

  • RisingWave ingests IoT sensor streams from Kafka and maintains always-current materialized views of cold chain status, detecting temperature excursions within seconds of each sensor transmission — not at the next poll or batch run.
  • The active_excursion_alerts view surfaces both out-of-spec conditions and stale sensors (connectivity failures) in the same interface, reducing the operational burden of monitoring multiple alert channels.
  • Rolling window aggregations like temperature_rolling_30min and compliance_summary provide the data foundation for regulatory reporting and MKT computation without any separate scheduled jobs.

Ready to try this yourself? Get started with RisingWave in minutes. Join our Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.