Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-824475 Support Spark 3.4 #510

Merged
merged 1 commit into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ClusterTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'true' ]
cloud_provider: [ 'gcp' ]
env:
SNOWFLAKE_TEST_CONFIG_SECRET: ${{ secrets.SNOWFLAKE_TEST_CONFIG_SECRET }}
TEST_SPARK_VERSION: '3.3'
DOCKER_IMAGE_TAG: 'snowflakedb/spark-base:3.3.0'
TEST_SPARK_VERSION: '3.4'
DOCKER_IMAGE_TAG: 'snowflakedb/spark-base:3.4.0'
TEST_SCALA_VERSION: '2.12'
TEST_COMPILE_SCALA_VERSION: '2.12.11'
TEST_SPARK_CONNECTOR_VERSION: '2.11.3'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_2.12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'true', 'false' ]
cloud_provider: [ 'aws', 'azure' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_2.13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.13.9' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'true', 'false' ]
cloud_provider: [ 'aws', 'azure' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_gcp_2.12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'false' ]
cloud_provider: [ 'gcp' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_gcp_2.13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.13.9' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'false' ]
cloud_provider: [ 'gcp' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion ClusterTest/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

val sparkConnectorVersion = "2.11.3"
val scalaVersionMajor = "2.12"
val sparkVersionMajor = "3.3"
val sparkVersionMajor = "3.4"
val sparkVersion = s"${sparkVersionMajor}.0"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse(sparkVersion)

Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

import scala.util.Properties

val sparkVersion = "3.3"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse("3.3.0")
val sparkVersion = "3.4"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse("3.4.0")

/*
* Don't change the variable name "sparkConnectorVersion" because
Expand All @@ -41,7 +41,7 @@ lazy val root = project.withId("spark-snowflake").in(file("."))
.settings(
name := "spark-snowflake",
organization := "net.snowflake",
version := s"${sparkConnectorVersion}-spark_3.3",
version := s"${sparkConnectorVersion}-spark_3.4",
scalaVersion := sys.props.getOrElse("SPARK_SCALA_VERSION", default = "2.12.11"),
// Spark 3.2 supports scala 2.12 and 2.13
crossScalaVersions := Seq("2.12.11", "2.13.9"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
var result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedAdditionQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedAdditionQueries, result,
Seq(Row(4), Row(5), Row(7)),
disablePushDown
)
Expand All @@ -352,14 +359,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedSubtractQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedSubtractQueries, result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
Seq(Row(0), Row(1), Row(1)),
disablePushDown
Expand All @@ -370,15 +385,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedMultiplyQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, we issue two SQLs now instead of one for this push down? Is that a performance issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is test-only and it doesn't cause any perf problem for production.

  1. The generated query for different spark may be different.
  2. To make the test case to work for different Spark versions, the expectedMultiplyQueries includes the query for spark 3.4 and previous version. If the query matches either one, the test case will succeed.

)

testPushdownMultiplefQueries(expectedMultiplyQueries, result,
Seq(Row(4), Row(6), Row(12)),
disablePushDown
)
Expand All @@ -388,15 +410,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedDivisionQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 6) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 6) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedDivisionQueries, result,
Seq(Row(1.000000), Row(1.333333), Row(1.500000)),
disablePushDown
)
Expand All @@ -406,15 +435,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedModQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedModQueries, result,
Seq(Row(0), Row(1), Row(1)),
disablePushDown
)
Expand All @@ -425,18 +461,28 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
s"select -o, - o + p, - o - p, - ( o + p ), - 3 + o from df2 where o IS NOT NULL"
)

testPushdown(
val expectedUnaryMinusQueries = Seq(
s"""SELECT ( - ( "SUBQUERY_1"."O" ) ) AS "SUBQUERY_2_COL_0" , ( CAST (
|( - ( "SUBQUERY_1"."O" ) + "SUBQUERY_1"."P" ) AS DECIMAL(38, 0) ) )
|AS "SUBQUERY_2_COL_1" , ( CAST ( ( - ( "SUBQUERY_1"."O" ) - "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_2" , ( - ( CAST ( ( "SUBQUERY_1"."O"
|+ "SUBQUERY_1"."P" ) AS DECIMAL(38, 0) ) ) ) AS "SUBQUERY_2_COL_3" , ( CAST ( (
|-3 + "SUBQUERY_1"."O" ) AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_4" FROM ( SELECT
|* FROM ( SELECT * FROM ( $test_table2 ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS
|"SUBQUERY_0" WHERE ( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|( - ( "SUBQUERY_1"."O" ) + "SUBQUERY_1"."P" ) AS DECIMAL(38, 0) ) )
|AS "SUBQUERY_2_COL_1" , ( CAST ( ( - ( "SUBQUERY_1"."O" ) - "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_2" , ( - ( CAST ( ( "SUBQUERY_1"."O"
|+ "SUBQUERY_1"."P" ) AS DECIMAL(38, 0) ) ) ) AS "SUBQUERY_2_COL_3" , ( CAST ( (
|-3 + "SUBQUERY_1"."O" ) AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_4" FROM ( SELECT
|* FROM ( SELECT * FROM ( $test_table2 ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS
|"SUBQUERY_0" WHERE ( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( - ( "SUBQUERY_1"."O" ) ) AS "SUBQUERY_2_COL_0" ,
| ( ( - ( "SUBQUERY_1"."O" ) + "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_1" ,
| ( ( - ( "SUBQUERY_1"."O" ) - "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_2" ,
| ( - ( ( "SUBQUERY_1"."O" + "SUBQUERY_1"."P" ) ) ) AS "SUBQUERY_2_COL_3" ,
| ( ( -3 + "SUBQUERY_1"."O" ) ) AS "SUBQUERY_2_COL_4"
| FROM ( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS
| "SUBQUERY_0" WHERE ( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedUnaryMinusQueries, result,
Seq(
Row(-2, 0, -4, -4, -1),
Row(-3, -1, -5, -5, 0),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.sql

import net.snowflake.spark.snowflake.{SnowflakeConnectorUtils, TestUtils}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.snowflake.{SFQueryTest, SFTestData, SFTestSessionBase}
Expand Down Expand Up @@ -167,23 +168,41 @@ class SFDataFrameWindowFramesSuite
window.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing))),
Row("non_numeric", "non_numeric") :: Nil)

// The error message for 3.4 is different.
val expectedErrorMessage1 =
if (TestUtils.compareVersion(SnowflakeConnectorUtils.SUPPORT_SPARK_VERSION, "3.4") >= 0) {
"The data type of the upper bound \"STRING\" does not match the expected data type"
} else {
"The data type of the upper bound 'string' does not match the expected data type"
}
val e1 = intercept[AnalysisException](
df.select(
min("value").over(window.rangeBetween(Window.unboundedPreceding, 1))))
assert(e1.message.contains("The data type of the upper bound 'string' " +
"does not match the expected data type"))

assert(e1.message.contains(expectedErrorMessage1))

// The error message for 3.4 is different.
val expectedErrorMessage2 =
if (TestUtils.compareVersion(SnowflakeConnectorUtils.SUPPORT_SPARK_VERSION, "3.4") >= 0) {
"The data type of the lower bound \"STRING\" does not match the expected data type"
} else {
"The data type of the lower bound 'string' does not match the expected data type"
}
val e2 = intercept[AnalysisException](
df.select(
min("value").over(window.rangeBetween(-1, Window.unboundedFollowing))))
assert(e2.message.contains("The data type of the lower bound 'string' " +
"does not match the expected data type"))

assert(e2.message.contains(expectedErrorMessage2))

// The error message for 3.4 is different.
val expectedErrorMessage3 =
if (TestUtils.compareVersion(SnowflakeConnectorUtils.SUPPORT_SPARK_VERSION, "3.4") >= 0) {
"The data type of the lower bound \"STRING\" does not match the expected data type"
} else {
"The data type of the lower bound 'string' does not match the expected data type"
}
val e3 = intercept[AnalysisException](
df.select(
min("value").over(window.rangeBetween(-1, 1))))
assert(e3.message.contains("The data type of the lower bound 'string' " +
"does not match the expected data type"))
assert(e3.message.contains(expectedErrorMessage3))
}

test("range between should accept int/long values as boundary") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SFDataFrameWindowFunctionsSuite
}
}

protected lazy val sql = spark.sql _
protected def sql(sqlText: String) = spark.sql(sqlText)

override def spark: SparkSession = getSnowflakeSession()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.unsafe.types.CalendarInterval

class SFDateFunctionsSuite extends SFQueryTest with SFTestSessionBase {
import SFTestImplicits._
protected lazy val sql = spark.sql _
protected def sql(sqlText: String) = spark.sql(sqlText)

test("function current_date") {
val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class SnowflakeSparkUtilsSuite extends SFQueryTest with SFTestSessionBase {

import SFTestImplicits._

protected lazy val sql = spark.sql _
protected def sql(sqlText: String) = spark.sql(sqlText)

test("unit test: SnowflakeSparkUtils.getJDBCProviderName") {
assert(SnowflakeSparkUtils.getJDBCProviderName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object SnowflakeConnectorUtils {
* Check Spark version, if Spark version matches SUPPORT_SPARK_VERSION enable PushDown,
* otherwise disable it.
*/
val SUPPORT_SPARK_VERSION = "3.3"
val SUPPORT_SPARK_VERSION = "3.4"

def checkVersionAndEnablePushdown(session: SparkSession): Boolean =
if (session.version.startsWith(SUPPORT_SPARK_VERSION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import net.snowflake.spark.snowflake.{
SnowflakePushdownUnsupportedException,
SnowflakeSQLStatement
}
import org.apache.spark.sql.catalyst.expressions.EvalMode.LEGACY
import org.apache.spark.sql.catalyst.expressions.{
Alias,
Ascending,
Expand Down Expand Up @@ -47,7 +48,13 @@ private[querygeneration] object MiscStatement {
// override val ansiEnabled: Boolean = SQLConf.get.ansiEnabled
// So support to pushdown, if ansiEnabled is false.
// https://github.com/apache/spark/commit/6f51e37eb52f21b50c8d7b15c68bf9969fee3567
case Cast(child, t, _, ansiEnabled) if !ansiEnabled =>
// Spark 3.4 changed the last argument type:
// https://github.com/apache/spark/commit/f8d51b9940b5f1f7c1f37693b10931cbec0a4741
// - Old type: ansiEnabled: Boolean = SQLConf.get.ansiEnabled
// - New Type: evalMode: EvalMode.Value = EvalMode.fromSQLConf(SQLConf.get)
// Currently, there are 3 modes: LEGACY, ANSI, TRY
// support to pushdown, if the mode is LEGACY.
case Cast(child, t, _, evalMode) if evalMode == LEGACY =>
getCastType(t) match {
case Some(cast) =>
// For known unsupported data conversion, raise exception to break the
Expand Down Expand Up @@ -112,7 +119,10 @@ private[querygeneration] object MiscStatement {
// joinCond: Seq[Expression] = Seq.empty
// So support to pushdown, if joinCond is empty.
// https://github.com/apache/spark/commit/806da9d6fae403f88aac42213a58923cf6c2cb05
case ScalarSubquery(subquery, _, _, joinCond) if joinCond.isEmpty =>
// Spark 3.4 introduce join hint. The join hint doesn't affect correctness.
// So it can be ignored in the pushdown process
// https://github.com/apache/spark/commit/0fa9c554fc0b3940a47c3d1c6a5a17ca9a8cee8e
case ScalarSubquery(subquery, _, _, joinCond, _) if joinCond.isEmpty =>
blockStatement(new QueryBuilder(subquery).statement)

case UnscaledValue(child) =>
Expand Down
Loading