From a7d4e096328442ebe6c423c55cbb21e1add2ff61 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 22 Jun 2022 19:39:29 +0200 Subject: [PATCH] fix: specify how function arguments are to be bound BREAKING CHANGE: function argument bindings were open to interpretation before, and were often produced incorrectly; therefore, this change semantically shifts some responsibilities from the consumers to the producers. --- proto/substrait/algebra.proto | 187 +++++++++++++++++-- site/docs/expressions/aggregate_functions.md | 12 +- 2 files changed, 178 insertions(+), 21 deletions(-) diff --git a/proto/substrait/algebra.proto b/proto/substrait/algebra.proto index fb07ba02f..1c9f87359 100644 --- a/proto/substrait/algebra.proto +++ b/proto/substrait/algebra.proto @@ -478,47 +478,146 @@ message Expression { } } + // A scalar function call. message ScalarFunction { - // points to a function_anchor defined in this plan + // Points to a function_anchor defined in this plan, which must refer + // to a scalar function in the associated YAML file. Required; avoid + // using anchor/reference zero. uint32 function_reference = 1; + + // The arguments to be bound to the function. This must have exactly the + // number of arguments specified in the function definition, and the + // argument types must also match exactly: + // + // - Value arguments must be bound using FunctionArgument.value, and + // the expression in that must yield a value of a type that a function + // overload is defined for. + // - Type arguments must be bound using FunctionArgument.type. + // - Required enum arguments must be bound using FunctionArgument.enum + // followed by Enum.specified, with a string that case-insensitively + // matches one of the allowed options. + // - Optional enum arguments must be bound using FunctionArgument.enum + // followed by either Enum.specified or Enum.unspecified. If specified, + // the string must case-insensitively match one of the allowed options. repeated FunctionArgument arguments = 4; + + // Must be set to the return type of the function, exactly as derived + // using the declaration in the extension. Type output_type = 3; - // deprecated; use args instead + // Deprecated; use arguments instead. repeated Expression args = 2 [deprecated = true]; } + // A window function call. message WindowFunction { - // points to a function_anchor defined in this plan + // Points to a function_anchor defined in this plan, which must refer + // to a window function in the associated YAML file. Required; 0 is + // considered to be a valid anchor/reference. uint32 function_reference = 1; - repeated Expression partitions = 2; + + // The arguments to be bound to the function. This must have exactly the + // number of arguments specified in the function definition, and the + // argument types must also match exactly: + // + // - Value arguments must be bound using FunctionArgument.value, and + // the expression in that must yield a value of a type that a function + // overload is defined for. + // - Type arguments must be bound using FunctionArgument.type, and a + // function overload must be defined for that type. + // - Required enum arguments must be bound using FunctionArgument.enum + // followed by Enum.specified, with a string that case-insensitively + // matches one of the allowed options. + // - Optional enum arguments must be bound using FunctionArgument.enum + // followed by either Enum.specified or Enum.unspecified. If specified, + // the string must case-insensitively match one of the allowed options. + repeated FunctionArgument arguments = 9; + + // Must be set to the return type of the function, exactly as derived + // using the declaration in the extension. + Type output_type = 7; + + // Describes which part of the window function to perform within the + // context of distributed algorithms. Required. Must be set to + // INITIAL_TO_RESULT for window functions that are not decomposable. + AggregationPhase phase = 6; + + // If specified, the records that are part of the window defined by + // upper_bound and lower_bound are ordered according to this list + // before they are aggregated. The first sort field has the highest + // priority; only if a sort field determines two records to be equivalent + // is the next field queried. This field is optional, and is only allowed + // if the window function is defined to support sorting. repeated SortField sorts = 3; - Bound upper_bound = 4; + + // Specifies whether equivalent records are merged before being aggregated. + // Optional, defaults to AGGREGATION_INVOCATION_ALL. + AggregateFunction.AggregationInvocation invocation = 10; + + // When one or more partition expressions are specified, two records are + // considered to be in the same partition if and only if these expressions + // yield an equal tuple of values for both. When computing the window + // function, only the subset of records within the bounds that are also in + // the same partition as the current record are aggregated. + repeated Expression partitions = 2; + + // Defines the record relative to the current record from which the window + // extends. The bound is inclusive. If the lower bound indexes a record + // greater than the upper bound, TODO (null range/no records passed? + // wrapping around as if lower/upper were swapped? error? null?). + // Optional; defaults to the start of the partition. Bound lower_bound = 5; - AggregationPhase phase = 6; - Type output_type = 7; - repeated FunctionArgument arguments = 9; - // deprecated; use args instead + // Defines the record relative to the current record up to which the window + // extends. The bound is inclusive. If the upper bound indexes a record + // less than the lower bound, TODO (null range/no records passed? + // wrapping around as if lower/upper were swapped? error? null?). + // Optional; defaults to the end of the partition. + Bound upper_bound = 4; + + // Deprecated; use arguments instead. repeated Expression args = 8 [deprecated = true]; + // Defines one of the two boundaries for the window of a window function. message Bound { + // Defines that the bound extends this far back from the current record. message Preceding { + // A strictly positive integer specifying the number of records that + // the window extends back from the current record. Required. Use + // CurrentRow for offset zero and Following for negative offsets. int64 offset = 1; } + // Defines that the bound extends this far ahead of the current record. message Following { + // A strictly positive integer specifying the number of records that + // the window extends ahead of the current record. Required. Use + // CurrentRow for offset zero and Preceding for negative offsets. int64 offset = 1; } + // Defines that the bound extends to or from the current record. message CurrentRow {} + // Defines an "unbounded bound": for lower bounds this means the start + // of the partition, and for upper bounds this means the end of the + // partition. message Unbounded {} oneof kind { + // The bound extends some number of records behind the current record. Preceding preceding = 1; + + // The bound extends some number of records ahead of the current + // record. Following following = 2; + + // The bound extends to the current record. CurrentRow current_row = 3; + + // The bound extends to the start of the partition or the end of the + // partition, depending on whether this represents the upper or lower + // bound. Unbounded unbounded = 4; } } @@ -826,33 +925,91 @@ message SortField { } } +// Describes which part of an aggregation or window function to perform within +// the context of distributed algorithms. enum AggregationPhase { + // Implies `INTERMEDIATE_TO_RESULT`. AGGREGATION_PHASE_UNSPECIFIED = 0; + + // Specifies that the function should be run only up to the point of + // generating an intermediate value, to be further aggregated later using + // INTERMEDIATE_TO_INTERMEDIATE or INTERMEDIATE_TO_RESULT. AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE = 1; + + // Specifies that the inputs of the aggregate or window function are the + // intermediate values of the function, and that the output should also be + // an intermediate value, to be further aggregated later using + // INTERMEDIATE_TO_INTERMEDIATE or INTERMEDIATE_TO_RESULT. AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE = 2; + + // A complete invocation: the function should aggregate the given set of + // inputs to yield a single return value. This style must be used for + // aggregate or window functions that are not decomposable. AGGREGATION_PHASE_INITIAL_TO_RESULT = 3; + + // Specifies that the inputs of the aggregate or window function are the + // intermediate values of the function, generated previously using + // INITIAL_TO_INTERMEDIATE and possibly INTERMEDIATE_TO_INTERMEDIATE calls. + // This call should combine the intermediate values to yield the final + // return value. AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT = 4; } +// An aggregate function. message AggregateFunction { - // points to a function_anchor defined in this plan + // Points to a function_anchor defined in this plan, which must refer + // to an aggregate function in the associated YAML file. Required; 0 is + // considered to be a valid anchor/reference. uint32 function_reference = 1; + + // The arguments to be bound to the function. This must have exactly the + // number of arguments specified in the function definition, and the + // argument types must also match exactly: + // + // - Value arguments must be bound using FunctionArgument.value, and + // the expression in that must yield a value of a type that a function + // overload is defined for. + // - Type arguments must be bound using FunctionArgument.type, and a + // function overload must be defined for that type. + // - Required enum arguments must be bound using FunctionArgument.enum + // followed by Enum.specified, with a string that case-insensitively + // matches one of the allowed options. + // - Optional enum arguments must be bound using FunctionArgument.enum + // followed by either Enum.specified or Enum.unspecified. If specified, + // the string must case-insensitively match one of the allowed options. repeated FunctionArgument arguments = 7; - repeated SortField sorts = 3; - AggregationPhase phase = 4; + + // Must be set to the return type of the function, exactly as derived + // using the declaration in the extension. Type output_type = 5; + + // Describes which part of the aggregation to perform within the context of + // distributed algorithms. Required. Must be set to INITIAL_TO_RESULT for + // aggregate functions that are not decomposable. + AggregationPhase phase = 4; + + // If specified, the aggregated records are ordered according to this list + // before they are aggregated. The first sort field has the highest + // priority; only if a sort field determines two records to be equivalent is + // the next field queried. This field is optional. + repeated SortField sorts = 3; + + // Specifies whether equivalent records are merged before being aggregated. + // Optional, defaults to AGGREGATION_INVOCATION_ALL. AggregationInvocation invocation = 6; - // deprecated; use args instead + // deprecated; use arguments instead repeated Expression args = 2 [deprecated = true]; + // Method in which equivalent records are merged before being aggregated. enum AggregationInvocation { + // This default value implies AGGREGATION_INVOCATION_ALL. AGGREGATION_INVOCATION_UNSPECIFIED = 0; - // Use all values in aggregation calculation + // Use all values in the aggregation calculation. AGGREGATION_INVOCATION_ALL = 1; - // Use only distinct values in aggregation calculation + // Use only distinct values in the aggregation calculation. AGGREGATION_INVOCATION_DISTINCT = 2; } } diff --git a/site/docs/expressions/aggregate_functions.md b/site/docs/expressions/aggregate_functions.md index 156895b6c..e40b61a9a 100644 --- a/site/docs/expressions/aggregate_functions.md +++ b/site/docs/expressions/aggregate_functions.md @@ -4,11 +4,11 @@ Aggregate functions are functions that define an operation which consumes values Aggregate function signatures contain all the properties defined for [scalar functions](scalar_functions.md). Additionally, they contain the properties below: -| Property | Description | Required | -| ------------------------ | ------------------------------------------------------------- | ------------------------------- | -| Inherits | All properties defined for scalar function. | N/A | -| Ordered | Whether this aggregation function should allow user ordering. | Optional, defaults to false | -| Maximum set size | Maximum allowed set size as an unsigned integer. | Optional, defaults to unlimited | +| Property | Description | Required | +| ------------------------ | --------------------------------------------------------------- | ------------------------------- | +| Inherits | All properties defined for scalar function. | N/A | +| Ordered | Whether the result of this function is sensitive to sort order. | Optional, defaults to false | +| Maximum set size | Maximum allowed set size as an unsigned integer. | Optional, defaults to unlimited | | Decomposable | Whether the function can be executed in one or more intermediate steps. Valid options are: `NONE`, `ONE`, `MANY`, describing how intermediate steps can be taken. | Optional, defaults to `NONE` | | Intermediate Output Type | If the function is decomposable, represents the intermediate output type that is used, if the function is defined as either `ONE` or `MANY` decomposable. Will be a struct in many cases. | Required for `ONE` and `MANY`. | | Invocation | Whether the function uses all or only distinct values in the aggregation calculation. Valid options are: `ALL`, `DISTINCT`. | Optional, defaults to `ALL` | @@ -22,5 +22,5 @@ When binding an aggregate function, the binding must include the following addit | Property | Description | | -------- | ------------------------------------------------------------ | | Phase | Describes the input type of the data: [INITIAL_TO_INTERMEDIATE, INTERMEDIATE_TO_INTERMEDIATE, INITIAL_TO_RESULT, INTERMEDIATE_TO_RESULT] describing what portion of the operation is required. For functions that are NOT decomposable, the only valid option will be INITIAL_TO_RESULT. | -| Ordering | One or more ordering keys along with key order (ASC\|DESC\|NULL FIRST, etc.), declared similar to the sort keys in an `ORDER BY` relational operation. Only allowed in cases where the function signature supports ordering. | +| Ordering | Zero or more ordering keys along with key order (ASC\|DESC\|NULL FIRST, etc.), declared similar to the sort keys in an `ORDER BY` relational operation. If no sorts are specified, the records are not sorted prior to being passed to the aggregate function. |