Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the aggregate API to take a separate group_by #9027

Merged
merged 14 commits into from
Feb 13, 2024
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@
- [Allow removing rows using a Filter_Condition.][8861]
- [Added `Table.to_xml`.][8979]
- [Implemented Write support for `S3_File`.][8921]
- [Separate `Group_By` from `columns` into new argument on `aggregate`.][9027]

[debug-shortcuts]:
https://github.com/enso-org/enso/blob/develop/app/gui/docs/product/shortcuts.md#debug
Expand Down Expand Up @@ -886,6 +887,7 @@
[8861]: https://github.com/enso-org/enso/pull/8861
[8979]: https://github.com/enso-org/enso/pull/8979
[8921]: https://github.com/enso-org/enso/pull/8921
[9027]: https://github.com/enso-org/enso/pull/9027

#### Enso Compiler

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import project.Data.Text.Text

## A warning that an API is deprecated.
type Deprecated
## PRIVATE
Warning type_name:Text method_name:Text message:Text=""

## PRIVATE

Pretty prints the Deprecated warning.
to_display_text : Text
to_display_text self =
if self.message.is_empty then ("Deprecated: " + self.type_name + "." + self.method_name + " is deprecated.") else self.message
131 changes: 73 additions & 58 deletions distribution/lib/Standard/Database/0.0.0-dev/src/Data/Table.enso
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Standard.Base.Errors.Common.Additional_Warnings
import Standard.Base.Errors.Common.Incomparable_Values
import Standard.Base.Errors.Common.Index_Out_Of_Bounds
import Standard.Base.Errors.Common.Type_Error
import Standard.Base.Errors.Deprecated.Deprecated
import Standard.Base.Errors.File_Error.File_Error
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Illegal_State.Illegal_State
Expand Down Expand Up @@ -1720,10 +1721,14 @@ type Table
GROUP Standard.Base.Calculations
ICON sigma

Aggregates the rows in a table using any `Group_By` entries in columns.
The columns argument specifies which additional aggregations to perform and to return.
Aggregates the rows in a table using `group_by` columns.
The columns argument specifies which additional aggregations to perform
and to return.
jdunkerley marked this conversation as resolved.
Show resolved Hide resolved

Arguments:
- group_by: Vector of column identifiers to group by. These will be
included at the start of the resulting table. If no columns are
specified a single row will be returned with the aggregate columns.
- columns: Vector of `Aggregate_Column` specifying the aggregated table.
Expressions can be used within the aggregate column to perform more
complicated calculations.
Expand All @@ -1745,15 +1750,15 @@ type Table
- If a column index is out of range, a `Missing_Input_Columns` is
reported according to the `on_problems` setting, unless
`error_on_missing_columns` is set to `True`, in which case it is
raised as an error. Problems resolving `Group_By` columns are
raised as an error. Problems resolving `group_by` columns are
reported as dataflow errors regardless of these settings, as a
missing grouping will completely change semantics of the query.
- If a column selector is given as a `Text` and it does not match any
columns in the input table nor is it a valid expression, an
`Invalid_Aggregate_Column` error is raised according to the
`on_problems` settings (unless `error_on_missing_columns` is set to
`True` in which case it will always be an error). Problems resolving
`Group_By` columns are reported as dataflow errors regardless of
`group_by` columns are reported as dataflow errors regardless of
these settings, as a missing grouping will completely change
semantics of the query.
- If an aggregation fails, an `Invalid_Aggregation` dataflow error is
Expand All @@ -1771,63 +1776,73 @@ type Table
- If there are more than 10 issues with a single column,
an `Additional_Warnings`.

> Example
jdunkerley marked this conversation as resolved.
Show resolved Hide resolved
Count all the rows

table.aggregate columns=[Aggregate_Column.Count]

> Example
Group by the Key column, count the rows

table.aggregate [Aggregate_Column.Group_By "Key", Aggregate_Column.Count]
table.aggregate ["Key"] [Aggregate_Column.Count]
@group_by Widget_Helpers.make_column_name_vector_selector
@columns Widget_Helpers.make_aggregate_column_vector_selector
aggregate : Vector Aggregate_Column -> Boolean -> Problem_Behavior -> Table ! No_Output_Columns | Invalid_Aggregate_Column | Invalid_Column_Names | Duplicate_Output_Column_Names | Floating_Point_Equality | Invalid_Aggregation | Unquoted_Delimiter | Additional_Warnings
aggregate self columns (error_on_missing_columns=False) (on_problems=Report_Warning) =
validated = Aggregate_Column_Helper.prepare_aggregate_columns self.column_naming_helper columns self error_on_missing_columns=error_on_missing_columns
key_columns = validated.key_columns
key_problems = key_columns.flat_map internal_column->
column = self.make_column internal_column
case column.value_type.is_floating_point of
True -> [Floating_Point_Equality.Error column.name]
False -> []
on_problems.attach_problems_before validated.problems+key_problems <|
resolved_aggregates = validated.valid_columns
key_expressions = key_columns.map .expression
new_ctx = self.context.set_groups key_expressions
problem_builder = Problem_Builder.new
## TODO [RW] here we will perform as many fetches as there are
aggregate columns, but technically we could perform just one
fetch fetching all column types - TODO we should do that. We can
do it here by creating a builder that will gather all requests
from the executed callbacks and create Lazy references that all
point to a single query.
See #6118.
infer_from_database_callback expression =
SQL_Type_Reference.new self.connection self.context expression
dialect = self.connection.dialect
type_mapping = dialect.get_type_mapping
infer_return_type op_kind columns expression =
type_mapping.infer_return_type infer_from_database_callback op_kind columns expression
results = resolved_aggregates.map p->
agg = p.second
new_name = p.first
result = Aggregate_Helper.make_aggregate_column self agg new_name dialect infer_return_type problem_builder
## If the `result` did contain an error, we catch it to be
able to store it in a vector and then we will partition the
created columns and failures.
result.catch Any error->
DB_Wrapped_Error.Value error

partitioned = results.partition (_.is_a DB_Wrapped_Error)

## When working on join we may encounter further issues with having
aggregate columns exposed directly, it may be useful to re-use
the `lift_aggregate` method to push the aggregates into a
subquery.
new_columns = partitioned.second
problem_builder.attach_problems_before on_problems <|
problems = partitioned.first.map .value
on_problems.attach_problems_before problems <|
handle_no_output_columns =
first_problem = if problems.is_empty then Nothing else problems.first
Error.throw (No_Output_Columns.Error first_problem)
if new_columns.is_empty then handle_no_output_columns else
self.updated_context_and_columns new_ctx new_columns subquery=True
aggregate : Vector (Integer | Text | Regex | Aggregate_Column) | Text | Integer | Regex -> Vector Aggregate_Column -> Boolean -> Problem_Behavior -> Table ! No_Output_Columns | Invalid_Aggregate_Column | Invalid_Column_Names | Duplicate_Output_Column_Names | Floating_Point_Equality | Invalid_Aggregation | Unquoted_Delimiter | Additional_Warnings
aggregate self group_by=[] columns=[] (error_on_missing_columns=False) (on_problems=Report_Warning) =
normalized_group_by = Vector.unify_vector_or_element group_by
if normalized_group_by.is_empty && columns.is_empty then Error.throw (No_Output_Columns.Error "At least one column must be specified.") else
validated = Aggregate_Column_Helper.prepare_aggregate_columns self.column_naming_helper normalized_group_by columns self error_on_missing_columns=error_on_missing_columns

key_columns = validated.key_columns
key_problems = key_columns.flat_map internal_column->
column = self.make_column internal_column
case column.value_type.is_floating_point of
True -> [Floating_Point_Equality.Error column.name]
False -> []
on_problems.attach_problems_before validated.problems+key_problems <|
resolved_aggregates = validated.valid_columns
key_expressions = key_columns.map .expression
new_ctx = self.context.set_groups key_expressions
problem_builder = Problem_Builder.new
## TODO [RW] here we will perform as many fetches as there are
aggregate columns, but technically we could perform just one
fetch fetching all column types - TODO we should do that. We can
do it here by creating a builder that will gather all requests
from the executed callbacks and create Lazy references that all
point to a single query.
See #6118.
infer_from_database_callback expression =
SQL_Type_Reference.new self.connection self.context expression
dialect = self.connection.dialect
type_mapping = dialect.get_type_mapping
infer_return_type op_kind columns expression =
type_mapping.infer_return_type infer_from_database_callback op_kind columns expression
results = resolved_aggregates.map p->
agg = p.second
new_name = p.first
result = Aggregate_Helper.make_aggregate_column self agg new_name dialect infer_return_type problem_builder
## If the `result` did contain an error, we catch it to be
able to store it in a vector and then we will partition the
created columns and failures.
result.catch Any error->(DB_Wrapped_Error.Value error)

partitioned = results.partition (_.is_a DB_Wrapped_Error)

## When working on join we may encounter further issues with having
aggregate columns exposed directly, it may be useful to re-use
the `lift_aggregate` method to push the aggregates into a
subquery.
new_columns = partitioned.second
problem_builder.attach_problems_before on_problems <|
problems = partitioned.first.map .value
on_problems.attach_problems_before problems <|
handle_no_output_columns =
first_problem = if problems.is_empty then Nothing else problems.first
Error.throw (No_Output_Columns.Error first_problem)
if new_columns.is_empty then handle_no_output_columns else
result = self.updated_context_and_columns new_ctx new_columns subquery=True
if validated.old_style.not then result else
Warning.attach (Deprecated.Warning "Standard.Database.Data.Aggregate_Column.Aggregate_Column" "Group_By" "Deprecated: `Group_By` constructor has been deprecated, use the `group_by` argument instead.") result

## GROUP Standard.Base.Calculations
ICON dataframe_map_row
Expand Down Expand Up @@ -1939,7 +1954,7 @@ type Table
A | Example | France
@group_by Widget_Helpers.make_column_name_vector_selector
@name_column Widget_Helpers.make_column_name_selector
@values (Widget_Helpers.make_aggregate_column_selector include_group_by=False)
@values Widget_Helpers.make_aggregate_column_selector
cross_tab : Vector (Integer | Text | Regex | Aggregate_Column) | Text | Integer | Regex -> (Text | Integer) -> Aggregate_Column | Vector Aggregate_Column -> Problem_Behavior -> Table ! Missing_Input_Columns | Invalid_Aggregate_Column | Floating_Point_Equality | Invalid_Aggregation | Unquoted_Delimiter | Additional_Warnings | Invalid_Column_Names
cross_tab self group_by name_column values=Aggregate_Column.Count (on_problems=Report_Warning) =
## Avoid unused arguments warning. We cannot rename arguments to `_`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Context
- groups: a list of grouping expressions, for each entry a GROUP BY is
added, the resulting query can then directly include only the
grouped-by columns or aggregate expressions.
- limit: an optional maximum number of elements that the equery should
- limit: an optional maximum number of elements that the query should
return.
Value (from_spec : From_Spec) (where_filters : Vector SQL_Expression) (orders : Vector Order_Descriptor) (groups : Vector SQL_Expression) (limit : Nothing | Integer) (distinct_on : Nothing | Vector SQL_Expression)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ type Non_Unique_Key_Recipe
Creates a `Non_Unique_Key` error containing information about an
example group violating the uniqueness constraint.
raise_duplicated_primary_key_error source_table primary_key original_panic =
agg = source_table.aggregate [Aggregate_Column.Count]+(primary_key.map Aggregate_Column.Group_By)
filtered = agg.filter column=0 (Filter_Condition.Greater than=1)
agg = source_table.aggregate primary_key [Aggregate_Column.Count]
filtered = agg.filter column=-1 (Filter_Condition.Greater than=1)
materialized = filtered.read max_rows=1 warn_if_more_rows=False
case materialized.row_count == 0 of
## If we couldn't find a duplicated key, we give up the translation and
Expand All @@ -239,8 +239,8 @@ raise_duplicated_primary_key_error source_table primary_key original_panic =
True -> Panic.throw original_panic
False ->
row = materialized.first_row.to_vector
example_count = row.first
example_entry = row.drop 1
example_count = row.last
example_entry = row.drop (Last 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The count column moved from first to last column due to API change.

Error.throw (Non_Unique_Key.Error primary_key example_entry example_count)

## PRIVATE
Expand Down Expand Up @@ -619,15 +619,15 @@ check_duplicate_key_matches_for_delete target_table tmp_table key_columns allow_
Checks if any rows identified by `key_columns` have more than one match between two tables.
check_multiple_rows_match left_table right_table key_columns ~continuation =
joined = left_table.join right_table on=key_columns join_kind=Join_Kind.Inner
counted = joined.aggregate [Aggregate_Column.Count]+(key_columns.map (Aggregate_Column.Group_By _))
duplicates = counted.filter 0 (Filter_Condition.Greater than=1)
counted = joined.aggregate key_columns [Aggregate_Column.Count]
duplicates = counted.filter -1 (Filter_Condition.Greater than=1)
example = duplicates.read max_rows=1 warn_if_more_rows=False
case example.row_count == 0 of
True -> continuation
False ->
row = example.first_row . to_vector
offending_key = row.drop 1
count = row.first
offending_key = row.drop (Last 1)
count = row.last
Error.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count)

## PRIVATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import project.Data.Sort_Column.Sort_Column

## Defines an Aggregate Column
type Aggregate_Column
## Specifies a column to group the rows by.
## PRIVATE
Specifies a column to group the rows by. Deprecated but used internally.

Arguments:
- column: the column (specified by name, expression or index) to group
by.
- column: the column (either name, expression or index) to group by.
- new_name: name of new column.
Group_By (column:Text|Integer|Any) (new_name:Text="") # Any needed because of 6866

Expand Down
Loading
Loading