Window Functions for PostgreSQL Desgin Overview [v08]

Abstract

Window Functions in SQL is an OLAP functionality that provides ranking, cumulative computation, and partitioning aggregation. Many commercial RDMBS such like Oracle, MS SQL Server and DB2 have implemented part of this specification, while open source RDMBS including PostgreSQL, MySQL and Firebird doesn't yet. To implement this functionality on PostgreSQL not only helps many users move from those RDBMS to PostgreSQL but encourages OLAP applications such as BI (Business Inteligence) to analyze large data set. This specification is defined first in SQL:2003, and improved in SQL:2008.

The first proposal: http://archives.postgresql.org/pgsql-hackers/2008-06/msg00380.php

The subsequent discussion: http://archives.postgresql.org/pgsql-hackers/2008-07/msg00232.php

The discussion about window function APIs: http://archives.postgresql.org/pgsql-hackers/2008-09/msg00021.php

The classification of window functions and introduction of Buffering Strategy: http://archives.postgresql.org/pgsql-hackers/2008-10/msg00892.php

patch v08: http://umitanuki.net/pgsql/wfv08/window_functions.patch.20081031.gz

patch v08 applied source git: http://git.postgresql.org/git/~davidfetter/window_functions/.git

sample SQL: http://umitanuki.net/pgsql/wfv08/sample.sql

Below is a description of how it is designed in the patch so far.

Roadmap

Below may be dropped features for 8.4.

Lacking these feature, this stage looks compatible to SQLServer 2005, while Oracle and DB2 have almost full of the specification. Nevertheless, if the current window function APIs design is accepted by audience, sliding window frame is not difficult to implement for 8.4.

Terminology

When posted and discussed in -hackers list, a bit confusing was about terminology. So be aware of these definitions.

window function

An expression evaluated in a Window node, which is one of rank function, aggregate function, ntile function, lead or lag function, first or last value function, or nth value function. In a Window node, only TargetEntry that has window expression is evaluated, while other entries are evaluated some outer (scans, joins, aggs) node. This is represented by WFunc node.

non-aggregate window function

This type of function returns different or the same values row by row. Since this function needs to know and operate "current window", we will need to add new mechanism to PostgreSQL. This includes new spec functions such as ROW_NUMBER(), RANK(), DENSE_RANK(), LEAD(), LAG(), etc. In v06, a new attribute pg_proc.proiswfunc is introduced, indicating the function has capability to be called as a (non-aggregate) window function.

window aggregate

The rest part of window function. This type of function scans tuples along the specified window frame, then returns the same values as long as the window frame doesn't slide. We can use aggregate function we already have and there's no need to add/introduce anything new. In v06, pure aggregate functions are treated as special subset of window functions. See eval_windowaggregate() in nodeWindow.c.

normal aggregate (group aggregate)

This is a normal aggregate that PostgreSQL has already. "Normal" means "not windowed". In some SQL spec documents, they call it "group aggregate".

frame

Also called a sliding/moving window, which is represented in SQL syntax by "ROW BETWEEN...", "RANGE BETWEEN...", "CURRENT ROW...", etc. This range slides row by row in a partitoned window and window functions/aggregates can access any row within this moving frame, thus we need to introduce some mechanism to optimize not to allocate wasting memory.

partition

A partition is set of rows specified by PARTITION BY clause. As long as the values specified in the clause are peers, the rows are contained in the same partition. Even if there is a moving frame, the content of the partition never changes.

to shrink

A window frame shrinks. Especially when window function APIs and WindowObject tell about "shrinking frame", it indicates to cut off the preceding rows spilled out from the current frame that is related to the current row. How it shrinks depends on how PRECEDING of FRAME clause specifies, and sometimes doesn't shrink. However, it never feeds rows before the frame as the current row advances.

to extend

A window frame extends. To extend is opposite of "to shrink". A frame that extends feeds rows after the current frame that is related to the current row as it advances. Same as shrinking, how it extends depends on how FOLLOWING of FRAME clause specifies, and sometimes doesn't extend. It never cuts off rows after the frame.

How it works

The sample table is like this.

sample=# SELECT * FROM empsalary;
  depname  | empno | salary | enroll_date 
