Over-irrigation wastes water and leaches nutrients; under-irrigation stresses crops and cuts yield. A smart irrigation system built on RisingWave, a PostgreSQL-compatible streaming database, continuously evaluates soil moisture percentages, evapotranspiration estimates, and valve states to schedule irrigation runs based on real field conditions—not fixed timers.
Why Smart Irrigation Matters
Agriculture accounts for roughly 70 % of global freshwater withdrawals. Conventional timer-based irrigation ignores actual crop water demand: it runs on schedule whether the soil is already saturated from overnight rain or critically dry after a heat wave. Precision irrigation systems that respond to real sensor data can reduce water consumption by 20–50 % while maintaining or improving yields.
The challenge is that reacting to live sensor data requires a system that:
- Continuously reads soil moisture from dozens to hundreds of field sensors.
- Correlates sensor readings with weather station data (evapotranspiration, rainfall).
- Evaluates per-field irrigation need and generates valve open/close decisions in near-real-time.
- Logs every valve state change for compliance, billing, and agronomic analysis.
Batch analytics jobs that run once per hour cannot power this. A streaming SQL platform that maintains continuously updated views can.
How Streaming SQL Solves This
RisingWave ingests sensor telemetry and weather data from Kafka and maintains materialized views that calculate current irrigation need per field. A lightweight irrigation controller microservice queries these views via the PostgreSQL interface and actuates valves accordingly. All valve events are logged back to RisingWave for a complete audit trail.
Step-by-Step Tutorial
Step 1: Connect the Data Source
-- Soil moisture and related sensors
CREATE SOURCE soil_sensor_stream (
sensor_id VARCHAR, -- e.g. 'SM-F07-N3'
field_id VARCHAR, -- e.g. 'FIELD-007'
valve_zone_id VARCHAR, -- irrigation zone served by one valve
reading_time TIMESTAMPTZ,
soil_moisture_pct DOUBLE PRECISION, -- volumetric water content %
soil_temp_c DOUBLE PRECISION,
ec_dscm DOUBLE PRECISION, -- electrical conductivity
depth_cm SMALLINT
)
WITH (
connector = 'kafka',
topic = 'agri.soil.sensors',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Weather station data
CREATE SOURCE weather_station_stream (
station_id VARCHAR, -- e.g. 'WS-FARM-01'
reading_time TIMESTAMPTZ,
temp_c DOUBLE PRECISION,
humidity_pct DOUBLE PRECISION,
wind_speed_ms DOUBLE PRECISION,
solar_rad_wm2 DOUBLE PRECISION,
rainfall_mm DOUBLE PRECISION,
et0_mm DOUBLE PRECISION -- reference evapotranspiration mm/day
)
WITH (
connector = 'kafka',
topic = 'agri.weather',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Valve state events logged by the controller
CREATE SOURCE valve_events (
valve_id VARCHAR, -- irrigation valve ID
valve_zone_id VARCHAR,
field_id VARCHAR,
event_time TIMESTAMPTZ,
state VARCHAR, -- 'OPEN','CLOSED'
commanded_by VARCHAR, -- 'SCHEDULER','MANUAL','EMERGENCY_CLOSE'
flow_rate_lpm DOUBLE PRECISION
)
WITH (
connector = 'kafka',
topic = 'agri.valves',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Field configuration
CREATE TABLE field_irrigation_config (
field_id VARCHAR,
valve_zone_id VARCHAR,
crop_type VARCHAR,
target_moisture_pct DOUBLE PRECISION, -- optimal midpoint
stress_threshold_pct DOUBLE PRECISION, -- trigger irrigation below this
saturation_pct DOUBLE PRECISION, -- stop irrigation above this
kc_factor DOUBLE PRECISION, -- crop coefficient for ET calculation
PRIMARY KEY (field_id, valve_zone_id)
);
Step 2: Build the Core View
Calculate 15-minute average soil moisture per valve zone and compare with thresholds:
CREATE MATERIALIZED VIEW zone_moisture_status AS
SELECT
s.field_id,
s.valve_zone_id,
window_start,
window_end,
AVG(s.soil_moisture_pct) FILTER (WHERE s.depth_cm = 30) AS avg_moisture_30cm,
MIN(s.soil_moisture_pct) FILTER (WHERE s.depth_cm = 30) AS min_moisture_30cm,
COUNT(DISTINCT s.sensor_id) AS active_sensors,
c.target_moisture_pct,
c.stress_threshold_pct,
c.saturation_pct,
c.kc_factor,
c.crop_type
FROM TUMBLE(soil_sensor_stream s, reading_time, INTERVAL '15 MINUTES')
JOIN field_irrigation_config c
ON s.field_id = c.field_id AND s.valve_zone_id = c.valve_zone_id
GROUP BY s.field_id, s.valve_zone_id, window_start, window_end,
c.target_moisture_pct, c.stress_threshold_pct, c.saturation_pct,
c.kc_factor, c.crop_type;
Incorporate ET and recent rainfall into an irrigation demand index:
CREATE MATERIALIZED VIEW irrigation_demand AS
SELECT
m.field_id,
m.valve_zone_id,
m.window_start,
m.crop_type,
m.avg_moisture_30cm,
m.stress_threshold_pct,
m.saturation_pct,
w.et0_mm,
w.rainfall_mm,
m.kc_factor * w.et0_mm AS crop_et_mm,
m.avg_moisture_30cm - m.stress_threshold_pct AS moisture_deficit,
CASE
WHEN m.avg_moisture_30cm <= m.stress_threshold_pct
AND w.rainfall_mm < 2.0 THEN 'IRRIGATE'
WHEN m.avg_moisture_30cm >= m.saturation_pct THEN 'SATURATED_SKIP'
WHEN w.rainfall_mm >= 5.0 THEN 'RAIN_SKIP'
ELSE 'MONITOR'
END AS irrigation_decision
FROM zone_moisture_status m
JOIN (
SELECT
window_start,
AVG(et0_mm) AS et0_mm,
SUM(rainfall_mm) AS rainfall_mm
FROM TUMBLE(weather_station_stream, reading_time, INTERVAL '15 MINUTES')
GROUP BY window_start
) w ON m.window_start = w.window_start;
Step 3: Alerts and Downstream Integration
CREATE MATERIALIZED VIEW alerts AS
SELECT
field_id,
valve_zone_id,
window_start AS alert_time,
irrigation_decision AS alert_type,
avg_moisture_30cm,
stress_threshold_pct,
crop_et_mm,
rainfall_mm
FROM irrigation_demand
WHERE irrigation_decision IN ('IRRIGATE', 'SATURATED_SKIP');
CREATE SINK irrigation_commands_sink
FROM alerts
WITH (
connector = 'kafka',
topic = 'agri.irrigation.commands',
properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;
Comparison Table
| System Type | Decision Latency | Sensor Integration | Weather Adjustment | Audit Logging |
| Fixed-schedule timer | N/A (blind) | None | None | None |
| Simple threshold relay | ~5 min polling | Single sensor | None | Basic |
| Cloud batch platform | 30–60 min | Multi-sensor | Post-hoc | ETL required |
| RisingWave Streaming | < 15 min | Multi-sensor SQL join | Real-time ET | Built-in |
FAQ
Q: Can RisingWave directly open and close valves?
RisingWave is an analytics and decision layer, not an actuator. The irrigation_commands_sink publishes IRRIGATE decisions to Kafka. A lightweight field controller (Raspberry Pi, PLCnext, or any MQTT client) subscribes to the topic and actuates the valve. This separation keeps the real-time analytics decoupled from field hardware.
Q: Our fields have different soil types with different field-capacity parameters. How do we model that?
Add soil type columns to the field_irrigation_config table and set target_moisture_pct and stress_threshold_pct values accordingly. You can even add a soil texture lookup table and join it into the view for fine-grained parameter management without changing the SQL logic.
Q: How do we handle a valve that fails to close and causes over-irrigation?
The valve_events source logs every valve state. You can create a materialized view that flags valves open longer than a configurable maximum duration and sink those alerts to a separate emergency-close topic that the controller monitors with higher priority than regular irrigation commands.
Key Takeaways
- RisingWave continuously evaluates soil moisture, evapotranspiration, and rainfall to produce data-driven irrigation decisions—replacing fixed schedules.
- Multi-sensor aggregation per valve zone with threshold comparison happens in standard SQL that any agronomist or engineer can read and modify.
- Weather-adjusted irrigation decisions prevent both over-irrigation after rain and under-irrigation during heat stress.
- Valve state audit logging is built into the streaming pipeline, providing complete traceability for compliance and post-season analysis.

