Skip to content

Commit

Permalink
disable filter pushdown (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
stefankandic authored Jan 23, 2024
1 parent 76702c4 commit 5f6b579
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,22 @@ object DataSourceUtils extends PredicateHelper {
dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))
(ExpressionSet(partitionFilters ++ extraPartitionFilter).toSeq, dataFilters)
}

/**
* Determines whether a filter should be pushed down to the data source or not.
*
* @param expression The filter expression to be evaluated.
* @return A boolean indicating whether the filter should be pushed down or not.
*/
def shouldPushFilter(expression: Expression): Boolean = expression match {
case attr: AttributeReference =>
attr.dataType match {
// don't push down filters for string columns with non-default collation
// as it could lead to incorrect results
case st: StringType => st.isDefaultCollation
case _ => true
}

case _ => expression.children.forall(shouldPushFilter)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)

val deterministicFiltersToPush = filters
.filter(_.deterministic)
.filter(f => DataSourceUtils.shouldPushFilter(f))
val normalizedFilters = DataSourceStrategy.normalizeExprs(
filters.filter(_.deterministic), l.output)
deterministicFiltersToPush, l.output)

val partitionColumns =
l.resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
_))
if filters.nonEmpty && fsRelation.partitionSchema.nonEmpty =>
val normalizedFilters = DataSourceStrategy.normalizeExprs(
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)),
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)
&& DataSourceUtils.shouldPushFilter(f)),
logicalRelation.output)
val (partitionKeyFilters, _) = DataSourceUtils
.getPartitionFiltersAndDataFilters(partitionSchema, normalizedFilters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private[parquet] class ParquetRowConverter(
throw QueryExecutionErrors.cannotCreateParquetConverterForDecimalTypeError(
t, parquetType.toString)

case StringType =>
case _: StringType =>
new ParquetStringConverter(updater)

// As long as the parquet type is INT64 timestamp, whether logical annotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ abstract class FileScanBuilder(
}

override def pushFilters(filters: Seq[Expression]): Seq[Expression] = {
val (deterministicFilters, nonDeterminsticFilters) = filters.partition(_.deterministic)
val (deterministicFilters, nonDeterminsticFilters) = filters
.filter(f => DataSourceUtils.shouldPushFilter(f))
.partition(_.deterministic)
val (partitionFilters, dataFilters) =
DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, deterministicFilters)
this.partitionFilters = partitionFilters
Expand Down
27 changes: 27 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,31 @@ class CollationSuite extends QueryTest
checkAnswer(sql(s"SELECT COUNT(DISTINCT c1) FROM $tableName"), Seq(Row(1)))
}
}

test("disable filter pushdown") {
val tableName = "parquet_dummy_t2"
val collation = "'sr_ci_ai'"
withTable(tableName) {
spark.sql(
s"""
| CREATE TABLE $tableName(c1 STRING COLLATE $collation) USING PARQUET
|""".stripMargin)
spark.sql(s"INSERT INTO $tableName VALUES ('aaa')")
spark.sql(s"INSERT INTO $tableName VALUES ('AAA')")

val filters = Seq(
(">=", Seq(Row("aaa"), Row("AAA"))),
("<=", Seq(Row("aaa"), Row("AAA"))),
(">", Seq()),
("<", Seq()),
("!=", Seq())
)

filters.foreach { filter =>
val df = sql(s"SELECT * FROM $tableName WHERE c1 ${filter._1} collate('aaa', $collation)")
assert(df.queryExecution.toString().contains("PushedFilters: []"))
checkAnswer(df, filter._2)
}
}
}
}

0 comments on commit 5f6b579

Please sign in to comment.