Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37627][SQL][FOLLOWUP] Separate SortedBucketTransform from BucketTransform #34914

Closed
wants to merge 9 commits into from

Conversation

huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Dec 15, 2021

What changes were proposed in this pull request?

  1. Currently only a single bucket column is supported in BucketTransform, fix the code to make multiple bucket columns work.
  2. Separate SortedBucketTransform from BucketTransform, and make the arguments in SortedBucketTransform in the format of columns numBuckets sortedColumns so we have a way to find out the columns and sortedColumns.
  3. add more test coverage.

Why are the changes needed?

Fix bugs in BucketTransform and SortedBucketTransform.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New tests

@github-actions github-actions bot added the SQL label Dec 15, 2021
@SparkQA
Copy link

SparkQA commented Dec 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50721/

@SparkQA
Copy link

SparkQA commented Dec 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50721/

@SparkQA
Copy link

SparkQA commented Dec 16, 2021

Test build #146247 has finished for PR 34914 at commit 7188482.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@huaxingao huaxingao marked this pull request as draft December 16, 2021 05:19
@@ -112,15 +112,9 @@ private[sql] final case class BucketTransform(
arguments.collect { case named: NamedReference => named }
}

override def arguments: Array[Expression] = numBuckets +: columns.toArray
override def arguments: Array[Expression] = (numBuckets +: columns.toArray) ++ sortedColumns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this really work? I don't see a way for people to extract bucket and sort columns from arguments with the Transform API.

