A city with 100,000 deployed IoT sensors—traffic, parking, air quality, water, waste, lighting—generates tens of millions of events per hour. RisingWave, a PostgreSQL-compatible streaming database, provides the architecture layer that ingests this volume from Kafka, normalizes heterogeneous sensor schemas, and maintains live aggregations across every sensor type with a single, operationally simple deployment.
Why Smart City Sensor Scale Demands a Streaming Architecture
Smart city initiatives start small—a pilot of 50 air quality sensors or 200 parking sensors—and succeed. Then city leadership asks to expand to every neighborhood, every street, every asset. The sensor count jumps from hundreds to tens of thousands, and the data volume jumps proportionally.
Batch-oriented architectures that worked for the pilot break at scale. A nightly ETL job that processed 500,000 sensor readings in two hours cannot process 50 million readings in the same window. More importantly, the analytics value of sensor data degrades with latency: a water-main pressure anomaly detected 12 hours after the event is a post-mortem, not an intervention.
The heterogeneity problem compounds the scale problem. Smart city sensor networks are not homogeneous—they include equipment from dozens of vendors across multiple procurement cycles, each with its own data format, reporting interval, and schema. A streaming data platform must normalize these schemas without requiring a custom integration for each sensor type.
RisingWave addresses both problems: it scales horizontally to handle high-volume ingestion, and it supports schema normalization through source definitions and view transformations in standard SQL.
The Streaming SQL Approach
The architecture has four layers:
- Kafka topics per sensor domain — traffic, parking, environment, utilities, each with a defined schema
- RisingWave source definitions — one per topic, with schema enforcement
- Normalization views — transform vendor-specific fields into a canonical smart city data model
- Analytics views — zone-level, city-level, and cross-domain aggregations that surface operational KPIs
The canonical data model uses (sensor_id, zone_id, metric_name, value, unit, event_ts) tuples, enabling cross-domain analytics without per-domain materialized views.
Building It Step by Step
Step 1: Data Source
-- Canonical sensor event schema (normalized from multiple sources)
CREATE SOURCE sensor_events_canonical (
sensor_id VARCHAR,
sensor_type VARCHAR, -- 'traffic','parking','aq','water','waste','lighting'
zone_id VARCHAR,
district_id VARCHAR,
city_id VARCHAR,
metric_name VARCHAR, -- e.g., 'vehicle_count','pm25','occupancy','pressure'
metric_value NUMERIC,
metric_unit VARCHAR,
quality_flag INT, -- 0=good, 1=suspect, 2=bad
firmware_ver VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'smartcity.sensors.canonical',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Sensor registry (static metadata)
CREATE TABLE sensor_registry (
sensor_id VARCHAR PRIMARY KEY,
sensor_type VARCHAR,
zone_id VARCHAR,
district_id VARCHAR,
install_date DATE,
vendor VARCHAR,
asset_tag VARCHAR,
lat NUMERIC,
lon NUMERIC,
active BOOLEAN
);
Step 2: Core Materialized View
-- Per-sensor per-metric 5-minute summary (all sensor types unified)
CREATE MATERIALIZED VIEW sensor_metrics_5min AS
SELECT
sensor_id,
sensor_type,
zone_id,
district_id,
metric_name,
metric_unit,
window_start,
window_end,
AVG(metric_value) FILTER (WHERE quality_flag = 0) AS avg_value,
MIN(metric_value) FILTER (WHERE quality_flag = 0) AS min_value,
MAX(metric_value) FILTER (WHERE quality_flag = 0) AS max_value,
COUNT(*) AS total_readings,
COUNT(*) FILTER (WHERE quality_flag = 0) AS good_readings,
COUNT(*) FILTER (WHERE quality_flag = 2) AS bad_readings,
ROUND(
COUNT(*) FILTER (WHERE quality_flag = 0) * 100.0
/ NULLIF(COUNT(*), 0), 1
) AS data_quality_pct
FROM TUMBLE(sensor_events_canonical, event_ts, INTERVAL '5 minutes')
GROUP BY sensor_id, sensor_type, zone_id, district_id,
metric_name, metric_unit, window_start, window_end;
-- Zone-level KPI rollup: latest 5-minute window per zone per metric
CREATE MATERIALIZED VIEW zone_kpis AS
SELECT
zone_id,
district_id,
sensor_type,
metric_name,
metric_unit,
window_start,
window_end,
AVG(avg_value) AS zone_avg,
MAX(max_value) AS zone_max,
MIN(min_value) AS zone_min,
COUNT(DISTINCT sensor_id) AS active_sensors,
AVG(data_quality_pct) AS avg_data_quality_pct
FROM sensor_metrics_5min
GROUP BY zone_id, district_id, sensor_type, metric_name,
metric_unit, window_start, window_end;
-- Sensor health dashboard: sensors with poor data quality
CREATE MATERIALIZED VIEW sensor_health AS
SELECT
sensor_id,
sensor_type,
zone_id,
AVG(data_quality_pct) AS avg_quality_pct_1h,
SUM(bad_readings) AS total_bad_readings_1h,
SUM(total_readings) AS total_readings_1h,
MAX(window_end) AS last_window_end
FROM sensor_metrics_5min
WHERE window_start >= NOW() - INTERVAL '1 hour'
GROUP BY sensor_id, sensor_type, zone_id
HAVING AVG(data_quality_pct) < 80;
Step 3: Alerts and Aggregations
-- Cross-domain anomaly: metric value deviates > 3 sigma from zone average
CREATE MATERIALIZED VIEW cross_zone_anomalies AS
SELECT
s.sensor_id,
s.sensor_type,
s.zone_id,
s.metric_name,
s.avg_value AS sensor_avg,
z.zone_avg AS zone_avg,
z.zone_max AS zone_max,
ROUND(
ABS(s.avg_value - z.zone_avg) /
NULLIF(STDDEV(s.avg_value) OVER (
PARTITION BY s.zone_id, s.metric_name
), 0), 2
) AS sigma_deviation,
s.window_start
FROM sensor_metrics_5min s
JOIN zone_kpis z
ON s.zone_id = z.zone_id
AND s.metric_name = z.metric_name
AND s.window_start = z.window_start
WHERE ABS(s.avg_value - z.zone_avg) > 3 * NULLIF(z.zone_avg * 0.1, 0);
CREATE SINK anomaly_alerts_sink
AS SELECT
sensor_id, sensor_type, zone_id,
metric_name, sensor_avg, zone_avg, window_start
FROM cross_zone_anomalies
WITH (
connector = 'kafka',
topic = 'smartcity.alerts.anomalies',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Silent sensor alert: sensor not reporting for > 15 minutes
CREATE MATERIALIZED VIEW silent_sensors AS
SELECT
r.sensor_id,
r.sensor_type,
r.zone_id,
MAX(e.event_ts) AS last_seen_ts,
NOW() - MAX(e.event_ts) AS silence_duration
FROM sensor_registry r
LEFT JOIN sensor_events_canonical e ON r.sensor_id = e.sensor_id
WHERE r.active = true
GROUP BY r.sensor_id, r.sensor_type, r.zone_id
HAVING NOW() - MAX(e.event_ts) > INTERVAL '15 minutes'
OR MAX(e.event_ts) IS NULL;
Comparison Table
| Architecture | Sensor Scale | Latency | Cross-Domain Joins | Ops Complexity |
| Per-domain batch ETL | Thousands | Hours | Manual | High |
| Apache Kafka + Flink | Hundreds of thousands | Seconds | Complex (code) | Very high |
| Cloud IoT platform | Millions | Minutes | Limited | Medium |
| RisingWave streaming SQL | Millions | Sub-second | Native SQL | Low |
FAQ
How do you handle sensors with different reporting intervals in the same topic?
The TUMBLE window collects all readings that arrive within the window period regardless of their individual reporting rates. A sensor reporting every 10 seconds and one reporting every 5 minutes both contribute to the same 5-minute aggregate window—the 5-minute sensor contributes one reading, the 10-second sensor contributes 30. The good_readings count in sensor_metrics_5min reflects this difference accurately.
What is the recommended Kafka topic partitioning strategy for city-scale sensor data?
Partition by zone_id or district_id rather than sensor_id. This clusters sensors in the same geographic area on the same Kafka partition, which improves locality for zone-level aggregations in RisingWave. For very high-density zones, use a composite partition key like zone_id + sensor_type.
Can RisingWave handle seasonal sensor fleet expansions without downtime?
Yes. Adding new sensor IDs to an existing Kafka topic does not require changes to RisingWave source definitions or materialized views. New sensors are automatically included in zone aggregations the moment their first event arrives in the topic.
Key Takeaways
- A canonical sensor event schema (
sensor_id,metric_name,metric_value,unit) enables cross-domain analytics across traffic, parking, environment, and utilities sensors with a single set of materialized views. - The
sensor_metrics_5minview unified across all sensor types reduces the number of views to maintain from one-per-domain to one-per-aggregation-granularity. - Silent-sensor detection via a
LEFT JOINbetween the sensor registry and the event stream surfaces maintenance needs automatically without scheduled ping jobs. - Cross-domain anomaly detection identifies outlier sensors within their zone, enabling early fault detection and quality-based data exclusion.

