From 11761c3701e43c0805158fec9e3c555bdf2d1b12 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 12 Mar 2024 16:26:06 -0500 Subject: [PATCH 1/5] Update JsonToStructs and ScanJson to have white space normalization Signed-off-by: Robert (Bobby) Evans --- .../src/main/python/json_matrix_test.py | 297 +++++++++++++++++- .../test/resources/int_array_formatted.json | 12 + .../int_mixed_array_struct_formatted.json | 21 ++ .../test/resources/int_struct_formatted.json | 5 + .../nvidia/spark/rapids/ColumnCastUtil.scala | 149 +++++---- .../spark/sql/rapids/GpuJsonReadCommon.scala | 52 ++- 6 files changed, 457 insertions(+), 79 deletions(-) create mode 100644 integration_tests/src/test/resources/int_array_formatted.json create mode 100644 integration_tests/src/test/resources/int_mixed_array_struct_formatted.json create mode 100644 integration_tests/src/test/resources/int_struct_formatted.json diff --git a/integration_tests/src/main/python/json_matrix_test.py b/integration_tests/src/main/python/json_matrix_test.py index 823d8e0fe0e..ebc91a45f1e 100644 --- a/integration_tests/src/main/python/json_matrix_test.py +++ b/integration_tests/src/main/python/json_matrix_test.py @@ -52,14 +52,16 @@ def read_json_as_text(spark, data_path, column_name): 'spark.rapids.sql.format.json.read.enabled': 'true', 'spark.rapids.sql.json.read.float.enabled': 'true', 'spark.rapids.sql.json.read.double.enabled': 'true', - 'spark.rapids.sql.json.read.decimal.enabled': 'true' + 'spark.rapids.sql.json.read.decimal.enabled': 'true', + 'spark.rapids.sql.json.read.mixedTypesAsString.enabled': 'true' } _enable_json_to_structs_conf = { 'spark.rapids.sql.expression.JsonToStructs': 'true', 'spark.rapids.sql.json.read.float.enabled': 'true', 'spark.rapids.sql.json.read.double.enabled': 'true', - 'spark.rapids.sql.json.read.decimal.enabled': 'true' + 'spark.rapids.sql.json.read.decimal.enabled': 'true', + 'spark.rapids.sql.json.read.mixedTypesAsString.enabled': 'true' } _enable_get_json_object_conf = { @@ -601,7 +603,10 @@ def test_json_tuple_dec_locale_non_aribic(std_input_path): "sci_formatted_strings.json", "decimal_locale_formatted_strings.json", "single_quoted_strings.json", - "boolean_formatted.json"] + "boolean_formatted.json", + "int_array_formatted.json", + "int_struct_formatted.json", + "int_mixed_array_struct_formatted.json"] @pytest.mark.parametrize('input_file', COMMON_TEST_FILES) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same @@ -672,7 +677,20 @@ def test_from_json_longs(std_input_path, input_file): conf =_enable_json_to_structs_conf) @pytest.mark.parametrize('dt', [DecimalType(38,0), DecimalType(38,10), DecimalType(10,2)], ids=idfn) -@pytest.mark.parametrize('input_file', COMMON_TEST_FILES) +@pytest.mark.parametrize('input_file', [ + "int_formatted.json", + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), + "int_formatted_strings.json", + "float_formatted_strings.json", + "sci_formatted_strings.json", + "decimal_locale_formatted_strings.json", + "single_quoted_strings.json", + "boolean_formatted.json", + "invalid_ridealong_columns.json", + "int_array_formatted.json", + "int_struct_formatted.json", + "int_mixed_array_struct_formatted.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_decs(std_input_path, read_func, spark_tmp_table_factory, input_file, dt): assert_gpu_and_cpu_are_equal_collect( @@ -682,7 +700,20 @@ def test_scan_json_decs(std_input_path, read_func, spark_tmp_table_factory, inpu conf=_enable_all_types_json_scan_conf) @pytest.mark.parametrize('dt', [DecimalType(38,0), DecimalType(38,10), DecimalType(10,2)], ids=idfn) -@pytest.mark.parametrize('input_file', COMMON_TEST_FILES) +@pytest.mark.parametrize('input_file', [ + "int_formatted.json", + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), + "int_formatted_strings.json", + "float_formatted_strings.json", + "sci_formatted_strings.json", + "decimal_locale_formatted_strings.json", + "single_quoted_strings.json", + "boolean_formatted.json", + "invalid_ridealong_columns.json", + "int_array_formatted.json", + "int_struct_formatted.json", + "int_mixed_array_struct_formatted.json"]) @allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 def test_from_json_decs(std_input_path, input_file, dt): schema = StructType([StructField("data", dt)]) @@ -701,7 +732,10 @@ def test_from_json_decs(std_input_path, input_file, dt): "decimal_locale_formatted_strings.json", pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://github.com/NVIDIA/spark-rapids/issues/10495')), pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10479')), - pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534'))]) + pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278')), + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278'))]) @pytest.mark.parametrize('read_func', [read_json_df]) def test_scan_json_strings(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -720,7 +754,10 @@ def test_scan_json_strings(std_input_path, read_func, spark_tmp_table_factory, i "decimal_locale_formatted_strings.json", "single_quoted_strings.json", pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10479')), - pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534'))]) + pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278')), + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278'))]) @allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 def test_from_json_strings(std_input_path, input_file): schema = StructType([StructField("data", StringType())]) @@ -738,7 +775,10 @@ def test_from_json_strings(std_input_path, input_file): "decimal_locale_formatted_strings.json", "single_quoted_strings.json", pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')), - pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534'))]) + pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')), + pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')), + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218'))]) @allow_non_gpu(TEXT_INPUT_EXEC) def test_get_json_object_formats(std_input_path, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -755,7 +795,10 @@ def test_get_json_object_formats(std_input_path, input_file): "decimal_locale_formatted_strings.json", "single_quoted_strings.json", pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')), - pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534'))]) + pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')), + pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')), + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218'))]) @allow_non_gpu(TEXT_INPUT_EXEC) def test_json_tuple_formats(std_input_path, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -783,13 +826,16 @@ def test_from_json_bools(std_input_path, input_file): @pytest.mark.parametrize('input_file', [ "int_formatted.json", pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10481')), - "sci_formatted.json", + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "int_formatted_strings.json", pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060')), "sci_formatted_strings.json", "decimal_locale_formatted_strings.json", "single_quoted_strings.json", - "boolean_formatted.json"]) + "boolean_formatted.json", + "int_array_formatted.json", + "int_struct_formatted.json", + "int_mixed_array_struct_formatted.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) def test_scan_json_floats(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -802,13 +848,16 @@ def test_scan_json_floats(std_input_path, read_func, spark_tmp_table_factory, in @pytest.mark.parametrize('input_file', [ "int_formatted.json", pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10481')), - "sci_formatted.json", + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "int_formatted_strings.json", pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060')), "sci_formatted_strings.json", "decimal_locale_formatted_strings.json", "single_quoted_strings.json", - "boolean_formatted.json"]) + "boolean_formatted.json", + "int_array_formatted.json", + "int_struct_formatted.json", + "int_mixed_array_struct_formatted.json"]) @allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 def test_from_json_floats(std_input_path, input_file): schema = StructType([StructField("data", FloatType())]) @@ -820,13 +869,16 @@ def test_from_json_floats(std_input_path, input_file): @pytest.mark.parametrize('input_file', [ "int_formatted.json", pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10481')), - "sci_formatted.json", + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "int_formatted_strings.json", pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060')), "sci_formatted_strings.json", "decimal_locale_formatted_strings.json", "single_quoted_strings.json", - "boolean_formatted.json"]) + "boolean_formatted.json", + "int_array_formatted.json", + "int_struct_formatted.json", + "int_mixed_array_struct_formatted.json"]) @pytest.mark.parametrize('read_func', [read_json_df]) def test_scan_json_doubles(std_input_path, read_func, spark_tmp_table_factory, input_file): assert_gpu_and_cpu_are_equal_collect( @@ -839,13 +891,16 @@ def test_scan_json_doubles(std_input_path, read_func, spark_tmp_table_factory, i @pytest.mark.parametrize('input_file', [ "int_formatted.json", pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10481')), - "sci_formatted.json", + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15280')), "int_formatted_strings.json", pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(condition=is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060')), "sci_formatted_strings.json", "decimal_locale_formatted_strings.json", "single_quoted_strings.json", - "boolean_formatted.json"]) + "boolean_formatted.json", + "int_array_formatted.json", + "int_struct_formatted.json", + "int_mixed_array_struct_formatted.json"]) @allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 def test_from_json_doubles(std_input_path, input_file): schema = StructType([StructField("data", DoubleType())]) @@ -853,4 +908,212 @@ def test_from_json_doubles(std_input_path, input_file): lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), conf =_enable_json_to_structs_conf) +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10573')), + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same +def test_scan_json_long_arrays(std_input_path, read_func, spark_tmp_table_factory, input_file): + assert_gpu_and_cpu_are_equal_collect( + read_func(std_input_path + '/' + input_file, + StructType([StructField("data", ArrayType(LongType()))]), + spark_tmp_table_factory), + conf=_enable_all_types_json_scan_conf) + +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10573')), + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 +def test_from_json_long_arrays(std_input_path, input_file): + schema = StructType([StructField("data", ArrayType(LongType()))]) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), + conf =_enable_json_to_structs_conf) + +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10574')), + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same +def test_scan_json_string_arrays(std_input_path, read_func, spark_tmp_table_factory, input_file): + assert_gpu_and_cpu_are_equal_collect( + read_func(std_input_path + '/' + input_file, + StructType([StructField("data", ArrayType(StringType()))]), + spark_tmp_table_factory), + conf=_enable_all_types_json_scan_conf) + +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10574')), + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 +def test_from_json_string_arrays(std_input_path, input_file): + schema = StructType([StructField("data", ArrayType(StringType()))]) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), + conf =_enable_json_to_structs_conf) + +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "int_array_formatted.json", + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same +def test_scan_json_long_structs(std_input_path, read_func, spark_tmp_table_factory, input_file): + assert_gpu_and_cpu_are_equal_collect( + read_func(std_input_path + '/' + input_file, + StructType([StructField("data", StructType([StructField("A", LongType()),StructField("B", LongType())]))]), + spark_tmp_table_factory), + conf=_enable_all_types_json_scan_conf) + +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "int_array_formatted.json", + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 +def test_from_json_long_structs(std_input_path, input_file): + schema = StructType([StructField("data", StructType([StructField("A", LongType()),StructField("B", LongType())]))]) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), + conf =_enable_json_to_structs_conf) + +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "int_array_formatted.json", + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) + +@pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same +def test_scan_json_string_structs(std_input_path, read_func, spark_tmp_table_factory, input_file): + assert_gpu_and_cpu_are_equal_collect( + read_func(std_input_path + '/' + input_file, + StructType([StructField("data", StructType([StructField("A", StringType()),StructField("B", StringType())]))]), + spark_tmp_table_factory), + conf=_enable_all_types_json_scan_conf) + +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + "int_array_formatted.json", + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 +def test_from_json_string_structs(std_input_path, input_file): + schema = StructType([StructField("data", StructType([StructField("A", StringType()),StructField("B", StringType())]))]) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), + conf =_enable_json_to_structs_conf) + +@pytest.mark.parametrize('dt', [DecimalType(38,0), DecimalType(10,2)], ids=idfn) +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10573')), # This does not fail on 38,0 + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same +def test_scan_json_dec_arrays(std_input_path, read_func, spark_tmp_table_factory, input_file, dt): + assert_gpu_and_cpu_are_equal_collect( + read_func(std_input_path + '/' + input_file, + StructType([StructField("data", ArrayType(dt))]), + spark_tmp_table_factory), + conf=_enable_all_types_json_scan_conf) + +@pytest.mark.parametrize('dt', [DecimalType(38,0), DecimalType(10,2)], ids=idfn) +@pytest.mark.parametrize('input_file', [ + pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("float_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("sci_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("decimal_locale_formatted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), + pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10573')), # This does not fail on 38,0 + "int_struct_formatted.json", + pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) +@allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 +def test_from_json_dec_arrays(std_input_path, input_file, dt): + schema = StructType([StructField("data", ArrayType(dt))]) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : read_json_as_text(spark, std_input_path + '/' + input_file, "json").select(f.col('json'), f.from_json(f.col('json'), schema)), + conf =_enable_json_to_structs_conf) + diff --git a/integration_tests/src/test/resources/int_array_formatted.json b/integration_tests/src/test/resources/int_array_formatted.json new file mode 100644 index 00000000000..e241cfe58c1 --- /dev/null +++ b/integration_tests/src/test/resources/int_array_formatted.json @@ -0,0 +1,12 @@ +{"data": [1,0]} +{"data": [-1,,100]} +{"data": [-0,5,6,7 ,8 , 9 ]} +{"data": [0]} +{"data": [127, -128]} +{"data": []} +{"data": [32767, -32768]} +{"data": [2147483647, -2147483648]} +{"data": [9223372036854775807,-9223372036854775808]} +{"data": [9223372036854775808, -9223372036854775809]} +{"data": [99999999999999999999999999999999999999, -99999999999999999999999999999999999999]} +{"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]} diff --git a/integration_tests/src/test/resources/int_mixed_array_struct_formatted.json b/integration_tests/src/test/resources/int_mixed_array_struct_formatted.json new file mode 100644 index 00000000000..9a48be7ff2b --- /dev/null +++ b/integration_tests/src/test/resources/int_mixed_array_struct_formatted.json @@ -0,0 +1,21 @@ +{"data": {"A": 0, "B": 1}} +{"data": [1,0]} +{"data": {"A": 1}} +{"data": [-1,,100]} +{"data": {"B": 50}} +{"data": [0]} +{"data": null} +{"data": []} +{"data": {"B": -128, "A": 127}} +{"data": [127, -128]} +{"data": {"A": 32767, "B": -32767}} +{"data": [32767, -32768]} +{"data": {"A": 214783647, "B": -2147483648}} +{"data": [2147483647, -2147483648]} +{"data": {"A": 9223372036854775807, "B": -9223372036854775808}} +{"data": [9223372036854775807,-9223372036854775808]} +{"data": {"A": 9223372036854775808,, "B": -9223372036854775809}} +{"data": [9223372036854775808, -9223372036854775809]} +{"data": {"B": 99999999999999999999999999999999999999, "A": -99999999999999999999999999999999999999}} +{"data": [99999999999999999999999999999999999999, -99999999999999999999999999999999999999]} +{"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]} diff --git a/integration_tests/src/test/resources/int_struct_formatted.json b/integration_tests/src/test/resources/int_struct_formatted.json new file mode 100644 index 00000000000..e3ac75fbf14 --- /dev/null +++ b/integration_tests/src/test/resources/int_struct_formatted.json @@ -0,0 +1,5 @@ +{"data": {"A": 0, "B": 1}} +{"data": {"A": 1}} +{"data": {"B": 50}} +{"data": {"B": -128, "A": 127}} +{"data": {"B": 99999999999999999999, "A": -9999999999999999999}} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala index 9dca6b87155..2ef075792ec 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala @@ -45,12 +45,18 @@ object ColumnCastUtil { * * @param cv the view to be updated * @param dt the Spark's data type of the input view (if applicable) + * @param nestedMismatchHandler a function that can handle a mismatch between nesting. This can + * include things like when a STRING is found, but a nested type is + * needed, or when a nested value is returned by CUDF but a + * non-nested type is expected. * @param convert the partial function used to convert the data. If this matches and returns * a updated view this function takes ownership of that view. * @return None if there were no changes to the view or the updated view along with anything else * that needs to be closed. */ - def deepTransformView(cv: ColumnView, dt: Option[DataType] = None) + def deepTransformView(cv: ColumnView, dt: Option[DataType] = None, + nestedMismatchHandler: Option[(ColumnView, DataType) => + (Option[ColumnView], ArrayBuffer[AutoCloseable])] = None) (convert: PartialFunction[(ColumnView, Option[DataType]), ColumnView]): (Option[ColumnView], ArrayBuffer[AutoCloseable]) = { closeOnExcept(ArrayBuffer.empty[AutoCloseable]) { needsClosing => @@ -64,71 +70,90 @@ object ColumnCastUtil { // Recurse down if needed and check children cv.getType.getTypeId match { case DType.DTypeEnum.STRUCT => - withResource(ArrayBuffer.empty[ColumnView]) { tmpNeedsClosed => - val structFields = dt match { - case None => Array.empty[StructField] - case Some(t: StructType) => t.fields - case Some(t) => /* this should never be reach out */ + val (structFields, transformedCv) = dt match { + case None => (Array.empty[StructField], None) + case Some(t: StructType) => (t.fields, None) + case Some(t) => + nestedMismatchHandler.map { handler => + // The fields is ignored + (Array.empty[StructField], Some(handler(cv, t))) + }.getOrElse { throw new IllegalStateException("Invalid input DataType: " + - s"Expect StructType but got ${t.toString}") - } - var childrenUpdated = false - val newChildren = ArrayBuffer.empty[ColumnView] - (0 until cv.getNumChildren).foreach { index => - val child = cv.getChildColumnView(index) - tmpNeedsClosed += child - val childDt = if (structFields.nonEmpty) { - Some(structFields(index).dataType) - } else { - None + s"CUDF returned STRUCT Spark asked for ${t.toString}") } - val (updatedChild, needsClosingChild) = deepTransformView(child, childDt)(convert) - needsClosing ++= needsClosingChild - updatedChild match { - case Some(newChild) => - newChildren += newChild - childrenUpdated = true - case None => - newChildren += child + } + transformedCv.map { + case (updatedData, needsClosingData) => + needsClosing ++= needsClosingData + (updatedData, needsClosing) + }.getOrElse { + withResource(ArrayBuffer.empty[ColumnView]) { tmpNeedsClosed => + var childrenUpdated = false + val newChildren = ArrayBuffer.empty[ColumnView] + (0 until cv.getNumChildren).foreach { index => + val child = cv.getChildColumnView(index) + tmpNeedsClosed += child + val childDt = if (structFields.nonEmpty) { + Some(structFields(index).dataType) + } else { + None + } + val (updatedChild, needsClosingChild) = deepTransformView(child, childDt, + nestedMismatchHandler)(convert) + needsClosing ++= needsClosingChild + updatedChild match { + case Some(newChild) => + newChildren += newChild + childrenUpdated = true + case None => + newChildren += child + } } - } - if (childrenUpdated) { - withResource(cv.getValid) { valid => - val ret = new ColumnView(DType.STRUCT, cv.getRowCount, - Optional.empty[java.lang.Long](), valid, null, newChildren.toArray) - (Some(ret), needsClosing) + if (childrenUpdated) { + withResource(cv.getValid) { valid => + val ret = new ColumnView(DType.STRUCT, cv.getRowCount, + Optional.empty[java.lang.Long](), valid, null, newChildren.toArray) + (Some(ret), needsClosing) + } + } else { + (None, needsClosing) } - } else { - (None, needsClosing) } } case DType.DTypeEnum.LIST => - withResource(cv.getChildColumnView(0)) { child => - // A ColumnView of LIST type may have data type is ArrayType or MapType in Spark. - // If it is a MapType, its child will be a column of type struct. - // In such cases, we need to generate the corresponding Spark's data type - // for the child column as a StructType. - val childDt = dt match { - case None => None - case Some(t: ArrayType) => Some(t.elementType) - case Some(_: BinaryType) => Some(ByteType) - case Some(t: MapType) => Some(StructType(Array( - StructField("key", t.keyType, nullable = false), - StructField("value", t.valueType, nullable = t.valueContainsNull)))) - case Some(t) => /* this should never be reach out */ - throw new IllegalStateException("Invalid input DataType: " + - s"Expect ArrayType/BinaryType/MapType but got ${t.toString}") - } - val (updatedData, needsClosingData) = deepTransformView(child, childDt)(convert) - needsClosing ++= needsClosingData - updatedData match { - case Some(updated) => - (Some(GpuListUtils.replaceListDataColumnAsView(cv, updated)), needsClosing) - case None => - (None, needsClosing) + // A ColumnView of LIST was found. There are some types that we can auto-transform, + // but, in some cases we need to fall back to other processing. + val (childDt, transformedResult) = dt match { + case None => (None, None) + case Some(t: ArrayType) => (Some(t.elementType), None) + case Some(_: BinaryType) => (Some(ByteType), None) + case Some(t: MapType) => (Some(StructType(Array( + StructField("key", t.keyType, nullable = false), + StructField("value", t.valueType, nullable = t.valueContainsNull)))), None) + case Some(t) => + nestedMismatchHandler.map { handler => + (None, Some(handler(cv, t))) + }.getOrElse { + withResource(cv.getChildColumnView(0)) { child => + throw new IllegalStateException("Invalid input DataType: " + + s"CUDF returned LIST[${child.getType}] We expect Spark to want an " + + s"ArrayType/BinaryType/MapType but got ${t.toString}") + } + } + } + val (updatedData, needsClosingData) = transformedResult.getOrElse { + withResource(cv.getChildColumnView(0)) { child => + val (ud, nc) = deepTransformView(child, childDt, nestedMismatchHandler)(convert) + ud match { + case Some(updated) => + (Some(GpuListUtils.replaceListDataColumnAsView(cv, updated)), nc) + case None => + (None, nc) + } } } - + needsClosing ++= needsClosingData + (updatedData, needsClosing) case _ => (None, needsClosing) } @@ -143,13 +168,19 @@ object ColumnCastUtil { * * @param cv the vector to be updated * @param dt the Spark's data type of the input vector (if applicable) + * @param nestedMismatchHandler a function that can handle a mismatch between nesting. This can + * include things like when a STRING is found, but a nested type is + * needed, or when a nested value is returned by CUDF but a + * non-nested type is expected. * @param convert the partial function used to convert the data. If this matches and returns * a updated view this function takes ownership of that view. * @return the updated vector */ - def deepTransform(cv: ColumnVector, dt: Option[DataType] = None) + def deepTransform(cv: ColumnVector, dt: Option[DataType] = None, + nestedMismatchHandler: Option[(ColumnView, DataType) => + (Option[ColumnView], ArrayBuffer[AutoCloseable])] = None) (convert: PartialFunction[(ColumnView, Option[DataType]), ColumnView]): ColumnVector = { - val (retView, needsClosed) = deepTransformView(cv, dt)(convert) + val (retView, needsClosed) = deepTransformView(cv, dt, nestedMismatchHandler)(convert) withResource(needsClosed) { _ => retView match { case Some(updated) => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala index 20f3e202ced..8ac8dcd6e02 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala @@ -19,15 +19,17 @@ package org.apache.spark.sql.rapids import java.util.Locale +import scala.collection.mutable.ArrayBuffer + import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, RegexProgram, Scalar, Schema, Table} -import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuTextBasedPartitionReader} +import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuScalar, GpuTextBasedPartitionReader} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray import com.nvidia.spark.rapids.jni.CastStrings import org.apache.spark.sql.catalyst.json.{GpuJsonUtils, JSONOptions} import org.apache.spark.sql.rapids.shims.GpuJsonToStructsShim -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{DataType, _} /** * This is a utility method intended to provide common functionality between JsonToStructs and @@ -266,10 +268,53 @@ object GpuJsonReadCommon { private def timestampFormat(options: JSONOptions): String = GpuJsonUtils.timestampFormatInRead(options) + private def throwMismatchException(cv: ColumnView, + dt: DataType): (Option[ColumnView], ArrayBuffer[AutoCloseable]) = { + throw new IllegalStateException(s"Don't know how to transform $cv to $dt for JSON") + } + + private def nestedColumnViewMismatchTransform(cv: ColumnView, + dt: DataType): (Option[ColumnView], ArrayBuffer[AutoCloseable]) = { + // In the future we should be able to convert strings to maps/etc, but for + // now we are working around issues where CUDF is not returning a STRING for nested + // types when asked for it. + cv.getType match { + case DType.LIST => + dt match { + case ByteType | ShortType | IntegerType | LongType | + BooleanType | FloatType | DoubleType | + _: DecimalType | _: StructType => + // This is all nulls + val rows = cv.getRowCount().toInt + val ret = withResource(GpuScalar.from(null, dt)) { nullScalar => + ColumnVector.fromScalar(nullScalar, rows) + } + (Some(ret.asInstanceOf[ColumnView]), ArrayBuffer(ret)) + case _ => + throwMismatchException(cv, dt) + } + case DType.STRUCT => + dt match { + case _: ArrayType => + // This is all nulls + val rows = cv.getRowCount().toInt + val ret = withResource(GpuScalar.from(null, dt)) { nullScalar => + ColumnVector.fromScalar(nullScalar, rows) + } + (Some(ret.asInstanceOf[ColumnView]), ArrayBuffer(ret)) + case _ => + throwMismatchException(cv, dt) + } + case _ => + throwMismatchException(cv, dt) + } + } + private def convertToDesiredType(inputCv: ColumnVector, topLevelType: DataType, options: JSONOptions): ColumnVector = { - ColumnCastUtil.deepTransform(inputCv, Some(topLevelType)) { + ColumnCastUtil.deepTransform(inputCv, Some(topLevelType), + Some(nestedColumnViewMismatchTransform)) { case (cv, Some(BooleanType)) if cv.getType == DType.STRING => castJsonStringToBool(cv) case (cv, Some(DateType)) if cv.getType == DType.STRING => @@ -324,6 +369,7 @@ object GpuJsonReadCommon { ai.rapids.cudf.JSONOptions.builder() .withRecoverWithNull(true) .withMixedTypesAsStrings(enableMixedTypes) + .withNormalizeWhitespace(true) .withKeepQuotes(true) .withNormalizeSingleQuotes(options.allowSingleQuotes) .build() From 8369bd9c7f4b9fc3c2c81e06dd04a03a688a2ee4 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 13 Mar 2024 15:37:49 -0500 Subject: [PATCH 2/5] Some more fixes --- integration_tests/src/main/python/json_matrix_test.py | 4 ++-- integration_tests/src/main/python/json_test.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/json_matrix_test.py b/integration_tests/src/main/python/json_matrix_test.py index ebc91a45f1e..0156a5c1c9a 100644 --- a/integration_tests/src/main/python/json_matrix_test.py +++ b/integration_tests/src/main/python/json_matrix_test.py @@ -1001,7 +1001,7 @@ def test_from_json_string_arrays(std_input_path, input_file): pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "int_array_formatted.json", - "int_struct_formatted.json", + pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(condition=is_before_spark_342(),reason='https://github.com/NVIDIA/spark-rapids/issues/10588')), pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) @pytest.mark.parametrize('read_func', [read_json_df]) # we have done so many tests already that we don't need both read func. They are the same def test_scan_json_long_structs(std_input_path, read_func, spark_tmp_table_factory, input_file): @@ -1022,7 +1022,7 @@ def test_scan_json_long_structs(std_input_path, read_func, spark_tmp_table_facto pytest.param("single_quoted_strings.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260')), "int_array_formatted.json", - "int_struct_formatted.json", + pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(condition=is_before_spark_342(),reason='https://github.com/NVIDIA/spark-rapids/issues/10588')), pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15260'))]) @allow_non_gpu(TEXT_INPUT_EXEC, *non_utc_allow) # https://github.com/NVIDIA/spark-rapids/issues/10453 def test_from_json_long_structs(std_input_path, input_file): diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 890eb43b4a8..c4759669ce5 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -314,7 +314,7 @@ def do_read(spark): 'floats.json', 'floats_leading_zeros.json', 'floats_invalid.json', - pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), + 'floats_edge_cases.json', 'decimals.json', 'dates.json', 'dates_invalid.json', From ff77330fbce4bdf4d8a3bfc2f8ed5728e82a11d4 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 13 Mar 2024 15:54:04 -0500 Subject: [PATCH 3/5] Update reason for test failure --- integration_tests/src/main/python/json_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index c4759669ce5..6ddebd1b9eb 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -307,7 +307,7 @@ def do_read(spark): 'boolean.json', pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')), 'ints.json', - pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')), + pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4940')), # This fails for dates, as not all are invalid 'nan_and_inf.json', pytest.param('nan_and_inf_strings.json', marks=pytest.mark.skipif(is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060 fixed in Spark 3.3.0')), 'nan_and_inf_invalid.json', From 6079f1d450f03fa06af0080776461c21f4f78151 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 14 Mar 2024 10:14:15 -0500 Subject: [PATCH 4/5] Review comments --- .../main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala index 2ef075792ec..6a82447412a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,9 +56,9 @@ object ColumnCastUtil { */ def deepTransformView(cv: ColumnView, dt: Option[DataType] = None, nestedMismatchHandler: Option[(ColumnView, DataType) => - (Option[ColumnView], ArrayBuffer[AutoCloseable])] = None) + (Option[ColumnView], Seq[AutoCloseable])] = None) (convert: PartialFunction[(ColumnView, Option[DataType]), ColumnView]): - (Option[ColumnView], ArrayBuffer[AutoCloseable]) = { + (Option[ColumnView], Seq[AutoCloseable]) = { closeOnExcept(ArrayBuffer.empty[AutoCloseable]) { needsClosing => val updated = convert.lift((cv, dt)) needsClosing ++= updated From 0bda0fb868cadbdb636a656aff79706b1b90ee30 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 15 Mar 2024 08:34:30 -0500 Subject: [PATCH 5/5] Review comments and Scala2.13 fixes --- .../com/nvidia/spark/rapids/ColumnCastUtil.scala | 14 +++++++------- .../spark/sql/rapids/GpuJsonReadCommon.scala | 10 ++++------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala index 6a82447412a..af4bc53ab4c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnCastUtil.scala @@ -65,7 +65,7 @@ object ColumnCastUtil { updated match { case Some(newCv) => - (Some(newCv), needsClosing) + (Some(newCv), needsClosing.toSeq) case None => // Recurse down if needed and check children cv.getType.getTypeId match { @@ -85,7 +85,7 @@ object ColumnCastUtil { transformedCv.map { case (updatedData, needsClosingData) => needsClosing ++= needsClosingData - (updatedData, needsClosing) + (updatedData, needsClosing.toSeq) }.getOrElse { withResource(ArrayBuffer.empty[ColumnView]) { tmpNeedsClosed => var childrenUpdated = false @@ -113,10 +113,10 @@ object ColumnCastUtil { withResource(cv.getValid) { valid => val ret = new ColumnView(DType.STRUCT, cv.getRowCount, Optional.empty[java.lang.Long](), valid, null, newChildren.toArray) - (Some(ret), needsClosing) + (Some(ret), needsClosing.toSeq) } } else { - (None, needsClosing) + (None, needsClosing.toSeq) } } } @@ -153,9 +153,9 @@ object ColumnCastUtil { } } needsClosing ++= needsClosingData - (updatedData, needsClosing) + (updatedData, needsClosing.toSeq) case _ => - (None, needsClosing) + (None, needsClosing.toSeq) } } } @@ -178,7 +178,7 @@ object ColumnCastUtil { */ def deepTransform(cv: ColumnVector, dt: Option[DataType] = None, nestedMismatchHandler: Option[(ColumnView, DataType) => - (Option[ColumnView], ArrayBuffer[AutoCloseable])] = None) + (Option[ColumnView], Seq[AutoCloseable])] = None) (convert: PartialFunction[(ColumnView, Option[DataType]), ColumnView]): ColumnVector = { val (retView, needsClosed) = deepTransformView(cv, dt, nestedMismatchHandler)(convert) withResource(needsClosed) { _ => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala index 8ac8dcd6e02..cf908fbb557 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.rapids import java.util.Locale -import scala.collection.mutable.ArrayBuffer - import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, RegexProgram, Scalar, Schema, Table} import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuScalar, GpuTextBasedPartitionReader} import com.nvidia.spark.rapids.Arm.withResource @@ -269,12 +267,12 @@ object GpuJsonReadCommon { GpuJsonUtils.timestampFormatInRead(options) private def throwMismatchException(cv: ColumnView, - dt: DataType): (Option[ColumnView], ArrayBuffer[AutoCloseable]) = { + dt: DataType): (Option[ColumnView], Seq[AutoCloseable]) = { throw new IllegalStateException(s"Don't know how to transform $cv to $dt for JSON") } private def nestedColumnViewMismatchTransform(cv: ColumnView, - dt: DataType): (Option[ColumnView], ArrayBuffer[AutoCloseable]) = { + dt: DataType): (Option[ColumnView], Seq[AutoCloseable]) = { // In the future we should be able to convert strings to maps/etc, but for // now we are working around issues where CUDF is not returning a STRING for nested // types when asked for it. @@ -289,7 +287,7 @@ object GpuJsonReadCommon { val ret = withResource(GpuScalar.from(null, dt)) { nullScalar => ColumnVector.fromScalar(nullScalar, rows) } - (Some(ret.asInstanceOf[ColumnView]), ArrayBuffer(ret)) + (Some(ret.asInstanceOf[ColumnView]), Seq(ret)) case _ => throwMismatchException(cv, dt) } @@ -301,7 +299,7 @@ object GpuJsonReadCommon { val ret = withResource(GpuScalar.from(null, dt)) { nullScalar => ColumnVector.fromScalar(nullScalar, rows) } - (Some(ret.asInstanceOf[ColumnView]), ArrayBuffer(ret)) + (Some(ret.asInstanceOf[ColumnView]), Seq(ret)) case _ => throwMismatchException(cv, dt) }