Water utilities and agricultural irrigation districts face strict regulatory limits on turbidity, pH, and chemical concentrations. Contamination events that take hours to surface in batch reports can lead to regulatory penalties, crop damage, and public health incidents. RisingWave, a PostgreSQL-compatible streaming database, processes sensor telemetry in real time and fires alerts the moment readings deviate from safe ranges.
Why Real-Time Water Quality Monitoring Matters
Water quality in agricultural irrigation and municipal supply networks can degrade rapidly. A broken backflow preventer can introduce fertilizer runoff into a potable supply line within minutes. A storm event can spike turbidity in a reservoir above the treatment plant's capacity in under an hour. A pH excursion in hydroponic irrigation can destroy an entire crop cycle in 24–48 hours.
Regulations such as the EPA's Safe Drinking Water Act, the EU Water Framework Directive, and state-level agricultural water standards require utilities to monitor and report on multiple parameters continuously. Meeting those obligations with daily lab samples or hourly batch reports leaves unacceptable detection gaps.
Real-time streaming analytics enables:
- Immediate contamination detection: turbidity spikes, pH excursions, or conductivity anomalies trigger alerts within the sensor's next reporting interval.
- Regulatory auto-reporting: continuous logging of all readings in a queryable store provides the data needed for automated compliance reports.
- Source correlation: joining upstream and downstream sensor readings identifies where in the distribution network a quality event originated.
- Predictive maintenance: gradual drift in EC or pH values that stays within regulatory limits can still indicate sensor fouling or chemical dosing system degradation—flaggable before limits are breached.
How Streaming SQL Solves This
RisingWave ingests water quality telemetry from Kafka (fed by sensor gateways via MQTT bridges) and maintains continuously updated materialized views. Regulatory thresholds stored in reference tables are joined with live readings so that limit logic requires no code changes when standards are updated—just a UPDATE statement on the reference table.
Step-by-Step Tutorial
Step 1: Connect the Data Source
-- Water quality sensor stream
CREATE SOURCE water_quality_raw (
sensor_id VARCHAR, -- e.g. 'WQ-INTAKE-04'
node_location VARCHAR, -- 'INTAKE','TREATMENT','DISTRIBUTION','ENDPOINT'
field_id VARCHAR, -- for agricultural systems; NULL for municipal
reading_time TIMESTAMPTZ,
turbidity_ntu DOUBLE PRECISION, -- NTU
ph_value DOUBLE PRECISION, -- 0–14
ec_dscm DOUBLE PRECISION, -- electrical conductivity dS/m
do_mgl DOUBLE PRECISION, -- dissolved oxygen mg/L
temp_c DOUBLE PRECISION,
chlorine_mgl DOUBLE PRECISION, -- residual chlorine mg/L
nitrate_mgl DOUBLE PRECISION -- mg/L as NO3
)
WITH (
connector = 'kafka',
topic = 'water.quality.telemetry',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Regulatory thresholds by node location and use type
CREATE TABLE water_quality_limits (
node_location VARCHAR,
use_type VARCHAR, -- 'POTABLE','IRRIGATION','TREATED_EFFLUENT'
max_turbidity DOUBLE PRECISION,
min_ph DOUBLE PRECISION,
max_ph DOUBLE PRECISION,
max_ec_dscm DOUBLE PRECISION,
min_do_mgl DOUBLE PRECISION,
min_chlorine_mgl DOUBLE PRECISION,
max_nitrate_mgl DOUBLE PRECISION,
PRIMARY KEY (node_location, use_type)
);
-- Network topology for source-correlation queries
CREATE TABLE sensor_network (
sensor_id VARCHAR PRIMARY KEY,
node_location VARCHAR,
upstream_sensor VARCHAR, -- sensor_id of the next upstream sensor
use_type VARCHAR,
field_id VARCHAR
);
Step 2: Build the Core View
Aggregate readings per sensor in 5-minute tumbling windows and flag limit breaches:
CREATE MATERIALIZED VIEW water_quality_5m AS
SELECT
r.sensor_id,
r.node_location,
r.field_id,
window_start,
window_end,
AVG(r.turbidity_ntu) AS avg_turbidity,
MAX(r.turbidity_ntu) AS max_turbidity,
AVG(r.ph_value) AS avg_ph,
MIN(r.ph_value) AS min_ph,
MAX(r.ph_value) AS max_ph,
AVG(r.ec_dscm) AS avg_ec,
AVG(r.do_mgl) AS avg_do,
AVG(r.chlorine_mgl) AS avg_chlorine,
AVG(r.nitrate_mgl) AS avg_nitrate,
COUNT(*) AS sample_count
FROM TUMBLE(water_quality_raw r, reading_time, INTERVAL '5 MINUTES')
GROUP BY r.sensor_id, r.node_location, r.field_id, window_start, window_end;
Join with limits to classify each window:
CREATE MATERIALIZED VIEW water_quality_compliance AS
SELECT
q.sensor_id,
q.node_location,
q.field_id,
q.window_start,
q.avg_turbidity,
q.avg_ph,
q.avg_ec,
q.avg_do,
q.avg_chlorine,
q.avg_nitrate,
l.use_type,
CASE
WHEN q.avg_turbidity > l.max_turbidity THEN 'TURBIDITY_BREACH'
WHEN q.avg_ph < l.min_ph OR q.avg_ph > l.max_ph THEN 'PH_BREACH'
WHEN q.avg_ec > l.max_ec_dscm THEN 'EC_BREACH'
WHEN q.avg_do < l.min_do_mgl THEN 'LOW_DO'
WHEN q.avg_chlorine < l.min_chlorine_mgl THEN 'LOW_CHLORINE'
WHEN q.avg_nitrate > l.max_nitrate_mgl THEN 'NITRATE_BREACH'
ELSE 'COMPLIANT'
END AS compliance_status
FROM water_quality_5m q
JOIN sensor_network sn USING (sensor_id)
JOIN water_quality_limits l
ON q.node_location = l.node_location
AND sn.use_type = l.use_type;
Step 3: Alerts and Downstream Integration
CREATE MATERIALIZED VIEW alerts AS
SELECT
sensor_id,
node_location,
field_id,
window_start AS alert_time,
compliance_status AS alert_type,
avg_turbidity,
avg_ph,
avg_ec,
avg_chlorine,
avg_nitrate
FROM water_quality_compliance
WHERE compliance_status != 'COMPLIANT';
CREATE SINK water_quality_alerts_sink
FROM alerts
WITH (
connector = 'kafka',
topic = 'water.quality.alerts',
properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;
Comparison Table
| Monitoring Method | Detection Lag | Multi-parameter | Compliance Log | SQL Query |
| Daily lab sampling | 24 h | Yes | Manual | No |
| SCADA with fixed rules | 5–15 min | Limited | SCADA historian | Proprietary |
| Time-series DB (InfluxDB) | ~1 min | Yes | TTL-limited | Flux (non-SQL) |
| RisingWave | < 5 min | Yes, SQL join | Persistent | Full PostgreSQL |
FAQ
Q: How do we correlate a downstream contamination event back to its upstream source?
The sensor_network table stores upstream_sensor for each node. Join the water_quality_compliance view with itself on sensor_id = upstream_sensor and filter where the downstream sensor breaches limits but the upstream does not. This narrows the contamination entry point to the pipe segment between the two nodes.
Q: Our sensors report pH and turbidity at different frequencies. Does RisingWave handle mixed-frequency streams?
Yes. Each sensor's readings land in the same water_quality_raw source table and are aggregated independently in tumbling windows. Sensors that report less frequently simply contribute fewer samples per window; the sample_count column indicates data density for each aggregation window.
Q: Can we export compliance reports automatically to meet regulatory submission deadlines?
Create an additional sink to a PostgreSQL table or S3-backed Iceberg table that accumulates daily compliance summaries. A scheduled query or cron job can then extract and format those summaries as CSV or JSON for regulatory submission without any manual data extraction.
Key Takeaways
- RisingWave detects turbidity spikes, pH excursions, EC anomalies, and chlorine residual deficiencies within minutes of occurrence using continuously maintained SQL views.
- Regulatory thresholds stored in a reference table can be updated with a simple SQL
UPDATE—no code deployment required. - Network topology joins enable automatic upstream-source attribution for contamination events, reducing investigation time from hours to minutes.
- The PostgreSQL wire protocol means compliance data is immediately accessible to any reporting tool or regulatory API connector.