-----------+-------+--------+-------------
 develop   |    10 |   5200 | 2007-08-01
 sales     |     1 |   5000 | 2006-10-01
 personnel |     5 |   3500 | 2007-12-10
 sales     |     4 |   4800 | 2007-08-08
 sales     |     6 |   5500 | 2007-01-02
 personnel |     2 |   3900 | 2006-12-23
 develop   |     7 |   4200 | 2008-01-01
 develop   |     9 |   4500 | 2008-01-01
 sales     |     3 |   4800 | 2007-08-01
 develop   |     8 |   6000 | 2006-10-01
 develop   |    11 |   5200 | 2007-08-15
(11 rows)

Now let's throw a windowed query.

sample=# SELECT 
sample-#   depname,
sample-#   empno,
sample-#   salary,
sample-#   sum(salary) OVER (PARTITION BY depname)
sample-# FROM 
sample-#   empsalary;
  depname  | empno | salary |  sum  
-----------+-------+--------+-------
 develop   |    10 |   5200 | 25100
 develop   |     7 |   4200 | 25100
 develop   |     9 |   4500 | 25100
 develop   |     8 |   6000 | 25100
 develop   |    11 |   5200 | 25100
 personnel |     2 |   3900 |  7400
 personnel |     5 |   3500 |  7400
 sales     |     3 |   4800 | 20100
 sales     |     1 |   5000 | 20100
 sales     |     4 |   4800 | 20100
 sales     |     6 |   5500 | 20100
(11 rows)

You may see dep_sum is the result of SUM() for each depname, and year_sum is the result of SUM() for each enrolling year, without rows aggregated.

The ranking function of window function works as:

sample=# SELECT
sample-#   depname,
sample-#   empno,
sample-#   salary,
sample-#   rank() OVER (PARTITION BY depname ORDER BY salary)
sample-# FROM
sample-#   empsalary;
  depname  | empno | salary | rank 
-----------+-------+--------+------
 develop   |     7 |   4200 |    1
 develop   |     9 |   4500 |    2
 develop   |    10 |   5200 |    3
 develop   |    11 |   5200 |    3
 develop   |     8 |   6000 |    5
 personnel |     5 |   3500 |    1
 personnel |     2 |   3900 |    2
 sales     |     4 |   4800 |    1
 sales     |     3 |   4800 |    1
 sales     |     1 |   5000 |    3
 sales     |     6 |   5500 |    4
(11 rows)

Another example shows a use in combination with GROUP BY clause.

sample=# SELECT
sample=#   y,
sample=#   m,
sample=#   SUM(SUM(people)) OVER (PARTITION BY y),
sample=#   AVG(people)
sample=# FROM(
sample=#   SELECT
sample=#     EXTRACT(YEAR FROM accident_date) AS y,
sample=#     EXTRACT(MONTH FROM accident_date) AS m,
sample=#     *
sample=#   FROM
sample=#     accident
sample=# )s
sample=# GROUP BY y, m
sample=# ORDER BY y, m;
  y   | m  | sum  |        avg         
------+----+------+--------------------
 2005 |  1 | 1698 | 3.5161290322580645
 2005 |  2 | 1698 | 4.8928571428571429
 2005 |  3 | 1698 | 4.3870967741935484
 2005 |  4 | 1698 | 4.7333333333333333
 2005 |  5 | 1698 | 5.0967741935483871
 2005 |  6 | 1698 | 5.2666666666666667
 2005 |  7 | 1698 | 4.8709677419354839
 2005 |  8 | 1698 | 4.7419354838709677
 2005 |  9 | 1698 | 4.8000000000000000
 2005 | 10 | 1698 | 4.8709677419354839
 2005 | 11 | 1698 | 4.1333333333333333
 2005 | 12 | 1698 | 4.5483870967741935
 2006 |  1 | 1740 | 4.3870967741935484
 2006 |  2 | 1740 | 4.5000000000000000
 2006 |  3 | 1740 | 4.8387096774193548
 2006 |  4 | 1740 | 5.0333333333333333
 2006 |  5 | 1740 | 4.4838709677419355
 2006 |  6 | 1740 | 4.1333333333333333
 2006 |  7 | 1740 | 5.1935483870967742
 2006 |  8 | 1740 | 4.7419354838709677
 2006 |  9 | 1740 | 3.8333333333333333
 2006 | 10 | 1740 | 6.2258064516129032
 2006 | 11 | 1740 | 4.4333333333333333
 2006 | 12 | 1740 | 5.3225806451612903
