Cities can protect public health by processing air quality sensor networks in real time with RisingWave. Streaming SQL materialized views aggregate PM2.5, NO2, CO, and ozone readings across thousands of sensors into live AQI scores, neighborhood health alerts, and pollution trend analysis—with updates measured in seconds, not hours.
Why Real-Time Air Quality Monitoring Matters
Air quality is one of the most direct environmental factors affecting public health. WHO estimates that 99% of the world's population breathes air exceeding its guidelines, contributing to 7 million premature deaths annually. Cities that can detect and respond to pollution spikes in real time—rather than publishing daily AQI averages—can issue targeted health advisories, trigger traffic restrictions, and alert vulnerable populations before cumulative exposure reaches dangerous levels.
Traditional air quality monitoring relies on a small number of expensive reference-grade stations with 1–24 hour reporting delays. Modern low-cost sensor networks change the economics: cities can deploy hundreds or thousands of sensors at a fraction of the cost of reference stations. But high-density sensor networks generate proportionally more data—a network of 1,000 sensors reading every minute produces 1,440,000 readings per day—and that data must be processed, quality-controlled, and aggregated fast enough to be actionable.
The use case also spans multiple audiences. Citizens need simple AQI scores and health advisories. Environmental regulators need hourly averages and exceedance records. Urban planners need spatial analysis of pollution hotspots. Streaming SQL can serve all three from a single ingestion pipeline.
The Streaming SQL Approach
Sensor readings flow from IoT gateways into Kafka. RisingWave ingests the raw readings, applies rolling-average smoothing (required by AQI standards), computes AQI breakpoints per pollutant, aggregates to neighborhood and zone levels, and surfaces health-alert materialized views. The pipeline requires only SQL—no custom Flink jobs or Python scripts.
Building It Step by Step
Step 1: Data Source
-- Air quality sensor readings (every 1-5 minutes per sensor)
CREATE SOURCE aq_sensor_readings (
sensor_id VARCHAR,
zone_id VARCHAR,
neighborhood_id VARCHAR,
city_id VARCHAR,
lat NUMERIC,
lon NUMERIC,
pm25_ugm3 NUMERIC, -- PM2.5 µg/m³
pm10_ugm3 NUMERIC, -- PM10 µg/m³
no2_ppb NUMERIC, -- NO2 parts per billion
co_ppm NUMERIC, -- CO parts per million
o3_ppb NUMERIC, -- Ozone ppb
so2_ppb NUMERIC, -- SO2 ppb
temperature_c NUMERIC,
humidity_pct NUMERIC,
reading_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'aq.sensors.readings',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Incident and emission event reports
CREATE SOURCE pollution_incidents (
incident_id VARCHAR,
zone_id VARCHAR,
incident_type VARCHAR, -- 'industrial','traffic','fire','other'
severity VARCHAR, -- 'low','medium','high'
reported_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'aq.incidents',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- 1-hour rolling average per sensor (required for AQI calculation)
CREATE MATERIALIZED VIEW sensor_hourly_avg AS
SELECT
sensor_id,
zone_id,
neighborhood_id,
city_id,
window_start,
window_end,
AVG(pm25_ugm3) AS avg_pm25,
AVG(pm10_ugm3) AS avg_pm10,
AVG(no2_ppb) AS avg_no2,
AVG(co_ppm) AS avg_co,
AVG(o3_ppb) AS avg_o3,
MAX(pm25_ugm3) AS max_pm25,
COUNT(*) AS reading_count
FROM TUMBLE(aq_sensor_readings, reading_ts, INTERVAL '1 hour')
GROUP BY sensor_id, zone_id, neighborhood_id, city_id, window_start, window_end;
-- AQI calculation for PM2.5 (US EPA standard breakpoints)
CREATE MATERIALIZED VIEW sensor_aqi AS
SELECT
sensor_id,
zone_id,
neighborhood_id,
window_start,
window_end,
avg_pm25,
avg_no2,
avg_co,
-- PM2.5 sub-index (simplified EPA NowCast breakpoints)
CASE
WHEN avg_pm25 <= 12.0 THEN ROUND(avg_pm25 * 50.0 / 12.0, 0)
WHEN avg_pm25 <= 35.4 THEN ROUND(50 + (avg_pm25 - 12.1) * 49.0 / 23.3, 0)
WHEN avg_pm25 <= 55.4 THEN ROUND(100 + (avg_pm25 - 35.5) * 49.0 / 19.9, 0)
WHEN avg_pm25 <= 150.4 THEN ROUND(150 + (avg_pm25 - 55.5) * 49.0 / 94.9, 0)
WHEN avg_pm25 <= 250.4 THEN ROUND(200 + (avg_pm25 - 150.5) * 99.0 / 99.9, 0)
ELSE 301
END AS pm25_aqi,
-- AQI category
CASE
WHEN avg_pm25 <= 12.0 THEN 'Good'
WHEN avg_pm25 <= 35.4 THEN 'Moderate'
WHEN avg_pm25 <= 55.4 THEN 'Unhealthy for Sensitive Groups'
WHEN avg_pm25 <= 150.4 THEN 'Unhealthy'
WHEN avg_pm25 <= 250.4 THEN 'Very Unhealthy'
ELSE 'Hazardous'
END AS aqi_category,
reading_count
FROM sensor_hourly_avg;
-- Neighborhood-level AQI (average across sensors in neighborhood)
CREATE MATERIALIZED VIEW neighborhood_aqi AS
SELECT
neighborhood_id,
city_id,
window_start,
window_end,
AVG(avg_pm25) AS avg_pm25,
AVG(avg_no2) AS avg_no2,
AVG(avg_co) AS avg_co,
MAX(pm25_aqi) AS max_pm25_aqi,
AVG(pm25_aqi) AS avg_pm25_aqi,
COUNT(DISTINCT sensor_id) AS sensor_count,
-- Dominant AQI category (worst reading in neighborhood)
CASE
WHEN MAX(pm25_aqi) <= 50 THEN 'Good'
WHEN MAX(pm25_aqi) <= 100 THEN 'Moderate'
WHEN MAX(pm25_aqi) <= 150 THEN 'Unhealthy for Sensitive Groups'
WHEN MAX(pm25_aqi) <= 200 THEN 'Unhealthy'
WHEN MAX(pm25_aqi) <= 300 THEN 'Very Unhealthy'
ELSE 'Hazardous'
END AS worst_aqi_category
FROM sensor_aqi
GROUP BY neighborhood_id, city_id, window_start, window_end;
Step 3: Alerts and Aggregations
-- Health alert: any sensor exceeds Unhealthy threshold (PM2.5 > 55.5 µg/m³)
CREATE MATERIALIZED VIEW health_alerts AS
SELECT
sensor_id,
zone_id,
neighborhood_id,
avg_pm25,
avg_no2,
pm25_aqi,
aqi_category,
window_start,
window_end
FROM sensor_aqi
WHERE pm25_aqi > 150;
CREATE SINK health_alerts_sink
AS SELECT * FROM health_alerts
WITH (
connector = 'kafka',
topic = 'aq.alerts.health',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Spike detection: PM2.5 increase > 50% vs prior hour
CREATE MATERIALIZED VIEW pm25_spike_alerts AS
SELECT
current_hour.sensor_id,
current_hour.zone_id,
current_hour.avg_pm25 AS current_pm25,
prior_hour.avg_pm25 AS prior_pm25,
ROUND(
(current_hour.avg_pm25 - prior_hour.avg_pm25) * 100.0
/ NULLIF(prior_hour.avg_pm25, 0), 1
) AS pct_increase,
current_hour.window_start
FROM sensor_hourly_avg current_hour
JOIN sensor_hourly_avg prior_hour
ON current_hour.sensor_id = prior_hour.sensor_id
AND prior_hour.window_start = current_hour.window_start - INTERVAL '1 hour'
WHERE current_hour.avg_pm25 > prior_hour.avg_pm25 * 1.5
AND current_hour.avg_pm25 > 20;
Comparison Table
| Approach | Update Frequency | Spatial Resolution | AQI Calculation | Alert Latency |
| Reference station network | 1 hour | City-wide (sparse) | Yes | 1–2 hours |
| Cloud IoT platform | 5–15 minutes | Sensor-level | Partial | Minutes |
| EPA AirNow API | 1 hour | County/MSA | Yes | 1 hour |
| RisingWave streaming SQL | Sub-minute | Sensor + zone | Yes (SQL) | Sub-minute |
FAQ
How does RisingWave handle sensor drift and anomalous readings?
You can add data-quality filters in the materialized view definitions. For example, add a WHERE pm25_ugm3 BETWEEN 0 AND 1000 clause to exclude obviously erroneous readings. For more sophisticated QC, join the sensor stream against a sensor_calibration reference table that stores per-sensor correction factors and apply them in the aggregation view.
Can I display the neighborhood AQI data on a map in real time?
Yes. Map visualization tools like Mapbox GL JS or Deck.gl can query the neighborhood_aqi view over a backend API backed by RisingWave's PostgreSQL interface. The map refreshes by polling the API every 30–60 seconds, showing the latest AQI category per neighborhood as a colored overlay.
Does RisingWave support storing long-term historical readings for trend analysis?
RisingWave is optimized for streaming computation, not long-term archival storage. For historical analysis beyond the streaming window, configure a Kafka sink to write processed readings to a data lake (S3 + Parquet) or a time-series database. Dashboards can then combine real-time RisingWave views with historical warehouse queries.
Key Takeaways
- Ingest PM2.5, NO2, CO, and ozone sensor readings into RisingWave from Kafka and compute AQI scores using EPA breakpoint logic in plain SQL CASE expressions.
- Hourly tumbling windows produce the time-averaged concentrations required by AQI standards; neighborhood-level aggregations roll up sensor data for public-facing maps.
- Health alerts fire within one minute of any sensor crossing the Unhealthy AQI threshold, enabling real-time public health advisories.
- PM2.5 spike detection—comparing current vs. prior-hour averages via a self-join on the hourly view—identifies emerging pollution events before they reach alert thresholds.

