Window Functions are a very common feature in databases and stream processing. This functionality allows for the calculation of aggregate results of data within a certain window range around each row of input data, or for obtaining data from specified offset before/after the input row.

In some other stream systems, window function is also known as Over Aggregation. RisingWave added support for window functions in its Version 1.1. In our implementation of window functions, we call the operator that performs window function calls the OverWindow operator. This article attempts to analyze its design and implementation.


Basic examples


First, let's demonstrate the basic usage of window functions with two simple examples. For a complete syntax explanation, please refer to the RisingWave user documentation.


Example 1


The following example continuously calculates the price difference from the last update for each stock price update event.

CREATE MATERIALIZED VIEW mv AS
SELECT
  stock_id,
  event_time,
  price - LAG(price) OVER (PARTITION BY stock_id ORDER BY event_time) AS price_diff
FROM stock_prices;

The LAG window function is used to obtain the price value of the row that is sorted by event_time and is immediately before the current row with the same stock_id. Corresponding to LAG, there is also a LEAD function, used to obtain the next row (in terms of time order, i.e., a more "leading" row). We refer to these types of window functions as general-purpose window functions, consistent with the Concept in PostgreSQL.


Example 2


The second example calculates for each order the average amount of the last 10 orders of the user who placed the order.

CREATE MATERIALIZED VIEW mv AS
SELECT
  user_id,
  amount,
  AVG(amount) OVER (
    PARTITION BY user_id
    ORDER BY order_time
    ROWS BETWEEN 10 PRECEDING AND 1 PRECEDING
  ) AS recent_10_orders_avg_amount
FROM orders;

The AVG function here is an aggregate function. In RisingWave, all aggregate functions can be used as window functions, followed by an OVER clause to specify the computing window. We call this type of window function an Aggregate Window Function. Similarly, this aligns with PostgreSQL’s concept, making it easier for users to understand.


Two output modes


In a previous article, Deep dive into the RisingWave stream processing engine (part 3): trigger mechanism, we introduced two output strategies of the RisingWave streaming engine, including the default general mode, “Emit-On-Update”, and the EOWC mode, “Emit-On-Window-Close”, which can be enabled through keywords. The OverWindow operator also supports these two .


General mode (Emit-On-Update)


In general mode, the OverWindow operator immediately finds the range of rows affected by the input change upon receiving an input change, and then recalculates the window function results for all rows in that range.

The two SQL examples in the previous section are calculated using the general mode.


EOWC mode (Emit-On-Window-Close)


By adding the EMIT ON WINDOW CLOSE keyword to the query, you can use the EOWC output mode.

Before further explaining, we would like to remind you that since the EOWC mode is still an experimental feature, its behavior and syntax may change. For example, the syntax changed in version 1.2, adjusting the position of the EMIT ON WINDOW CLOSE keyword. So please refer to the documentation for the version you are using.

In EOWC mode, OverWindow only outputs when it receives a watermark that "submerges" both the ORDER BY column and the corresponding window of the row. This is slightly different from the EOWC mode of the HashAgg operator, where receiving a watermark on GROUP BY column signifies that groups before the watermark are “complete" and can be output.

In OverWindow, two conditions must be met before outputting: first, the "completion" of the ORDER BY column, meaning the input row is semantically allowed to be visible downstream, and second, the "completion" of the window defined by the window function, meaning the last row of the input row's corresponding window is also visible downstream.

For performance considerations, we wrote two executor implementations for the general mode and EOWC mode (though much of the code is reused), to fully utilize the semantic features of the two output modes. The following will introduce them separately.


EOWC version: The art of sliding


The implementation algorithm of the OverWindow operator (hereinafter referred to as EowcOverWindow) in the EOWC version is somewhat simpler than the general version, so it is introduced first.

As previously mentioned, before outputting an input row and its window function calculation results, EowcOverWindow needs to meet two conditions: condition ①, ”completion” of the ORDER BY column, and condition ②, ”completion” of the window of the input row.

That is to say, even if the frame of the window function is ROWS BETWEEN 10 PRECEDING AND 1 PRECEDING, and the condition ① of the row before the CURRENT ROW is met, a situation that the condition ② of the CURRENT ROW seems to be met, EowcOverWindow still waits for condition ① of the CURRENT ROW to be met before outputting. We can understand this from another perspective: consider all input columns included in the output as LAG(?, 0), then you will quickly realize that condition ① is actually a prerequisite for condition ②.

Based on this, we implement EowcOverWindow in two stages. For an input row:

  1. The first stage waits for condition ① to be met. Once met, the row is released to the second stage.
  2. The second stage waits for condition ② to be met. Once met, the window function result is calculated.

The actual calculation of the window function is performed only after both conditions are met, which can avoid a large amount of unnecessary invalid calculations. This is slightly different from the EOWC implementation of the HashAgg operator (which will be introduced in future articles). Because unlike in HashAgg where each group of input changes results in at most one row of output change, every single input row in OverWindow can lead to multiple rows of output, which significantly magnifies both computation and I/O costs.


First stage: SortBuffer


The first stage is a buffer for input rows. Due to the non-decreasing nature of watermarks, it's easy to implement the first stage's output as ordered. Therefore, we named the first stage SortBuffer. Furthermore, we introduced an operator named EowcSort to decouple SortBuffer and the second stage, allowing SortBuffer to be reused elsewhere. Thus, the EowcOverWindow Operator uses EowcSort as its upstream, and its internal implementation only needs to implement the second stage for ordered input rows that meet condition ①.