(24 rows)

You can put any expressions as window function's arguments or PARTITION BY/ORDER BY clause as long as they satisfy the condition that normal aggregate requires.

Now WINDOW clause is shown.

sample=# SELECT depname, empno, salary, sum(salary) OVER w FROM empsalary WINDOW w AS (PARTITION BY depname);
  depname  | empno | salary |  sum  
-----------+-------+--------+-------
 develop   |    11 |   5200 | 25100
 develop   |     7 |   4200 | 25100
 develop   |     9 |   4500 | 25100
 develop   |     8 |   6000 | 25100
 develop   |    10 |   5200 | 25100
 personnel |     5 |   3500 |  7400
 personnel |     2 |   3900 |  7400
 sales     |     3 |   4800 | 14600
 sales     |     1 |   5000 | 14600
 sales     |     4 |   4800 | 14600
(10 rows)

Note that a window definition which is not referred from any function is ignored.

Now comes the basic cumulative aggregate.

sample=# SELECT sum(i) OVER (ORDER BY i) FROM generate_series(1, 10) i;
 sum
-----
  1
  3
  6
 10
 15
 21
 28
 36
 45
 55
(10 rows)

With ORDER BY clause in the window specification, you implicitly define the frame as ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. So the result of sum() is total of the numbers contained between the first row and the current row.

Changes from v06

Limitation & TODO

Added entries for pg_proc (oid)

All above is implemented in backend/utils/adt/wfunc.c and declared in include/utils/builtins.h

Added files

Added nodes

primnode

parser

planner

executor

Special structs

These are all in nodeWindow.c

Added keywords in parsing (alphabetical order)

Note: ROWS is still non-reserved.

Plan

EXPLAIN
SELECT 
  sum(salary) OVER (PARTITION BY depname) AS dep_sum
  ,sum(salary) OVER (PARTITION BY extract(YEAR FROM enroll_date)) AS year_sum
  ,*
FROM 
  empsalary;
                                       QUERY PLAN                                        
-----------------------------------------------------------------------------------------
 Window  (cost=127.23..129.83 rows=1040 width=48)
   ->  Sort  (cost=127.23..129.83 rows=1040 width=48)
         Sort Key: (date_part('year'::text, (enroll_date)::timestamp without time zone))
         ->  Window  (cost=72.52..75.12 rows=1040 width=48)
               ->  Sort  (cost=72.52..75.12 rows=1040 width=48)
                     Sort Key: depname
                     ->  Seq Scan on empsalary  (cost=0.00..20.40 rows=1040 width=48)

This plan is quite ugly, because for each window a Window node is implicitly added with a Sort node. Probably all of window and sort process is packed into a Window node. For this current plan, Sort node uses Tuplesort as you expect then Window node uses Tuplestore to store each Partition tuples. This is supposed to be the worst plan. We are able to get it better somehow.

The design of window function

The final design

The final window functions design is described in nodeWindow.c:

* A window function is defined as a function maked as wfunc in pg_proc. By this
* mark, it means the function can handle window function APIs that allow
* it to access arbitrary random rows within the window. 
*
* Window node can aggregate function as well, treating it as special case.
* The aggregated result is cached in a WindowStatePerAgg struct and is
* recycled if the frame wasn't either shrinked nor extended. If not shrinked
* but extended, the unprocessed rows are passed to trans function and the result
* would be finalized again. If not shrinked and not extended, the result is
* reused without additional trans/final function calls. If shrinked, the cached
* result values cannot be used so the node initializes state and
* aggregate values from the head of the frame again. This is not efficient
* so the aggregate function can be a window function, which can subtract
* values from the shrinking rows for the next execution, in order to avoid
* whole the aggregate process is processed again. fcinfo->context will be
* a WindowState instead of AggState if the aggregate function is called as
* a window function.

...

*
* Note that the solid concept of the window functions is "they can access
* arbitrary rows within the frame as they want". As far as we keep the rule
* of thumb, any kind of optimization is allowed.

Points are:

Not implemented yet, but with this design we will see the FRAME clause and be able to optimize window aggregate soon. Currently window aggregate caches its result unless the frame shrinks. But once the frame shrinks, aggregate must be recalculate all the tuples. If the frame doesn't shrink but extends, aggregate continues trans function with the trans value that is the result of the last trans function call. To avoid shrinking case inefficiency, we can define aggregates as window functions. The pseudo code is such like:

