Urban safety operations centers juggle thousands of simultaneous data points—CCTV event classifications, crowd occupancy counts, 911 dispatch IDs, sensor-triggered alarms—yet most analytics still run on batch pipelines that lag by minutes or hours. RisingWave, a PostgreSQL-compatible streaming database, lets safety teams maintain continuously refreshed views over these feeds so commanders can act on facts, not stale summaries.
Why Urban Safety Analytics Matters
A crowd crush, a vehicle intrusion, or an active-threat event can escalate from incident to catastrophe in under two minutes. Safety operations centers that rely on manual video monitoring or 15-minute batch reports are structurally unable to coordinate a proportionate response at that speed.
Streaming analytics changes the calculus. By continuously aggregating occupancy counts per zone code, correlating dispatch IDs with sensor alarms, and flagging event classification sequences that match known threat patterns, a streaming SQL platform can surface actionable intelligence faster than any human analyst reviewing individual camera feeds.
Key capabilities that streaming SQL enables for urban safety:
- Real-time crowd density thresholds per zone that trigger automated gate control or PA announcements.
- Correlation of multiple event classifications (e.g., audio gunshot detection + motion anomaly + 911 call) within a configurable time window.
- Live dispatch board showing response times per incident and flagging SLA breaches.
- Rolling hotspot maps that update every 30 seconds rather than nightly.
How Streaming SQL Solves This
RisingWave consumes event streams from diverse sources—Kafka topics fed by video analytics platforms, IoT sensor gateways, and CAD (computer-aided dispatch) systems—and exposes them as SQL tables. Operators write CREATE MATERIALIZED VIEW once; RisingWave maintains results incrementally as new events arrive. The same SQL interface works for live dashboards and post-incident forensic queries.
Because RisingWave is PostgreSQL-compatible, it integrates natively with Grafana (via the Postgres data source plugin), Metabase, and any JDBC/ODBC-capable tool without custom connectors.
Step-by-Step Tutorial
Step 1: Connect the Data Source
Create three sources representing the main feeds in a smart-city safety deployment.
-- Video analytics & IoT sensor events
CREATE SOURCE safety_events (
event_id VARCHAR,
sensor_node_id VARCHAR, -- e.g. 'CAM-N-014'
zone_code VARCHAR, -- e.g. 'PLAZA-3'
event_time TIMESTAMPTZ,
event_class VARCHAR, -- 'CROWD_DENSITY','MOTION_ANOMALY','AUDIO_ALARM'
confidence_score DOUBLE PRECISION, -- 0.0–1.0
occupancy_count INTEGER
)
WITH (
connector = 'kafka',
topic = 'city.safety.events',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Computer-aided dispatch feed
CREATE SOURCE dispatch_events (
dispatch_id VARCHAR, -- e.g. 'D-20240402-00312'
zone_code VARCHAR,
call_time TIMESTAMPTZ,
unit_assigned VARCHAR,
response_time_s INTEGER, -- seconds from call to on-scene
incident_type VARCHAR,
status VARCHAR -- 'DISPATCHED','ON_SCENE','CLEARED'
)
WITH (
connector = 'kafka',
topic = 'city.dispatch',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core View
Aggregate occupancy and event counts per zone in 1-minute tumbling windows:
CREATE MATERIALIZED VIEW zone_safety_summary_1m AS
SELECT
zone_code,
window_start,
window_end,
MAX(occupancy_count) AS peak_occupancy,
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE event_class = 'CROWD_DENSITY') AS crowd_events,
COUNT(*) FILTER (WHERE event_class = 'MOTION_ANOMALY') AS motion_events,
COUNT(*) FILTER (WHERE event_class = 'AUDIO_ALARM') AS audio_events,
AVG(confidence_score) AS avg_confidence
FROM TUMBLE(safety_events, event_time, INTERVAL '1 MINUTE')
GROUP BY zone_code, window_start, window_end;
Build a live dispatch performance view that joins CAD events with sensor activity:
CREATE MATERIALIZED VIEW dispatch_response_live AS
SELECT
d.dispatch_id,
d.zone_code,
d.incident_type,
d.call_time,
d.response_time_s,
d.status,
s.peak_occupancy,
s.total_events AS concurrent_sensor_events,
CASE WHEN d.response_time_s > 480 THEN true ELSE false END AS sla_breach
FROM dispatch_events d
LEFT JOIN zone_safety_summary_1m s
ON d.zone_code = s.zone_code
AND d.call_time BETWEEN s.window_start AND s.window_end;
Step 3: Alerts and Downstream Integration
Detect correlated multi-signal threat patterns within a 2-minute window:
CREATE MATERIALIZED VIEW alerts AS
SELECT
zone_code,
window_start AS alert_time,
'MULTI_SIGNAL_THREAT' AS alert_type,
peak_occupancy,
audio_events,
motion_events,
total_events
FROM zone_safety_summary_1m
WHERE
audio_events >= 1
AND motion_events >= 2
AND peak_occupancy > 50;
CREATE SINK safety_alerts_sink
FROM alerts
WITH (
connector = 'kafka',
topic = 'city.safety.alerts',
properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;
Comparison Table
| Capability | Batch ETL | Spark Streaming | Flink SQL | RisingWave |
| Multi-source stream join | Complex | Moderate | Yes | Yes, native SQL |
| Time-window correlation | Manual | Code-heavy | SQL | Standard SQL |
| PostgreSQL wire protocol | No | No | No | Yes |
| Grafana integration | Via plugin | Via plugin | Via plugin | Native Postgres DS |
| Setup complexity | Low | High | High | Low |
FAQ
Q: How does RisingWave handle events from cameras that occasionally go offline and send bursts when connectivity is restored?
RisingWave supports watermark-based late-event handling. You configure an acceptable out-of-order tolerance on each source; events arriving within that window are included in the correct time bucket, and later arrivals are either included in a correction or dropped based on your policy.
Q: Can we run geospatial queries to find incidents within a radius of a given coordinate?
RisingWave supports PostGIS-compatible spatial types and functions. You can store zone centroids as GEOMETRY columns in a reference table and join them with event streams using ST_DWithin or similar predicates.
Q: Is the dispatch response view updated in real time as units clear incidents?
Yes. Both dispatch_events and safety_events are live sources. The materialized view recomputes incrementally whenever either stream produces new records, so status changes propagate to the view within seconds.
Key Takeaways
- Streaming SQL over video analytics, IoT sensors, and CAD feeds enables sub-second threat correlation that batch pipelines cannot achieve.
- RisingWave's PostgreSQL compatibility eliminates the need for custom connectors to Grafana, Metabase, or any JDBC tool.
- Multi-signal alert logic (crowd density + motion + audio) written in plain SQL is easier to audit and modify than equivalent code in Flink or Spark.
- Continuous dispatch performance views give commanders a live SLA dashboard with zero additional infrastructure.

