IoT Data Deduplication at Scale with Streaming SQL

IoT Data Deduplication at Scale with Streaming SQL

IoT data deduplication at scale means identifying and removing duplicate sensor readings—caused by retransmissions, at-least-once delivery semantics, or network retries—continuously as events flow through the pipeline, before they corrupt aggregates or trigger false alerts. RisingWave, a PostgreSQL-compatible streaming database, handles this with native SQL deduplication patterns.

Why IoT Data Deduplication Is Critical

IoT data pipelines almost universally operate with at-least-once delivery semantics. MQTT brokers, Kafka producers at the edge, and cellular modems all retry on connection drops. The result: the same sensor reading arrives multiple times in the event stream, sometimes seconds apart, sometimes hours later.

For analytics, duplicates are toxic. A temperature reading that arrives three times inflates the average by pulling the mean toward that value. A production event counted twice doubles the apparent throughput. An alert triggered by a duplicate fires a second time, flooding the notification system.

Traditional deduplication approaches—checking incoming data against a window of seen message IDs in application code—don't scale. At 100,000 sensors, each retrying at edge network disruptions, the volume of potential duplicates requires a systematic, scalable approach.

How Streaming SQL Solves This

RisingWave's DISTINCT ON and window function patterns provide two complementary deduplication strategies:

  1. ID-based deduplication: deduplicate on a unique message ID or (device_id, sequence_number) combination
  2. Time-window deduplication: deduplicate semantically identical readings within a configurable time window

Both patterns are expressed as materialized views. The deduplication logic runs continuously and incrementally; you query the deduplicated view, not the raw source.

Step-by-Step Tutorial

Step 1: Connect Your Data Source

Raw sensor readings from Kafka may contain duplicates at the application level:

