Skip to content

Commit

Permalink
Merge branch 'branch-0.5' into struct_hash
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowe authored Apr 5, 2021
2 parents 6c1562c + 7cbbc12 commit c806ce5
Show file tree
Hide file tree
Showing 56 changed files with 1,470 additions and 550 deletions.
9 changes: 0 additions & 9 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,6 @@ Also, the GPU does not support casting from strings containing hex values.

To enable this operation on the GPU, set
[`spark.rapids.sql.castStringToFloat.enabled`](configs.md#sql.castStringToFloat.enabled) to `true`.

### String to Integral Types

The GPU will return incorrect results for strings representing values greater than Long.MaxValue or
less than Long.MinValue. The correct behavior would be to return null for these values, but the GPU
currently overflows and returns an incorrect integer value.

To enable this operation on the GPU, set
[`spark.rapids.sql.castStringToInteger.enabled`](configs.md#sql.castStringToInteger.enabled) to `true`.

### String to Date

Expand Down
2 changes: 2 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Name | Description | Default Value
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true
<a name="sql.batchSizeBytes"></a>spark.rapids.sql.batchSizeBytes|Set the target number of bytes for a GPU batch. Splits sizes for input data is covered by separate configs. The maximum setting is 2 GB to avoid exceeding the cudf row count limit of a column.|2147483647
<a name="sql.castDecimalToString.enabled"></a>spark.rapids.sql.castDecimalToString.enabled|When set to true, casting from decimal to string is supported on the GPU. The GPU does NOT produce exact same string as spark produces, but producing strings which are semantically equal. For instance, given input BigDecimal(123, -2), the GPU produces "12300", which spark produces "1.23E+4".|false
<a name="sql.castFloatToDecimal.enabled"></a>spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|false
<a name="sql.castFloatToIntegralTypes.enabled"></a>spark.rapids.sql.castFloatToIntegralTypes.enabled|Casting from floating point types to integral types on the GPU supports a slightly different range of values when using Spark 3.1.0 or later. Refer to the CAST documentation for more details.|false
<a name="sql.castFloatToString.enabled"></a>spark.rapids.sql.castFloatToString.enabled|Casting from floating point types to string on the GPU returns results that have a different precision than the default results of Spark.|false
Expand Down Expand Up @@ -158,6 +159,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Floor"></a>spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None|
<a name="sql.expression.FromUnixTime"></a>spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None|
<a name="sql.expression.GetArrayItem"></a>spark.rapids.sql.expression.GetArrayItem| |Gets the field at `ordinal` in the Array|true|None|
<a name="sql.expression.GetJsonObject"></a>spark.rapids.sql.expression.GetJsonObject|`get_json_object`|Extracts a json object from path|true|None|
<a name="sql.expression.GetMapValue"></a>spark.rapids.sql.expression.GetMapValue| |Gets Value from a Map based on a key|true|None|
<a name="sql.expression.GetStructField"></a>spark.rapids.sql.expression.GetStructField| |Gets the named field of the struct|true|None|
<a name="sql.expression.GetTimestamp"></a>spark.rapids.sql.expression.GetTimestamp| |Gets timestamps from strings using given pattern.|true|None|
Expand Down
86 changes: 77 additions & 9 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ Accelerator supports are described below.
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -379,7 +379,7 @@ Accelerator supports are described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -5817,6 +5817,74 @@ Accelerator support is described below.
<td><b>NS</b></td>
</tr>
<tr>
<td rowSpan="3">GetJsonObject</td>
<td rowSpan="3">`get_json_object`</td>
<td rowSpan="3">Extracts a json object from path</td>
<td rowSpan="3">None</td>
<td rowSpan="3">project</td>
<td>json</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>path</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><em>PS (Literal value only)</em></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="6">GetMapValue</td>
<td rowSpan="6"> </td>
<td rowSpan="6">Gets Value from a Map based on a key</td>
Expand Down Expand Up @@ -12421,7 +12489,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -12442,7 +12510,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -18049,7 +18117,7 @@ and the accelerator produces the same result.
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td>S*</td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -18175,7 +18243,7 @@ and the accelerator produces the same result.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS (the struct's children must also support being cast to string)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -18453,7 +18521,7 @@ and the accelerator produces the same result.
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td>S*</td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -18579,7 +18647,7 @@ and the accelerator produces the same result.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS (the struct's children must also support being cast to string)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
1 change: 0 additions & 1 deletion docs/tuning-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,5 +209,4 @@ performance.
- [`spark.rapids.sql.variableFloatAgg.enabled`](configs.md#sql.variableFloatAgg.enabled)
- [`spark.rapids.sql.hasNans`](configs.md#sql.hasNans)
- [`spark.rapids.sql.castFloatToString.enabled`](configs.md#sql.castFloatToString.enabled)
- [`spark.rapids.sql.castStringToInteger.enabled`](configs.md#sql.castStringToInteger.enabled)
- [`spark.rapids.sql.castStringToFloat.enabled`](configs.md#sql.castStringToFloat.enabled)
60 changes: 27 additions & 33 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ else
then
# With xdist 0 and 1 are the same parallelsm but
# 0 is more effecient
TEST_PARALLEL_OPTS=""
TEST_PARALLEL_OPTS=()
MEMORY_FRACTION='1'
else
MEMORY_FRACTION=`python -c "print(1/($TEST_PARALLEL + 1))"`
TEST_PARALLEL_OPTS="-n $TEST_PARALLEL"
TEST_PARALLEL_OPTS=("-n" "$TEST_PARALLEL")
fi
RUN_DIR="$SCRIPTPATH"/target/run_dir
mkdir -p "$RUN_DIR"
Expand All @@ -99,41 +99,35 @@ else
## Under cloud environment, overwrite the '--std_input_path' param to point to the distributed file path
INPUT_PATH=${INPUT_PATH:-"$SCRIPTPATH"}

if [[ "${TEST_PARALLEL_OPTS}" != "" ]];
RUN_TESTS_COMMAND=("$SCRIPTPATH"/runtests.py
--rootdir
"$LOCAL_ROOTDIR"
"$LOCAL_ROOTDIR"/src/main/python)

TEST_COMMON_OPTS=(-v
-rfExXs
"$TEST_TAGS"
--std_input_path="$INPUT_PATH"/src/test/resources
--color=yes
$TEST_TYPE_PARAM
"$TEST_ARGS"
$RUN_TEST_PARAMS
"$@")

export PYSP_TEST_spark_driver_extraClassPath="${ALL_JARS// /:}"
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=UTC $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions='-ea -Duser.timezone=UTC'
export PYSP_TEST_spark_ui_showConsoleProgress='false'
export PYSP_TEST_spark_sql_session_timeZone='UTC'
export PYSP_TEST_spark_sql_shuffle_partitions='12'
if ((${#TEST_PARALLEL_OPTS[@]} > 0));
then
export PYSP_TEST_spark_driver_extraClassPath="${ALL_JARS// /:}"
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=UTC $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions='-ea -Duser.timezone=UTC'
export PYSP_TEST_spark_ui_showConsoleProgress='false'
export PYSP_TEST_spark_sql_session_timeZone='UTC'
export PYSP_TEST_spark_sql_shuffle_partitions='12'
export PYSP_TEST_spark_rapids_memory_gpu_allocFraction=$MEMORY_FRACTION
export PYSP_TEST_spark_rapids_memory_gpu_maxAllocFraction=$MEMORY_FRACTION

python \
"$SCRIPTPATH"/runtests.py --rootdir "$LOCAL_ROOTDIR" "$LOCAL_ROOTDIR"/src/main/python \
$TEST_PARALLEL_OPTS \
-v -rfExXs "$TEST_TAGS" \
--std_input_path="$INPUT_PATH"/src/test/resources/ \
--color=yes \
$TEST_TYPE_PARAM \
"$TEST_ARGS" \
$RUN_TEST_PARAMS \
"$@"
python "${RUN_TESTS_COMMAND[@]}" "${TEST_PARALLEL_OPTS[@]}" "${TEST_COMMON_OPTS[@]}"
else
"$SPARK_HOME"/bin/spark-submit --jars "${ALL_JARS// /,}" \
--conf "spark.driver.extraJavaOptions=-ea -Duser.timezone=UTC $COVERAGE_SUBMIT_FLAGS" \
--conf 'spark.executor.extraJavaOptions=-ea -Duser.timezone=UTC' \
--conf 'spark.sql.session.timeZone=UTC' \
--conf 'spark.sql.shuffle.partitions=12' \
$SPARK_SUBMIT_FLAGS \
"$SCRIPTPATH"/runtests.py --rootdir "$LOCAL_ROOTDIR" "$LOCAL_ROOTDIR"/src/main/python \
-v -rfExXs "$TEST_TAGS" \
--std_input_path="$INPUT_PATH"/src/test/resources/ \
--color=yes \
$TEST_TYPE_PARAM \
"$TEST_ARGS" \
$RUN_TEST_PARAMS \
"$@"
--driver-java-options "$PYSP_TEST_spark_driver_extraJavaOptions" \
$SPARK_SUBMIT_FLAGS "${RUN_TESTS_COMMAND[@]}" "${TEST_COMMON_OPTS[@]}"
fi
fi
12 changes: 8 additions & 4 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
from spark_session import with_cpu_session, with_gpu_session
import time
import types as pytypes
import data_gen

def _assert_equal(cpu, gpu, float_check, path):
t = type(cpu)
Expand Down Expand Up @@ -99,7 +100,7 @@ class _RowCmp(object):
"""Allows for sorting Rows in a consistent way"""
def __init__(self, wrapped):
#TODO will need others for maps, etc
if isinstance(wrapped, Row):
if isinstance(wrapped, Row) or isinstance(wrapped, list):
self.wrapped = [_RowCmp(c) for c in wrapped]
else:
self.wrapped = wrapped
Expand Down Expand Up @@ -356,7 +357,7 @@ def assert_gpu_and_cpu_row_counts_equal(func, conf={}):
"""
_assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf)

def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None):
def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df_fun: a function that will create the dataframe
Expand All @@ -370,7 +371,10 @@ def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None):
def do_it_all(spark):
df = df_fun(spark)
df.createOrReplaceTempView(table_name)
return spark.sql(sql)
if debug:
return data_gen.debug_df(spark.sql(sql))
else:
return spark.sql(sql)
assert_gpu_and_cpu_are_equal_collect(do_it_all, conf)

def assert_py4j_exception(func, error_message):
Expand Down
30 changes: 24 additions & 6 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def start(self, rand):
POS_FLOAT_NAN_MAX_VALUE = struct.unpack('f', struct.pack('I', 0x7fffffff))[0]
class FloatGen(DataGen):
"""Generate floats, which some built in corner cases."""
def __init__(self, nullable=True,
def __init__(self, nullable=True,
no_nans=False, special_cases=None):
self._no_nans = no_nans
if special_cases is None:
Expand Down Expand Up @@ -334,7 +334,7 @@ def gen_float():
POS_DOUBLE_NAN_MAX_VALUE = struct.unpack('d', struct.pack('L', 0x7fffffffffffffff))[0]
class DoubleGen(DataGen):
"""Generate doubles, which some built in corner cases."""
def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False,
def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False,
nullable=True, special_cases = None):
self._min_exp = min_exp
self._max_exp = max_exp
Expand Down Expand Up @@ -447,7 +447,7 @@ def __init__(self, start=None, end=None, nullable=True):

self._start_day = self._to_days_since_epoch(start)
self._end_day = self._to_days_since_epoch(end)

self.with_special_case(start)
self.with_special_case(end)

Expand Down Expand Up @@ -652,9 +652,27 @@ def gen_scalar_value(data_gen, seed=0, force_no_nulls=False):
v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]

def debug_df(df):
"""print out the contents of a dataframe for debugging."""
print('COLLECTED\n{}'.format(df.collect()))
def debug_df(df, path = None, file_format = 'json', num_parts = 1):
"""Print out or save the contents and the schema of a dataframe for debugging."""

if path is not None:
# Save the dataframe and its schema
# The schema can be re-created by using DataType.fromJson and used
# for loading the dataframe
file_name = f"{path}.{file_format}"
schema_file_name = f"{path}.schema.json"

df.coalesce(num_parts).write.format(file_format).save(file_name)
print(f"SAVED df output for debugging at {file_name}")

schema_json = df.schema.json()
schema_file = open(schema_file_name , 'w')
schema_file.write(schema_json)
schema_file.close()
print(f"SAVED df schema for debugging along in the output dir")
else:
print('COLLECTED\n{}'.format(df.collect()))

df.explain()
df.printSchema()
return df
Expand Down
Loading

0 comments on commit c806ce5

Please sign in to comment.