Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedupe proxy rapids shuffle manager byte code #3602

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/branch-21.10' into dedupeProxyRa…
…pidsShuffleManagerByteCode
gerashegalov committed Sep 22, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit d815523605f2fb74a8383f85fb8715ee1704a57e
29 changes: 16 additions & 13 deletions dist/README.md
Original file line number Diff line number Diff line change
@@ -17,20 +17,23 @@ Files are: `com.nvidia.spark.rapids.SparkShimServiceProvider.sparkNonSnapshot`,

The new uber jar is structured like:

1. Base common classes are user visible classes. For these we use Spark 3.0.1 versions
2. META-INF/services. This is a file that has to list all the shim versions supported by this jar. The files talked about above for each profile are put into place here for uber jars.
1. Base common classes are user visible classes. For these we use Spark 3.0.1 versions because they are assumed to be
bitwise-identical to the other shims, this assumption is subject to the future automatic validation.
2. META-INF/services. This is a file that has to list all the shim versions supported by this jar.
The files talked about above for each profile are put into place here for uber jars. Although we currently do not use
[ServiceLoader API](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html) we use the same service
provider discovery mechanism
3. META-INF base files are from 3.0.1 - maven, LICENSE, NOTICE, etc
4. shaded dependencies for Spark 3.0.1 in case the base common classes needed them.
5. Spark specific directory for each version of Spark supported in the jar. ie spark301/, spark302/, spark311/, etc.
4. Spark specific directory (aka Parallel World in the jargon of
[ParallelWorldClassloader](https://github.com/openjdk/jdk/blob/jdk8-b120/jaxws/src/share/jaxws_classes/com/sun/istack/internal/tools/ParallelWorldClassLoader.java))
for each version of Spark supported in the jar, i.e., spark301/, spark302/, spark311/, etc.

If you have to change the contents of the uber jar the following files control what goes into the base jar as classes that are not shaded.

1. `unshimmed-base.txt` - this has classes and files that should go into the base jar with their normal package name
(not shaded). This includes user visible classes (e.g., com/nvidia/spark/SQLPlugin), python files, and other files
that aren't version specific. Uses Spark 3.0.1 built jar for these base classes.

2. `unshimmed-extras.txt` - This is applied to all the individual Spark specific verson jars to pull any files that
need to go into the base of the jar and not into the Spark specific directory from all of the other Spark version jars.

3. `unshimmed-spark311.txt` - This is applied to all the Spark 3.1.1 specific verson to pull any files that need
to go into the base of the jar and not into the Spark specific directory from all of the other Spark version jars.
1. `unshimmed-common-from-spark301.txt` - this has classes and files that should go into the base jar with their normal
package name (not shaded). This includes user visible classes (i.e., com/nvidia/spark/SQLPlugin), python files,
and other files that aren't version specific. Uses Spark 3.0.1 built jar for these base classes as explained above.
2. `unshimmed-from-each-spark3xx.txt` - This is applied to all the individual Spark specific version jars to pull
any files that need to go into the base of the jar and not into the Spark specific directory.
3. `unshimmed-spark311.txt` - This is applied to all the Spark 3.1.1 specific version jars to pull any files that need to go
into the base of the jar and not into the Spark specific directory.
251 changes: 146 additions & 105 deletions dist/pom.xml

Large diffs are not rendered by default.

File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
META-INF/services/**
com/nvidia/spark/rapids/*/RapidsShuffleManager*
org/apache/spark/sql/rapids/VisibleShuffleManager*
org/apache/spark/sql/rapids/shims/*/ProxyRapidsShuffleInternalManager*
spark-*-info.properties
4 changes: 2 additions & 2 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
@@ -41,12 +41,12 @@ else
if [ -d "$LOCAL_JAR_PATH" ]; then
CUDF_JARS=$(echo "$LOCAL_JAR_PATH"/cudf-*.jar)
PLUGIN_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark_*.jar)
TEST_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark-integration-tests*-$SPARK_SHIM_VER-*.jar)
TEST_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark-integration-tests*-$SPARK_SHIM_VER*.jar)
UDF_EXAMPLE_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark-udf-examples*.jar)
else
CUDF_JARS=$(echo "$SCRIPTPATH"/target/dependency/cudf-*.jar)
PLUGIN_JARS=$(echo "$SCRIPTPATH"/../dist/target/rapids-4-spark_*.jar)
TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$SPARK_SHIM_VER-*.jar)
TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$SPARK_SHIM_VER*.jar)
UDF_EXAMPLE_JARS=$(echo "$SCRIPTPATH"/../udf-examples/target/rapids-4-spark-udf-examples*.jar)
fi
ALL_JARS="$CUDF_JARS $PLUGIN_JARS $TEST_JARS $UDF_EXAMPLE_JARS"
146 changes: 117 additions & 29 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,21 @@
import pyspark.sql.functions as f
from pyspark.sql.utils import IllegalArgumentException

# No overflow gens here because we just focus on verifying the fallback to CPU when
# enabling ANSI mode. But overflows will fail the tests because CPU runs raise
# exceptions.
_no_overflow_multiply_gens = [
ByteGen(min_val = 1, max_val = 10, special_cases=[]),
ShortGen(min_val = 1, max_val = 100, special_cases=[]),
IntegerGen(min_val = 1, max_val = 1000, special_cases=[]),
LongGen(min_val = 1, max_val = 3000, special_cases=[])]

def _get_overflow_df(spark, data, data_type, expr):
return spark.createDataFrame(
SparkContext.getOrCreate().parallelize([data]),
StructType([StructField('a', data_type)])
).selectExpr(expr)

decimal_gens_not_max_prec = [decimal_gen_neg_scale, decimal_gen_scale_precision,
decimal_gen_same_scale_precision, decimal_gen_64bit]

@@ -38,6 +53,19 @@ def test_addition(data_gen):
f.col('a') + f.col('b')),
conf=allow_negative_scale_of_decimal_conf)

# If it will not overflow for multiply it is good for add too
@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens, ids=idfn)
def test_addition_ansi_no_overflow(data_gen):
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
f.col('a') + f.lit(100).cast(data_type),
f.lit(-12).cast(data_type) + f.col('b'),
f.lit(None).cast(data_type) + f.col('a'),
f.col('b') + f.lit(None).cast(data_type),
f.col('a') + f.col('b')),
conf={'spark.sql.ansi.enabled': 'true'})

@pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens_not_max_prec, ids=idfn)
def test_subtraction(data_gen):
data_type = data_gen.data_type
@@ -50,6 +78,19 @@ def test_subtraction(data_gen):
f.col('a') - f.col('b')),
conf=allow_negative_scale_of_decimal_conf)

# If it will not overflow for multiply it is good for subtract too
@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens, ids=idfn)
def test_subtraction_ansi_no_overflow(data_gen):
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
f.col('a') - f.lit(100).cast(data_type),
f.lit(-12).cast(data_type) - f.col('b'),
f.lit(None).cast(data_type) - f.col('a'),
f.col('b') - f.lit(None).cast(data_type),
f.col('a') - f.col('b')),
conf={'spark.sql.ansi.enabled': 'true'})

@pytest.mark.parametrize('data_gen', numeric_gens +
[decimal_gen_neg_scale, decimal_gen_scale_precision, decimal_gen_same_scale_precision, DecimalGen(8, 8)], ids=idfn)
def test_multiplication(data_gen):
@@ -63,22 +104,13 @@ def test_multiplication(data_gen):
f.col('a') * f.col('b')),
conf=allow_negative_scale_of_decimal_conf)

# No overflow gens here because we just focus on verifying the fallback to CPU when
# enabling ANSI mode. But overflows will fail the tests because CPU runs raise
# exceptions.
_no_overflow_multiply_gens = [
ByteGen(min_val = 1, max_val = 10, special_cases=[]),
ShortGen(min_val = 1, max_val = 100, special_cases=[]),
IntegerGen(min_val = 1, max_val = 1000, special_cases=[]),
LongGen(min_val = 1, max_val = 3000, special_cases=[])]

@allow_non_gpu('ProjectExec', 'Alias', 'CheckOverflow', 'Multiply', 'PromotePrecision', 'Cast')
@allow_non_gpu('ProjectExec', 'Alias', 'Multiply', 'Cast')
@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens, ids=idfn)
def test_multiplication_fallback_when_ansi_enabled(data_gen):
assert_gpu_fallback_collect(
lambda spark : binary_op_df(spark, data_gen).select(
f.col('a') * f.col('b')),
'ProjectExec',
'Multiply',
conf={'spark.sql.ansi.enabled': 'true'})

@pytest.mark.parametrize('data_gen', [float_gen, double_gen,
@@ -175,6 +207,25 @@ def test_unary_minus(data_gen):
lambda spark : unary_op_df(spark, data_gen).selectExpr('-a'),
conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens + [float_gen, double_gen] + decimal_gens, ids=idfn)
def test_unary_minus_ansi_no_overflow(data_gen):
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.sql.ansi.enabled': 'true'})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('-a'),
conf=conf)

@pytest.mark.parametrize('data_type,value', [
(LongType(), LONG_MIN),
(IntegerType(), INT_MIN),
(ShortType(), SHORT_MIN),
(ByteType(), BYTE_MIN)], ids=idfn)
def test_unary_minus_ansi_overflow(data_type, value):
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.sql.ansi.enabled': 'true'})
assert_gpu_and_cpu_error(
df_fun=lambda spark: _get_overflow_df(spark, [value], data_type, '-a').collect(),
conf=conf,
error_message='ArithmeticException')

# This just ends up being a pass through. There is no good way to force
# a unary positive into a plan, because it gets optimized out, but this
# verifies that we can handle it.
@@ -190,6 +241,29 @@ def test_abs(data_gen):
lambda spark : unary_op_df(spark, data_gen).selectExpr('abs(a)'),
conf=allow_negative_scale_of_decimal_conf)

# ANSI is ignored for abs prior to 3.2.0, but still okay to test it a little more.
@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens + [float_gen, double_gen] + decimal_gens, ids=idfn)
def test_abs_ansi_no_overflow(data_gen):
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.sql.ansi.enabled': 'true'})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('abs(a)'),
conf=conf)

# Only run this test for Spark v3.2.0 and later to verify abs will
# throw exceptions for overflow when ANSI mode is enabled.
@pytest.mark.skipif(is_before_spark_320(), reason='SPARK-33275')
@pytest.mark.parametrize('data_type,value', [
(LongType(), LONG_MIN),
(IntegerType(), INT_MIN),
(ShortType(), SHORT_MIN),
(ByteType(), BYTE_MIN)], ids=idfn)
def test_abs_ansi_overflow(data_type, value):
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.sql.ansi.enabled': 'true'})
assert_gpu_and_cpu_error(
df_fun=lambda spark: _get_overflow_df(spark, [value], data_type, 'abs(a)').collect(),
conf=conf,
error_message='ArithmeticException')

@approximate_float
@pytest.mark.parametrize('data_gen', double_gens, ids=idfn)
def test_asin(data_gen):
@@ -607,47 +681,61 @@ def test_div_overflow_no_exception_when_ansi(expr, ansi_enabled):
func=lambda spark: _get_div_overflow_df(spark, expr),
conf={'spark.sql.ansi.enabled': ansi_enabled})


def _get_add_overflow_df(spark, data, data_type, expr):
return spark.createDataFrame(
SparkContext.getOrCreate().parallelize([data]),
StructType([StructField('a', data_type)])
).selectExpr(expr)

_data_type_expr_for_add_overflow = [
([127], ByteType(), 'a + 1Y'), ([-128], ByteType(), '-1Y + a'),
([32767], ShortType(), 'a + 1S'), ([-32768], ShortType(), '-1S + a'),
([2147483647], IntegerType(), 'a + 1'), ([-2147483648], IntegerType(), '-1 + a'),
([127], ByteType(), 'a + 1Y'),
([-128], ByteType(), '-1Y + a'),
([32767], ShortType(), 'a + 1S'),
([-32768], ShortType(), '-1S + a'),
([2147483647], IntegerType(), 'a + 1'),
([-2147483648], IntegerType(), '-1 + a'),
([9223372036854775807], LongType(), 'a + 1L'),
([-9223372036854775808], LongType(), '-1L + a'),
([3.4028235E38], FloatType(), 'a + a'),
([-3.4028235E38], FloatType(), 'a + a'),
([1.7976931348623157E308], DoubleType(), 'a + a'),
([-1.7976931348623157E308], DoubleType(), 'a + a'),
([Decimal('9'*18 + 'e-0')], DecimalType(precision=18), 'a + a'),
([Decimal('-' + '9'*18 + 'e-0')], DecimalType(precision=18), 'a + a'),
]
([-1.7976931348623157E308], DoubleType(), 'a + a')]

@pytest.mark.parametrize('data,tp,expr', _data_type_expr_for_add_overflow[:12])
@pytest.mark.parametrize('data,tp,expr', _data_type_expr_for_add_overflow)
def test_add_overflow_with_ansi_enabled(data, tp, expr):
ansi_conf = {'spark.sql.ansi.enabled': 'true'}
if isinstance(tp, IntegralType):
assert_gpu_and_cpu_error(
lambda spark: _get_add_overflow_df(spark, data, tp, expr).collect(),
lambda spark: _get_overflow_df(spark, data, tp, expr).collect(),
conf=ansi_conf,
error_message='overflow')
else:
assert_gpu_and_cpu_are_equal_collect(
func=lambda spark: _get_add_overflow_df(spark, data, tp, expr),
func=lambda spark: _get_overflow_df(spark, data, tp, expr),
conf=ansi_conf)


_data_type_expr_for_sub_overflow = [
([-128], ByteType(), 'a - 1Y'),
([-32768], ShortType(), 'a -1S'),
([-2147483648], IntegerType(), 'a - 1'),
([-9223372036854775808], LongType(), 'a - 1L'),
([-3.4028235E38], FloatType(), 'a - cast(1.0 as float)'),
([-1.7976931348623157E308], DoubleType(), 'a - 1.0')]

@pytest.mark.parametrize('data,tp,expr', _data_type_expr_for_sub_overflow)
def test_subtraction_overflow_with_ansi_enabled(data, tp, expr):
ansi_conf = {'spark.sql.ansi.enabled': 'true'}
if isinstance(tp, IntegralType):
assert_gpu_and_cpu_error(
lambda spark: _get_overflow_df(spark, data, tp, expr).collect(),
conf=ansi_conf,
error_message='overflow')
else:
assert_gpu_and_cpu_are_equal_collect(
func=lambda spark: _get_overflow_df(spark, data, tp, expr),
conf=ansi_conf)

@allow_non_gpu('ProjectExec', 'Alias', 'CheckOverflow', 'Add', 'PromotePrecision', 'Cast')
@pytest.mark.parametrize('data,tp,expr', _data_type_expr_for_add_overflow[12:])
@pytest.mark.parametrize('ansi_enabled', ['false','true'])
def test_add_overflow_fallback_for_decimal(data, tp, expr, ansi_enabled):
# Spark will try to promote the precision (to 19) which GPU does not supported now.
assert_gpu_fallback_collect(
lambda spark: _get_add_overflow_df(spark, data, tp, expr),
lambda spark: _get_overflow_df(spark, data, tp, expr),
'ProjectExec',
conf={'spark.sql.ansi.enabled': ansi_enabled})
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/cast_test.py
Original file line number Diff line number Diff line change
@@ -83,15 +83,15 @@ def test_cast_string_ts_valid_format():
def test_cast_string_date_fallback():
assert_gpu_fallback_collect(
# Cast back to String because this goes beyond what python can support for years
lambda spark : unary_op_df(spark, StringGen('([0-9]|-|\+){4,12}')).select(f.col('a').cast(DateType()).cast(StringType())),
lambda spark : unary_op_df(spark, StringGen('([0-9]|-|\\+){4,12}')).select(f.col('a').cast(DateType()).cast(StringType())),
'Cast')

@allow_non_gpu('ProjectExec', 'Cast', 'Alias')
@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0+ do we have issues with extended years")
def test_cast_string_timestamp_fallback():
assert_gpu_fallback_collect(
# Cast back to String because this goes beyond what python can support for years
lambda spark : unary_op_df(spark, StringGen('([0-9]|-|\+){4,12}')).select(f.col('a').cast(TimestampType()).cast(StringType())),
lambda spark : unary_op_df(spark, StringGen('([0-9]|-|\\+){4,12}')).select(f.col('a').cast(TimestampType()).cast(StringType())),
'Cast',
conf = {'spark.rapids.sql.castStringToTimestamp.enabled': 'true'})

6 changes: 3 additions & 3 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
@@ -176,11 +176,11 @@ def start(self, rand):
length = _MAX_CHOICES
self._start(rand, lambda : strs[rand.randrange(0, length)])

_BYTE_MIN = -(1 << 7)
_BYTE_MAX = (1 << 7) - 1
BYTE_MIN = -(1 << 7)
BYTE_MAX = (1 << 7) - 1
class ByteGen(DataGen):
"""Generate Bytes"""
def __init__(self, nullable=True, min_val =_BYTE_MIN, max_val = _BYTE_MAX, special_cases=[]):
def __init__(self, nullable=True, min_val = BYTE_MIN, max_val = BYTE_MAX, special_cases=[]):
super().__init__(ByteType(), nullable=nullable, special_cases=special_cases)
self._min_val = min_val
self._max_val = max_val
29 changes: 28 additions & 1 deletion integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@
from datetime import date, datetime, timezone
from marks import incompat, allow_non_gpu
from pyspark.sql.types import *
from spark_session import with_spark_session
from spark_session import with_spark_session, is_before_spark_311
import pyspark.sql.functions as f

# We only support literal intervals for TimeSub
@@ -178,13 +178,31 @@ def test_to_unix_timestamp(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"))

@allow_non_gpu('ProjectExec,Alias,ToUnixTimestamp,Literal')
@pytest.mark.skipif(is_before_spark_311(), reason='SPARK-33498')
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp_fallback(data_gen):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"),
'ToUnixTimestamp',
conf={'spark.sql.ansi.enabled': 'true'})

@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_unix_timestamp_improved(data_gen):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true",
"spark.sql.legacy.timeParserPolicy": "CORRECTED"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), conf)

@allow_non_gpu('ProjectExec,Alias,UnixTimestamp,Literal')
@pytest.mark.skipif(is_before_spark_311(), reason='SPARK-33498')
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_unix_timestamp_fallback(data_gen):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col("a"))),
'UnixTimestamp',
conf={'spark.sql.ansi.enabled': 'true'})

@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp_improved(data_gen):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"}
@@ -207,6 +225,15 @@ def test_string_unix_timestamp(data_gen, date_form):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form)))

@allow_non_gpu('ProjectExec,Alias,GetTimestamp,Literal,Cast')
@pytest.mark.skipif(is_before_spark_311(), reason='SPARK-33498')
@pytest.mark.parametrize('data_gen', [StringGen('200[0-9]-0[1-9]-[0-2][1-8]')], ids=idfn)
def test_gettimestamp_fallback(data_gen):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.to_date(f.col("a"), "yyyy-MM-dd")),
'GetTimestamp',
conf={'spark.sql.ansi.enabled': 'true'})

supported_date_formats = ['yyyy-MM-dd', 'yyyy-MM', 'yyyy/MM/dd', 'yyyy/MM', 'dd/MM/yyyy',
'MM-dd', 'MM/dd', 'dd-MM', 'dd/MM']
@pytest.mark.parametrize('date_format', supported_date_formats, ids=idfn)
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.