City environmental officers need a single, continuously updated view of air quality, noise pollution, and microclimatic conditions across every district—but sensor data arrives from hundreds of heterogeneous IoT nodes at different rates and in different formats. RisingWave, a PostgreSQL-compatible streaming database, normalizes and aggregates these streams in real time so a public-facing dashboard always shows the current state of the city's environment, not yesterday's batch report.
Why a City-Wide Environmental Dashboard Matters
Regulatory pressure on urban air quality is intensifying globally. Cities must report hourly PM2.5, NO₂, and ozone averages to national environment agencies, publish real-time air quality indices for residents, and trigger low-emission zone enforcement when thresholds are breached. Doing this with nightly batch jobs creates compliance gaps and delays public health warnings by hours.
A real-time environmental dashboard built on streaming SQL delivers:
- Immediate threshold alerts: when PM2.5 in a zone exceeds WHO limits, push notifications and digital signage updates fire within seconds.
- Regulatory compliance logging: every sensor reading is timestamped and retained, producing an audit trail without separate archiving pipelines.
- Cross-parameter correlation: combine PM2.5 spikes with wind direction and traffic density to attribute pollution sources automatically.
- Public transparency: a live web map updated every minute builds civic trust and reduces FOI requests.
How Streaming SQL Solves This
RisingWave ingests multiple Kafka topics—one per sensor type—and joins them in SQL. Materialized views aggregate readings into hourly and daily AQI values that comply with standard calculation methods. The same database serves the real-time dashboard and the regulatory export API, eliminating the dual-pipeline problem common in smart-city data architectures.
Step-by-Step Tutorial
Step 1: Connect the Data Source
-- Air quality sensor stream
CREATE SOURCE air_quality_raw (
node_id VARCHAR, -- e.g. 'AQ-NW-031'
zone_code VARCHAR, -- e.g. 'NORTH-WEST-3'
event_time TIMESTAMPTZ,
pm25_ugm3 DOUBLE PRECISION, -- µg/m³
pm10_ugm3 DOUBLE PRECISION,
no2_ppb DOUBLE PRECISION,
o3_ppb DOUBLE PRECISION,
co_ppm DOUBLE PRECISION,
temp_c DOUBLE PRECISION,
humidity_pct DOUBLE PRECISION
)
WITH (
connector = 'kafka',
topic = 'city.env.airquality',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Noise sensor stream
CREATE SOURCE noise_raw (
node_id VARCHAR,
zone_code VARCHAR,
event_time TIMESTAMPTZ,
noise_db DOUBLE PRECISION, -- dB(A)
peak_db DOUBLE PRECISION
)
WITH (
connector = 'kafka',
topic = 'city.env.noise',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Zone reference: regulatory thresholds
CREATE TABLE zone_env_thresholds (
zone_code VARCHAR PRIMARY KEY,
district_name VARCHAR,
pm25_limit_ugm3 DOUBLE PRECISION, -- e.g. 15 for WHO 24-hr guideline
noise_limit_db DOUBLE PRECISION,
land_use_class VARCHAR -- 'RESIDENTIAL','COMMERCIAL','INDUSTRIAL'
);
Step 2: Build the Core View
Compute hourly AQI-relevant averages and join with regulatory thresholds:
CREATE MATERIALIZED VIEW air_quality_hourly AS
SELECT
a.zone_code,
window_start,
window_end,
ROUND(AVG(a.pm25_ugm3)::NUMERIC, 2) AS avg_pm25,
ROUND(MAX(a.pm25_ugm3)::NUMERIC, 2) AS peak_pm25,
ROUND(AVG(a.pm10_ugm3)::NUMERIC, 2) AS avg_pm10,
ROUND(AVG(a.no2_ppb)::NUMERIC, 2) AS avg_no2,
ROUND(AVG(a.o3_ppb)::NUMERIC, 2) AS avg_o3,
ROUND(AVG(a.temp_c)::NUMERIC, 1) AS avg_temp_c,
ROUND(AVG(a.humidity_pct)::NUMERIC, 1) AS avg_humidity,
COUNT(DISTINCT a.node_id) AS active_nodes
FROM TUMBLE(air_quality_raw a, event_time, INTERVAL '1 HOUR')
GROUP BY a.zone_code, window_start, window_end;
-- Combine air quality and noise into a unified zone summary (1 minute)
CREATE MATERIALIZED VIEW zone_env_summary_1m AS
SELECT
aq.zone_code,
t.district_name,
t.land_use_class,
aq.window_start,
aq.avg_pm25,
aq.peak_pm25,
aq.avg_no2,
aq.avg_o3,
n.avg_noise_db,
n.peak_noise_db,
t.pm25_limit_ugm3,
t.noise_limit_db,
CASE
WHEN aq.avg_pm25 > t.pm25_limit_ugm3 * 1.5 THEN 'VERY_UNHEALTHY'
WHEN aq.avg_pm25 > t.pm25_limit_ugm3 THEN 'UNHEALTHY'
WHEN aq.avg_pm25 > t.pm25_limit_ugm3 * 0.7 THEN 'MODERATE'
ELSE 'GOOD'
END AS aqi_category
FROM (
SELECT zone_code, window_start,
AVG(pm25_ugm3) AS avg_pm25,
MAX(pm25_ugm3) AS peak_pm25,
AVG(no2_ppb) AS avg_no2,
AVG(o3_ppb) AS avg_o3
FROM TUMBLE(air_quality_raw, event_time, INTERVAL '1 MINUTE')
GROUP BY zone_code, window_start
) aq
JOIN (
SELECT zone_code, window_start,
AVG(noise_db) AS avg_noise_db,
MAX(peak_db) AS peak_noise_db
FROM TUMBLE(noise_raw, event_time, INTERVAL '1 MINUTE')
GROUP BY zone_code, window_start
) n USING (zone_code, window_start)
JOIN zone_env_thresholds t USING (zone_code);
Step 3: Alerts and Downstream Integration
CREATE MATERIALIZED VIEW alerts AS
SELECT
zone_code,
district_name,
window_start AS alert_time,
CASE
WHEN aqi_category IN ('UNHEALTHY','VERY_UNHEALTHY') THEN 'AIR_QUALITY_BREACH'
WHEN avg_noise_db > noise_limit_db THEN 'NOISE_LIMIT_BREACH'
END AS alert_type,
aqi_category,
avg_pm25,
peak_pm25,
avg_noise_db
FROM zone_env_summary_1m
WHERE aqi_category IN ('UNHEALTHY','VERY_UNHEALTHY')
OR avg_noise_db > noise_limit_db;
CREATE SINK env_alerts_sink
FROM alerts
WITH (
connector = 'kafka',
topic = 'city.env.alerts',
properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;
Comparison Table
| Feature | Nightly Batch | InfluxDB + Grafana | RisingWave |
| Multi-sensor SQL join | ETL required | Not native | Native SQL |
| Sub-minute aggregations | No | Yes | Yes |
| Regulatory-grade history | Manual archiving | TTL retention | Built-in persistence |
| Alert routing | Cron + scripts | Grafana alerts | SQL SINK to Kafka |
| PostgreSQL compatibility | Via adapter | No | Yes |
FAQ
Q: How do we serve the public dashboard without exposing the internal Kafka cluster?
RisingWave exposes a standard PostgreSQL wire protocol endpoint. Your dashboard backend (Node.js, Python, etc.) connects to RisingWave the same way it would connect to PostgreSQL and runs SELECT queries against the materialized views. The Kafka cluster remains internal.
Q: Can we store 5-year historical data for trend analysis in the same system?
RisingWave persists all materialized view data internally and supports range queries with standard SQL WHERE event_time BETWEEN ... AND ... clauses. For very long retention periods, you can also sink data to object storage (S3/GCS) in Parquet format and query it via RisingWave's iceberg or file source connectors.
Q: Our sensors have different reporting intervals—some every 10 seconds, some every 5 minutes. Does that cause issues?
No. Each source is ingested independently, and the tumbling windows simply aggregate whatever data arrives within each interval. Sparse sensors produce fewer samples per window; the COUNT(DISTINCT node_id) column in the view tells you how many sensors contributed, so you can flag windows with insufficient data coverage.
Key Takeaways
- RisingWave unifies heterogeneous IoT streams (air quality, noise, weather) in a single SQL layer, eliminating the need for separate per-sensor pipelines.
- Continuously maintained materialized views provide sub-minute environmental summaries that are always current for public dashboards.
- AQI category logic written in SQL is auditable, version-controllable, and updatable without redeploying stream processors.
- Threshold alerts sink directly to Kafka for routing to push notification services, digital signage controllers, and regulatory reporting systems.

