Modern farms deploy dozens to hundreds of sensor types: soil moisture probes, EC meters, weather stations, flow meters, and satellite-derived indices. Getting all that data into a unified real-time analytics layer used to require complex custom pipelines. RisingWave, a PostgreSQL-compatible streaming database, provides a single SQL interface for ingesting, joining, and analyzing heterogeneous agricultural sensor streams without custom stream-processing code.
Why Unified Agricultural Sensor Analytics Matters
A precision agriculture platform that operates on siloed data misses the most valuable insights. Soil moisture in isolation tells you the current water status; combined with EC values, pH, soil temperature, recent weather station readings, and NDVI trends, it tells you whether a crop is heading toward drought stress, salt stress, or disease. Each of those conditions requires a different intervention.
The challenge is that farm sensors are heterogeneous by nature:
- Different manufacturers: each sensor brand uses its own data schema and reporting format.
- Different transmission protocols: Modbus RTU, 4–20 mA analog, LoRaWAN, Wi-Fi, NB-IoT.
- Different reporting intervals: some probes report every 5 minutes; satellite NDVI updates may arrive once or twice per week.
- Different coordinate systems: GPS coordinates, field ID codes, irrigation zone codes, and row/bed numbers all need to map to the same farm geography.
A streaming SQL platform that normalizes these inputs into a unified schema lets agronomists write analytics once and have them work across the entire farm network.
How Streaming SQL Solves This
RisingWave ingests normalized sensor data from a Kafka backbone (sensors feed into the backbone via protocol-specific gateways) and exposes it as SQL tables. Once data is in the SQL layer, any combination of sensor types can be joined, filtered, and aggregated in standard SQL. The PostgreSQL wire protocol means existing farm management software—Trimble, AgWorld, custom Python notebooks—can query live analytics with no integration work.
Step-by-Step Tutorial
Step 1: Connect the Data Source
Define sources for each major sensor category. Use a Kafka topic per sensor class for schema stability.
-- Soil sensors (moisture, EC, temperature)
CREATE SOURCE soil_sensors (
sensor_id VARCHAR, -- e.g. 'SS-F12-BED4'
field_id VARCHAR, -- e.g. 'NORTH-BLOCK-12'
weather_station_id VARCHAR, -- nearest weather station
reading_time TIMESTAMPTZ,
soil_moisture_pct DOUBLE PRECISION,
ec_dscm DOUBLE PRECISION,
soil_temp_c DOUBLE PRECISION,
ph_value DOUBLE PRECISION,
depth_cm SMALLINT
)
WITH (
connector = 'kafka',
topic = 'agri.sensors.soil',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Weather stations
CREATE SOURCE weather_stations (
station_id VARCHAR, -- e.g. 'WS-FARM-NORTH'
reading_time TIMESTAMPTZ,
temp_c DOUBLE PRECISION,
humidity_pct DOUBLE PRECISION,
wind_speed_ms DOUBLE PRECISION,
solar_rad_wm2 DOUBLE PRECISION,
rainfall_mm DOUBLE PRECISION,
et0_mm_day DOUBLE PRECISION -- Penman-Monteith ET₀
)
WITH (
connector = 'kafka',
topic = 'agri.sensors.weather',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- NDVI and canopy indices (satellite / drone)
CREATE SOURCE ndvi_feed (
field_id VARCHAR,
image_time TIMESTAMPTZ,
ndvi_score DOUBLE PRECISION,
ndvi_7d_delta DOUBLE PRECISION,
canopy_cover_pct DOUBLE PRECISION,
source_platform VARCHAR -- 'SENTINEL2','PLANET','DRONE'
)
WITH (
connector = 'kafka',
topic = 'agri.sensors.ndvi',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Farm reference data
CREATE TABLE field_registry (
field_id VARCHAR PRIMARY KEY,
farm_id VARCHAR,
crop_type VARCHAR,
area_ha DOUBLE PRECISION,
irrigation_type VARCHAR, -- 'DRIP','SPRINKLER','FLOOD'
weather_station_id VARCHAR
);
Step 2: Build the Core View
Create a unified per-field sensor summary updated every 15 minutes:
CREATE MATERIALIZED VIEW field_sensor_summary_15m AS
SELECT
s.field_id,
f.farm_id,
f.crop_type,
f.irrigation_type,
s.window_start,
s.window_end,
-- Soil metrics
s.avg_moisture,
s.avg_ec,
s.avg_ph,
s.avg_soil_temp,
-- Weather metrics
w.avg_temp_c,
w.total_rainfall_mm,
w.avg_et0,
-- NDVI (most recent within window ± 4 hours)
n.ndvi_score,
n.ndvi_7d_delta,
n.canopy_cover_pct
FROM (
SELECT
field_id,
weather_station_id,
window_start,
window_end,
AVG(soil_moisture_pct) FILTER (WHERE depth_cm = 30) AS avg_moisture,
AVG(ec_dscm) AS avg_ec,
AVG(ph_value) AS avg_ph,
AVG(soil_temp_c) AS avg_soil_temp
FROM TUMBLE(soil_sensors, reading_time, INTERVAL '15 MINUTES')
GROUP BY field_id, weather_station_id, window_start, window_end
) s
JOIN field_registry f USING (field_id)
JOIN (
SELECT
station_id,
window_start,
AVG(temp_c) AS avg_temp_c,
SUM(rainfall_mm) AS total_rainfall_mm,
AVG(et0_mm_day) AS avg_et0
FROM TUMBLE(weather_stations, reading_time, INTERVAL '15 MINUTES')
GROUP BY station_id, window_start
) w ON s.weather_station_id = w.station_id AND s.window_start = w.window_start
LEFT JOIN ndvi_feed n
ON s.field_id = n.field_id
AND n.image_time BETWEEN s.window_start - INTERVAL '4 HOURS' AND s.window_end;
Build a cross-farm comparison view for portfolio-level monitoring:
CREATE MATERIALIZED VIEW farm_daily_summary AS
SELECT
farm_id,
crop_type,
DATE_TRUNC('day', window_start) AS summary_date,
COUNT(DISTINCT field_id) AS monitored_fields,
AVG(avg_moisture) AS farm_avg_moisture,
AVG(avg_ec) AS farm_avg_ec,
AVG(ndvi_score) AS farm_avg_ndvi,
SUM(total_rainfall_mm) AS total_rainfall_mm,
AVG(avg_et0) AS avg_et0
FROM field_sensor_summary_15m
GROUP BY farm_id, crop_type, DATE_TRUNC('day', window_start);
Step 3: Alerts and Downstream Integration
CREATE MATERIALIZED VIEW alerts AS
SELECT
field_id,
farm_id,
crop_type,
window_start AS alert_time,
CASE
WHEN avg_moisture < 20.0 AND total_rainfall_mm < 1.0 THEN 'DROUGHT_RISK'
WHEN avg_ec > 2.5 THEN 'EC_STRESS'
WHEN avg_ph < 5.5 OR avg_ph > 7.5 THEN 'PH_ANOMALY'
WHEN ndvi_7d_delta < -0.1 AND ndvi_score IS NOT NULL THEN 'CANOPY_DECLINE'
ELSE NULL
END AS alert_type,
avg_moisture,
avg_ec,
avg_ph,
ndvi_score,
ndvi_7d_delta
FROM field_sensor_summary_15m
WHERE
(avg_moisture < 20.0 AND total_rainfall_mm < 1.0)
OR avg_ec > 2.5
OR avg_ph < 5.5 OR avg_ph > 7.5
OR ndvi_7d_delta < -0.1;
CREATE SINK agri_sensor_alerts_sink
FROM alerts
WITH (
connector = 'kafka',
topic = 'agri.alerts',
properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;
Comparison Table
| Architecture | Schema Flexibility | Multi-sensor Join | Historical Query | PostgreSQL API |
| Per-sensor database silos | High (per-silo) | Manual ETL | Per-silo | Varies |
| InfluxDB time-series | Medium | Limited | Yes | No |
| Flink + data lake | High | Yes (complex) | Via batch | No |
| RisingWave | High | Native SQL | Yes | Yes |
FAQ
Q: Our LoRaWAN sensors send data to a LoRa network server (e.g., TTN or ChirpStack), not directly to Kafka. How do we connect them?
Most LoRa network servers support MQTT or HTTP webhook integrations. Deploy a lightweight bridge (e.g., a Node.js or Python service) that subscribes to the network server's MQTT output, normalizes the payload to your agreed schema, and publishes to the appropriate Kafka topic. RisingWave then ingests from Kafka as normal.
Q: Satellite NDVI updates arrive only 2–3 times per week. How does the LEFT JOIN with 15-minute soil windows work for multi-day gaps?
The LEFT JOIN with the time-range condition n.image_time BETWEEN s.window_start - INTERVAL '4 HOURS' AND s.window_end will return NULL for ndvi_score in windows where no satellite pass occurred. To carry the most recent NDVI score forward, create an intermediate materialized view using LAST_VALUE or a JOIN LATERAL that selects the latest NDVI record per field regardless of time, then join that into the summary view.
Q: How do we add a new sensor type without disrupting existing views?
Create a new CREATE SOURCE statement for the new sensor topic. Update only the views that need the new data; all other views continue to work unchanged. RisingWave handles schema evolution at the view level, not the source level.
Key Takeaways
- A Kafka backbone that normalizes heterogeneous sensor protocols into per-class topics enables a single RisingWave instance to serve the entire farm's analytics needs.
- Joining soil sensors, weather stations, and NDVI feeds in a single 15-minute summary view gives agronomists a complete field picture that no single sensor type provides alone.
- Farm-level daily summaries aggregate across all fields for portfolio monitoring without a separate BI layer.
- The PostgreSQL interface makes live farm analytics accessible to existing tools—Python notebooks, Grafana dashboards, farm management platforms—without custom connectors.

