How to Stream Agricultural Sensor Data into Real-Time Analytics

How to Stream Agricultural Sensor Data into Real-Time Analytics

Modern farms deploy dozens to hundreds of sensor types: soil moisture probes, EC meters, weather stations, flow meters, and satellite-derived indices. Getting all that data into a unified real-time analytics layer used to require complex custom pipelines. RisingWave, a PostgreSQL-compatible streaming database, provides a single SQL interface for ingesting, joining, and analyzing heterogeneous agricultural sensor streams without custom stream-processing code.

Why Unified Agricultural Sensor Analytics Matters

A precision agriculture platform that operates on siloed data misses the most valuable insights. Soil moisture in isolation tells you the current water status; combined with EC values, pH, soil temperature, recent weather station readings, and NDVI trends, it tells you whether a crop is heading toward drought stress, salt stress, or disease. Each of those conditions requires a different intervention.

The challenge is that farm sensors are heterogeneous by nature:

  • Different manufacturers: each sensor brand uses its own data schema and reporting format.
  • Different transmission protocols: Modbus RTU, 4–20 mA analog, LoRaWAN, Wi-Fi, NB-IoT.
  • Different reporting intervals: some probes report every 5 minutes; satellite NDVI updates may arrive once or twice per week.
  • Different coordinate systems: GPS coordinates, field ID codes, irrigation zone codes, and row/bed numbers all need to map to the same farm geography.

A streaming SQL platform that normalizes these inputs into a unified schema lets agronomists write analytics once and have them work across the entire farm network.

How Streaming SQL Solves This

RisingWave ingests normalized sensor data from a Kafka backbone (sensors feed into the backbone via protocol-specific gateways) and exposes it as SQL tables. Once data is in the SQL layer, any combination of sensor types can be joined, filtered, and aggregated in standard SQL. The PostgreSQL wire protocol means existing farm management software—Trimble, AgWorld, custom Python notebooks—can query live analytics with no integration work.

Step-by-Step Tutorial

Step 1: Connect the Data Source

Define sources for each major sensor category. Use a Kafka topic per sensor class for schema stability.

