Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UDF compiler skeleton #434

Merged
merged 1 commit into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,15 @@ Casting from string to timestamp currently has the following limitations.
milliseconds, with 2 digits each for hours, minutes, and seconds, and 6 digits for milliseconds.
Only timezone 'Z' (UTC) is supported. Casting unsupported formats will result in null values.

## UDF to Catalyst Expressions
To speedup the process of UDF, spark-rapids introduces a udf-compiler extension to translate UDFs to Catalyst expressions.

To enable this operation on the GPU, set
[`spark.rapids.sql.udfCompiler.enabled`](configs.md#sql.udfCompiler.enabled) to `true`.

However, Spark may produce different results for a compiled udf and the non-compiled. For example: a udf of `x/y` where `y` happens to be `0`, the compiled catalyst expressions will return `NULL` while the original udf would fail the entire job with a `java.lang.ArithmeticException: / by zero`

When translating UDFs to Catalyst expressions, the supported UDF functions are limited:

| Operand type | Operation |
| ------------------------------------------------------------------- | ------------------|
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Name | Description | Default Value
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.udfCompiler.enabled"></a>spark.rapids.sql.udfCompiler.enabled|When set to true, all UDFs are compiled to Catalyst expressions by Catalyst Analyzer|false
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,16 @@
<module>integration_tests</module>
<module>shims</module>
<module>api_validation</module>
<module>udf-compiler</module>
</modules>

<profiles>
<profile>
<id>udf-compiler</id>
<modules>
<module>udf-compiler</module>
</modules>
</profile>
<profile>
<id>alpha-features</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val UDF_COMPILER_ENABLED = conf("spark.rapids.sql.udfCompiler.enabled")
.doc("When set to true, all UDFs will be compiled to Catalyst expressions by Catalyst " +
"Analyzer.")
.booleanConf
.createWithDefault(false)

val INCOMPATIBLE_OPS = conf("spark.rapids.sql.incompatibleOps.enabled")
.doc("For operations that work, but are not 100% compatible with the Spark equivalent " +
"set if they should be enabled by default or disabled by default.")
Expand Down Expand Up @@ -752,6 +758,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isSqlEnabled: Boolean = get(SQL_ENABLED)

lazy val isUdfCompilerEnabled: Boolean = get(UDF_COMPILER_ENABLED)

lazy val exportColumnarRdd: Boolean = get(EXPORT_COLUMNAR_RDD)

lazy val isIncompatEnabled: Boolean = get(INCOMPATIBLE_OPS)
Expand Down
25 changes: 25 additions & 0 deletions udf-compiler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
UDF Compiler
============

How to run tests
----------------

From `rapids-plugin-4-spark` root directory, use this command to run the `OpcodeSuite`:

```
mvn test -DwildcardSuites=com.nvidia.spark.OpcodeSuite
```

How to run spark shell
----------------------

To run the spark-shell, you need a `SPARK_HOME`, the `cudf-0.15-SNAPSHOT-cuda10-1.jar`, and the jars produced in the plugin. The cudf jar will be downloaded when mvn test (or package) is run into the ~/.m2 directory. It's easy to get the jar from this directory, and place somewhere accessible. In the case below, the cudf jar is assumed to be in a directory `$JARS`:

```
export SPARK_HOME=[your spark distribution directory]
export JARS=[path to cudf 0.15 jar]

$SPARK_HOME/bin/spark-shell \
--jars $JARS/cudf-0.15-SNAPSHOT-cuda10-1.jar,udf-compiler/target/rapids-4-spark-udf-0.2.0-SNAPSHOT.jar,sql-plugin/target/rapids-4-spark-sql_2.12-0.2.0-SNAPSHOT.jar \
--conf spark.sql.extensions="com.nvidia.spark.SQLPlugin,com.nvidia.spark.udf.Plugin"
```
137 changes: 137 additions & 0 deletions udf-compiler/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?xml version="1.0" encoding="UTF-8"?>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
<!--
Copyright (c) 2020, NVIDIA CORPORATION.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-parent</artifactId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-udf</artifactId>
<version>0.2.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ai.rapids</groupId>
<artifactId>cudf</artifactId>
<classifier>${cuda.version}</classifier>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<!-- Include the properties file to provide the build information. -->
<directory>${project.build.directory}/extra-resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>${project.basedir}/..</directory>
<targetPath>META-INF</targetPath>
<includes>
<!-- The NOTICE will be taken care of by the antrun task below -->
<include>LICENSE</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>copy-notice</id>
<goals>
<goal>run</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<target>
<!-- copy NOTICE-binary to NOTICE -->
<copy
todir="${project.build.directory}/classes/META-INF/"
verbose="true">
<fileset dir="${project.basedir}/..">
<include name="NOTICE-binary"/>
</fileset>
<mapper type="glob" from="*-binary" to="*"/>
</copy>
</target>
</configuration>
</execution>
</executions>
</plugin>
<!-- disable surefire as we are using scalatest only -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
59 changes: 59 additions & 0 deletions udf-compiler/src/main/scala/com/nvidia/spark/udf/Plugin.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.udf

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, ScalaUDF}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule

import com.nvidia.spark.rapids.RapidsConf

class Plugin extends Function1[SparkSessionExtensions, Unit] with Logging {
override def apply(extensions: SparkSessionExtensions): Unit = {
logWarning("Installing rapids UDF compiler extensions to Spark. The compiler is disabled" +
s" by default. To enable it, set `${RapidsConf.UDF_COMPILER_ENABLED}` to true")
extensions.injectResolutionRule(_ => LogicalPlanRules())
}
}

case class LogicalPlanRules() extends Rule[LogicalPlan] with Logging {
def replacePartialFunc(plan: LogicalPlan): PartialFunction[Expression, Expression] = {
case d: Expression => attemptToReplaceExpression(plan, d)
}

def attemptToReplaceExpression(plan: LogicalPlan, exp: Expression): Expression = {
exp
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entrance of compiler. implementations will come in next PR.

}

override def apply(plan: LogicalPlan): LogicalPlan = {
val conf = new RapidsConf(plan.conf)
if (conf.isUdfCompilerEnabled) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Control whether to use the compiler or not.

plan match {
case project: Project =>
Project(project.projectList.map(e => attemptToReplaceExpression(plan, e))
.asInstanceOf[Seq[NamedExpression]], project.child)
case x => {
x.transformExpressions(replacePartialFunc(plan))
}
}
} else {
plan
}
}
}
28 changes: 28 additions & 0 deletions udf-compiler/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) 2019, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/surefire-reports/scala-test-detailed-output.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

#Just warnings for the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
63 changes: 63 additions & 0 deletions udf-compiler/src/test/scala/com/nvidia/spark/OpcodeSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{udf => makeUdf}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.scalatest.FunSuite

import org.scalatest.Assertions._
import org.apache.spark.sql.functions.{log => logalias}
import java.nio.charset.Charset

import com.nvidia.spark.rapids.RapidsConf



class OpcodeSuite extends FunSuite {

val conf: SparkConf = new SparkConf()
.set("spark.sql.extensions", "com.nvidia.spark.udf.Plugin")
.set(RapidsConf.EXPLAIN.key, "true")

val spark: SparkSession =
SparkSession.builder()
.master("local[1]")
.appName("OpcodeSuite")
.config(conf)
.getOrCreate()

import spark.implicits._

// Utility Function for checking equivalency of Dataset type
def checkEquiv[T](ds1: Dataset[T], ds2: Dataset[T]) : Unit = {
ds1.explain(true)
ds2.explain(true)
val resultdf = ds1.toDF()
val refdf = ds2.toDF()
ds1.show
ds2.show
val columns = refdf.schema.fields.map(_.name)
val selectiveDifferences = columns.map(col => refdf.select(col).except(resultdf.select(col)))
selectiveDifferences.map(diff => { assert(diff.count==0) } )
ds1.show
ds2.show
}
}