Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add expand rel #368

Merged
merged 20 commits into from
Aug 10, 2023
Merged

feat: add expand rel #368

merged 20 commits into from
Aug 10, 2023

Conversation

JkSelf
Copy link
Contributor

@JkSelf JkSelf commented Nov 9, 2022

Adds an expand relation. This relation can be used to create near-duplicate
copies of each input row based on templates describing how to create the
copies. This is used within spark to implement certain operations like aggregate
rollup and pivot longer.

@JkSelf
Copy link
Contributor Author

JkSelf commented Nov 9, 2022

@jacques-n @rui-mo Please help to review. Thanks.

@CLAassistant
Copy link

CLAassistant commented Nov 9, 2022

CLA assistant check
All committers have signed the CLA.

@jacques-n
Copy link
Contributor

Can you add side content that describes what expand would do? I'm not familiar with the operator.

@JkSelf
Copy link
Contributor Author

JkSelf commented Nov 16, 2022

@jacques-n Updated the description document.
Currently the Aggregate Rel output is mainly the Grouping sets expression and Measure. And we need the Grouping sets expression, aggregate expressions and goup ids. So the Aggregate Rel may can't meet the needs of Expand very well.

@JkSelf
Copy link
Contributor Author

JkSelf commented Nov 16, 2022

cc @FelixYBW @baibaichen

@JkSelf JkSelf changed the title feat: Add expand Rel feat: add expand rel in proto Dec 1, 2022
@github-actions
Copy link

ACTION NEEDED

Substrait follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@JkSelf JkSelf changed the title feat: add expand rel in proto feat: add expand rel Mar 15, 2023
@JkSelf
Copy link
Contributor Author

JkSelf commented Mar 15, 2023

Just for reference PR.
Spark add the Expand operator in order to support the GROUPING SETS, ROLLUP, CUBE expression in PR#1567, which apply all the GroupExpressions to every input row and then get multiple output rows for an input row. When we add the Expand operator in gluten, we need adding ExpandRel support in proto.

@baibaichen
Copy link

@jacques-n @westonpace

Expand operator is introduced to support cube, rollup, and grouping sets(see PR in @JkSelf comments). To support these functions in a naive way, we can scan the source input for each group and then union the results. Expand is used to avoid such multiple scans by ouptuting multiple rows for each input row.

Furthermore, Group sets are also used to optimize multiple distinct aggregations. see SPARK-9241 and CALCITE-732

I did some research on translating Aggregate over Expand to substrait style aggregate. It's ok for normal groupingsets, cube and rollup by its fixed pattern. However, Expand just outputs multiple rows, it can be configured by different pattern, i.e. SPARK-9241. I failed to translate in such case.

@westonpace
Copy link
Member

I think I understand that EXPAND is a helper that is used by Spark to calculate an aggregate that contains grouping sets. I'm not quite sure I understand how it works. Does each copy get sent to a different output? Can you share a Spark physical plan that makes use of EXPAND?

Does it look something like this?

SELECT Country, Region, SUM(Sales) AS TotalSales
FROM Sales
GROUP BY ROLLUP (Country, Region);

Translates to...

SCAN -> EXPAND --- AGG(keys={country,region}) --- UNION
               \-- AGG(keys={country}) ---------/
               \-- AGG(keys={}) ---------------/

@baibaichen
Copy link

baibaichen commented Mar 17, 2023

SCAN -> EXPAND --- AGG(keys={country,region}) --- UNION
               \-- AGG(keys={country}) ---------/
               \-- AGG(keys={}) ---------------/

The physical plan looks like Scan -> Expand(output, projections)-> Aggregate[groupby, measure]

Expand:
  output = sales, Country, Region, grouping_id(Literal)  /* Ouput is the input schema for the next aggregate operator */
  projections =[                                         /* Projections  are the actual inputs for the next aggregate operator */
     [sales, Country, Region, 0],
     [sales, Country, Null,   1],
     [sales, Null, Null,      6]
  ]

Aggregate:
  groupby = Country, Region, grouping_id
  meausre = sum(sales)

By inserting Null and groupingid, Spark eliminates the union

@westonpace
Copy link
Member

Ok. I think I understand now. So, in the above example, if the input data was:

Country Region Sales
US North America 100
US North America 200
Canada North America 300
France Europe 400

Then the output data would be:

Country Region Sales
US North America 100
US NULL 100
NULL NULL 100
US North America 200
US NULL 200
NULL NULL 200
Canada North America 300
Canada NULL 300
NULL NULL 300
France Europe 400
France NULL 400
NULL NULL 400

@westonpace
Copy link
Member

Is this correct?

@JkSelf
Copy link
Contributor Author

JkSelf commented Mar 17, 2023

@westonpace Yes. The output also contains the group_id cols.

Sales Country Region grouping_id
100 US North America 0
100 US NULL 1
100 NULL NULL 3
200 US North America 0
200 US NULL 1
200 NULL NULL 3
300 Canada North America 0
300 Canada NULL 1
300 NULL NULL 3
400 France Europe 0
400 France NULL 1
400 NULL NULL 3

@baibaichen
Copy link

@JkSelf As we discussed offline, it's better to follow spark's semantics to define ExpandRel

@JkSelf
Copy link
Contributor Author

JkSelf commented Mar 20, 2023

@baibaichen @westonpace I have already updated ExpandRel based on spark's semantics. Please help to review again.

// A list of expression grouping that the aggregation measured should be calculated for.
repeated Grouping groupings = 4;

