Skip to content

Commit

Permalink
SNOW-824475 Support Spark 3.4 (#510)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mrui authored May 22, 2023
1 parent 78294d0 commit 463678d
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 72 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ClusterTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'true' ]
cloud_provider: [ 'gcp' ]
env:
SNOWFLAKE_TEST_CONFIG_SECRET: ${{ secrets.SNOWFLAKE_TEST_CONFIG_SECRET }}
TEST_SPARK_VERSION: '3.3'
DOCKER_IMAGE_TAG: 'snowflakedb/spark-base:3.3.0'
TEST_SPARK_VERSION: '3.4'
DOCKER_IMAGE_TAG: 'snowflakedb/spark-base:3.4.0'
TEST_SCALA_VERSION: '2.12'
TEST_COMPILE_SCALA_VERSION: '2.12.11'
TEST_SPARK_CONNECTOR_VERSION: '2.11.3'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_2.12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'true', 'false' ]
cloud_provider: [ 'aws', 'azure' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_2.13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.13.9' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'true', 'false' ]
cloud_provider: [ 'aws', 'azure' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_gcp_2.12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'false' ]
cloud_provider: [ 'gcp' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_gcp_2.13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.13.9' ]
spark_version: [ '3.3.0' ]
spark_version: [ '3.4.0' ]
use_copy_unload: [ 'false' ]
cloud_provider: [ 'gcp' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion ClusterTest/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

val sparkConnectorVersion = "2.11.3"
val scalaVersionMajor = "2.12"
val sparkVersionMajor = "3.3"
val sparkVersionMajor = "3.4"
val sparkVersion = s"${sparkVersionMajor}.0"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse(sparkVersion)

Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

import scala.util.Properties

val sparkVersion = "3.3"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse("3.3.0")
val sparkVersion = "3.4"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse("3.4.0")

/*
* Don't change the variable name "sparkConnectorVersion" because
Expand All @@ -41,7 +41,7 @@ lazy val root = project.withId("spark-snowflake").in(file("."))
.settings(
name := "spark-snowflake",
organization := "net.snowflake",
version := s"${sparkConnectorVersion}-spark_3.3",
version := s"${sparkConnectorVersion}-spark_3.4",
scalaVersion := sys.props.getOrElse("SPARK_SCALA_VERSION", default = "2.12.11"),
// Spark 3.2 supports scala 2.12 and 2.13
crossScalaVersions := Seq("2.12.11", "2.13.9"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
var result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedAdditionQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedAdditionQueries, result,
Seq(Row(4), Row(5), Row(7)),
disablePushDown
)
Expand All @@ -352,14 +359,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedSubtractQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedSubtractQueries, result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
Seq(Row(0), Row(1), Row(1)),
disablePushDown
Expand All @@ -370,15 +385,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedMultiplyQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedMultiplyQueries, result,
Seq(Row(4), Row(6), Row(12)),
disablePushDown
)
Expand All @@ -388,15 +410,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedDivisionQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 6) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 6) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedDivisionQueries, result,
Seq(Row(1.000000), Row(1.333333), Row(1.500000)),
disablePushDown
)
Expand All @@ -406,15 +435,22 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
result =
sparkSession.sql(s"select o $operator p from df2 where o IS NOT NULL")

testPushdown(
val expectedModQueries = Seq(
s"""SELECT ( CAST ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(null, 1), (2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( ( "SUBQUERY_1"."O" $operator "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_0" FROM
|( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS
|"SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE
|( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedModQueries, result,
Seq(Row(0), Row(1), Row(1)),
disablePushDown
)
Expand All @@ -425,18 +461,28 @@ class SimpleNewPushdownIntegrationSuite extends IntegrationSuiteBase {
s"select -o, - o + p, - o - p, - ( o + p ), - 3 + o from df2 where o IS NOT NULL"
)

testPushdown(
val expectedUnaryMinusQueries = Seq(
s"""SELECT ( - ( "SUBQUERY_1"."O" ) ) AS "SUBQUERY_2_COL_0" , ( CAST (
|( - ( "SUBQUERY_1"."O" ) + "SUBQUERY_1"."P" ) AS DECIMAL(38, 0) ) )
|AS "SUBQUERY_2_COL_1" , ( CAST ( ( - ( "SUBQUERY_1"."O" ) - "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_2" , ( - ( CAST ( ( "SUBQUERY_1"."O"
|+ "SUBQUERY_1"."P" ) AS DECIMAL(38, 0) ) ) ) AS "SUBQUERY_2_COL_3" , ( CAST ( (
|-3 + "SUBQUERY_1"."O" ) AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_4" FROM ( SELECT
|* FROM ( SELECT * FROM ( $test_table2 ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS
|"SUBQUERY_0" WHERE ( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
|( - ( "SUBQUERY_1"."O" ) + "SUBQUERY_1"."P" ) AS DECIMAL(38, 0) ) )
|AS "SUBQUERY_2_COL_1" , ( CAST ( ( - ( "SUBQUERY_1"."O" ) - "SUBQUERY_1"."P" )
|AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_2" , ( - ( CAST ( ( "SUBQUERY_1"."O"
|+ "SUBQUERY_1"."P" ) AS DECIMAL(38, 0) ) ) ) AS "SUBQUERY_2_COL_3" , ( CAST ( (
|-3 + "SUBQUERY_1"."O" ) AS DECIMAL(38, 0) ) ) AS "SUBQUERY_2_COL_4" FROM ( SELECT
|* FROM ( SELECT * FROM ( $test_table2 ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS
|"SUBQUERY_0" WHERE ( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin,
result,
// Data in df2 (o, p) values(2, 2), (3, 2), (4, 3)
// From spark 3.4, the CAST operation is not presented in the plan
s"""SELECT ( - ( "SUBQUERY_1"."O" ) ) AS "SUBQUERY_2_COL_0" ,
| ( ( - ( "SUBQUERY_1"."O" ) + "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_1" ,
| ( ( - ( "SUBQUERY_1"."O" ) - "SUBQUERY_1"."P" ) ) AS "SUBQUERY_2_COL_2" ,
| ( - ( ( "SUBQUERY_1"."O" + "SUBQUERY_1"."P" ) ) ) AS "SUBQUERY_2_COL_3" ,
| ( ( -3 + "SUBQUERY_1"."O" ) ) AS "SUBQUERY_2_COL_4"
| FROM ( SELECT * FROM ( SELECT * FROM ( $test_table2 ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS
| "SUBQUERY_0" WHERE ( "SUBQUERY_0"."O" IS NOT NULL ) ) AS "SUBQUERY_1"
""".stripMargin
)

testPushdownMultiplefQueries(expectedUnaryMinusQueries, result,
Seq(
Row(-2, 0, -4, -4, -1),
Row(-3, -1, -5, -5, 0),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.sql

import net.snowflake.spark.snowflake.{SnowflakeConnectorUtils, TestUtils}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.snowflake.{SFQueryTest, SFTestData, SFTestSessionBase}
Expand Down Expand Up @@ -167,23 +168,41 @@ class SFDataFrameWindowFramesSuite
window.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing))),
Row("non_numeric", "non_numeric") :: Nil)

// The error message for 3.4 is different.
val expectedErrorMessage1 =
if (TestUtils.compareVersion(SnowflakeConnectorUtils.SUPPORT_SPARK_VERSION, "3.4") >= 0) {
"The data type of the upper bound \"STRING\" does not match the expected data type"
} else {
"The data type of the upper bound 'string' does not match the expected data type"
}
val e1 = intercept[AnalysisException](
df.select(
min("value").over(window.rangeBetween(Window.unboundedPreceding, 1))))
assert(e1.message.contains("The data type of the upper bound 'string' " +
"does not match the expected data type"))

assert(e1.message.contains(expectedErrorMessage1))

// The error message for 3.4 is different.
val expectedErrorMessage2 =
if (TestUtils.compareVersion(SnowflakeConnectorUtils.SUPPORT_SPARK_VERSION, "3.4") >= 0) {
"The data type of the lower bound \"STRING\" does not match the expected data type"
} else {
"The data type of the lower bound 'string' does not match the expected data type"
}
val e2 = intercept[AnalysisException](
df.select(
min("value").over(window.rangeBetween(-1, Window.unboundedFollowing))))
assert(e2.message.contains("The data type of the lower bound 'string' " +
"does not match the expected data type"))

assert(e2.message.contains(expectedErrorMessage2))

// The error message for 3.4 is different.
val expectedErrorMessage3 =
if (TestUtils.compareVersion(SnowflakeConnectorUtils.SUPPORT_SPARK_VERSION, "3.4") >= 0) {
"The data type of the lower bound \"STRING\" does not match the expected data type"
} else {
"The data type of the lower bound 'string' does not match the expected data type"
}
val e3 = intercept[AnalysisException](
df.select(
min("value").over(window.rangeBetween(-1, 1))))
assert(e3.message.contains("The data type of the lower bound 'string' " +
"does not match the expected data type"))
assert(e3.message.contains(expectedErrorMessage3))
}

test("range between should accept int/long values as boundary") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SFDataFrameWindowFunctionsSuite
}
}

protected lazy val sql = spark.sql _
protected def sql(sqlText: String) = spark.sql(sqlText)

override def spark: SparkSession = getSnowflakeSession()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.unsafe.types.CalendarInterval

class SFDateFunctionsSuite extends SFQueryTest with SFTestSessionBase {
import SFTestImplicits._
protected lazy val sql = spark.sql _
protected def sql(sqlText: String) = spark.sql(sqlText)

test("function current_date") {
val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class SnowflakeSparkUtilsSuite extends SFQueryTest with SFTestSessionBase {

import SFTestImplicits._

protected lazy val sql = spark.sql _
protected def sql(sqlText: String) = spark.sql(sqlText)

test("unit test: SnowflakeSparkUtils.getJDBCProviderName") {
assert(SnowflakeSparkUtils.getJDBCProviderName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object SnowflakeConnectorUtils {
* Check Spark version, if Spark version matches SUPPORT_SPARK_VERSION enable PushDown,
* otherwise disable it.
*/
val SUPPORT_SPARK_VERSION = "3.3"
val SUPPORT_SPARK_VERSION = "3.4"

def checkVersionAndEnablePushdown(session: SparkSession): Boolean =
if (session.version.startsWith(SUPPORT_SPARK_VERSION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import net.snowflake.spark.snowflake.{
SnowflakePushdownUnsupportedException,
SnowflakeSQLStatement
}
import org.apache.spark.sql.catalyst.expressions.EvalMode.LEGACY
import org.apache.spark.sql.catalyst.expressions.{
Alias,
Ascending,
Expand Down Expand Up @@ -47,7 +48,13 @@ private[querygeneration] object MiscStatement {
// override val ansiEnabled: Boolean = SQLConf.get.ansiEnabled
// So support to pushdown, if ansiEnabled is false.
// https://github.com/apache/spark/commit/6f51e37eb52f21b50c8d7b15c68bf9969fee3567
case Cast(child, t, _, ansiEnabled) if !ansiEnabled =>
// Spark 3.4 changed the last argument type:
// https://github.com/apache/spark/commit/f8d51b9940b5f1f7c1f37693b10931cbec0a4741
// - Old type: ansiEnabled: Boolean = SQLConf.get.ansiEnabled
// - New Type: evalMode: EvalMode.Value = EvalMode.fromSQLConf(SQLConf.get)
// Currently, there are 3 modes: LEGACY, ANSI, TRY
// support to pushdown, if the mode is LEGACY.
case Cast(child, t, _, evalMode) if evalMode == LEGACY =>
getCastType(t) match {
case Some(cast) =>
// For known unsupported data conversion, raise exception to break the
Expand Down Expand Up @@ -112,7 +119,10 @@ private[querygeneration] object MiscStatement {
// joinCond: Seq[Expression] = Seq.empty
// So support to pushdown, if joinCond is empty.
// https://github.com/apache/spark/commit/806da9d6fae403f88aac42213a58923cf6c2cb05
case ScalarSubquery(subquery, _, _, joinCond) if joinCond.isEmpty =>
// Spark 3.4 introduce join hint. The join hint doesn't affect correctness.
// So it can be ignored in the pushdown process
// https://github.com/apache/spark/commit/0fa9c554fc0b3940a47c3d1c6a5a17ca9a8cee8e
case ScalarSubquery(subquery, _, _, joinCond, _) if joinCond.isEmpty =>
blockStatement(new QueryBuilder(subquery).statement)

case UnscaledValue(child) =>
Expand Down
Loading

0 comments on commit 463678d

Please sign in to comment.