Join our Streaming Lakehouse Tour!
Register Now.->

Watermark

A Watermark in stream processing is a mechanism used to track the progress of event time in a data stream. It is a timestamp that signifies that the system believes all events with event times older than the watermark have been observed. Watermarks are crucial for enabling correct and consistent event-time processing, especially when dealing with out-of-order events or distributed data sources.

Essentially, a watermark is a heuristic: "I don't expect to see any more events older than time X."

Purpose of Watermarks

  1. Handling Out-of-Order Events: Data streams often contain events that arrive out of their natural time order due to network latency, distributed sources, or variations in event generation. Watermarks provide a way for the system to "wait" for a certain period for late events before finalizing computations for a given event time window.
  2. Triggering Window Computations: In event-time windowing, watermarks are used to determine when a time window can be considered "closed" or complete for processing. When a watermark passes the end of a window, the system can be reasonably confident that most (if not all) events belonging to that window have arrived, and it can then trigger the window's computation (e.g., aggregation).
  3. Progress Tracking: They provide a notion of time progression within the streaming system that is tied to the data itself, rather than just the wall-clock time of the processing machines.
  4. State Management: Watermarks can inform when it's safe to discard old state for windowed operations, as events older than the watermark are not expected to affect already-closed windows.

How Watermarks Work

  1. Generation: Watermarks can be generated in several ways:
    • Source-based: If the source system has a good understanding of event time progression (e.g., Kafka topics partitioned by event time).
    • Ingestion-based: Generated upon data ingestion into the streaming system based on observed event timestamps.
    • Derived within the pipeline: Calculated based on the maximum observed event time seen so far across parallel operator instances, often with a configured "allowed lateness" or "slack" period.
  2. Propagation: Watermarks flow through the dataflow graph alongside the data records. Operators update their internal event time clocks based on the watermarks they receive. For operators with multiple inputs, the watermark is typically the minimum of the watermarks from all its inputs.
  3. Allowed Lateness: Systems often allow for a configurable "allowed lateness" period. This means that even after a watermark passes the end of a window, events that are "late" but within this allowed lateness period might still be incorporated into the window's computation (potentially triggering an update to the window result). Events arriving after the allowed lateness period are typically considered "too late" and might be dropped or processed separately.

Example

Imagine a stream of sensor readings, each with an event_timestamp.

  • The system observes events with timestamps like 10:00:01, 10:00:03, 10:00:02 (out of order).
  • A watermark generator might track the maximum observed event time (e.g., 10:00:03).
  • It then subtracts a small delay (e.g., 5 seconds) to estimate the watermark: watermark = max_event_time - delay. So, the watermark might be 09:59:58.
  • This watermark (09:59:58) signals to downstream operators that they are unlikely to see events older than 09:59:58.
  • If a tumbling window is defined for [09:59:00, 10:00:00), when the watermark advances past 10:00:00, this window can be closed and its results emitted.

Watermarks in RisingWave

RisingWave uses watermarks for its event-time processing capabilities, especially in conjunction with windowed aggregations and joins defined in Streaming SQL.

  • You can define watermarks on sources using the WATERMARK FOR <event_time_column> AS <event_time_column> - INTERVAL '<delay>' clause in CREATE SOURCE or CREATE TABLE.
  • This tells RisingWave how to track event time progress for that source.
  • RisingWave then uses these watermarks internally to correctly process time-sensitive operations and trigger window emissions.
CREATE TABLE my_data_stream (
    event_id INT,
    event_timestamp TIMESTAMP,
    value INT,
    WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND -- Define watermark
) WITH (
    connector = 'kafka',
    topic = 'my_topic',
    properties.bootstrap.server = 'kafka:9092',
    format = 'json',
    scan.startup.mode = 'earliest'
);

-- Watermarks will then be used implicitly when processing this stream,
-- for example, in windowed aggregations:
SELECT
    window_start,
    SUM(value)
FROM TUMBLE(my_data_stream, event_timestamp, INTERVAL '1' MINUTE)
GROUP BY window_start, window_end;

Challenges and Considerations

  • Choosing the Right Delay/Lateness: Setting the watermark delay too short can lead to premature window closings and dropped late data. Setting it too long can increase latency as the system waits longer. This often requires understanding the characteristics of the data sources.
  • Idle Sources: If a source or a partition of a source becomes idle (stops sending data), its watermark may not advance, potentially stalling downstream processing for operators that depend on the minimum of multiple input watermarks. Systems need mechanisms to handle this (e.g., idle source detection).
  • Skew: Significant event time skew between parallel instances or partitions can make watermark progression complex.

Watermarks are a sophisticated yet essential concept for achieving reliable and accurate results in distributed event-time stream processing.

Related Glossary Terms

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.