Real-Time Livestock Monitoring with Streaming SQL

Real-Time Livestock Monitoring with Streaming SQL

Smart livestock farms attach RFID ear tags and biosensors to each animal to stream temperature, activity, and location data continuously. With RisingWave, a PostgreSQL-compatible streaming database, you can write SQL queries that detect fever, estrus, or injury in real time — enabling faster intervention and better animal welfare outcomes.

Why Livestock Monitoring Matters

Animal health events are time-sensitive. A cow with a temperature above 39.5 °C for more than 30 minutes is likely febrile; delay in treatment leads to production losses and potential herd spread. Similarly, detecting estrus windows from accelerometer activity spikes can improve conception rates significantly.

Traditional farm management software collects data in daily or hourly batch reports. By the time a vet reviews a flag, the treatment window may have passed. The volume and velocity of modern ear-tag telemetry — thousands of readings per minute for a large herd — demands a streaming approach.

Relevant signals include:

  • Body temperature (°C) from subcutaneous implants or ear-tag sensors
  • Activity index from tri-axis accelerometers (steps per minute, lying/standing ratio)
  • Animal ID from passive RFID tags (ISO 11784/11785)
  • Location from UWB or BLE triangulation (zone ID)
  • Rumination time (minutes per hour) — a drop often precedes illness

How Streaming SQL Solves This

RisingWave is a PostgreSQL-compatible streaming database. You define sources that connect to Kafka or MQTT brokers, then write CREATE MATERIALIZED VIEW statements using standard window functions and joins. RisingWave maintains these views incrementally as new telemetry arrives, making results available for query in milliseconds.

There is no need to write a Flink job or manage a complex stream-processing topology. Veterinary staff can query the system with any PostgreSQL client, and alerts flow automatically to sinks like Kafka topics, webhooks, or databases.

Step-by-Step Tutorial

Step 1: Data Source

Ingest ear-tag telemetry from a Kafka topic. Each record contains the animal's RFID tag ID, species, temperature, and accelerometer-derived activity score.

CREATE SOURCE livestock_telemetry (
    tag_id          VARCHAR,        -- ISO 11784 RFID tag
    animal_id       VARCHAR,        -- farm internal ID
    species         VARCHAR,        -- cattle, sheep, pig, etc.
    zone_id         VARCHAR,        -- barn/paddock zone
    temp_celsius    DOUBLE PRECISION,
    activity_index  DOUBLE PRECISION,  -- steps/min equivalent
    rumination_min  DOUBLE PRECISION,  -- rumination minutes in last hour
    event_time      TIMESTAMPTZ
)
WITH (
    connector      = 'kafka',
    topic          = 'farm.livestock.telemetry',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON
( TIMESTAMP COLUMN = event_time );

Step 2: Core View

Compute 30-minute rolling baselines per animal and flag deviations. A sustained elevated temperature or unusual activity drop within the window raises a health concern score.

CREATE MATERIALIZED VIEW animal_baselines AS
SELECT
    animal_id,
    species,
    window_start,
    window_end,
    AVG(temp_celsius)   AS avg_temp,
    AVG(activity_index) AS avg_activity,
    AVG(rumination_min) AS avg_rumination,
    COUNT(*)            AS reading_count
FROM TUMBLE(livestock_telemetry, event_time, INTERVAL '30 MINUTES')
GROUP BY animal_id, species, window_start, window_end;

CREATE MATERIALIZED VIEW animal_health_scores AS
SELECT
    t.tag_id,
    t.animal_id,
    t.species,
    t.zone_id,
    t.event_time,
    t.temp_celsius,
    t.activity_index,
    t.rumination_min,
    b.avg_temp,
    b.avg_activity,
    -- Fever flag: temperature above species threshold
    CASE t.species
        WHEN 'cattle' THEN t.temp_celsius > 39.5
        WHEN 'sheep'  THEN t.temp_celsius > 40.0
        WHEN 'pig'    THEN t.temp_celsius > 40.5
        ELSE t.temp_celsius > 40.0
    END AS fever_flag,
    -- Activity anomaly: drop > 40% from 30-min baseline
    CASE
        WHEN b.avg_activity > 0
        THEN (b.avg_activity - t.activity_index) / b.avg_activity > 0.4
        ELSE FALSE
    END AS low_activity_flag,
    -- Rumination anomaly: below 30 min/hr suggests digestive issue
    t.rumination_min < 30 AS low_rumination_flag
FROM livestock_telemetry t
LEFT JOIN animal_baselines b
    ON  t.animal_id   = b.animal_id
    AND t.event_time >= b.window_start
    AND t.event_time <  b.window_end;

Step 3: Alerts and Sinks

Combine flags into a health alert and route to the farm management system.

CREATE MATERIALIZED VIEW health_alerts AS
SELECT
    tag_id,
    animal_id,
    species,
    zone_id,
    event_time,
    temp_celsius,
    activity_index,
    rumination_min,
    fever_flag,
    low_activity_flag,
    low_rumination_flag,
    CASE
        WHEN fever_flag AND low_activity_flag THEN 'CRITICAL'
        WHEN fever_flag OR (low_activity_flag AND low_rumination_flag) THEN 'WARNING'
        ELSE 'INFO'
    END AS severity
FROM animal_health_scores
WHERE fever_flag OR low_activity_flag OR low_rumination_flag;

CREATE SINK health_alerts_sink
FROM health_alerts
WITH (
    connector  = 'kafka',
    topic      = 'farm.health.alerts',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

Comparison Table

ApproachDetection LatencyRequires Custom CodeQueryable by Vets
Daily batch reports12–24 hoursNoYes (slow)
Hourly ETL pipeline1 hourYesYes
Custom stream processorSecondsYes (complex)No (needs adapter)
RisingWave streaming SQLSecondsNoYes (PostgreSQL)

FAQ

Q: Can I join ear-tag data with a reference table of animal records? Yes. Create a regular PostgreSQL-compatible table in RisingWave with animal metadata (breed, age, weight, owner) and join it with the streaming view. The join is evaluated incrementally as new telemetry arrives.

Q: How do I handle different temperature thresholds for different breeds? Store breed-specific thresholds in a lookup table and join it to the health scores view. The CASE expression in the materialized view can then reference the looked-up threshold value.

Q: What if the same animal reports from multiple zone sensors simultaneously? Add a deduplication step using DISTINCT ON (animal_id, event_time) or a tumbling window with MAX(event_time) grouping to keep the most recent reading per animal per window.

Key Takeaways

  • RisingWave ingests RFID and biosensor telemetry from Kafka and makes health anomalies queryable in near real time using standard SQL.
  • Combining temperature, activity, and rumination signals in a single materialized view reduces false positives compared to single-metric thresholds.
  • Alerts flow automatically to farm management systems, enabling earlier veterinary intervention and better herd outcomes.
  • The PostgreSQL-compatible interface means farm staff can build dashboards and run ad hoc queries with any standard BI tool.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.