Datum
count_window(PG_FUNCTION_ARGS)
{
	WindowObject winobj = PG_WINDOW_OBJECT();
	count_context *context;
	int		i;
	int64		result;
	
	allocate_if_new(context, sizeof(int64));
	
	context = fcinfo->flinfo->fn_extra;
	result = context->result++;
	for(i = 0; i < WinFrameGetShrinkingNum(winobj); i++)
			context->result--;
	
	PG_RETURN_INT64(result);
}

window function APIs

## nodeWindow.h ##
/* window function APIs */
#define PG_WINDOW_OBJECT(n) ((WindowObject) ((WindowState *) fcinfo->context)->winobj)
#define PG_WINDOW_ARG(n) ((ExprState *) (DatumGetPointer(fcinfo->arg[n])))

#define WINDOW_SEEK_CURRENT 0
#define WINDOW_SEEK_HEAD 1
#define WINDOW_SEEK_TAIL 2

/*
 * private in nodeWindow.c but exposed as window function APIs
 */
typedef struct WindowObjectData *WindowObject;
typedef struct WindowIterData *WindowIter;

extern int64 WinRowCurrentPos(WindowObject winobj);
extern Datum WinRowGetArg(WindowObject winobj, ExprState *argstate, bool *isnull);
extern bool WinRowGetTuple(WindowObject winobj, TupleTableSlot *slot);

extern bool WinFrameShrinked(WindowObject winobj);
extern bool WinFrameExtended(WindowObject winobj);
extern int64 WinFrameGetRowNum(WindowObject winobj);
extern Datum WinFrameGetArg(WindowObject winobj, ExprState *argstate, 
			int relpos, int seektype, bool *isnull, bool *isout);
extern bool WinFrameGetTuple(WindowObject winobj, TupleTableSlot *slot,
			int relpos, int seektype, bool *isout);
extern int WinFrameShrinkingNum(WindowObject winobj);
extern int WinFrameExtendedNum(WindowObject winobj);

extern int64 WinPartGetRowNum(WindowObject winobj);
extern Datum WinPartGetArg(WindowObject winobj, ExprState *argstate,
			int relpos, int seektype, bool *isnull, bool *isout);
extern bool WinPartGetTuple(WindowObject winobj, TupleTableSlot *slot,
			int relpos, int seektype, bool *isout);

extern WindowIter WinFrameStartIter(WindowObject winobj, int pos);
extern WindowIter WinPartStartIter(WindowObject winobj, int pos);
extern bool WinIterNext(WindowIter iter);
extern Datum WinIterGetArg(WindowIter iter, ExprState *argstate, bool *isnull);
extern bool WinIterGetTuple(WindowIter iter, TupleTableSlot *slot);

the Buffering Strategy

## plannodes.h ##
/* ----------------
 *		window buffering strategy ID
 * ----------------
 */
#define WINDOW_BUFFER_ROW		1	/* no buffering, only CURRENT ROW */
#define WINDOW_BUFFER_FRAME		2	/* including ROWS/RANGE */
#define WINDOW_BUFFER_PARTITION	4	/* all inclusive */

window function classification and the Buffering Strategies

- row_number() -- row buffering
 Obviously no buffering, or what you need is only where you are in the partition

- rank() -- row buffering
 You need two rows, previous row and current row to determine if you
rank up or not. But it is solvable if the function cache the previous
row so row buffering is needed.

- dense_rank() -- row buffering
 Same as rank().

- percent_rank() -- partition buffering
 In addition to rank() requirement, you have to see the last row in
the partition to count up how many rows are contained in the
partition. The spec says: defined as (RK--1)/(NR--1), where RK is
defined to be the RANK of R and NR is defined to be the number of rows
in the window partition of R.

- cume_dist() -- partition buffering
 Almost same as percent_rank(), partition buffering.

- ntile() -- partition buffering
 You have to know how many rows in the partition to determine how many
rows are contained in each bucket.

- lead() -- partition buffering
 The spec says: returns the value of VE evaluated on a row that is
OFFSET number of rows after R within P (partition).

- lag() -- partition buffering
 Same as lead().

- first_value() -- frame buffering
 The spec says: return the value of VE evaluated on the first row of
the window frame.

- last_value() -- frame buffering
 Same as first_value().

- nth_value() -- frame buffering
 Same as first_value()

- aggregates -- frame buffering
 aggregates all rows in the frame.

So I propose three Window node buffering strategies,
row/frame/partition bufferinig so as to avoid unnecessary row
buffering. Each window functions have buffering strategy number and
planner detect which strategy is at least needed for the execution. If
the node contains only row_number() then it needs row buffering but if
row_number() and lead() then partition bufferinig is needed.
Temporarily until window function APIs are public out, the strategy
numbers for each function can be defined in the source code as macro
or something, and when they are public we must have pg_wfunc or
something to register the strategy. If the function violate the
declared strategy and touched different API, for example if the
row_number() is about to call window_paritition_rows(), error will
occur.

quoted from http://archives.postgresql.org/pgsql-hackers/2008-10/msg00892.php

As above, the Buffering Strategies are used to reduce inefficiency when the unnecessary rows are stored though the functions on the node don't require them. In other word, to store rows as they need. If there is only row_number() and rank(), Window node doesn't store any rows but the current row (Row Buffering), and if there is lead() the node stores all the rows contained in the partition (Partition Buffering) even though there is another row_number().

Note: Currently, supported strategies are Partition and Row. If Frame Buffering is needed, Partition is used. It is because the supported FRAME is only "ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" and "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW". The former is exactly same as Partition Buffering and latter is not so efficient as it is complex.

Former discussions about window function design

These shown below are *former* ideas about how the window function is made up. Pros & cons from these ideas are considered into the current design.

to extend current aggregate, posed by Simon Riggs

CREATE AGGREGATE window_func() 
(
	sfunc = ...
	stype = ...
	wfunc = ...
	initcond = 
)

For each row we would execute the transition function (sfunc) then, if
there is a window function (wfunc) then we call that to return a value
for this tuple (so in that case we execute two functions per tuple in
the window). If wfunc is not set then we return the transition datatype
itself.
http://archives.postgresql.org/pgsql-hackers/2008-07/msg00236.php

Objection: A window aggregate is same as a grouping aggregate. Also, some of window functions need full scan of rows *before* returning values.

to define them as non-user-defined functions, posed by Simon Riggs

So that would mean we don't provide a mechanism for user-defined
windowed aggregate functions at all. Which solves the discussion about
how to pass generic info through to them (at least long enough to get
the first implementation done).
http://archives.postgresql.org/pgsql-hackers/2008-07/msg00239.php

Objection: As mentioned, it hides the definition of functions from external user so that implementation is easier. However, it is odd as other function types is extensible and SQL spec may add more functions later. Some unification seems need.

to introduce Window Object and pass it to the functions, posed by Hitoshi Harada

Just idea, how about pass window object to a function? We'll provide
window operation API then in the function you take window object
through fcinfo:

Datum func(PG_FUNCTION_ARGS)
{
  Datum v;
  WindowObject w = get_window(fcinfo);
  HeapTuple htup_current = window_current_row(w);
  HeapTuple htup_prev = window_preceding(w, 1);
  /* do something */
  PG_RETURN_DATUM(v);
}
http://archives.postgresql.org/pgsql-hackers/2008-07/msg00254.php

Objection: You should consider about the performance. Some optimization mechanism is required.

discussion after v04

More valuable discussion about the design starts here: http://archives.postgresql.org/pgsql-hackers/2008-09/msg00021.php

1. Implement Window node, with the capability to invoke an aggregate function, using the above API. Implement required parser/planner changes. Implement a few simple ranking aggregates using the API.
2. Implement glue to call normal aggregates through the new API
3. Implement the optimization to drop rows that are no longer needed (signal_cutoff can be a no-op until this phase)
4. Implement window framing (the frame can always be all rows in the partition, or all rows until the current row, until this phase)
5. Expose the new API to user-defined aggregates. It can be an internal API only used by built-in functions until this phase

I believe you already have phase 1 in your patch, except for the API changes

It seems that we must add something like Window object mechanism that represents a window frame, to describe logical window. At the moment there needs to be careful not to cut its performance.

Things to be discussed

Reference

this document is as of 2008/10/31, written by Hitoshi Harada (umi.tanuki@gmail.com)