IoT data deduplication at scale means identifying and removing duplicate sensor readings—caused by retransmissions, at-least-once delivery semantics, or network retries—continuously as events flow through the pipeline, before they corrupt aggregates or trigger false alerts. RisingWave, a PostgreSQL-compatible streaming database, handles this with native SQL deduplication patterns.
Why IoT Data Deduplication Is Critical
IoT data pipelines almost universally operate with at-least-once delivery semantics. MQTT brokers, Kafka producers at the edge, and cellular modems all retry on connection drops. The result: the same sensor reading arrives multiple times in the event stream, sometimes seconds apart, sometimes hours later.
For analytics, duplicates are toxic. A temperature reading that arrives three times inflates the average by pulling the mean toward that value. A production event counted twice doubles the apparent throughput. An alert triggered by a duplicate fires a second time, flooding the notification system.
Traditional deduplication approaches—checking incoming data against a window of seen message IDs in application code—don't scale. At 100,000 sensors, each retrying at edge network disruptions, the volume of potential duplicates requires a systematic, scalable approach.
How Streaming SQL Solves This
RisingWave's DISTINCT ON and window function patterns provide two complementary deduplication strategies:
- ID-based deduplication: deduplicate on a unique message ID or (device_id, sequence_number) combination
- Time-window deduplication: deduplicate semantically identical readings within a configurable time window
Both patterns are expressed as materialized views. The deduplication logic runs continuously and incrementally; you query the deduplicated view, not the raw source.
Step-by-Step Tutorial
Step 1: Connect Your Data Source
Raw sensor readings from Kafka may contain duplicates at the application level:
CREATE SOURCE raw_sensor_events (
message_id VARCHAR, -- UUID generated at the sensor
device_id VARCHAR,
sequence_num BIGINT, -- monotonic counter per device
sensor_type VARCHAR,
value DOUBLE,
unit VARCHAR,
quality SMALLINT,
ingested_at TIMESTAMPTZ, -- Kafka ingestion timestamp
event_time TIMESTAMPTZ -- sensor-local timestamp
) WITH (
connector = 'kafka',
topic = 'raw-sensor-events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Real-Time View
Strategy 1: Deduplicate by message ID
Keep only the first occurrence of each message_id within a 30-minute window. Later duplicates are discarded:
CREATE MATERIALIZED VIEW deduplicated_by_id AS
SELECT DISTINCT ON (message_id)
message_id,
device_id,
sequence_num,
sensor_type,
value,
unit,
quality,
event_time,
ingested_at
FROM raw_sensor_events
ORDER BY message_id, ingested_at ASC; -- keep earliest ingestion of each message_id
Strategy 2: Deduplicate by (device, sequence number)
If message_id is not available, deduplicate on the device's own sequence counter:
CREATE MATERIALIZED VIEW deduplicated_by_seq AS
SELECT DISTINCT ON (device_id, sequence_num)
device_id,
sequence_num,
sensor_type,
value,
unit,
quality,
event_time,
ingested_at
FROM raw_sensor_events
ORDER BY device_id, sequence_num, ingested_at ASC;
Step 3: Window-Based Aggregations
Strategy 3: Semantic deduplication within a time window
For devices that don't generate message IDs or sequence numbers, deduplicate on identical (device_id, sensor_type, value, event_time) combinations within a tumble window. This catches retransmissions that arrive within a short window:
CREATE MATERIALIZED VIEW semantically_deduplicated AS
SELECT DISTINCT ON (device_id, sensor_type, event_time)
device_id,
sensor_type,
value,
unit,
quality,
event_time,
MIN(ingested_at) AS first_seen
FROM raw_sensor_events
GROUP BY device_id, sensor_type, value, unit, quality, event_time;
Track the deduplication rate to monitor data quality over time:
CREATE MATERIALIZED VIEW dedup_quality_stats AS
SELECT
window_start,
window_end,
COUNT(*) AS total_raw_events,
COUNT(DISTINCT message_id) AS unique_messages,
COUNT(*) - COUNT(DISTINCT message_id) AS duplicate_count,
ROUND(
(COUNT(*) - COUNT(DISTINCT message_id)) * 100.0
/ NULLIF(COUNT(*), 0), 2
) AS duplicate_rate_pct
FROM TUMBLE(raw_sensor_events, ingested_at, INTERVAL '5 MINUTES')
GROUP BY window_start, window_end;
Identify the devices with the highest duplicate rates—often indicating connectivity problems:
CREATE MATERIALIZED VIEW device_dedup_stats AS
SELECT
device_id,
window_start,
window_end,
COUNT(*) AS total_events,
COUNT(DISTINCT message_id) AS unique_events,
COUNT(*) - COUNT(DISTINCT message_id) AS duplicates,
ROUND(
(COUNT(*) - COUNT(DISTINCT message_id)) * 100.0
/ NULLIF(COUNT(*), 0), 2
) AS dup_rate_pct
FROM TUMBLE(raw_sensor_events, ingested_at, INTERVAL '10 MINUTES')
GROUP BY device_id, window_start, window_end;
Step 4: Alerts and Sinks
Forward clean, deduplicated events to a downstream Kafka topic for consumption by other services:
CREATE SINK clean_sensor_events_sink
FROM deduplicated_by_id
WITH (
connector = 'kafka',
topic = 'clean-sensor-events',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Alert when a device's duplicate rate spikes—a potential indicator of network instability or sensor firmware issues:
CREATE MATERIALIZED VIEW high_dup_rate_alerts AS
SELECT
device_id,
dup_rate_pct,
duplicates,
total_events,
window_end AS detected_at,
'HIGH_DUPLICATE_RATE' AS alert_type
FROM device_dedup_stats
WHERE dup_rate_pct > 20.0; -- more than 20% duplicates is abnormal
CREATE SINK dedup_alerts_sink
FROM high_dup_rate_alerts
WITH (
connector = 'kafka',
topic = 'data-quality-alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Comparison Table
| Approach | Latency | Scale | Accuracy | Complexity |
| Application-code deduplication | Sub-second | Low (in-memory cache) | High (with full cache) | High |
| Kafka deduplication (idempotent producer) | Sub-second | Medium | Partial (producer-side only) | Medium |
| Database UNIQUE constraint | Milliseconds | Medium | High | Low |
| Streaming SQL (RisingWave) | Sub-second | High | High (configurable strategy) | Low |
FAQ
How large a window does RisingWave maintain for deduplication state?
RisingWave maintains state incrementally for DISTINCT ON views. The state size grows with the number of unique keys, not with time. For ID-based deduplication, state scales with the number of unique message IDs in the stream. You can bound state size using a time-based filter on ingested_at to purge old entries.
What if duplicate messages arrive hours apart (late retransmissions)?
Add a time filter to your deduplication view: WHERE ingested_at > NOW() - INTERVAL '2 HOURS'. This bounds the deduplication window and the state size. Events older than 2 hours are treated as distinct. Adjust the interval based on your retransmission timeout policy.
Should deduplication happen at the edge or in RisingWave?
Both layers are complementary. Edge deduplication (using MQTT QoS 2 or idempotent Kafka producers) reduces unnecessary network traffic. RisingWave deduplication catches cross-session retransmissions and ensures the analytics layer always sees clean data regardless of edge behavior.
Key Takeaways
- RisingWave's
DISTINCT ONpattern provides efficient ID-based and semantic deduplication as continuously maintained materialized views. - Deduplication quality statistics—duplicate rate per device per time window—surface network and firmware issues automatically.
- Clean, deduplicated events are forwarded to a downstream Kafka topic via a sink, ensuring all consumers see consistent data.
- High duplicate rate alerts detect connectivity problems at the device level before they affect data quality across the fleet.