CREATE SOURCE raw_sensor_events (
    message_id    VARCHAR,   -- UUID generated at the sensor
    device_id     VARCHAR,
    sequence_num  BIGINT,    -- monotonic counter per device
    sensor_type   VARCHAR,
    value         DOUBLE,
    unit          VARCHAR,
    quality       SMALLINT,
    ingested_at   TIMESTAMPTZ,  -- Kafka ingestion timestamp
    event_time    TIMESTAMPTZ   -- sensor-local timestamp
) WITH (
    connector = 'kafka',
    topic = 'raw-sensor-events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Real-Time View

Strategy 1: Deduplicate by message ID

Keep only the first occurrence of each message_id within a 30-minute window. Later duplicates are discarded:

CREATE MATERIALIZED VIEW deduplicated_by_id AS
SELECT DISTINCT ON (message_id)
    message_id,
    device_id,
    sequence_num,
    sensor_type,
    value,
    unit,
    quality,
    event_time,
    ingested_at
FROM raw_sensor_events
ORDER BY message_id, ingested_at ASC;  -- keep earliest ingestion of each message_id

Strategy 2: Deduplicate by (device, sequence number)

If message_id is not available, deduplicate on the device's own sequence counter:

CREATE MATERIALIZED VIEW deduplicated_by_seq AS
SELECT DISTINCT ON (device_id, sequence_num)
    device_id,
    sequence_num,
    sensor_type,
    value,
    unit,
    quality,
    event_time,
    ingested_at
FROM raw_sensor_events
ORDER BY device_id, sequence_num, ingested_at ASC;

Step 3: Window-Based Aggregations

Strategy 3: Semantic deduplication within a time window

For devices that don't generate message IDs or sequence numbers, deduplicate on identical (device_id, sensor_type, value, event_time) combinations within a tumble window. This catches retransmissions that arrive within a short window:

CREATE MATERIALIZED VIEW semantically_deduplicated AS
SELECT DISTINCT ON (device_id, sensor_type, event_time)
    device_id,
    sensor_type,
    value,
    unit,
    quality,
    event_time,
    MIN(ingested_at) AS first_seen
FROM raw_sensor_events
GROUP BY device_id, sensor_type, value, unit, quality, event_time;

Track the deduplication rate to monitor data quality over time:

CREATE MATERIALIZED VIEW dedup_quality_stats AS
SELECT
    window_start,
    window_end,
    COUNT(*)                          AS total_raw_events,
    COUNT(DISTINCT message_id)        AS unique_messages,
    COUNT(*) - COUNT(DISTINCT message_id) AS duplicate_count,
    ROUND(
        (COUNT(*) - COUNT(DISTINCT message_id)) * 100.0
            / NULLIF(COUNT(*), 0), 2
    )                                 AS duplicate_rate_pct
FROM TUMBLE(raw_sensor_events, ingested_at, INTERVAL '5 MINUTES')
GROUP BY window_start, window_end;

Identify the devices with the highest duplicate rates—often indicating connectivity problems:

CREATE MATERIALIZED VIEW device_dedup_stats AS
SELECT
    device_id,
    window_start,
    window_end,
    COUNT(*)                                  AS total_events,
    COUNT(DISTINCT message_id)                AS unique_events,
    COUNT(*) - COUNT(DISTINCT message_id)     AS duplicates,
    ROUND(
        (COUNT(*) - COUNT(DISTINCT message_id)) * 100.0
            / NULLIF(COUNT(*), 0), 2
    )                                         AS dup_rate_pct
FROM TUMBLE(raw_sensor_events, ingested_at, INTERVAL '10 MINUTES')
GROUP BY device_id, window_start, window_end;

Step 4: Alerts and Sinks

Forward clean, deduplicated events to a downstream Kafka topic for consumption by other services:

CREATE SINK clean_sensor_events_sink
FROM deduplicated_by_id
WITH (
    connector = 'kafka',
    topic = 'clean-sensor-events',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Alert when a device's duplicate rate spikes—a potential indicator of network instability or sensor firmware issues:

CREATE MATERIALIZED VIEW high_dup_rate_alerts AS
SELECT
    device_id,
    dup_rate_pct,
    duplicates,
    total_events,
    window_end AS detected_at,
    'HIGH_DUPLICATE_RATE' AS alert_type
FROM device_dedup_stats
WHERE dup_rate_pct > 20.0;  -- more than 20% duplicates is abnormal

CREATE SINK dedup_alerts_sink
FROM high_dup_rate_alerts
WITH (
    connector = 'kafka',
    topic = 'data-quality-alerts',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Comparison Table

ApproachLatencyScaleAccuracyComplexity
Application-code deduplicationSub-secondLow (in-memory cache)High (with full cache)High
Kafka deduplication (idempotent producer)Sub-secondMediumPartial (producer-side only)Medium
Database UNIQUE constraintMillisecondsMediumHighLow
Streaming SQL (RisingWave)Sub-secondHighHigh (configurable strategy)Low

FAQ

How large a window does RisingWave maintain for deduplication state?

RisingWave maintains state incrementally for DISTINCT ON views. The state size grows with the number of unique keys, not with time. For ID-based deduplication, state scales with the number of unique message IDs in the stream. You can bound state size using a time-based filter on ingested_at to purge old entries.

What if duplicate messages arrive hours apart (late retransmissions)?

Add a time filter to your deduplication view: WHERE ingested_at > NOW() - INTERVAL '2 HOURS'. This bounds the deduplication window and the state size. Events older than 2 hours are treated as distinct. Adjust the interval based on your retransmission timeout policy.

Should deduplication happen at the edge or in RisingWave?

Both layers are complementary. Edge deduplication (using MQTT QoS 2 or idempotent Kafka producers) reduces unnecessary network traffic. RisingWave deduplication catches cross-session retransmissions and ensures the analytics layer always sees clean data regardless of edge behavior.

Key Takeaways

  • RisingWave's DISTINCT ON pattern provides efficient ID-based and semantic deduplication as continuously maintained materialized views.
  • Deduplication quality statistics—duplicate rate per device per time window—surface network and firmware issues automatically.
  • Clean, deduplicated events are forwarded to a downstream Kafka topic via a sink, ensuring all consumers see consistent data.
  • High duplicate rate alerts detect connectivity problems at the device level before they affect data quality across the fleet.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.