From 8d00323029349ad82073cbc52404589399c08706 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Sat, 11 Dec 2021 12:31:47 -0600 Subject: [PATCH 1/7] Initial changes for parquet Signed-off-by: Kuhu Shukla --- .../src/main/python/parquet_test.py | 105 +++++++++++++----- .../src/main/python/parquet_write_test.py | 3 +- .../nvidia/spark/rapids/GpuOverrides.scala | 6 +- .../spark/rapids/GpuParquetScanBase.scala | 3 - 4 files changed, 82 insertions(+), 35 deletions(-) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 858ec932bef..2185449eb4d 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -74,7 +74,7 @@ def read_parquet_sql(data_path): @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): +def test_parquet_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -90,12 +90,33 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs, assert_gpu_and_cpu_are_equal_collect(read_func(data_path), conf=all_confs) + +@pytest.mark.parametrize('parquet_gens', [decimal_128_gens_no_neg], ids=idfn) +@pytest.mark.parametrize('read_func', [read_parquet_df]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +def test_parquet_read_round_trip_decimal128(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] + data_path = spark_tmp_path + '/PARQUET_DATA' + print ('KUHU tmp path = ' + str(spark_tmp_path)) + with_cpu_session( + lambda spark : gen_df(spark, gen_list).write.parquet(data_path), + conf=rebase_write_corrected_conf) + all_confs = copy_and_update(reader_confs, { + 'spark.sql.sources.useV1SourceList': v1_enabled_list, + # set the int96 rebase mode values because its LEGACY in databricks which will preclude this op from running on GPU + 'spark.sql.legacy.parquet.int96RebaseModeInRead' : 'CORRECTED', + 'spark.sql.legacy.parquet.datetimeRebaseModeInRead': 'CORRECTED'}) + # once https://github.com/NVIDIA/spark-rapids/issues/1126 is in we can remove spark.sql.legacy.parquet.datetimeRebaseModeInRead config which is a workaround + # for nested timestamp/date support + assert_gpu_and_cpu_are_equal_collect(lambda spark : spark.read.parquet(data_path), conf=all_confs) + @allow_non_gpu('FileSourceScanExec') @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.parquet.read.enabled']) def test_parquet_fallback(spark_tmp_path, read_func, disable_conf): data_gens = [string_gen, - byte_gen, short_gen, int_gen, long_gen, boolean_gen] + decimal_gens + byte_gen, short_gen, int_gen, long_gen, boolean_gen] + decimal_gens + decimal_128_gens_no_neg gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)] gen = StructGen(gen_list, nullable=False) @@ -116,7 +137,7 @@ def test_parquet_fallback(spark_tmp_path, read_func, disable_conf): @pytest.mark.parametrize('compress', parquet_compress_options) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs): +def test_parquet_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path), @@ -131,13 +152,13 @@ def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, rea string_gen, date_gen, # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with # timestamp_gen - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg @pytest.mark.parametrize('parquet_gen', parquet_pred_push_gens, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled_list, reader_confs): +def test_parquet_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' gen_list = [('a', RepeatSeqGen(parquet_gen, 100)), ('b', parquet_gen)] s0 = gen_scalar(parquet_gen, force_no_nulls=True) @@ -162,7 +183,7 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1126') -def test_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): +def test_parquet_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : unary_op_df(spark, gen).write.parquet(data_path), @@ -220,11 +241,11 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase, @pytest.mark.parametrize('parquet_gens', [[byte_gen, short_gen, DecimalGen(precision=7, scale=3)], decimal_gens, [ArrayGen(DecimalGen(7,2), max_length=10)], - [StructGen([['child0', DecimalGen(7, 2)]])]], ids=idfn) + [StructGen([['child0', DecimalGen(7, 2)]])], decimal_128_gens_no_neg], ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): +def test_parquet_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -236,14 +257,14 @@ def test_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_con parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens, + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg, pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133')), pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))] @pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, reader_confs): +def test_parquet_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, reader_confs): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -256,12 +277,12 @@ def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, r @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0/key2=20' with_cpu_session( @@ -281,10 +302,36 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs): lambda spark : spark.read.parquet(data_path), conf=all_confs) + +@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +def test_parquet_simple_partitioned_read_decimal128(spark_tmp_path, v1_enabled_list, reader_confs): + # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed + # we should go with a more standard set of generators + parquet_gens = decimal_128_gens_no_neg + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0/key2=20' + with_cpu_session( + lambda spark : gen_df(spark, gen_list).write.parquet(first_data_path), + conf=rebase_write_legacy_conf) + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1/key2=21' + with_cpu_session( + lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path), + conf=rebase_write_corrected_conf) + third_data_path = spark_tmp_path + '/PARQUET_DATA/key=2/key2=22' + with_cpu_session( + lambda spark : gen_df(spark, gen_list).write.parquet(third_data_path), + conf=rebase_write_corrected_conf) + data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.parquet(data_path), + conf=all_confs) + # In this we are reading the data, but only reading the key the data was partitioned by @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reader_confs): parquet_gens = [byte_gen] gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' @@ -303,7 +350,7 @@ def test_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reade @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen] @@ -328,12 +375,12 @@ def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reader_confs) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( @@ -352,12 +399,12 @@ def test_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( @@ -378,7 +425,7 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_con @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( lambda spark : unary_op_df(spark, long_gen).write.parquet(first_data_path)) @@ -403,7 +450,7 @@ def test_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.orc.parquet.enabled']) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_input_meta_fallback(spark_tmp_path, v1_enabled_list, reader_confs, disable_conf): +def test_parquet_input_meta_fallback(spark_tmp_path, v1_enabled_list, reader_confs, disable_conf): first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( lambda spark : unary_op_df(spark, long_gen).write.parquet(first_data_path)) @@ -485,7 +532,7 @@ def test_small_file_memory(spark_tmp_path, v1_enabled_list): ([["struct", StructGen([["c_1", StringGen()], ["case_insensitive", LongGen()], ["c_3", ShortGen()]])]], [["stRUct", StructGen([["CASE_INSENSITIVE", LongGen()]])]]), ] - +# TODO CHECK FOR DECIMAL?? @pytest.mark.parametrize('data_gen,read_schema', _nested_pruning_schemas, ids=idfn) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @@ -593,7 +640,7 @@ def test_disorder_read_schema(spark_tmp_table_factory, reader_confs, v1_enabled_ @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('enable_dictionary', ["true", "false"], ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_reading_from_unaligned_pages_basic_filters(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): +def test_parquet_reading_from_unaligned_pages_basic_filters(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) data_path = spark_tmp_path + '/PARQUET_UNALIGNED_DATA' with_cpu_session(lambda spark : spark.range(0, 2000)\ @@ -611,7 +658,7 @@ def test_reading_from_unaligned_pages_basic_filters(spark_tmp_path, reader_confs @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('enable_dictionary', ["true", "false"], ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_reading_from_unaligned_pages_all_types(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): +def test_parquet_reading_from_unaligned_pages_all_types(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) data_path = spark_tmp_path + '/PARQUET_UNALIGNED_DATA' with_cpu_session(lambda spark : spark.range(0, 2000)\ @@ -622,6 +669,7 @@ def test_reading_from_unaligned_pages_all_types(spark_tmp_path, reader_confs, en "cast(id as double) as _6", # DECIMAL128 IS NOT SUPPORTED YET "cast(id as decimal(20,0)) as _7", "cast(id as decimal(10,0)) as _7", + "cast(id as decimal(30,0)) as _8", "cast(cast(1618161925 + (id * 60 * 60 * 24) as timestamp) as date) as _9", "cast(1618161925 + id as timestamp) as _10")\ .coalesce(1)\ @@ -637,7 +685,7 @@ def test_reading_from_unaligned_pages_all_types(spark_tmp_path, reader_confs, en @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('enable_dictionary', ["true", "false"], ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_reading_from_unaligned_pages_all_types_dict_optimized(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): +def test_parquet_reading_from_unaligned_pages_all_types_dict_optimized(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) data_path = spark_tmp_path + '/PARQUET_UNALIGNED_DATA' with_cpu_session(lambda spark : spark.range(0, 2000)\ @@ -649,9 +697,10 @@ def test_reading_from_unaligned_pages_all_types_dict_optimized(spark_tmp_path, r "cast(id % 10 as double) as _6", # DECIMAL128 IS NOT SUPPORTED YET "cast(id % 10 as decimal(20,0)) as _7", "cast(id % 10 as decimal(10,0)) as _7", - "cast(id % 2 as boolean) as _8", - "cast(cast(1618161925 + ((id % 10) * 60 * 60 * 24) as timestamp) as date) as _9", - "cast(1618161925 + (id % 10) as timestamp) as _10")\ + "cast(id % 10 as decimal(20,0)) as _8", + "cast(id % 2 as boolean) as _9", + "cast(cast(1618161925 + ((id % 10) * 60 * 60 * 24) as timestamp) as date) as _10", + "cast(1618161925 + (id % 10) as timestamp) as _11")\ .coalesce(1)\ .write\ .option("parquet.page.size", "4096") @@ -665,7 +714,7 @@ def test_reading_from_unaligned_pages_all_types_dict_optimized(spark_tmp_path, r @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('enable_dictionary', ["true", "false"], ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): +def test_parquet_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): # insert 50 null values in [400, 450) to verify that they are skipped during processing row # range [500, 1000) against the second page of col_2 [400, 800) all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 7a5f9ff14e1..728d35b1d89 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -32,7 +32,8 @@ coalesce_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'} reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, coalesce_parquet_file_reader_conf] -parquet_decimal_gens=[decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit] +parquet_decimal_gens=[decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit, + decimal_gen_20_2, decimal_gen_36_5, decimal_gen_38_0, decimal_gen_38_10] parquet_decimal_struct_gen= StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(parquet_decimal_gens)]) writer_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', 'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'CORRECTED'} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 919ffc775e3..d65501ceb43 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -801,9 +801,9 @@ object GpuOverrides extends Logging { cudfWrite = TypeSig.none, sparkSig = TypeSig.atomics)), (ParquetFormatType, FileFormatChecks( - cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT + TypeSig.ARRAY + + cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), - cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT + + cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested())), @@ -3460,7 +3460,7 @@ object GpuOverrides extends Logging { exec[DataWritingCommandExec]( "Writing data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL.withPsNote( - TypeEnum.DECIMAL, "128bit decimal only supported for Orc") + + TypeEnum.DECIMAL, "128bit decimal only supported for Orc and Parquet") + TypeSig.STRUCT.withPsNote(TypeEnum.STRUCT, "Only supported for Parquet") + TypeSig.MAP.withPsNote(TypeEnum.MAP, "Only supported for Parquet") + TypeSig.ARRAY.withPsNote(TypeEnum.ARRAY, "Only supported for Parquet")).nested(), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index 34589cacae9..e94ea0e77ca 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -1100,7 +1100,6 @@ class MultiFileParquetPartitionReader( isSchemaCaseSensitive) val parseOpts = ParquetOptions.builder() .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) - .enableStrictDecimalType(true) .includeColumn(includeColumns: _*).build() // About to start using the GPU @@ -1374,7 +1373,6 @@ class MultiFileCloudParquetPartitionReader( isSchemaCaseSensitive) val parseOpts = ParquetOptions.builder() .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) - .enableStrictDecimalType(true) .includeColumn(includeColumns: _*).build() // about to start using the GPU @@ -1515,7 +1513,6 @@ class ParquetPartitionReader( isSchemaCaseSensitive) val parseOpts = ParquetOptions.builder() .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) - .enableStrictDecimalType(true) .includeColumn(includeColumns: _*).build() // about to start using the GPU From 721820e48a5bc69a4281db8a2709b3e89bf0c438 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Tue, 14 Dec 2021 17:46:28 -0600 Subject: [PATCH 2/7] Remove additional tests --- .../src/main/python/parquet_test.py | 45 ------------------- 1 file changed, 45 deletions(-) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 2185449eb4d..b7b36c4885e 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -91,26 +91,6 @@ def test_parquet_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader conf=all_confs) -@pytest.mark.parametrize('parquet_gens', [decimal_128_gens_no_neg], ids=idfn) -@pytest.mark.parametrize('read_func', [read_parquet_df]) -@pytest.mark.parametrize('reader_confs', reader_opt_confs) -@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_parquet_read_round_trip_decimal128(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): - gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] - data_path = spark_tmp_path + '/PARQUET_DATA' - print ('KUHU tmp path = ' + str(spark_tmp_path)) - with_cpu_session( - lambda spark : gen_df(spark, gen_list).write.parquet(data_path), - conf=rebase_write_corrected_conf) - all_confs = copy_and_update(reader_confs, { - 'spark.sql.sources.useV1SourceList': v1_enabled_list, - # set the int96 rebase mode values because its LEGACY in databricks which will preclude this op from running on GPU - 'spark.sql.legacy.parquet.int96RebaseModeInRead' : 'CORRECTED', - 'spark.sql.legacy.parquet.datetimeRebaseModeInRead': 'CORRECTED'}) - # once https://github.com/NVIDIA/spark-rapids/issues/1126 is in we can remove spark.sql.legacy.parquet.datetimeRebaseModeInRead config which is a workaround - # for nested timestamp/date support - assert_gpu_and_cpu_are_equal_collect(lambda spark : spark.read.parquet(data_path), conf=all_confs) - @allow_non_gpu('FileSourceScanExec') @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.parquet.read.enabled']) @@ -303,31 +283,6 @@ def test_parquet_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader conf=all_confs) -@pytest.mark.parametrize('reader_confs', reader_opt_confs) -@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_parquet_simple_partitioned_read_decimal128(spark_tmp_path, v1_enabled_list, reader_confs): - # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed - # we should go with a more standard set of generators - parquet_gens = decimal_128_gens_no_neg - gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] - first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0/key2=20' - with_cpu_session( - lambda spark : gen_df(spark, gen_list).write.parquet(first_data_path), - conf=rebase_write_legacy_conf) - second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1/key2=21' - with_cpu_session( - lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path), - conf=rebase_write_corrected_conf) - third_data_path = spark_tmp_path + '/PARQUET_DATA/key=2/key2=22' - with_cpu_session( - lambda spark : gen_df(spark, gen_list).write.parquet(third_data_path), - conf=rebase_write_corrected_conf) - data_path = spark_tmp_path + '/PARQUET_DATA' - all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) - assert_gpu_and_cpu_are_equal_collect( - lambda spark : spark.read.parquet(data_path), - conf=all_confs) - # In this we are reading the data, but only reading the key the data was partitioned by @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) From 0e3ad04dbb6379656b9aa3f44ff8d6d97014a600 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Thu, 16 Dec 2021 14:38:20 -0600 Subject: [PATCH 3/7] Some map tests and uint64 changes after basic manual tests --- integration_tests/src/main/python/data_gen.py | 1 + integration_tests/src/main/python/parquet_write_test.py | 2 +- .../scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala | 6 ++++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 2d5807efafe..5c3a8e33649 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -949,6 +949,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): decimal_64_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(7, 3, nullable=False), DecimalGen(12, 2, nullable=False), DecimalGen(18, -3, nullable=False)]] decimal_128_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(20, 2, nullable=False), DecimalGen(36, 5, nullable=False), DecimalGen(38, 38, nullable=False), DecimalGen(36, -5, nullable=False)]] +decimal_128_no_neg_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(20, 2, nullable=False), DecimalGen(36, 5, nullable=False), DecimalGen(38, 38, nullable=False)]] # Some map gens, but not all because of nesting map_gens_sample = all_basic_map_gens + [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10), diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 2a64976aad3..99a616cbb36 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -56,7 +56,7 @@ def limited_int96(): parquet_basic_map_gens = [MapGen(f(nullable=False), f()) for f in [BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen, - limited_timestamp]] + [simple_string_to_string_map_gen] + limited_timestamp]] + [simple_string_to_string_map_gen] + decimal_128_no_neg_map_gens parquet_struct_gen = [StructGen([['child' + str(ind), sub_gen] for ind, sub_gen in enumerate(parquet_basic_gen)]), StructGen([['child0', StructGen([['child1', byte_gen]])]]), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index e94ea0e77ca..e9a70c67d34 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -787,7 +787,8 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics field => { if (field.isPrimitive) { val t = field.getOriginalType - (t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32) + (t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || + (t == OriginalType.UINT_32) || (t == OriginalType.UINT_64) } else { existsUnsignedType(field.asGroupType) } @@ -796,7 +797,8 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } def needDecimalCast(cv: ColumnView, dt: DataType): Boolean = { - cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) + cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) || + cv.getType.equals(DType.UINT64) } def needUnsignedToSignedCast(cv: ColumnView, dt: DataType): Boolean = { From 85e4ae14b7f131ae05717499f0d223d85c25d410 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Tue, 21 Dec 2021 08:54:47 -0600 Subject: [PATCH 4/7] Add comment for Uint64 --- .../scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index e9a70c67d34..c1391694cee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -797,6 +797,10 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } def needDecimalCast(cv: ColumnView, dt: DataType): Boolean = { + // UINT64 is casted to Decimal(20,0) by Spark to accommodate + // the largest possible values this type can take. Other Unsigned data types are converted to + // basic types like LongType, this is analogous to that except we spill over to large + // decimal/ints. cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) || cv.getType.equals(DType.UINT64) } From 13f8d7cf2b6b5be00b9ea61b908e218cba2a1976 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Tue, 21 Dec 2021 11:44:07 -0600 Subject: [PATCH 5/7] Add support documentation --- docs/supported_ops.md | 18 +++++++++--------- .../com/nvidia/spark/rapids/GpuOverrides.scala | 4 ++-- .../src/main/resources/supportedDataSource.csv | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 100cf7acee3..03700e75ec5 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -626,7 +626,7 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
128bit decimal only supported for Orc
+PS
128bit decimal only supported for Orc and Parquet
NS NS NS @@ -17462,13 +17462,13 @@ dates or timestamps, or for a lack of type coercion support. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
NS @@ -17483,13 +17483,13 @@ dates or timestamps, or for a lack of type coercion support. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
NS diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 7ef252fedb4..92d313d24b9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -824,8 +824,8 @@ object GpuOverrides extends Logging { cudfWrite = TypeSig.none, sparkSig = TypeSig.atomics)), (ParquetFormatType, FileFormatChecks( - cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT + TypeSig.ARRAY + - TypeSig.MAP).nested(), + cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT + + TypeSig.ARRAY + TypeSig.MAP).nested(), cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + diff --git a/tools/src/main/resources/supportedDataSource.csv b/tools/src/main/resources/supportedDataSource.csv index e003ee02d9a..8e9b8ea1186 100644 --- a/tools/src/main/resources/supportedDataSource.csv +++ b/tools/src/main/resources/supportedDataSource.csv @@ -2,5 +2,5 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING, CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -Parquet,read,S,S,S,S,S,S,S,S,PS,S,PS,NA,NS,NA,PS,PS,PS,NS +Parquet,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS Parquet,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA From 37ff0906c3991e5d93e4ec208510cde334d94176 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Wed, 22 Dec 2021 11:38:12 -0600 Subject: [PATCH 6/7] Add scala test for UINT64 parquet Signed-off-by: Kuhu Shukla --- tests/src/test/resources/test_unsigned64.parquet | Bin 0 -> 1089 bytes .../nvidia/spark/rapids/ParquetScanSuite.scala | 13 +++++++++++++ 2 files changed, 13 insertions(+) create mode 100644 tests/src/test/resources/test_unsigned64.parquet diff --git a/tests/src/test/resources/test_unsigned64.parquet b/tests/src/test/resources/test_unsigned64.parquet new file mode 100644 index 0000000000000000000000000000000000000000..5d828e3a2399be0bdffa40cc1f958de9a995a286 GIT binary patch literal 1089 zcmcIk&1(};5P$pjCEact(3-d1l@)vG5@;H2OiGa!I-7o=rkVynQV=AKOA<`dn52!~ zJc>t8UIfqLAK=NeM-l%PPlC?8-K|yHV+Y=wnfGRX^PAbcRG_*fg{;W+vNWZbVJssw zHmNavazd~~%2=IXq5O_SGHJ=nS&EiPnRJ&~#)$81LyMwOO=URlrl#w!x0v@$bER;K zf+Zb0HEehLo#4q~yEj@cNN$yu)nK4`L4mTASi67uH6 zdn#qHX6|+7VL`{iKdpHv9QP9Qq($*Jlz2V+3-TTQ^gQpZZr9tjIE;2hgY_x)IT?*`Y zp8q$zdAI!O(mH-NR^;}H`jX3Jd6kZZt+>$SC|z;-n`N1f>txU2>zwCcthd54k+bz( zPc!DY_B2$h_a5C02W5Ze*GLwCWLNC~cL2uVH(?8)38khw^?iUHvPuBANTVNX^vLuJ z(K#Y)Am|FZn>z@CNeX(COjX4nlZABv_G-6Z)^0rP@Ar1|chN^&jHCFC2SiOBwQ$E! z+VDOWg;%dtJNpX0v_SF7#2~DTr1k(yDc%t%<@ylfJqw)}4yZgd@|{s*`$gvjr)`=8 zLZXg`w6@0VtNr@m`C%|xZ1nq!_;WrO&m9GWVf&yrSIAw@Ejd|yL-dT^<4dUEpT{3V Cmf+_A literal 0 HcmV?d00001 diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index 4ba3808175a..a2787fc5451 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -138,4 +138,17 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) { frame => frame.select(col("*")) } + + /** Parquet file with 2 columns + * + * + */ + * + testSparkResultsAreEqual("Test Parquet unsigned int: uint64", + frameFromParquet("test_unsigned64.parquet"), + // CPU version throws an exception when Spark < 3.2, so skip when Spark < 3.2. + // The exception is like "Parquet type not supported: INT32 (UINT_8)" + assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) { + frame => frame.select(col("*")) + } } From e231a1bd72617dc50e31ff590ed749436b8b5c20 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Wed, 22 Dec 2021 11:44:52 -0600 Subject: [PATCH 7/7] Fix doc indent Signed-off-by: Kuhu Shukla --- .../com/nvidia/spark/rapids/ParquetScanSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index a2787fc5451..38515b90f55 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -139,11 +139,12 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { frame => frame.select(col("*")) } - /** Parquet file with 2 columns - * - * - */ - * + /** + * Parquet file with 2 columns + * + * + */ + testSparkResultsAreEqual("Test Parquet unsigned int: uint64", frameFromParquet("test_unsigned64.parquet"), // CPU version throws an exception when Spark < 3.2, so skip when Spark < 3.2.