-- Soil sensors (moisture, EC, temperature)
CREATE SOURCE soil_sensors (
    sensor_id         VARCHAR,      -- e.g. 'SS-F12-BED4'
    field_id          VARCHAR,      -- e.g. 'NORTH-BLOCK-12'
    weather_station_id VARCHAR,     -- nearest weather station
    reading_time      TIMESTAMPTZ,
    soil_moisture_pct DOUBLE PRECISION,
    ec_dscm           DOUBLE PRECISION,
    soil_temp_c       DOUBLE PRECISION,
    ph_value          DOUBLE PRECISION,
    depth_cm          SMALLINT
)
WITH (
    connector     = 'kafka',
    topic         = 'agri.sensors.soil',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Weather stations
CREATE SOURCE weather_stations (
    station_id        VARCHAR,      -- e.g. 'WS-FARM-NORTH'
    reading_time      TIMESTAMPTZ,
    temp_c            DOUBLE PRECISION,
    humidity_pct      DOUBLE PRECISION,
    wind_speed_ms     DOUBLE PRECISION,
    solar_rad_wm2     DOUBLE PRECISION,
    rainfall_mm       DOUBLE PRECISION,
    et0_mm_day        DOUBLE PRECISION    -- Penman-Monteith ET₀
)
WITH (
    connector     = 'kafka',
    topic         = 'agri.sensors.weather',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- NDVI and canopy indices (satellite / drone)
CREATE SOURCE ndvi_feed (
    field_id          VARCHAR,
    image_time        TIMESTAMPTZ,
    ndvi_score        DOUBLE PRECISION,
    ndvi_7d_delta     DOUBLE PRECISION,
    canopy_cover_pct  DOUBLE PRECISION,
    source_platform   VARCHAR        -- 'SENTINEL2','PLANET','DRONE'
)
WITH (
    connector     = 'kafka',
    topic         = 'agri.sensors.ndvi',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Farm reference data
CREATE TABLE field_registry (
    field_id            VARCHAR PRIMARY KEY,
    farm_id             VARCHAR,
    crop_type           VARCHAR,
    area_ha             DOUBLE PRECISION,
    irrigation_type     VARCHAR,      -- 'DRIP','SPRINKLER','FLOOD'
    weather_station_id  VARCHAR
);

Step 2: Build the Core View

Create a unified per-field sensor summary updated every 15 minutes:

CREATE MATERIALIZED VIEW field_sensor_summary_15m AS
SELECT
    s.field_id,
    f.farm_id,
    f.crop_type,
    f.irrigation_type,
    s.window_start,
    s.window_end,
    -- Soil metrics
    s.avg_moisture,
    s.avg_ec,
    s.avg_ph,
    s.avg_soil_temp,
    -- Weather metrics
    w.avg_temp_c,
    w.total_rainfall_mm,
    w.avg_et0,
    -- NDVI (most recent within window ± 4 hours)
    n.ndvi_score,
    n.ndvi_7d_delta,
    n.canopy_cover_pct
FROM (
    SELECT
        field_id,
        weather_station_id,
        window_start,
        window_end,
        AVG(soil_moisture_pct) FILTER (WHERE depth_cm = 30) AS avg_moisture,
        AVG(ec_dscm)                                        AS avg_ec,
        AVG(ph_value)                                       AS avg_ph,
        AVG(soil_temp_c)                                    AS avg_soil_temp
    FROM TUMBLE(soil_sensors, reading_time, INTERVAL '15 MINUTES')
    GROUP BY field_id, weather_station_id, window_start, window_end
) s
JOIN field_registry f USING (field_id)
JOIN (
    SELECT
        station_id,
        window_start,
        AVG(temp_c)        AS avg_temp_c,
        SUM(rainfall_mm)   AS total_rainfall_mm,
        AVG(et0_mm_day)    AS avg_et0
    FROM TUMBLE(weather_stations, reading_time, INTERVAL '15 MINUTES')
    GROUP BY station_id, window_start
) w ON s.weather_station_id = w.station_id AND s.window_start = w.window_start
LEFT JOIN ndvi_feed n
    ON s.field_id = n.field_id
   AND n.image_time BETWEEN s.window_start - INTERVAL '4 HOURS' AND s.window_end;

Build a cross-farm comparison view for portfolio-level monitoring:

CREATE MATERIALIZED VIEW farm_daily_summary AS
SELECT
    farm_id,
    crop_type,
    DATE_TRUNC('day', window_start) AS summary_date,
    COUNT(DISTINCT field_id)        AS monitored_fields,
    AVG(avg_moisture)               AS farm_avg_moisture,
    AVG(avg_ec)                     AS farm_avg_ec,
    AVG(ndvi_score)                 AS farm_avg_ndvi,
    SUM(total_rainfall_mm)          AS total_rainfall_mm,
    AVG(avg_et0)                    AS avg_et0
FROM field_sensor_summary_15m
GROUP BY farm_id, crop_type, DATE_TRUNC('day', window_start);

Step 3: Alerts and Downstream Integration

CREATE MATERIALIZED VIEW alerts AS
SELECT
    field_id,
    farm_id,
    crop_type,
    window_start          AS alert_time,
    CASE
        WHEN avg_moisture < 20.0 AND total_rainfall_mm < 1.0 THEN 'DROUGHT_RISK'
        WHEN avg_ec > 2.5                                     THEN 'EC_STRESS'
        WHEN avg_ph < 5.5 OR avg_ph > 7.5                    THEN 'PH_ANOMALY'
        WHEN ndvi_7d_delta < -0.1 AND ndvi_score IS NOT NULL  THEN 'CANOPY_DECLINE'
        ELSE NULL
    END AS alert_type,
    avg_moisture,
    avg_ec,
    avg_ph,
    ndvi_score,
    ndvi_7d_delta
FROM field_sensor_summary_15m
WHERE
    (avg_moisture < 20.0 AND total_rainfall_mm < 1.0)
    OR avg_ec > 2.5
    OR avg_ph < 5.5 OR avg_ph > 7.5
    OR ndvi_7d_delta < -0.1;

CREATE SINK agri_sensor_alerts_sink
FROM alerts
WITH (
    connector  = 'kafka',
    topic      = 'agri.alerts',
    properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;

Comparison Table

ArchitectureSchema FlexibilityMulti-sensor JoinHistorical QueryPostgreSQL API
Per-sensor database silosHigh (per-silo)Manual ETLPer-siloVaries
InfluxDB time-seriesMediumLimitedYesNo
Flink + data lakeHighYes (complex)Via batchNo
RisingWaveHighNative SQLYesYes

FAQ

Q: Our LoRaWAN sensors send data to a LoRa network server (e.g., TTN or ChirpStack), not directly to Kafka. How do we connect them?

Most LoRa network servers support MQTT or HTTP webhook integrations. Deploy a lightweight bridge (e.g., a Node.js or Python service) that subscribes to the network server's MQTT output, normalizes the payload to your agreed schema, and publishes to the appropriate Kafka topic. RisingWave then ingests from Kafka as normal.

Q: Satellite NDVI updates arrive only 2–3 times per week. How does the LEFT JOIN with 15-minute soil windows work for multi-day gaps?

The LEFT JOIN with the time-range condition n.image_time BETWEEN s.window_start - INTERVAL '4 HOURS' AND s.window_end will return NULL for ndvi_score in windows where no satellite pass occurred. To carry the most recent NDVI score forward, create an intermediate materialized view using LAST_VALUE or a JOIN LATERAL that selects the latest NDVI record per field regardless of time, then join that into the summary view.

Q: How do we add a new sensor type without disrupting existing views?

Create a new CREATE SOURCE statement for the new sensor topic. Update only the views that need the new data; all other views continue to work unchanged. RisingWave handles schema evolution at the view level, not the source level.

Key Takeaways

  • A Kafka backbone that normalizes heterogeneous sensor protocols into per-class topics enables a single RisingWave instance to serve the entire farm's analytics needs.
  • Joining soil sensors, weather stations, and NDVI feeds in a single 15-minute summary view gives agronomists a complete field picture that no single sensor type provides alone.
  • Farm-level daily summaries aggregate across all fields for portfolio monitoring without a separate BI layer.
  • The PostgreSQL interface makes live farm analytics accessible to existing tools—Python notebooks, Grafana dashboards, farm management platforms—without custom connectors.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.