Building a Smart Irrigation System with Streaming SQL

Building a Smart Irrigation System with Streaming SQL

Over-irrigation wastes water and leaches nutrients; under-irrigation stresses crops and cuts yield. A smart irrigation system built on RisingWave, a PostgreSQL-compatible streaming database, continuously evaluates soil moisture percentages, evapotranspiration estimates, and valve states to schedule irrigation runs based on real field conditions—not fixed timers.

Why Smart Irrigation Matters

Agriculture accounts for roughly 70 % of global freshwater withdrawals. Conventional timer-based irrigation ignores actual crop water demand: it runs on schedule whether the soil is already saturated from overnight rain or critically dry after a heat wave. Precision irrigation systems that respond to real sensor data can reduce water consumption by 20–50 % while maintaining or improving yields.

The challenge is that reacting to live sensor data requires a system that:

  • Continuously reads soil moisture from dozens to hundreds of field sensors.
  • Correlates sensor readings with weather station data (evapotranspiration, rainfall).
  • Evaluates per-field irrigation need and generates valve open/close decisions in near-real-time.
  • Logs every valve state change for compliance, billing, and agronomic analysis.

Batch analytics jobs that run once per hour cannot power this. A streaming SQL platform that maintains continuously updated views can.

How Streaming SQL Solves This

RisingWave ingests sensor telemetry and weather data from Kafka and maintains materialized views that calculate current irrigation need per field. A lightweight irrigation controller microservice queries these views via the PostgreSQL interface and actuates valves accordingly. All valve events are logged back to RisingWave for a complete audit trail.

Step-by-Step Tutorial

Step 1: Connect the Data Source

-- Soil moisture and related sensors
CREATE SOURCE soil_sensor_stream (
    sensor_id          VARCHAR,     -- e.g. 'SM-F07-N3'
    field_id           VARCHAR,     -- e.g. 'FIELD-007'
    valve_zone_id      VARCHAR,     -- irrigation zone served by one valve
    reading_time       TIMESTAMPTZ,
    soil_moisture_pct  DOUBLE PRECISION,   -- volumetric water content %
    soil_temp_c        DOUBLE PRECISION,
    ec_dscm            DOUBLE PRECISION,   -- electrical conductivity
    depth_cm           SMALLINT
)
WITH (
    connector     = 'kafka',
    topic         = 'agri.soil.sensors',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Weather station data
CREATE SOURCE weather_station_stream (
    station_id         VARCHAR,     -- e.g. 'WS-FARM-01'
    reading_time       TIMESTAMPTZ,
    temp_c             DOUBLE PRECISION,
    humidity_pct       DOUBLE PRECISION,
    wind_speed_ms      DOUBLE PRECISION,
    solar_rad_wm2      DOUBLE PRECISION,
    rainfall_mm        DOUBLE PRECISION,
    et0_mm             DOUBLE PRECISION    -- reference evapotranspiration mm/day
)
WITH (
    connector     = 'kafka',
    topic         = 'agri.weather',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Valve state events logged by the controller
CREATE SOURCE valve_events (
    valve_id           VARCHAR,     -- irrigation valve ID
    valve_zone_id      VARCHAR,
    field_id           VARCHAR,
    event_time         TIMESTAMPTZ,
    state              VARCHAR,     -- 'OPEN','CLOSED'
    commanded_by       VARCHAR,     -- 'SCHEDULER','MANUAL','EMERGENCY_CLOSE'
    flow_rate_lpm      DOUBLE PRECISION
)
WITH (
    connector     = 'kafka',
    topic         = 'agri.valves',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Field configuration
CREATE TABLE field_irrigation_config (
    field_id           VARCHAR,
    valve_zone_id      VARCHAR,
    crop_type          VARCHAR,
    target_moisture_pct DOUBLE PRECISION,   -- optimal midpoint
    stress_threshold_pct DOUBLE PRECISION,  -- trigger irrigation below this
    saturation_pct     DOUBLE PRECISION,    -- stop irrigation above this
    kc_factor          DOUBLE PRECISION,    -- crop coefficient for ET calculation
    PRIMARY KEY (field_id, valve_zone_id)
);

Step 2: Build the Core View

Calculate 15-minute average soil moisture per valve zone and compare with thresholds:

CREATE MATERIALIZED VIEW zone_moisture_status AS
SELECT
    s.field_id,
    s.valve_zone_id,
    window_start,
    window_end,
    AVG(s.soil_moisture_pct) FILTER (WHERE s.depth_cm = 30) AS avg_moisture_30cm,
    MIN(s.soil_moisture_pct) FILTER (WHERE s.depth_cm = 30) AS min_moisture_30cm,
    COUNT(DISTINCT s.sensor_id)                              AS active_sensors,
    c.target_moisture_pct,
    c.stress_threshold_pct,
    c.saturation_pct,
    c.kc_factor,
    c.crop_type
FROM TUMBLE(soil_sensor_stream s, reading_time, INTERVAL '15 MINUTES')
JOIN field_irrigation_config c
    ON s.field_id = c.field_id AND s.valve_zone_id = c.valve_zone_id
GROUP BY s.field_id, s.valve_zone_id, window_start, window_end,
         c.target_moisture_pct, c.stress_threshold_pct, c.saturation_pct,
         c.kc_factor, c.crop_type;

Incorporate ET and recent rainfall into an irrigation demand index:

CREATE MATERIALIZED VIEW irrigation_demand AS
SELECT
    m.field_id,
    m.valve_zone_id,
    m.window_start,
    m.crop_type,
    m.avg_moisture_30cm,
    m.stress_threshold_pct,
    m.saturation_pct,
    w.et0_mm,
    w.rainfall_mm,
    m.kc_factor * w.et0_mm          AS crop_et_mm,
    m.avg_moisture_30cm - m.stress_threshold_pct AS moisture_deficit,
    CASE
        WHEN m.avg_moisture_30cm <= m.stress_threshold_pct
             AND w.rainfall_mm < 2.0                       THEN 'IRRIGATE'
        WHEN m.avg_moisture_30cm >= m.saturation_pct       THEN 'SATURATED_SKIP'
        WHEN w.rainfall_mm >= 5.0                          THEN 'RAIN_SKIP'
        ELSE 'MONITOR'
    END AS irrigation_decision
FROM zone_moisture_status m
JOIN (
    SELECT
        window_start,
        AVG(et0_mm)     AS et0_mm,
        SUM(rainfall_mm) AS rainfall_mm
    FROM TUMBLE(weather_station_stream, reading_time, INTERVAL '15 MINUTES')
    GROUP BY window_start
) w ON m.window_start = w.window_start;

Step 3: Alerts and Downstream Integration

CREATE MATERIALIZED VIEW alerts AS
SELECT
    field_id,
    valve_zone_id,
    window_start               AS alert_time,
    irrigation_decision        AS alert_type,
    avg_moisture_30cm,
    stress_threshold_pct,
    crop_et_mm,
    rainfall_mm
FROM irrigation_demand
WHERE irrigation_decision IN ('IRRIGATE', 'SATURATED_SKIP');

CREATE SINK irrigation_commands_sink
FROM alerts
WITH (
    connector  = 'kafka',
    topic      = 'agri.irrigation.commands',
    properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;

Comparison Table

System TypeDecision LatencySensor IntegrationWeather AdjustmentAudit Logging
Fixed-schedule timerN/A (blind)NoneNoneNone
Simple threshold relay~5 min pollingSingle sensorNoneBasic
Cloud batch platform30–60 minMulti-sensorPost-hocETL required
RisingWave Streaming< 15 minMulti-sensor SQL joinReal-time ETBuilt-in

FAQ

Q: Can RisingWave directly open and close valves?

RisingWave is an analytics and decision layer, not an actuator. The irrigation_commands_sink publishes IRRIGATE decisions to Kafka. A lightweight field controller (Raspberry Pi, PLCnext, or any MQTT client) subscribes to the topic and actuates the valve. This separation keeps the real-time analytics decoupled from field hardware.

Q: Our fields have different soil types with different field-capacity parameters. How do we model that?

Add soil type columns to the field_irrigation_config table and set target_moisture_pct and stress_threshold_pct values accordingly. You can even add a soil texture lookup table and join it into the view for fine-grained parameter management without changing the SQL logic.

Q: How do we handle a valve that fails to close and causes over-irrigation?

The valve_events source logs every valve state. You can create a materialized view that flags valves open longer than a configurable maximum duration and sink those alerts to a separate emergency-close topic that the controller monitors with higher priority than regular irrigation commands.

Key Takeaways

  • RisingWave continuously evaluates soil moisture, evapotranspiration, and rainfall to produce data-driven irrigation decisions—replacing fixed schedules.
  • Multi-sensor aggregation per valve zone with threshold comparison happens in standard SQL that any agronomist or engineer can read and modify.
  • Weather-adjusted irrigation decisions prevent both over-irrigation after rain and under-irrigation during heat stress.
  • Valve state audit logging is built into the streaming pipeline, providing complete traceability for compliance and post-season analysis.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.