Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2663] [SQL] Support the Grouping Set #1567

Closed
wants to merge 6 commits into from

Conversation

chenghao-intel
Copy link
Contributor

Add support for GROUPING SETS, ROLLUP, CUBE and the the virtual column GROUPING__ID.

More details on how to use the `GROUPING SETS" can be found at: https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation,+Cube,+Grouping+and+Rollup
https://issues.apache.org/jira/secure/attachment/12676811/grouping_set.pdf

The generic idea of the implementations are :
1 Replace the ROLLUP, CUBE with GROUPING SETS
2 Explode each of the input row, and then feed them to Aggregate

  • Each grouping set are represented as the bit mask for the GroupBy Expression List, for each bit, 1 means the expression is selected, otherwise 0 (left is the lower bit, and right is the higher bit in the GroupBy Expression List)
  • Several of projections are constructed according to the grouping sets, and within each projection(Seq[Expression), we replace those expressions with Literal(null) if it's not selected in the grouping set (based on the bit mask)
  • Output Schema of Explode is child.output :+ grouping__id
  • GroupBy Expressions of Aggregate is GroupBy Expression List :+ grouping__id
  • Keep the Aggregation expressions the same for the Aggregate

The expressions substitutions happen in Logic Plan analyzing, so we will benefit from the Logical Plan optimization (e.g. expression constant folding, and map side aggregation etc.), Only an Explosive operator added for Physical Plan, which will explode the rows according the pre-set projections.

A known issue will be done in the follow up PR:

  • Optimization ColumnPruning is not supported yet for Explosive node.

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA tests have started for PR 1567. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17097/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA results for PR 1567:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class GroupingSet(bitmasks: Seq[Int],
case class Cube(groupByExprs: Seq[Expression],
case class Rollup(groupByExprs: Seq[Expression],
case class VirtualColumn(name: String, dataType: DataType = StringType, nullable: Boolean = false)
case class GroupingSetExpansion(
case class GroupingSetExpansion(

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17097/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA tests have started for PR 1567. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17114/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA results for PR 1567:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class GroupingSet(bitmasks: Seq[Int],
case class Cube(groupByExprs: Seq[Expression],
case class Rollup(groupByExprs: Seq[Expression],
case class VirtualColumn(name: String, dataType: DataType = StringType, nullable: Boolean = false)
case class GroupingSetExpansion(
case class GroupingSetExpansion(

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17114/consoleFull

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have started for PR 1567 at commit 0325be5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have finished for PR 1567 at commit 0325be5.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GroupingSet(bitmasks: Seq[Int],
    • case class Cube(groupByExprs: Seq[Expression],
    • case class Rollup(groupByExprs: Seq[Expression],
    • protected case class AttributeEquals(val a: Attribute)
    • case class VirtualColumn(name: String, dataType: DataType = StringType, nullable: Boolean = false)
    • case class GroupingSetExpansion(
    • case class GroupingSetExpansion(

@chenghao-intel
Copy link
Contributor Author

retest this please.

1 similar comment
@chenghao-intel
Copy link
Contributor Author

retest this please.

@chenghao-intel
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Sep 4, 2014

QA tests have started for PR 1567 at commit 0325be5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 4, 2014

QA tests have finished for PR 1567 at commit 0325be5.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GroupingSet(bitmasks: Seq[Int],
    • case class Cube(groupByExprs: Seq[Expression],
    • case class Rollup(groupByExprs: Seq[Expression],
    • protected case class AttributeEquals(val a: Attribute)
    • case class VirtualColumn(name: String, dataType: DataType = StringType, nullable: Boolean = false)
    • case class GroupingSetExpansion(
    • case class GroupingSetExpansion(

@chenghao-intel
Copy link
Contributor Author

test this please.

@SparkQA
Copy link

SparkQA commented Oct 20, 2014

QA tests have started for PR 1567 at commit 88b939e.

  • This patch merges cleanly.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21910/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 20, 2014

QA tests have finished for PR 1567 at commit 88b939e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GroupingSet(bitmasks: Seq[Int],
    • case class Cube(groupByExprs: Seq[Expression],
    • case class Rollup(groupByExprs: Seq[Expression],
    • case class VirtualColumn(name: String, dataType: DataType = StringType, nullable: Boolean = false)
    • case class GroupingSetExpansion(
    • case class GroupingSetExpansion(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21911/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 21, 2014

QA tests have started for PR 1567 at commit 49b4955.

  • This patch merges cleanly.

@chenghao-intel
Copy link
Contributor Author

Rebased, failed in CliSuite.

@marmbrus @rxin , not sure if you got time to review this. Sorry, it's big PR. I can provide a rough design doc if you think that will be more helpful.

@SparkQA
Copy link

SparkQA commented Oct 21, 2014

QA tests have finished for PR 1567 at commit 49b4955.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReconnectWorker(masterUrl: String) extends DeployMessage
    • case class GroupingSet(bitmasks: Seq[Int],
    • case class Cube(groupByExprs: Seq[Expression],
    • case class Rollup(groupByExprs: Seq[Expression],
    • case class VirtualColumn(name: String, dataType: DataType = StringType, nullable: Boolean = false)
    • case class GroupingSetExpansion(
    • case class GroupingSetExpansion(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21945/
Test PASSed.

@rxin
Copy link
Contributor

rxin commented Oct 21, 2014

A short design doc would be nice. Just talk about the high level design and how it is implemented. Thanks.

@marmbrus
Copy link
Contributor

Yeah, please do post a design doc. Also, sorry for not reviewing this earlier. This will be a good feature to have.

I did a quick pass and I have two high level concerns (though I did not look in much detail):

  • The creation of bit vectors seems like a very implementation focused physical concern. I'm curious if this could be restricted to the actual physical operator.
  • Adding a new type of attribute reference for virtual columns might be a lot of overhead. Is this really necessary?

@SparkQA
Copy link

SparkQA commented Oct 23, 2014

QA tests have started for PR 1567 at commit 76f474e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 23, 2014

QA tests have finished for PR 1567 at commit 76f474e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GroupingSet(bitmasks: Seq[Int],
    • case class Cube(groupByExprs: Seq[Expression],
    • case class Rollup(groupByExprs: Seq[Expression],
    • case class VirtualColumn(name: String, dataType: DataType = StringType, nullable: Boolean = false)
    • case class GroupingSetExpansion(
    • case class GroupingSetExpansion(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22073/
Test PASSed.

@chenghao-intel
Copy link
Contributor Author

@rxin @marmbrus , I've uploaded an draft design doc in jira. https://issues.apache.org/jira/secure/attachment/12676811/grouping_set.pdf, sorry it doesn't cover every detail, let me know if you have any confusion.

@marmbrus :

The creation of bit vectors seems like a very implementation focused physical concern. I'm curious if this could be restricted to the actual physical operator.

Yeah, It's very reasonable, I was thinking of this either.
However, the bit vectors stuff don't rely on physical execution engine, and it's slightly different with the Aggregate, which has the optimization of mapside aggregation for spark execution.

Besides, the attribute reference pass down to the parent logical operator need to be correctly set in logical plan analyzing.

Anyway, I will consider your suggestion, after all, we should keep the Logical Plan for "describing what to do", not "how to do".

Adding a new type of attribute reference for virtual columns might be a lot of overhead. Is this really necessary?

A concrete VirtualColumn instance is very helpful in attribute referencing, and pattern matching, probably better than a name convention. Sorry, maybe I didn't understand your mean, we can discuss that in the code review.

object ResolveGroupingSet extends Rule[LogicalPlan] {
/**
* Extract attribute set according to the grouping id
* @param bitmask bitmask to represent the validity of the attribute sequence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what valid means here and elsewhere. Do you mean the bitmask indicates which attributes are selected perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be clearer to change the wording then. invalid sounds like something is broken.

@marmbrus
Copy link
Contributor

Okay, this is looking really good / clean. Most of my comments are about documentation since this is a very complicated feature.

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24535 has started for PR 1567 at commit 3c1df19.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24535 has finished for PR 1567 at commit 3c1df19.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GroupExpression(children: Seq[Expression]) extends Expression
    • case class Expand(
    • trait GroupingAnalytics extends UnaryNode
    • case class GroupingSets(
    • case class Cube(
    • case class Rollup(
    • case class Expand(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24535/
Test PASSed.

@chenghao-intel
Copy link
Contributor Author

@marmbrus I still have something need to be updated, I will let you know when it's ready.

@marmbrus
Copy link
Contributor

Cool, can you through WIP in the title while its being worked on?

@SparkQA
Copy link

SparkQA commented Dec 18, 2014

Test build #24563 has started for PR 1567 at commit fe65fcc.

  • This patch merges cleanly.

@chenghao-intel
Copy link
Contributor Author

Thank you @marmbrus , I've finished the updating, will add "WIP" next time. :)
Can you review the code again?

@SparkQA
Copy link

SparkQA commented Dec 18, 2014

Test build #24563 has finished for PR 1567 at commit fe65fcc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GroupExpression(children: Seq[Expression]) extends Expression
    • case class Expand(
    • trait GroupingAnalytics extends UnaryNode
    • case class GroupingSets(
    • case class Cube(
    • case class Rollup(
    • case class Expand(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24563/
Test PASSed.

@chenghao-intel
Copy link
Contributor Author

@marmbrus , any more comment on this?

@marmbrus
Copy link
Contributor

Thanks! Merged to master.

@asfgit asfgit closed this in f728e0f Dec 19, 2014
@harakiro
Copy link

What version of Spark will this be released under? Is it in 1.2? Is there a Jira to track this functionality that I could reference. Thanks so much for the work on this feature!

@rxin
Copy link
Contributor

rxin commented Feb 25, 2015

@harakiro the jira ticket is in the title of the pull request: https://issues.apache.org/jira/browse/SPARK-2663

private[this] var idx = -1 // -1 means the initial state
private[this] var input: Row = _

override final def hasNext = (-1 < idx && idx < groups.length) || iter.hasNext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi you probably want to move groups.length into a variable to avoid running this everytime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e.

private[this] val groupLength = groups.length

and then just reference groupLength

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin, the groups is in the type of Array, not Seq, probably it does not impact the performance a lot. Anyway, thank you for pointing out this, I can update that along with some other PR.

@chenghao-intel chenghao-intel deleted the grouping_sets branch July 2, 2015 08:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants