Building a City-Wide Environmental Dashboard

Building a City-Wide Environmental Dashboard

City environmental officers need a single, continuously updated view of air quality, noise pollution, and microclimatic conditions across every district—but sensor data arrives from hundreds of heterogeneous IoT nodes at different rates and in different formats. RisingWave, a PostgreSQL-compatible streaming database, normalizes and aggregates these streams in real time so a public-facing dashboard always shows the current state of the city's environment, not yesterday's batch report.

Why a City-Wide Environmental Dashboard Matters

Regulatory pressure on urban air quality is intensifying globally. Cities must report hourly PM2.5, NO₂, and ozone averages to national environment agencies, publish real-time air quality indices for residents, and trigger low-emission zone enforcement when thresholds are breached. Doing this with nightly batch jobs creates compliance gaps and delays public health warnings by hours.

A real-time environmental dashboard built on streaming SQL delivers:

  • Immediate threshold alerts: when PM2.5 in a zone exceeds WHO limits, push notifications and digital signage updates fire within seconds.
  • Regulatory compliance logging: every sensor reading is timestamped and retained, producing an audit trail without separate archiving pipelines.
  • Cross-parameter correlation: combine PM2.5 spikes with wind direction and traffic density to attribute pollution sources automatically.
  • Public transparency: a live web map updated every minute builds civic trust and reduces FOI requests.

How Streaming SQL Solves This

RisingWave ingests multiple Kafka topics—one per sensor type—and joins them in SQL. Materialized views aggregate readings into hourly and daily AQI values that comply with standard calculation methods. The same database serves the real-time dashboard and the regulatory export API, eliminating the dual-pipeline problem common in smart-city data architectures.

Step-by-Step Tutorial

Step 1: Connect the Data Source

-- Air quality sensor stream
CREATE SOURCE air_quality_raw (
    node_id       VARCHAR,        -- e.g. 'AQ-NW-031'
    zone_code     VARCHAR,        -- e.g. 'NORTH-WEST-3'
    event_time    TIMESTAMPTZ,
    pm25_ugm3     DOUBLE PRECISION,   -- µg/m³
    pm10_ugm3     DOUBLE PRECISION,
    no2_ppb       DOUBLE PRECISION,
    o3_ppb        DOUBLE PRECISION,
    co_ppm        DOUBLE PRECISION,
    temp_c        DOUBLE PRECISION,
    humidity_pct  DOUBLE PRECISION
)
WITH (
    connector     = 'kafka',
    topic         = 'city.env.airquality',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Noise sensor stream
CREATE SOURCE noise_raw (
    node_id       VARCHAR,
    zone_code     VARCHAR,
    event_time    TIMESTAMPTZ,
    noise_db      DOUBLE PRECISION,   -- dB(A)
    peak_db       DOUBLE PRECISION
)
WITH (
    connector     = 'kafka',
    topic         = 'city.env.noise',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Zone reference: regulatory thresholds
CREATE TABLE zone_env_thresholds (
    zone_code          VARCHAR PRIMARY KEY,
    district_name      VARCHAR,
    pm25_limit_ugm3    DOUBLE PRECISION,  -- e.g. 15 for WHO 24-hr guideline
    noise_limit_db     DOUBLE PRECISION,
    land_use_class     VARCHAR            -- 'RESIDENTIAL','COMMERCIAL','INDUSTRIAL'
);

Step 2: Build the Core View

Compute hourly AQI-relevant averages and join with regulatory thresholds:

CREATE MATERIALIZED VIEW air_quality_hourly AS
SELECT
    a.zone_code,
    window_start,
    window_end,
    ROUND(AVG(a.pm25_ugm3)::NUMERIC, 2)     AS avg_pm25,
    ROUND(MAX(a.pm25_ugm3)::NUMERIC, 2)     AS peak_pm25,
    ROUND(AVG(a.pm10_ugm3)::NUMERIC, 2)     AS avg_pm10,
    ROUND(AVG(a.no2_ppb)::NUMERIC, 2)       AS avg_no2,
    ROUND(AVG(a.o3_ppb)::NUMERIC, 2)        AS avg_o3,
    ROUND(AVG(a.temp_c)::NUMERIC, 1)        AS avg_temp_c,
    ROUND(AVG(a.humidity_pct)::NUMERIC, 1)  AS avg_humidity,
    COUNT(DISTINCT a.node_id)               AS active_nodes
FROM TUMBLE(air_quality_raw a, event_time, INTERVAL '1 HOUR')
GROUP BY a.zone_code, window_start, window_end;

-- Combine air quality and noise into a unified zone summary (1 minute)
CREATE MATERIALIZED VIEW zone_env_summary_1m AS
SELECT
    aq.zone_code,
    t.district_name,
    t.land_use_class,
    aq.window_start,
    aq.avg_pm25,
    aq.peak_pm25,
    aq.avg_no2,
    aq.avg_o3,
    n.avg_noise_db,
    n.peak_noise_db,
    t.pm25_limit_ugm3,
    t.noise_limit_db,
    CASE
        WHEN aq.avg_pm25 > t.pm25_limit_ugm3 * 1.5 THEN 'VERY_UNHEALTHY'
        WHEN aq.avg_pm25 > t.pm25_limit_ugm3        THEN 'UNHEALTHY'
        WHEN aq.avg_pm25 > t.pm25_limit_ugm3 * 0.7  THEN 'MODERATE'
        ELSE 'GOOD'
    END AS aqi_category
FROM (
    SELECT zone_code, window_start,
           AVG(pm25_ugm3) AS avg_pm25,
           MAX(pm25_ugm3) AS peak_pm25,
           AVG(no2_ppb)   AS avg_no2,
           AVG(o3_ppb)    AS avg_o3
    FROM TUMBLE(air_quality_raw, event_time, INTERVAL '1 MINUTE')
    GROUP BY zone_code, window_start
) aq
JOIN (
    SELECT zone_code, window_start,
           AVG(noise_db) AS avg_noise_db,
           MAX(peak_db)  AS peak_noise_db
    FROM TUMBLE(noise_raw, event_time, INTERVAL '1 MINUTE')
    GROUP BY zone_code, window_start
) n USING (zone_code, window_start)
JOIN zone_env_thresholds t USING (zone_code);

Step 3: Alerts and Downstream Integration

CREATE MATERIALIZED VIEW alerts AS
SELECT
    zone_code,
    district_name,
    window_start             AS alert_time,
    CASE
        WHEN aqi_category IN ('UNHEALTHY','VERY_UNHEALTHY') THEN 'AIR_QUALITY_BREACH'
        WHEN avg_noise_db > noise_limit_db                  THEN 'NOISE_LIMIT_BREACH'
    END AS alert_type,
    aqi_category,
    avg_pm25,
    peak_pm25,
    avg_noise_db
FROM zone_env_summary_1m
WHERE aqi_category IN ('UNHEALTHY','VERY_UNHEALTHY')
   OR avg_noise_db > noise_limit_db;

CREATE SINK env_alerts_sink
FROM alerts
WITH (
    connector  = 'kafka',
    topic      = 'city.env.alerts',
    properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;

Comparison Table

FeatureNightly BatchInfluxDB + GrafanaRisingWave
Multi-sensor SQL joinETL requiredNot nativeNative SQL
Sub-minute aggregationsNoYesYes
Regulatory-grade historyManual archivingTTL retentionBuilt-in persistence
Alert routingCron + scriptsGrafana alertsSQL SINK to Kafka
PostgreSQL compatibilityVia adapterNoYes

FAQ

Q: How do we serve the public dashboard without exposing the internal Kafka cluster?

RisingWave exposes a standard PostgreSQL wire protocol endpoint. Your dashboard backend (Node.js, Python, etc.) connects to RisingWave the same way it would connect to PostgreSQL and runs SELECT queries against the materialized views. The Kafka cluster remains internal.

Q: Can we store 5-year historical data for trend analysis in the same system?

RisingWave persists all materialized view data internally and supports range queries with standard SQL WHERE event_time BETWEEN ... AND ... clauses. For very long retention periods, you can also sink data to object storage (S3/GCS) in Parquet format and query it via RisingWave's iceberg or file source connectors.

Q: Our sensors have different reporting intervals—some every 10 seconds, some every 5 minutes. Does that cause issues?

No. Each source is ingested independently, and the tumbling windows simply aggregate whatever data arrives within each interval. Sparse sensors produce fewer samples per window; the COUNT(DISTINCT node_id) column in the view tells you how many sensors contributed, so you can flag windows with insufficient data coverage.

Key Takeaways

  • RisingWave unifies heterogeneous IoT streams (air quality, noise, weather) in a single SQL layer, eliminating the need for separate per-sensor pipelines.
  • Continuously maintained materialized views provide sub-minute environmental summaries that are always current for public dashboards.
  • AQI category logic written in SQL is auditable, version-controllable, and updatable without redeploying stream processors.
  • Threshold alerts sink directly to Kafka for routing to push notification services, digital signage controllers, and regulatory reporting systems.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.