Real-Time Water Quality Monitoring for Utilities

Real-Time Water Quality Monitoring for Utilities

Water utilities and agricultural irrigation districts face strict regulatory limits on turbidity, pH, and chemical concentrations. Contamination events that take hours to surface in batch reports can lead to regulatory penalties, crop damage, and public health incidents. RisingWave, a PostgreSQL-compatible streaming database, processes sensor telemetry in real time and fires alerts the moment readings deviate from safe ranges.

Why Real-Time Water Quality Monitoring Matters

Water quality in agricultural irrigation and municipal supply networks can degrade rapidly. A broken backflow preventer can introduce fertilizer runoff into a potable supply line within minutes. A storm event can spike turbidity in a reservoir above the treatment plant's capacity in under an hour. A pH excursion in hydroponic irrigation can destroy an entire crop cycle in 24–48 hours.

Regulations such as the EPA's Safe Drinking Water Act, the EU Water Framework Directive, and state-level agricultural water standards require utilities to monitor and report on multiple parameters continuously. Meeting those obligations with daily lab samples or hourly batch reports leaves unacceptable detection gaps.

Real-time streaming analytics enables:

  • Immediate contamination detection: turbidity spikes, pH excursions, or conductivity anomalies trigger alerts within the sensor's next reporting interval.
  • Regulatory auto-reporting: continuous logging of all readings in a queryable store provides the data needed for automated compliance reports.
  • Source correlation: joining upstream and downstream sensor readings identifies where in the distribution network a quality event originated.
  • Predictive maintenance: gradual drift in EC or pH values that stays within regulatory limits can still indicate sensor fouling or chemical dosing system degradation—flaggable before limits are breached.

How Streaming SQL Solves This

RisingWave ingests water quality telemetry from Kafka (fed by sensor gateways via MQTT bridges) and maintains continuously updated materialized views. Regulatory thresholds stored in reference tables are joined with live readings so that limit logic requires no code changes when standards are updated—just a UPDATE statement on the reference table.

Step-by-Step Tutorial

Step 1: Connect the Data Source

-- Water quality sensor stream
CREATE SOURCE water_quality_raw (
    sensor_id       VARCHAR,      -- e.g. 'WQ-INTAKE-04'
    node_location   VARCHAR,      -- 'INTAKE','TREATMENT','DISTRIBUTION','ENDPOINT'
    field_id        VARCHAR,      -- for agricultural systems; NULL for municipal
    reading_time    TIMESTAMPTZ,
    turbidity_ntu   DOUBLE PRECISION,   -- NTU
    ph_value        DOUBLE PRECISION,   -- 0–14
    ec_dscm         DOUBLE PRECISION,   -- electrical conductivity dS/m
    do_mgl          DOUBLE PRECISION,   -- dissolved oxygen mg/L
    temp_c          DOUBLE PRECISION,
    chlorine_mgl    DOUBLE PRECISION,   -- residual chlorine mg/L
    nitrate_mgl     DOUBLE PRECISION    -- mg/L as NO3
)
WITH (
    connector     = 'kafka',
    topic         = 'water.quality.telemetry',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Regulatory thresholds by node location and use type
CREATE TABLE water_quality_limits (
    node_location    VARCHAR,
    use_type         VARCHAR,    -- 'POTABLE','IRRIGATION','TREATED_EFFLUENT'
    max_turbidity    DOUBLE PRECISION,
    min_ph           DOUBLE PRECISION,
    max_ph           DOUBLE PRECISION,
    max_ec_dscm      DOUBLE PRECISION,
    min_do_mgl       DOUBLE PRECISION,
    min_chlorine_mgl DOUBLE PRECISION,
    max_nitrate_mgl  DOUBLE PRECISION,
    PRIMARY KEY (node_location, use_type)
);

-- Network topology for source-correlation queries
CREATE TABLE sensor_network (
    sensor_id        VARCHAR PRIMARY KEY,
    node_location    VARCHAR,
    upstream_sensor  VARCHAR,    -- sensor_id of the next upstream sensor
    use_type         VARCHAR,
    field_id         VARCHAR
);

Step 2: Build the Core View

Aggregate readings per sensor in 5-minute tumbling windows and flag limit breaches:

CREATE MATERIALIZED VIEW water_quality_5m AS
SELECT
    r.sensor_id,
    r.node_location,
    r.field_id,
    window_start,
    window_end,
    AVG(r.turbidity_ntu)  AS avg_turbidity,
    MAX(r.turbidity_ntu)  AS max_turbidity,
    AVG(r.ph_value)       AS avg_ph,
    MIN(r.ph_value)       AS min_ph,
    MAX(r.ph_value)       AS max_ph,
    AVG(r.ec_dscm)        AS avg_ec,
    AVG(r.do_mgl)         AS avg_do,
    AVG(r.chlorine_mgl)   AS avg_chlorine,
    AVG(r.nitrate_mgl)    AS avg_nitrate,
    COUNT(*)              AS sample_count
FROM TUMBLE(water_quality_raw r, reading_time, INTERVAL '5 MINUTES')
GROUP BY r.sensor_id, r.node_location, r.field_id, window_start, window_end;

Join with limits to classify each window:

CREATE MATERIALIZED VIEW water_quality_compliance AS
SELECT
    q.sensor_id,
    q.node_location,
    q.field_id,
    q.window_start,
    q.avg_turbidity,
    q.avg_ph,
    q.avg_ec,
    q.avg_do,
    q.avg_chlorine,
    q.avg_nitrate,
    l.use_type,
    CASE
        WHEN q.avg_turbidity > l.max_turbidity         THEN 'TURBIDITY_BREACH'
        WHEN q.avg_ph < l.min_ph OR q.avg_ph > l.max_ph THEN 'PH_BREACH'
        WHEN q.avg_ec > l.max_ec_dscm                  THEN 'EC_BREACH'
        WHEN q.avg_do < l.min_do_mgl                   THEN 'LOW_DO'
        WHEN q.avg_chlorine < l.min_chlorine_mgl       THEN 'LOW_CHLORINE'
        WHEN q.avg_nitrate > l.max_nitrate_mgl         THEN 'NITRATE_BREACH'
        ELSE 'COMPLIANT'
    END AS compliance_status
FROM water_quality_5m q
JOIN sensor_network sn USING (sensor_id)
JOIN water_quality_limits l
    ON q.node_location = l.node_location
   AND sn.use_type = l.use_type;

Step 3: Alerts and Downstream Integration

CREATE MATERIALIZED VIEW alerts AS
SELECT
    sensor_id,
    node_location,
    field_id,
    window_start      AS alert_time,
    compliance_status AS alert_type,
    avg_turbidity,
    avg_ph,
    avg_ec,
    avg_chlorine,
    avg_nitrate
FROM water_quality_compliance
WHERE compliance_status != 'COMPLIANT';

CREATE SINK water_quality_alerts_sink
FROM alerts
WITH (
    connector  = 'kafka',
    topic      = 'water.quality.alerts',
    properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;

Comparison Table

Monitoring MethodDetection LagMulti-parameterCompliance LogSQL Query
Daily lab sampling24 hYesManualNo
SCADA with fixed rules5–15 minLimitedSCADA historianProprietary
Time-series DB (InfluxDB)~1 minYesTTL-limitedFlux (non-SQL)
RisingWave< 5 minYes, SQL joinPersistentFull PostgreSQL

FAQ

Q: How do we correlate a downstream contamination event back to its upstream source?

The sensor_network table stores upstream_sensor for each node. Join the water_quality_compliance view with itself on sensor_id = upstream_sensor and filter where the downstream sensor breaches limits but the upstream does not. This narrows the contamination entry point to the pipe segment between the two nodes.

Q: Our sensors report pH and turbidity at different frequencies. Does RisingWave handle mixed-frequency streams?

Yes. Each sensor's readings land in the same water_quality_raw source table and are aggregated independently in tumbling windows. Sensors that report less frequently simply contribute fewer samples per window; the sample_count column indicates data density for each aggregation window.

Q: Can we export compliance reports automatically to meet regulatory submission deadlines?

Create an additional sink to a PostgreSQL table or S3-backed Iceberg table that accumulates daily compliance summaries. A scheduled query or cron job can then extract and format those summaries as CSV or JSON for regulatory submission without any manual data extraction.

Key Takeaways

  • RisingWave detects turbidity spikes, pH excursions, EC anomalies, and chlorine residual deficiencies within minutes of occurrence using continuously maintained SQL views.
  • Regulatory thresholds stored in a reference table can be updated with a simple SQL UPDATE—no code deployment required.
  • Network topology joins enable automatic upstream-source attribution for contamination events, reducing investigation time from hours to minutes.
  • The PostgreSQL wire protocol means compliance data is immediately accessible to any reporting tool or regulatory API connector.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.