Skip to content

Commit

Permalink
Add a shim for Databricks 11.3 spark330db [databricks] (#7152)
Browse files Browse the repository at this point in the history
* introduce non330db directories
* ShimExtractValue
* GpuPredicateHelper now extends and shims PredicateHelper
* Allow passing TEST_PARALLEL to test.sh to be able to run integration tests on a small instance
* No need to override getSparkShimVersion using the same implementation in every shim
 
Fixes #6879 

Signed-off-by: Gera Shegalov <[email protected]>

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
Co-authored-by: Ahmed Hussein (amahussein) <[email protected]>

Signed-off-by: Niranjan Artal <[email protected]>
Co-authored-by: Niranjan Artal <[email protected]>
  • Loading branch information
gerashegalov and nartal1 authored Dec 13, 2022
1 parent 2828596 commit a42e328
Show file tree
Hide file tree
Showing 78 changed files with 907 additions and 399 deletions.
4 changes: 2 additions & 2 deletions jenkins/databricks/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ install_dependencies()
initialize
if [[ $SKIP_DEP_INSTALL == "1" ]]
then
echo "SKIP_DEP_INSTALL is set to $SKIP_DEP_INSTALL. Skipping dependencies."
echo "!!!! SKIP_DEP_INSTALL is set to $SKIP_DEP_INSTALL. Skipping install-file for dependencies."
else
# Install required dependencies.
echo "!!!! Installing dependendecies. Set SKIP_DEP_INSTALL=1 to speed up reruns of build.sh"# Install required dependencies.
install_dependencies
fi
# Build the RAPIDS plugin by running package command for databricks
Expand Down
2 changes: 1 addition & 1 deletion jenkins/databricks/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export PYSP_TEST_spark_eventLog_enabled=true
mkdir -p /tmp/spark-events

## limit parallelism to avoid OOM kill
export TEST_PARALLEL=4
export TEST_PARALLEL=${TEST_PARALLEL:-4}
if [ -d "$LOCAL_JAR_PATH" ]; then
if [[ $TEST_MODE == "DEFAULT" ]]; then
## Run tests with jars in the LOCAL_JAR_PATH dir downloading from the dependency repo
Expand Down
50 changes: 49 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@
<spark.test.version>${spark330.version}</spark.test.version>
<parquet.hadoop.version>1.12.2</parquet.hadoop.version>
<spark.shim.sources>${spark330.sources}</spark.shim.sources>
<iceberg.version>${spark330.iceberg.version}</iceberg.version>
<spark.shim.test.sources>${project.basedir}/src/test/${buildver}/scala</spark.shim.test.sources>
</properties>
<modules>
Expand Down Expand Up @@ -477,6 +478,7 @@
<spark.test.version>${spark331.version}</spark.test.version>
<parquet.hadoop.version>1.12.2</parquet.hadoop.version>
<spark.shim.sources>${spark331.sources}</spark.shim.sources>
<iceberg.version>${spark330.iceberg.version}</iceberg.version>
<spark.shim.test.sources>${project.basedir}/src/test/${buildver}/scala</spark.shim.test.sources>
</properties>
<modules>
Expand Down Expand Up @@ -505,6 +507,7 @@
<spark.test.version>${spark332.version}</spark.test.version>
<parquet.hadoop.version>1.12.2</parquet.hadoop.version>
<spark.shim.sources>${spark332.sources}</spark.shim.sources>
<iceberg.version>${spark330.iceberg.version}</iceberg.version>
<spark.shim.test.sources>${project.basedir}/src/test/${buildver}/scala</spark.shim.test.sources>
</properties>
<modules>
Expand Down Expand Up @@ -533,6 +536,7 @@
<spark.test.version>${spark340.version}</spark.test.version>
<parquet.hadoop.version>1.12.3</parquet.hadoop.version>
<spark.shim.sources>${spark340.sources}</spark.shim.sources>
<iceberg.version>${spark330.iceberg.version}</iceberg.version>
<spark.shim.test.sources>${project.basedir}/src/test/${buildver}/scala</spark.shim.test.sources>
</properties>
<modules>
Expand Down Expand Up @@ -561,6 +565,7 @@
<spark.test.version>${spark330cdh.version}</spark.test.version>
<parquet.hadoop.version>1.10.99.7.1.8.0-801</parquet.hadoop.version>
<spark.shim.sources>${spark330cdh.sources}</spark.shim.sources>
<iceberg.version>${spark330.iceberg.version}</iceberg.version>
<spark.shim.test.sources>${project.basedir}/src/test/${buildver}/scala</spark.shim.test.sources>
</properties>
<repositories>
Expand All @@ -586,6 +591,48 @@
<module>aggregator</module>
</modules>
</profile>
<profile>
<!-- Note Databricks requires 2 properties -Ddatabricks and -Dbuildver=330db -->
<!-- Note that 330db backports many features from Spark3.4.0 -->
<id>release330db</id>
<activation>
<property>
<name>buildver</name>
<value>330db</value>
</property>
</activation>
<properties>
<!-- Use Spark version, as Delta Lake provided by the Databricks environment -->
<delta.version>${spark.version}</delta.version>
<!-- Downgrade scala plugin version due to: https://github.com/sbt/sbt/issues/4305 -->
<scala.plugin.version>3.4.4</scala.plugin.version>
<shim.module.name>spark330db</shim.module.name>
<spark.version.classifier>spark330db</spark.version.classifier>
<!--
Note that we are using the Spark version for all of the Databricks dependencies as well.
The jenkins/databricks/build.sh script handles installing the jars as maven artifacts.
This is to make it easier and not have to change version numbers for each individual dependency
and deal with differences between Databricks versions
-->
<spark.version>${spark330db.version}</spark.version>
<spark.test.version>${spark330db.version}</spark.test.version>
<hadoop.client.version>3.3.1</hadoop.client.version>
<rat.consoleOutput>true</rat.consoleOutput>
<parquet.hadoop.version>1.12.0</parquet.hadoop.version>
<spark.shim.sources>${spark330db.sources}</spark.shim.sources>
<iceberg.version>${spark330.iceberg.version}</iceberg.version>
<spark.shim.test.sources>${project.basedir}/src/test/${buildver}/scala</spark.shim.test.sources>
</properties>
<modules>
<module>dist</module>
<module>integration_tests</module>
<module>shuffle-plugin</module>
<module>sql-plugin</module>
<module>tests</module>
<module>udf-compiler</module>
<module>aggregator</module>
</modules>
</profile>
<profile>
<id>udf-compiler</id>
<modules>
Expand Down Expand Up @@ -683,6 +730,7 @@
<spark332.version>3.3.2-SNAPSHOT</spark332.version>
<spark340.version>3.4.0-SNAPSHOT</spark340.version>
<spark330cdh.version>3.3.0.3.3.7180.0-274</spark330cdh.version>
<spark330db.version>3.3.0-databricks</spark330db.version>
<mockito.version>3.6.0</mockito.version>
<scala.plugin.version>4.3.0</scala.plugin.version>
<maven.jar.plugin.version>3.2.0</maven.jar.plugin.version>
Expand All @@ -700,7 +748,7 @@
<maven.clean.plugin.version>3.1.0</maven.clean.plugin.version>
<maven.scalastyle.skip>false</maven.scalastyle.skip>
<dist.jar.compress>true</dist.jar.compress>

<spark330.iceberg.version>0.14.1</spark330.iceberg.version>
<!--
If true, disables verification that all Shims be built as of one and the same git
commit hash. Do not use for CI!
Expand Down
18 changes: 16 additions & 2 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,15 @@
<pathconvert property="spark311.sources" pathsep=",">
<dirset dir="${project.basedir}/src/main">
<patternset id="spark311+.pattern">
<include name="311+-non330db/*"/>
<include name="311+-nondb/*"/>
<include name="311until320-all/*"/>
<include name="311until320-noncdh/*"/>
<include name="311until320-nondb/*"/>
<include name="311until330-all/*"/>
<include name="311until330-nondb/*"/>
<include name="311until340-all/*"/>
<include name="311until340-non330db/*"/>
<include name="311until340-nondb/*"/>
<include name="pre320-treenode/*"/>
</patternset>
Expand Down Expand Up @@ -551,6 +553,7 @@
<include name="320until330-noncdh/*"/>
<include name="320until330-nondb/*"/>
<include name="320until340-all/*"/>
<include name="320until340-non330db/*"/>
<include name="delta-lake-common/*"/>
<include name="post320-treenode/*"/>
</patternset>
Expand Down Expand Up @@ -582,14 +585,15 @@
<pathconvert property="spark330.sources" pathsep=",">
<dirset dir="${project.basedir}/src/main">
<patternset id="spark330+.pattern">
<!-- inherit from 321+ with exceptions -->
<patternset refid="spark321+.pattern"/>
<exclude name="*until330*/*"/>

<!-- uniquely 330+ -->
<include name="330+/*"/>
<include name="330+-nondb/*"/>
<include name="330+-noncdh/*"/>
<include name="330until340/*"/>
<include name="330until340-nondb/*"/>
</patternset>
<include name="330/*"/>
</dirset>
Expand All @@ -615,7 +619,7 @@
<!-- inherit from 331+ with exceptions -->
<patternset refid="spark331+.pattern"/>
<exclude name="*until340*/*"/>

<include name="340+-and-330db/*"/>
<!-- uniquely 340+ -->
<include name="340+/*"/>
</patternset>
Expand Down Expand Up @@ -665,10 +669,20 @@
<exclude name="*nondb*/*"/>

<include name="311+-db/*"/>
<include name="321+-db/*"/>
</patternset>
<include name="321db/*"/>
</dirset>
</pathconvert>
<pathconvert property="spark330db.sources" pathsep=",">
<dirset dir="${project.basedir}/src/main">
<patternset refid="spark321db+.pattern"/>
<patternset refid="spark330+.pattern"/>
<include name="340+-and-330db/*"/>
<include name="330db/*"/>
<exclude name="*non330db*/*"/>
</dirset>
</pathconvert>
</target>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ trait ShimLeafExecNode extends LeafExecNode {
// For AQE support in Databricks, all Exec nodes implement computeStats(). This is actually
// a recursive call to traverse the entire physical plan to aggregate this number. For the
// end of the computation, this means that all LeafExecNodes must implement this method to
// avoid a stack overflow. For now, based on feedback from Databricks, Long.MaxValue is
// avoid a stack overflow. For now, based on feedback from Databricks, Long.MaxValue is
// sufficient to satisfy this computation.
override def computeStats(): Statistics = {
Statistics(
Expand All @@ -40,4 +40,6 @@ trait ShimDataSourceV2ScanExecBase extends DataSourceV2ScanExecBase {
sizeInBytes = Long.MaxValue
)
}

def ordering: Option[Seq[org.apache.spark.sql.catalyst.expressions.SortOrder]] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._
import org.apache.spark.sql.catalyst.expressions._

object SparkShimImpl extends Spark331PlusShims with Spark320until340Shims {
override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion
}
trait ShimExtractValue extends ExtractValue
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters

object SparkShimImpl extends Spark31XShims {

override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion

override def hasCastFloatTimestampUpcast: Boolean = false

override def reproduceEmptyStringBug: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.rapids._

trait ShimPredicateHelper extends PredicateHelper {
// SPARK-30027 from 3.2.0
// If one expression and its children are null intolerant, it is null intolerant.
protected def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
case _ => false
}

override protected def splitConjunctivePredicates(
condition: Expression
): Seq[Expression] = {
condition match {
case GpuAnd(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => super.splitConjunctivePredicates(condition)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters

object SparkShimImpl extends Spark31XShims {

override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion

override def hasCastFloatTimestampUpcast: Boolean = true

override def reproduceEmptyStringBug: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters

object SparkShimImpl extends Spark31XdbShims {

override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion

override def getParquetFilters(
schema: MessageType,
pushDownDate: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters

object SparkShimImpl extends Spark31XShims {

override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion

override def getParquetFilters(
schema: MessageType,
pushDownDate: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters

object SparkShimImpl extends Spark31XShims {

override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion

override def getParquetFilters(
schema: MessageType,
pushDownDate: Boolean,
Expand All @@ -45,4 +41,6 @@ object SparkShimImpl extends Spark31XShims {
override def hasCastFloatTimestampUpcast: Boolean = true

override def isCastingStringToNegDecimalScaleSupported: Boolean = true

override def reproduceEmptyStringBug: Boolean = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.rapids._

trait ShimPredicateHelper extends PredicateHelper {
// SPARK-30027 provides isNullIntolerant
override protected def splitConjunctivePredicates(
condition: Expression
): Seq[Expression] = {
condition match {
case GpuAnd(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => super.splitConjunctivePredicates(condition)
}
}
}
Loading

0 comments on commit a42e328

Please sign in to comment.