Skip to content

Commit

Permalink
rename NoEliminteSort to Clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
wecharyu committed Dec 15, 2024
1 parent bc15571 commit cec5209
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -959,16 +959,17 @@ case class Sort(
}

/**
* A special Sort node whose underlying Sort would not be eliminated
* by [[org.apache.spark.sql.catalyst.optimizer.EliminateSorts]].
* @param cluster The clustering expressions
* @param global True means global clustering apply for entire data set,
* False means clustering only apply within the partition.
* @param child Child logical plan
*/
case class NoEliminateSort(
order: Seq[SortOrder],
case class Clustering(
cluster: Seq[SortOrder],
global: Boolean,
child: LogicalPlan,
hint: Option[SortHint] = None) extends UnaryNode {
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: LogicalPlan): NoEliminateSort =
override protected def withNewChildInternal(newChild: LogicalPlan): Clustering =
copy(child = newChild)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
case logical.Sort(sortExprs, global, child, _) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
case logical.NoEliminateSort(sortExprs, global, child, _) =>
case logical.Clustering(sortExprs, global, child) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
execution.ProjectExec(projectList, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NoEliminateSort, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Clustering, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -107,7 +107,7 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
if (orderingMatched) {
empty2NullPlan
} else {
NoEliminateSort(requiredOrdering, global = false, empty2NullPlan)
Clustering(requiredOrdering, global = false, empty2NullPlan)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/explain_temp5, false, [val#x], Parquet, [path=file:[not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex(file:[not included in comparison]/{warehouse_dir}/explain_temp5), [key, val]
+- WriteFiles
+- NoEliminateSort [val#x ASC NULLS FIRST], false
+- Clustering [val#x ASC NULLS FIRST], false
+- Project [key#x, empty2null(val#x) AS val#x]
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/explain_temp5, false, [val#x], Parquet, [path=file:[not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex(file:[not included in comparison]/{warehouse_dir}/explain_temp5), [key, val]
+- WriteFiles
+- NoEliminateSort [val#x ASC NULLS FIRST], false
+- Clustering [val#x ASC NULLS FIRST], false
+- Project [key#x, empty2null(val#x) AS val#x]
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NoEliminateSort, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Clustering, LogicalPlan, Sort}
import org.apache.spark.sql.execution.{QueryExecution, SortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -94,7 +94,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
// Check whether exists a logical sort node of the write query.
// If user specified sort matches required ordering, the sort node may not at the top of query.
def isSort(plan: LogicalPlan): Boolean =
plan.isInstanceOf[Sort] || plan.isInstanceOf[NoEliminateSort]
plan.isInstanceOf[Sort] || plan.isInstanceOf[Clustering]
assert(optimizedPlan.exists(isSort(_)) == hasLogicalSort,
s"Expect hasLogicalSort: $hasLogicalSort, Actual: ${optimizedPlan.exists(isSort(_))}")

Expand Down

0 comments on commit cec5209

Please sign in to comment.