message Grouping {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is ExpandRel output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to define the output of the physical operator? It seems the output is defined by the consumer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Aggregate 's operator reference the output of Expand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@baibaichen Added the output in ExpandRel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with this. Substrait relations do not typically include names for the output. See the ProjectRel for an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question: spark generates different projections in Expand with grouping sets, rollup and cube expression. And how we handle these different cases if without the output?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is, adding a field can simplify the work of the consumer side, why do we need to save this field and make the consumer side more complicated? In addition, these assumptions may also be broken in the future.

I thought this would be very similar to how ProjectRel is handled. For example:

ProjectRel {
  expressions = [some_function(selection(3))]
}

The output's nullability will be determined by the function some_function. For example, if some_function is abs (absolute value) then the type and nullability will be the same as column 3. If some_function is is_null then the type will be bool and the output will be non-null.

In Expand I think this is a little harder. For example, if we have:

[
  [field("country"), field("region")]
  [field("country"), literal(null, string())]
  [literal(null, string()), literal(null, string())]
]

We have to make sure:

  • The output type must be the same for each position (e.g. field("country") and literal(null, string()) must have the same output type. field("region") and literal(null, string())` must have the same output type).
  • If any output type in a position is nullable then the output type must be nullable.

Putting this together the code might look like:

def calculate_output(groupings):
  output_columns=[]
  # Use the first grouping to determine output types
  for expression in groupings[0]:
    output_columns.append(expression.output_type)
  # Make sure the rest of the groupings have the same output types
  # and check to see if they are nullable
  for grouping in groupings[1:]:
    for col_idx in range(len(grouping)):
      # types must be same but nullability can differ
      if output_columns[col_idx].type != grouping[col_idx].output_type:
        raise Exception("All output types for a column must be the same")
      if grouping[col_idx].output_type.nullable:
        output_columns[col_idx].nullable = true
  return output_columns

Note: I think the above algorithm works for both "unreferenced columns" and "measure columns". You don't need to rely on assumption because the the expression literal(null, string()) will always have a nullable output type so the output for that column will always be nullable.

why do we need to save this field and make the consumer side more complicated?

The downsides I see are:

  1. It is harder to produce the plan. The producer now has to calculate the output.
  2. It is possible for a plan to be invalid if output doesn't match the expression (e.g. output is int but the expression return type is bool).
  3. The plan is larger than it needs to be
  4. This is different than ProjectRel and other relations where we expect the consumer to be able to calculate the output schema.

One question: spark generates different projections in Expand with grouping sets, rollup and cube expression. And how we handle these different cases if without the output?

I think that is ok. If you have ROLLUP then you have:

[
  [field("country"), field("region")],
  [field("country"), literal(null, string())],
  [literal(null, string()), literal(null, string())]
]

If you have CUBE then you have:

[
  [field("country"), field("region")],
  [field("country"), literal(null, string())],
  [literal(null, string()), field("region")],
  [literal(null, string()), literal(null, string())]
]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace thanks.

def calculate_output(groupings):
....
  1. We assume the type should be same at the same posisiton, it would be true now, but I don't think it would always true.
  2. The cons is all consumers should have the similar codes.
  1. This is different than ProjectRel and other relations where we expect the consumer to be able to calculate the output schema.

I agree with the 4th. So let's go on with calculate_output until we find it can't meet the requirements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace Thanks for your detailed explanation. I remove the output type in ExpandRel. Please help to review again.

Copy link

@baibaichen baibaichen Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace Thanks for your detailed explanation. I remove the output type in ExpandRel. Please help to review again.

@JkSelf Thank you

@JkSelf JkSelf force-pushed the add-expandrel branch 2 times, most recently from ae42cc3 to a404d8c Compare March 21, 2023 09:23
@westonpace
Copy link
Member

This will need a description in the markdown as well. I have created a PR into your branch with a suggestion.

@baibaichen
Copy link

@westonpace ok to merge?

westonpace
westonpace previously approved these changes Mar 30, 2023
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with where this is. I'll give @jacques-n a chance to comment further before merging.

@westonpace
Copy link
Member

Also, this PR will need a rebase.

@JkSelf JkSelf requested a review from cpcloud as a code owner August 3, 2023 02:53
@JkSelf
Copy link
Contributor Author

JkSelf commented Aug 3, 2023

@westonpace Thanks for your review. I have updated and can you help to review again?

westonpace
westonpace previously approved these changes Aug 3, 2023
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor suggestion. @jacques-n can you take another look at this one now? I think the markdown / spec is consistent with the proto files now.

site/docs/relations/physical_relations.md Outdated Show resolved Hide resolved
JkSelf and others added 20 commits August 3, 2023 10:08
Copy link
Contributor

@jacques-n jacques-n left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me.

// each `switching_field` must have the same number of expressions
// all expressions within a switching field must be the same type class but can differ in nullability.
// this column will be nullable if any of the expressions are nullable.
repeated Expression duplicates = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any value in having a repeated inside a repeated? Seems like this should just be a single item (to be consistent with consistent_field above).

Copy link
Member

@westonpace westonpace Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outer repeated (fields) is columns and the inner repeated (duplicates) is rows.

So if our goal is to take input:

X Y Z
1 2 3

and generate:

X Y Z
1 2 3
1 NULL 3
NULL NULL 3

Then we need something like...

fields: [
  { duplicates: [ field(x), field(x), NULL ] },
  { duplicates: [ field(y), NULL, NULL ] },
  { consistent: field(z) }
]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. I've sent #546 to expand the documentation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants