From 7193abbb64f82c0be16d985b289d4abb3ec071c1 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 6 Dec 2021 15:10:00 -0500 Subject: [PATCH 01/11] Fix a typo --- cpp/include/cudf/io/parquet.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 88cf7416506..2fff01bbafd 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -46,7 +46,7 @@ constexpr size_type default_row_group_size_rows = 1000000; class parquet_reader_options_builder; /** - * @brief Settings or `read_parquet()`. + * @brief Settings for `read_parquet()`. */ class parquet_reader_options { source_info _source; From 6ddab85adff3571f599e9d0a789e94c57a6e20cf Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 6 Dec 2021 16:11:58 -0500 Subject: [PATCH 02/11] Remove unused stream argument --- cpp/include/cudf/io/detail/parquet.hpp | 2 -- cpp/src/io/functions.cpp | 3 +-- cpp/src/io/parquet/reader_impl.cu | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 98922ad10a4..a18bd450640 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -54,12 +54,10 @@ class reader { * * @param sources Input `datasource` objects to read the dataset from * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ explicit reader(std::vector>&& sources, parquet_reader_options const& options, - rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); /** diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 402e212f07b..613ccf203cb 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -402,8 +402,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options, CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); - auto reader = std::make_unique( - std::move(datasources), options, rmm::cuda_stream_default, mr); + auto reader = std::make_unique(std::move(datasources), options, mr); return reader->read(options); } diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 28144276066..a400c360dfa 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1772,7 +1772,6 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Forward to implementation reader::reader(std::vector>&& sources, parquet_reader_options const& options, - rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) : _impl(std::make_unique(std::move(sources), options, mr)) { From 6f395dd0aa5e7480cfdf748484c8f07b6e49b368 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 6 Dec 2021 16:20:20 -0500 Subject: [PATCH 03/11] Add is_one_level_list member function --- cpp/src/io/parquet/parquet.hpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 4390d1c788f..af0b1a13f35 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -198,6 +198,11 @@ struct SchemaElement { // }; // } bool is_stub() const { return repetition_type == REPEATED && num_children == 1; } + + // One-level LIST encoding: Only allows required lists with required cells: + // repeated value_type name + bool is_one_level_list() const { return repetition_type == REPEATED and num_children == 0; } + // in parquet terms, a group is a level of nesting in the schema. a group // can be a struct or a list bool is_struct() const From 974b105ad3ca138f8c382f5181ba16e35ec75565 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 6 Dec 2021 16:41:05 -0500 Subject: [PATCH 04/11] Add one level encoding handling --- cpp/src/io/parquet/reader_impl.cu | 32 ++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index a400c360dfa..78838060225 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -411,6 +411,9 @@ class aggregate_metadata { // walk upwards, skipping repeated fields while (schema_index > 0) { if (!pfm.schema[schema_index].is_stub()) { depth++; } + // schema of one-level encoding list doesn't contain nesting information, so we need to + // manually add an extra nesting level + if (pfm.schema[schema_index].is_one_level_list()) { depth++; } schema_index = pfm.schema[schema_index].parent_idx; } return depth; @@ -596,7 +599,10 @@ class aggregate_metadata { // if we're at the root, this is a new output column auto const col_type = - to_type_id(schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types); + schema_elem.is_one_level_list() + ? type_id::LIST + : to_type_id( + schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types); auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 ? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}} : data_type{col_type}; @@ -629,6 +635,26 @@ class aggregate_metadata { if (schema_elem.num_children == 0) { input_column_info& input_col = input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name}); + + // set up child output column for one-level encoding list + if (schema_elem.is_one_level_list()) { + // determine the element data type + auto const element_type = to_type_id( + schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types); + auto const element_dtype = + element_type == type_id::DECIMAL32 || element_type == type_id::DECIMAL64 + ? data_type{element_type, numeric::scale_type{-schema_elem.decimal_scale}} + : data_type{element_type}; + + column_buffer element_col(element_dtype, schema_elem.repetition_type == OPTIONAL); + // store the index of this element + nesting.push_back(static_cast(output_col.children.size())); + // TODO: not sure if we should assign a name or leave it blank + element_col.name = "elemenet"; + + output_col.children.push_back(std::move(element_col)); + } + std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting)); path_is_valid = true; // If we're able to reach leaf then path is valid } @@ -849,6 +875,10 @@ void generate_depth_remappings(std::map, std::ve // if this is a repeated field, map it one level deeper shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth; } + // if it's one-level encoding list + else if (cur_schema.is_one_level_list()) { + shallowest = cur_depth - 1; + } if (!cur_schema.is_stub()) { cur_depth--; } schema_idx = cur_schema.parent_idx; } From 39f5c3133d6083f30a292b73e460def4deb98816 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 6 Dec 2021 16:43:27 -0500 Subject: [PATCH 05/11] Update comments --- cpp/src/io/parquet/parquet.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index af0b1a13f35..a168c4cdb5e 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -199,6 +199,7 @@ struct SchemaElement { // } bool is_stub() const { return repetition_type == REPEATED && num_children == 1; } + // https://github.com/apache/parquet-cpp/blob/master/src/parquet/schema.h // One-level LIST encoding: Only allows required lists with required cells: // repeated value_type name bool is_one_level_list() const { return repetition_type == REPEATED and num_children == 0; } From e0b9d55a452d1a5e24d66c2540a00018ee359aeb Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 6 Dec 2021 16:55:44 -0500 Subject: [PATCH 06/11] Fix a typo --- cpp/src/io/parquet/page_data.cu | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 337d9faec20..a3d53f65ad2 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -1643,10 +1643,10 @@ extern "C" __global__ void __launch_bounds__(block_size) bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; - // optimization : it might be useful to have a version of gpuDecodeStream that could go - // wider than 1 warp. Currently it only only uses 1 warp so that it can overlap work - // with the value decoding step when in the actual value decoding kernel. however during - // this preprocess step we have no such limits - we could go as wide as block_size + // optimization : it might be useful to have a version of gpuDecodeStream that could go wider than + // 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step + // when in the actual value decoding kernel. However, during this preprocess step we have no such + // limits - we could go as wide as block_size if (t < 32) { constexpr int batch_size = 32; int target_input_count = batch_size; From 7632f32f15c8ba41ff5460f6c6e2a9f433e96faa Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 6 Dec 2021 17:07:16 -0500 Subject: [PATCH 07/11] Fix a typo --- cpp/src/io/parquet/reader_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 78838060225..98b3384d54a 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -650,7 +650,7 @@ class aggregate_metadata { // store the index of this element nesting.push_back(static_cast(output_col.children.size())); // TODO: not sure if we should assign a name or leave it blank - element_col.name = "elemenet"; + element_col.name = "element"; output_col.children.push_back(std::move(element_col)); } From df6c1c67dc40dbaf6dadd15e50fcf22de0d4951c Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 7 Dec 2021 14:25:16 -0500 Subject: [PATCH 08/11] Update the link --- cpp/src/io/parquet/parquet.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index a168c4cdb5e..cd57414d98b 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -199,7 +199,7 @@ struct SchemaElement { // } bool is_stub() const { return repetition_type == REPEATED && num_children == 1; } - // https://github.com/apache/parquet-cpp/blob/master/src/parquet/schema.h + // https://github.com/apache/parquet-cpp/blob/642da05/src/parquet/schema.h#L49-L50 // One-level LIST encoding: Only allows required lists with required cells: // repeated value_type name bool is_one_level_list() const { return repetition_type == REPEATED and num_children == 0; } From 42154045071c7eb9d3231b269876f0b34eabe118 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 7 Dec 2021 14:59:32 -0500 Subject: [PATCH 09/11] Add one-level list parquet file for pytest --- .../tests/data/parquet/one_level_list.parquet | Bin 0 -> 255 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 python/cudf/cudf/tests/data/parquet/one_level_list.parquet diff --git a/python/cudf/cudf/tests/data/parquet/one_level_list.parquet b/python/cudf/cudf/tests/data/parquet/one_level_list.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f10d3a10290450d77c0d6ab9c590e19b4edd0a09 GIT binary patch literal 255 zcmZWkOA5k344t;qQc#yEQ3PkK^G`#&BACS97IF82p}+q@dOxld*E?K8b0cLHp`|UMz%yQh< zr{o<1b~xG4x~lGAW6`I~5+v|L9`b?DQSDPNL=H&6g0F=>@K=AU8^tHX?M2krhrUlw zb@bHrt*p*bGF!}&Xeb}$QAL@MB2z-=QpPINN*+wUoTox3X}UD?g;A-=&B2Vw8?P+z F^&c5tD4qZS literal 0 HcmV?d00001 From 51984a4be5d4934618ac48dd6dc698ef2cfd5af1 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 7 Dec 2021 17:57:24 -0500 Subject: [PATCH 10/11] Add pytest for one-level list encoding --- python/cudf/cudf/tests/test_parquet.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 516ee0d17d3..947d21aa8e5 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2255,6 +2255,15 @@ def test_parquet_reader_brotli(datadir): assert_eq(expect, got) +def test_parquet_reader_one_level_list(datadir): + fname = datadir / "one_level_list.parquet" + + expect = pd.read_parquet(fname) + got = cudf.read_parquet(fname).to_pandas(nullable=True) + + assert_eq(expect, got) + + @pytest.mark.parametrize("size_bytes", [4_000_000, 1_000_000, 600_000]) @pytest.mark.parametrize("size_rows", [1_000_000, 100_000, 10_000]) def test_parquet_writer_row_group_size( From 786b4562010a04c5d5373749c6ff9fe20235bfea Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 8 Dec 2021 21:22:28 -0500 Subject: [PATCH 11/11] Resolve conflicts + add to_data_type function --- cpp/src/io/parquet/reader_impl.cu | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 8091cf6a3f8..69d480edf85 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -185,6 +185,16 @@ type_id to_type_id(SchemaElement const& schema, return type_id::EMPTY; } +/** + * @brief Converts cuDF type enum to column logical type + */ +data_type to_data_type(type_id t_id, SchemaElement const& schema) +{ + return t_id == type_id::DECIMAL32 || t_id == type_id::DECIMAL64 || t_id == type_id::DECIMAL128 + ? data_type{t_id, numeric::scale_type{-schema.decimal_scale}} + : data_type{t_id}; +} + /** * @brief Function that returns the required the number of bits to store a value */ @@ -602,12 +612,8 @@ class aggregate_metadata { auto const col_type = schema_elem.is_one_level_list() ? type_id::LIST - : to_type_id( - schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types); - auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 || - col_type == type_id::DECIMAL128 - ? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}} - : data_type{col_type}; + : to_type_id(schema_elem, strings_to_categorical, timestamp_type_id); + auto const dtype = to_data_type(col_type, schema_elem); column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL); // store the index of this element if inserted in out_col_array @@ -641,13 +647,9 @@ class aggregate_metadata { // set up child output column for one-level encoding list if (schema_elem.is_one_level_list()) { // determine the element data type - auto const element_type = to_type_id( - schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types); - auto const element_dtype = - element_type == type_id::DECIMAL32 || element_type == type_id::DECIMAL64 || - element_type == type_id::DECIMAL128 - ? data_type{element_type, numeric::scale_type{-schema_elem.decimal_scale}} - : data_type{element_type}; + auto const element_type = + to_type_id(schema_elem, strings_to_categorical, timestamp_type_id); + auto const element_dtype = to_data_type(element_type, schema_elem); column_buffer element_col(element_dtype, schema_elem.repetition_type == OPTIONAL); // store the index of this element