Second stage: Sliding window


Since the actual calculation of the window function is performed only after condition ② is met, EowcOverWindow needs to first store input rows in its internal state table in order according to the PARTITION BY and ORDER BY columns. Moreover, for each partition, EowcOverWindow maintains in memory the CURRENT ROW and its corresponding window ("current window") that are waiting for the window to complete (this in-memory structure can be rebuilt from the state table during recovery).

When some input rows enter EowcOverWindow from SortBuffer, EowcOverWindow finds the above-mentioned in-memory structure for the corresponding partition. If the "current window" is complete, it outputs the "current row" and the window function calculation results on the "current window", and then slides the "current row" and its window to the next row, repeating this process until the "current window" is no longer complete. As the window slides, some of the oldest rows are moved out of the "current window", allowing EowcOverWindow to clear them from the state table.

Now, let's demonstrate the algorithmic process of the above two stages with an example. Consider the following query (once again, as the EOWC mode is still experimental, its behavior and syntax may change. For example, the syntax changed in version 1.2, adjusting the position of the EMIT ON WINDOW CLOSE keyword. Please refer to the documentation for the version you are using):

CREATE MATERIALIZED VIEW mv AS
SELECT
  SUM(x) OVER (PARTITION BY p ORDER BY ts ROWS 1 PRECEDING),
  SUM(x) OVER (PARTITION BY p ORDER BY ts ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING),
  LEAD(x, 1) OVER (PARTITION BY p ORDER BY ts)
FROM t
EMIT ON WINDOW CLOSE;

These three window function calls have the same PARTITION BY and ORDER BY clauses (in real world scenarios, the optimizer first splits the query into multiple OverWindow operators if there are different PARTITION BY or ORDER BY clauses), but different window frames. Additionally, the ts column defines a watermark with a 5-minute delay.

Then let’s use an animated demonstration of the algorithm to understand it, before that, let's understand the meanings of various arrows used in the animation:

Close
Featured Arrows used in the animation.

Now, the implementation algorithm of EowcOverWindow can be understood through the following animation:

Close
Featured Animation explaining the EowcOverWindow algorithm.


General version: Aesthetics of symmetry


Compared to the EOWC version, the general version of OverWindow (hereinafter referred to as GeneralOverWindow) seems simpler but is actually more complex in implementation.

In GeneralOverWindow, the ORDER BY column usually does not come with a watermark definition, meaning the ORDER BY column of the input rows can be any value (in real scenarios, this means there can be data from days ago being inserted, updated, or deleted). Thus, unlike EowcOverWindow, which always knows the "current window," GeneralOverWindow needs to find the corresponding window for the incoming row first. After that it can compute the window function result.

For example, consider the query example from the last section (removing the EMIT ON WINDOW CLOSE keyword), assuming we have the following data:

ts     pk   x
10:00  100  5
10:02  101  3
10:10  103  9
10:17  104  0

Now, a new row 10:06 102 8 is inserted, as shown below. (Here we'll discuss only insertions. Similar logic applies for updates or deletions.)

ts     pk   x
10:00  100  5
10:02  101  3
10:06  102  8  <-- insert
10:10  103  9
10:17  104  0

According to the specified frame, to calculate the window function result for the row pk = 102, we need to find one row before and one row after, meaning the "current window" for the CURRENT ROW of 102 ranges from row 101 to 103.

Here, we can quickly realize that the "current window" identified by starting from the newly inserted row and extending forward and backward according to the window frame only generates one row of output corresponding to the new insertion. However, the newly inserted row might also belong to other windows that have already been output, leading to the need to modify previously output rows. Therefore, we need to change our algorithm.

Instead of considering the currently inserted/updated/deleted row as the CURRENT ROW to identify the window, we should treat it as the last row of some window A and the first row of another window B. By identifying windows A and B, we can correctly generate new outputs for all affected rows.

Continuing with the same data example, considering row 102 as the last row of window A, and finding backwards, we can identify that the CURRENT ROW for window A is row 101, thereby finding the first row of window A as row 100. Here, we mark the first row 100 as first_frame_start and the CURRENT ROW(row 101) as first_curr_row. Symmetrically, treating row 102 as the first row of window B, and searching forward, we can find that the CURRENT ROW for window B is row 103, and the last row of window B is 104. These are marked as last_curr_row and last_frame_end respectively. This process is demonstrated in the following animation:

Close
Featured Animation showing the process of identifying the affected window range in GeneralOverWindow.

After identifying the entire range (first_frame_start, first_curr_row, last_curr_row, last_frame_end) affected by the new input row (corresponding to the four horizontal lines in the animation), we can reuse the second stage code of EowcOverWindow to calculate the new output results from first_curr_row to last_curr_row, as shown in the following animation:

Close
Featured Reusing the second stage code of EowcOverWindow.

conclusion

This article provides a brief introduction to RisingWave’s implementation of window functions. If you are interested in delving deeper, you are welcome to visit the RisingWave documentation and tutorial website.

RisingWave is an open-source distributed streaming database under the Apache 2.0 license. It aims to make stream computing easy to use. RisingWave achieves efficient complex queries, instant dynamic scaling, and fast fault recovery by separating data and compute. This helps users build stable and performant stream computing systems effortlessly. Using RisingWave to process streaming data is similar to using PostgreSQL. By creating real-time materialized views, users can quickly write stream computing logic and access these materialized views for immediate and consistent querying of stream computing results.

Avatar

Yuchao Qian

Kernel Engineer

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.