Municipal water utilities lose an estimated 20–30 % of treated water to leaks and pipe bursts every year. RisingWave, a PostgreSQL-compatible streaming database, lets operations teams build continuous materialized views over sensor telemetry so that pressure drops, flow anomalies, and zone imbalances surface in seconds—not hours.
Why Water Distribution Monitoring Matters
Modern distribution networks span thousands of kilometers of pipe, hundreds of pressure-regulation valves, and dozens of pump stations. Each node produces a continuous stream of readings: flow rate, inlet/outlet pressure, valve position, and water quality parameters. Batch analytics jobs that run every 15 minutes are too slow to prevent a main break from flooding a neighborhood or to isolate a contamination event before it reaches consumers.
Real-time monitoring closes that gap. With sub-second query latency on live sensor feeds, control-room operators can:
- Detect abnormal pressure gradients between adjacent zone codes before a pipe ruptures.
- Identify unexplained flow increases that signal unauthorized tapping or meter bypass.
- Correlate demand spikes with weather events and pre-position tankers or mobile pumps.
- Produce audit-grade logs for regulatory compliance without extra ETL pipelines.
How Streaming SQL Solves This
RisingWave ingests Kafka, Kinesis, or direct CDC feeds and exposes them as SQL tables. Engineers write CREATE MATERIALIZED VIEW statements—standard SQL—and RisingWave continuously maintains the results as new sensor readings arrive. Views can join live streams with static reference data (pipe diameter, zone topology, allowable pressure ranges) stored in regular tables. Downstream dashboards and alerting systems query those views with ordinary SELECT statements and always get up-to-date answers.
Because RisingWave separates storage from compute, historical replay and real-time processing share the same query layer. A single SQL file can power both live operational dashboards and weekly trend reports.
Step-by-Step Tutorial
Step 1: Connect the Data Source
Sensor telemetry arrives on a Kafka topic. Each message contains the sensor node ID, zone code, timestamp, and readings.
CREATE SOURCE water_sensor_raw (
node_id VARCHAR, -- e.g. 'WN-042'
zone_code VARCHAR, -- e.g. 'ZONE-7B'
event_time TIMESTAMPTZ,
flow_rate_lpm DOUBLE PRECISION, -- litres per minute
pressure_bar DOUBLE PRECISION,
valve_open_pct SMALLINT, -- 0–100
turbidity_ntu DOUBLE PRECISION
)
WITH (
connector = 'kafka',
topic = 'water.sensor.telemetry',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
Load the static zone reference table once from your GIS system:
CREATE TABLE zone_config (
zone_code VARCHAR PRIMARY KEY,
min_pressure_bar DOUBLE PRECISION,
max_pressure_bar DOUBLE PRECISION,
expected_flow_lpm DOUBLE PRECISION,
district_name VARCHAR
);
Step 2: Build the Core View
Create a 1-minute tumbling-window aggregate that tracks mean and peak values per sensor node:
CREATE MATERIALIZED VIEW water_node_stats_1m AS
SELECT
node_id,
zone_code,
window_start,
window_end,
AVG(flow_rate_lpm) AS avg_flow_lpm,
MAX(flow_rate_lpm) AS peak_flow_lpm,
AVG(pressure_bar) AS avg_pressure_bar,
MIN(pressure_bar) AS min_pressure_bar,
MAX(pressure_bar) AS max_pressure_bar,
AVG(turbidity_ntu) AS avg_turbidity_ntu,
COUNT(*) AS sample_count
FROM TUMBLE(water_sensor_raw, event_time, INTERVAL '1 MINUTE')
GROUP BY node_id, zone_code, window_start, window_end;
Join with zone configuration to compute zone-level balance (total inflow vs. outflow):
CREATE MATERIALIZED VIEW zone_flow_balance AS
SELECT
s.zone_code,
z.district_name,
s.window_start,
SUM(s.avg_flow_lpm) AS total_flow_lpm,
z.expected_flow_lpm,
SUM(s.avg_flow_lpm) - z.expected_flow_lpm AS flow_delta_lpm
FROM water_node_stats_1m s
JOIN zone_config z USING (zone_code)
GROUP BY s.zone_code, z.district_name, s.window_start, z.expected_flow_lpm;
Step 3: Alerts and Downstream Integration
Detect pressure anomalies and high-turbidity events:
CREATE MATERIALIZED VIEW alerts AS
SELECT
s.node_id,
s.zone_code,
s.window_start AS alert_time,
CASE
WHEN s.min_pressure_bar < z.min_pressure_bar THEN 'LOW_PRESSURE'
WHEN s.max_pressure_bar > z.max_pressure_bar THEN 'HIGH_PRESSURE'
WHEN s.avg_turbidity_ntu > 4.0 THEN 'HIGH_TURBIDITY'
WHEN b.flow_delta_lpm > z.expected_flow_lpm * 0.15 THEN 'EXCESS_FLOW'
ELSE NULL
END AS alert_type,
s.avg_pressure_bar,
s.avg_turbidity_ntu,
b.flow_delta_lpm
FROM water_node_stats_1m s
JOIN zone_config z USING (zone_code)
JOIN zone_flow_balance b USING (zone_code, window_start)
WHERE
s.min_pressure_bar < z.min_pressure_bar
OR s.max_pressure_bar > z.max_pressure_bar
OR s.avg_turbidity_ntu > 4.0
OR b.flow_delta_lpm > z.expected_flow_lpm * 0.15;
Publish alerts to a Kafka topic consumed by PagerDuty, Grafana, or a SCADA system:
CREATE SINK water_alerts_sink
FROM alerts
WITH (
connector = 'kafka',
topic = 'water.alerts',
properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;
Comparison Table
| Approach | Latency | SQL Support | Stateful Joins | Operational Complexity |
| Batch ETL (hourly) | ~60 min | Full | Limited | Medium |
| Spark Structured Streaming | ~10–30 s | Partial | Yes | High |
| Flink SQL | ~1–5 s | Partial | Yes | High |
| RisingWave | < 1 s | Full PostgreSQL | Yes | Low |
FAQ
Q: Can RisingWave handle sensor nodes that send out-of-order events?
Yes. RisingWave supports configurable watermarks and out-of-order event tolerance. You define an WATERMARK FOR event_time AS event_time - INTERVAL '10 SECONDS' clause on the source, and windows wait for late data up to that bound before finalizing results.
Q: How do we store raw sensor history for replay and forensics?
Append the source stream to a separate sink backed by S3 or HDFS in Parquet format. RisingWave can re-read that data as a bounded source for historical analysis while the live stream continues.
Q: Can multiple dashboards query the same materialized view simultaneously?
Yes. Materialized views in RisingWave behave like ordinary PostgreSQL tables. Any number of clients can issue SELECT queries against them concurrently without re-running the streaming computation.
Key Takeaways
- RisingWave ingests Kafka sensor telemetry and maintains continuously updated materialized views using standard SQL.
- Tumbling-window aggregates and zone-level joins let operators detect pressure anomalies, turbidity spikes, and flow imbalances in under one second.
- Alerts can be published directly to Kafka, enabling integration with SCADA systems, PagerDuty, and Grafana without additional middleware.
- The PostgreSQL-compatible interface means any BI tool, dashboarding platform, or operations team familiar with SQL can query live data immediately.

