Weather is the largest uncontrolled variable in crop production. RisingWave, a PostgreSQL-compatible streaming database, lets precision agriculture platforms continuously correlate weather station readings—temperature, humidity, wind speed, solar radiation—with soil sensor data and NDVI scores to quantify weather-driven stress in real time and trigger automated responses before the crop shows visible symptoms.
Why Real-Time Weather Impact Analysis Matters
Batch-processed weather analytics tell you what happened to your crops yesterday. Streaming analytics tell you what is happening now and what will happen in the next 2–4 hours if conditions hold. For precision agriculture, that difference is the gap between prevention and recovery.
Consider three common weather-driven stress scenarios:
Frost risk: a temperature drop below a crop-specific threshold triggers a frost protection irrigation cycle if detected 30–60 minutes in advance; the same event detected 3 hours later in a batch report leaves burnt foliage and potentially lost yield.
Heat stress accumulation: sustained temperatures above 35 °C during flowering cause irreversible yield loss in most grain crops. Detecting a multi-hour heat accumulation event within the first window allows an emergency cooling irrigation or shade deployment.
Disease pressure buildup: many fungal pathogens (downy mildew, grey mold) require sustained leaf wetness combined with high humidity. A streaming view that tracks leaf wetness hours per field can trigger a predictive fungicide application before spores germinate.
None of these interventions is possible with nightly batch processing. They require a system that watches live weather data continuously and correlates it with field-level sensor readings in seconds.
How Streaming SQL Solves This
RisingWave ingests weather station telemetry and soil sensor data from Kafka and maintains materialized views that calculate weather-derived agronomic indices: growing degree days (GDD), heat accumulation, estimated evapotranspiration, and leaf wetness hours. These views join seamlessly with crop threshold tables so that alert logic is data-driven rather than hard-coded.
Step-by-Step Tutorial
Step 1: Connect the Data Source
-- Weather station telemetry
CREATE SOURCE weather_raw (
station_id VARCHAR, -- e.g. 'WS-BLOCK-A'
reading_time TIMESTAMPTZ,
temp_c DOUBLE PRECISION,
min_temp_c DOUBLE PRECISION,
max_temp_c DOUBLE PRECISION,
humidity_pct DOUBLE PRECISION,
wind_speed_ms DOUBLE PRECISION,
solar_rad_wm2 DOUBLE PRECISION,
rainfall_mm DOUBLE PRECISION,
leaf_wetness DOUBLE PRECISION -- 0.0 (dry) to 1.0 (wet)
)
WITH (
connector = 'kafka',
topic = 'agri.weather.raw',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Soil and canopy sensors
CREATE SOURCE field_sensors (
sensor_id VARCHAR,
field_id VARCHAR,
weather_station_id VARCHAR,
reading_time TIMESTAMPTZ,
soil_moisture_pct DOUBLE PRECISION,
soil_temp_c DOUBLE PRECISION,
canopy_temp_c DOUBLE PRECISION, -- from infrared sensor if available
ec_dscm DOUBLE PRECISION
)
WITH (
connector = 'kafka',
topic = 'agri.field.sensors',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Crop thermal requirements
CREATE TABLE crop_weather_thresholds (
crop_type VARCHAR,
growth_stage VARCHAR,
base_temp_c DOUBLE PRECISION, -- GDD base temperature
max_temp_c DOUBLE PRECISION, -- GDD cap temperature
frost_threshold_c DOUBLE PRECISION, -- frost damage temperature
heat_stress_c DOUBLE PRECISION, -- heat stress onset
max_leaf_wetness_h DOUBLE PRECISION, -- disease pressure threshold (hours)
field_id VARCHAR,
PRIMARY KEY (field_id, growth_stage)
);
-- Field-to-station mapping
CREATE TABLE field_registry (
field_id VARCHAR PRIMARY KEY,
farm_id VARCHAR,
crop_type VARCHAR,
weather_station_id VARCHAR,
area_ha DOUBLE PRECISION
);
Step 2: Build the Core View
Calculate hourly weather indices per weather station:
CREATE MATERIALIZED VIEW weather_indices_1h AS
SELECT
station_id,
window_start,
window_end,
AVG(temp_c) AS avg_temp_c,
MIN(min_temp_c) AS min_temp_c,
MAX(max_temp_c) AS max_temp_c,
AVG(humidity_pct) AS avg_humidity,
AVG(wind_speed_ms) AS avg_wind_ms,
SUM(rainfall_mm) AS total_rainfall_mm,
AVG(solar_rad_wm2) AS avg_solar_rad,
-- Leaf wetness hours: fraction of hour where wetness > 0.5
SUM(CASE WHEN leaf_wetness > 0.5 THEN 1.0/COUNT(*) OVER (PARTITION BY station_id, window_start) ELSE 0 END) AS leaf_wetness_hours,
-- Growing degree day contribution (requires per-crop base/max, applied in next view)
AVG(temp_c) AS mean_temp_for_gdd
FROM TUMBLE(weather_raw, reading_time, INTERVAL '1 HOUR')
GROUP BY station_id, window_start, window_end;
Join with crop thresholds to produce field-level weather impact scores:
CREATE MATERIALIZED VIEW field_weather_impact_1h AS
SELECT
f.field_id,
f.farm_id,
f.crop_type,
w.window_start,
w.avg_temp_c,
w.min_temp_c,
w.max_temp_c,
w.avg_humidity,
w.total_rainfall_mm,
w.leaf_wetness_hours,
-- Growing degree days (capped at max_temp)
GREATEST(0,
LEAST(w.mean_temp_for_gdd, t.max_temp_c) - t.base_temp_c
) AS gdd_contribution,
-- Stress classification
CASE
WHEN w.min_temp_c <= t.frost_threshold_c THEN 'FROST_RISK'
WHEN w.max_temp_c >= t.heat_stress_c THEN 'HEAT_STRESS'
WHEN w.avg_humidity > 85 AND w.leaf_wetness_hours > t.max_leaf_wetness_h
THEN 'DISEASE_PRESSURE'
WHEN w.total_rainfall_mm > 20 AND s.avg_moisture > 70 THEN 'WATERLOGGING_RISK'
ELSE 'NORMAL'
END AS weather_stress_class
FROM field_registry f
JOIN weather_indices_1h w
ON f.weather_station_id = w.station_id
JOIN crop_weather_thresholds t
ON f.field_id = t.field_id AND f.crop_type = t.crop_type
LEFT JOIN (
SELECT
field_id,
window_start,
AVG(soil_moisture_pct) FILTER (WHERE depth_cm = 30) AS avg_moisture
FROM TUMBLE(field_sensors, reading_time, INTERVAL '1 HOUR')
GROUP BY field_id, window_start
) s ON f.field_id = s.field_id AND w.window_start = s.window_start;
Step 3: Alerts and Downstream Integration
CREATE MATERIALIZED VIEW alerts AS
SELECT
field_id,
farm_id,
crop_type,
window_start AS alert_time,
weather_stress_class AS alert_type,
avg_temp_c,
min_temp_c,
max_temp_c,
avg_humidity,
leaf_wetness_hours,
gdd_contribution,
total_rainfall_mm
FROM field_weather_impact_1h
WHERE weather_stress_class != 'NORMAL';
CREATE SINK weather_impact_alerts_sink
FROM alerts
WITH (
connector = 'kafka',
topic = 'agri.weather.alerts',
properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;
Comparison Table
| Method | Latency | GDD Calculation | Disease Model | Field-level Join |
| Daily agronomic report | 12–24 h | Batch | Batch | Manual lookup |
| Weather API + rule engine | ~30 min | Scheduled script | Static rules | No |
| Flink + custom code | ~5 min | Code-defined | Code-defined | Complex |
| RisingWave | < 1 h (per window) | In-SQL | Threshold SQL | Native join |
FAQ
Q: Can we accumulate GDD across the entire growing season, not just per hour?
Yes. Create a second materialized view that sums gdd_contribution from field_weather_impact_1h grouped by field_id and crop_type using a GROUP BY field_id, crop_type without a time window. This produces a running cumulative GDD total per field. In RisingWave, non-windowed aggregations over append-only sources accumulate results continuously.
Q: Weather stations sometimes transmit erroneous spikes (e.g., -99.9 °C on sensor fault). How do we filter those?
Add a WHERE filter on the source or in the weather_indices_1h view:
WHERE temp_c BETWEEN -20 AND 55
AND humidity_pct BETWEEN 0 AND 100
Out-of-range readings are excluded from aggregations, and the COUNT(*) in the window tells you how many valid readings contributed.
Q: How do we model a multi-day disease pressure index that requires cumulative leaf wetness hours?
Define a hopping window view that accumulates leaf wetness hours over a 48-hour or 72-hour lookback period:
SELECT station_id, window_start,
SUM(leaf_wetness_hours) AS cumulative_lw_48h
FROM HOP(weather_indices_1h, window_start, INTERVAL '1 HOUR', INTERVAL '48 HOURS')
GROUP BY station_id, window_start;
This gives you a rolling 48-hour leaf wetness accumulation updated every hour without a separate batch job.
Key Takeaways
- Streaming weather impact analysis gives precision agriculture platforms a frost, heat stress, and disease pressure alert within the first affected hour—not the next morning.
- Growing degree day calculation in SQL is readable, auditable, and updatable by agronomists without engineering support.
- Joining weather station indices with field-level soil sensor data and crop-specific thresholds produces field-accurate alerts rather than generic area forecasts.
- Cumulative indices (GDD season total, rolling leaf wetness hours) are maintained continuously by RisingWave without cron jobs or batch pipelines.