@huaxingao huaxingao marked this pull request as ready for review January 3, 2022 07:24
(columns.toArray :+ numBuckets) ++ sortedColumns
} else {
s"bucket(${arguments.map(_.describe).mkString(", ")})"
numBuckets +: columns.toArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we keep consistent order of columns and numBuckets for two cases in arguments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are sortedColumn, we need numBuckets in between of columns and sortedColumns, because we need a way to figure out which elements in the array are for columns, and which elements are for sortedColumns.

var posOfLit: Int = -1
var numOfBucket: Int = -1
s.foreach {
case Lit(value: Int, IntegerType) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an extra space between case and Lit.

@@ -39,6 +40,7 @@ class TransformExtractorSuite extends SparkFunSuite {
override def describe: String = names.mkString(".")
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.

this.copy(columns = newReferences)
} else {
val splits = newReferences.grouped(columns.length).toList
this.copy(columns = splits(0), sortedColumns = splits(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it: columns = newReferences.take(columns.length), sortedColumns = newReferences.drop(columns.length)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. Thanks!

@@ -104,24 +104,29 @@ private[sql] final case class BucketTransform(
columns: Seq[NamedReference],
sortedColumns: Seq[NamedReference] = Seq.empty[NamedReference]) extends RewritableTransform {

override val name: String = "bucket"
override val name: String = if (sortedColumns.nonEmpty) "sortedBucket" else "bucket"
Copy link
Contributor

@cloud-fan cloud-fan Jan 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a new class SortedBucketTransform to be clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new class SortedBucketTransform. Thanks!

@@ -1566,18 +1566,22 @@ class DataSourceV2SQLSuite
test("create table using - with sorted bucket") {
val identifier = "testcat.table_name"
withTable(identifier) {
sql(s"CREATE TABLE $identifier (a int, b string, c int) USING $v2Source PARTITIONED BY (c)" +
s" CLUSTERED BY (b) SORTED by (a) INTO 4 BUCKETS")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why changing this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to make sure multiple columns/sortedColumns work ok.

@@ -45,11 +45,11 @@ private[sql] object LogicalExpressions {
def bucket(numBuckets: Int, references: Array[NamedReference]): BucketTransform =
BucketTransform(literal(numBuckets, IntegerType), references)

def bucket(
def sortedBucket(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to keep the name bucket, to match the name of this SQL feature

columns: Seq[NamedReference],
sortedColumns: Seq[NamedReference] = Seq.empty[NamedReference]) extends RewritableTransform {

override val name: String = "sortedBucket"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorted_bucket is more SQL-ish.

@@ -161,7 +162,11 @@ class InMemoryTable(
case (v, t) =>
throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
}
case BucketTransform(numBuckets, ref, _) =>
case BucketTransform(numBuckets, ref) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have a single BucketTransform.unapply, to match both BucketTransform and SortedBucketTransform, so that we can have a single case here and avoid duplicated code.


override def withReferences(newReferences: Seq[NamedReference]): Transform = {
this.copy(columns = newReferences)
}
}

private[sql] object BucketTransform {
def unapply(expr: Expression): Option[(Int, FieldReference, FieldReference)] =
expr match {
def unapply(expr: Expression): Option[(Int, FieldReference, FieldReference)] = expr match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we use this unapply?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was introduced in #30706 but doesn't seem to be used. I will remove for now.

var index: Int = -1
var posOfLit: Int = -1
var numOfBucket: Int = -1
arguments.foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can do arguments.zipWithIndex.foreach, so that it's much easier to get posOfLit.

posOfLit = index
case _ => index = index + 1
}
Some(numOfBucket, FieldReference(arguments.take(posOfLit).map(_.describe)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we know that the arguments of bucket/sorted_bucketare all NamedReference, how about arguments.take(posOfLit).map(_.asInstanceOf[NamedReference])?

}
Some(numOfBucket, FieldReference(arguments.take(posOfLit).map(_.describe)),
FieldReference(arguments.drop(posOfLit + 1).map(_.describe)))
case NamedTransform("bucket", Seq(Lit(value: Int, IntegerType), Ref(seq: Seq[String]))) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem to be right. It only matches bucket with a single bucket column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems somehow only a single column is supported in BucketTransform. Will fix this.


override def withReferences(newReferences: Seq[NamedReference]): Transform = {
this.copy(columns = newReferences)
}
}

private[sql] object BucketTransform {
def unapply(expr: Expression): Option[(Int, FieldReference, FieldReference)] =
expr match {
def unapply(expr: Expression): Option[(Int, FieldReference, FieldReference)] = expr match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments on unapply (if it is really used) about what it returns?


override def withReferences(newReferences: Seq[NamedReference]): Transform = {
this.copy(columns = newReferences)
}
}

private[sql] object BucketTransform {
def unapply(expr: Expression): Option[(Int, FieldReference, FieldReference)] =
expr match {
def unapply(expr: Expression): Option[(Int, FieldReference, FieldReference)] = expr match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, why def unapply(expr: Expression) addresses only BucketTransform but def unapply(transform: Transform) addresses both sorted_bucket and bucket?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems to be not only "Add tests ...". It's better to update the title and description accordingly before merging.

@huaxingao huaxingao changed the title [SPARK-37627][SQL][FOLLOWUP] Add tests for sorted BucketTransform [SPARK-37627][SQL][FOLLOWUP] Separate SortedBucketTransform from BucketTransform Jan 10, 2022
@huaxingao
Copy link
Contributor Author

It's better to update the title and description accordingly before merging.

Updated. Thanks!

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if tests pass

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 2ed827a Jan 14, 2022
@huaxingao
Copy link
Contributor Author

Thanks!

dchvn pushed a commit to dchvn/spark that referenced this pull request Jan 19, 2022
…etTransform

### What changes were proposed in this pull request?

1. Currently only a single bucket column is supported in `BucketTransform`, fix the code to make multiple bucket columns work.
2. Separate `SortedBucketTransform` from `BucketTransform`, and make the `arguments` in `SortedBucketTransform` in the format of `columns numBuckets sortedColumns` so we have a way to find out the `columns` and `sortedColumns`.
3. add more test coverage.

### Why are the changes needed?

Fix bugs in `BucketTransform` and `SortedBucketTransform`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New tests

Closes apache#34914 from huaxingao/sorted_followup.

Lead-authored-by: Huaxin Gao <[email protected]>
Co-authored-by: huaxingao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants