diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 98922ad10a4..a18bd450640 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -54,12 +54,10 @@ class reader { * * @param sources Input `datasource` objects to read the dataset from * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ explicit reader(std::vector>&& sources, parquet_reader_options const& options, - rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); /** diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index a2771d6400f..2215f24b550 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -46,7 +46,7 @@ constexpr size_type default_row_group_size_rows = 1000000; class parquet_reader_options_builder; /** - * @brief Settings or `read_parquet()`. + * @brief Settings for `read_parquet()`. */ class parquet_reader_options { source_info _source; diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 402e212f07b..613ccf203cb 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -402,8 +402,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options, CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); - auto reader = std::make_unique( - std::move(datasources), options, rmm::cuda_stream_default, mr); + auto reader = std::make_unique(std::move(datasources), options, mr); return reader->read(options); } diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 95a79fceb63..b31888b6ac2 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -1557,10 +1557,10 @@ extern "C" __global__ void __launch_bounds__(block_size) bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; - // optimization : it might be useful to have a version of gpuDecodeStream that could go - // wider than 1 warp. Currently it only only uses 1 warp so that it can overlap work - // with the value decoding step when in the actual value decoding kernel. however during - // this preprocess step we have no such limits - we could go as wide as block_size + // optimization : it might be useful to have a version of gpuDecodeStream that could go wider than + // 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step + // when in the actual value decoding kernel. However, during this preprocess step we have no such + // limits - we could go as wide as block_size if (t < 32) { constexpr int batch_size = 32; int target_input_count = batch_size; diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 4390d1c788f..cd57414d98b 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -198,6 +198,12 @@ struct SchemaElement { // }; // } bool is_stub() const { return repetition_type == REPEATED && num_children == 1; } + + // https://github.com/apache/parquet-cpp/blob/642da05/src/parquet/schema.h#L49-L50 + // One-level LIST encoding: Only allows required lists with required cells: + // repeated value_type name + bool is_one_level_list() const { return repetition_type == REPEATED and num_children == 0; } + // in parquet terms, a group is a level of nesting in the schema. a group // can be a struct or a list bool is_struct() const diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 8ff0d14ffda..69d480edf85 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -185,6 +185,16 @@ type_id to_type_id(SchemaElement const& schema, return type_id::EMPTY; } +/** + * @brief Converts cuDF type enum to column logical type + */ +data_type to_data_type(type_id t_id, SchemaElement const& schema) +{ + return t_id == type_id::DECIMAL32 || t_id == type_id::DECIMAL64 || t_id == type_id::DECIMAL128 + ? data_type{t_id, numeric::scale_type{-schema.decimal_scale}} + : data_type{t_id}; +} + /** * @brief Function that returns the required the number of bits to store a value */ @@ -414,6 +424,9 @@ class aggregate_metadata { // walk upwards, skipping repeated fields while (schema_index > 0) { if (!pfm.schema[schema_index].is_stub()) { depth++; } + // schema of one-level encoding list doesn't contain nesting information, so we need to + // manually add an extra nesting level + if (pfm.schema[schema_index].is_one_level_list()) { depth++; } schema_index = pfm.schema[schema_index].parent_idx; } return depth; @@ -596,11 +609,11 @@ class aggregate_metadata { } // if we're at the root, this is a new output column - auto const col_type = to_type_id(schema_elem, strings_to_categorical, timestamp_type_id); - auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 || - col_type == type_id::DECIMAL128 - ? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}} - : data_type{col_type}; + auto const col_type = + schema_elem.is_one_level_list() + ? type_id::LIST + : to_type_id(schema_elem, strings_to_categorical, timestamp_type_id); + auto const dtype = to_data_type(col_type, schema_elem); column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL); // store the index of this element if inserted in out_col_array @@ -630,6 +643,23 @@ class aggregate_metadata { if (schema_elem.num_children == 0) { input_column_info& input_col = input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name}); + + // set up child output column for one-level encoding list + if (schema_elem.is_one_level_list()) { + // determine the element data type + auto const element_type = + to_type_id(schema_elem, strings_to_categorical, timestamp_type_id); + auto const element_dtype = to_data_type(element_type, schema_elem); + + column_buffer element_col(element_dtype, schema_elem.repetition_type == OPTIONAL); + // store the index of this element + nesting.push_back(static_cast(output_col.children.size())); + // TODO: not sure if we should assign a name or leave it blank + element_col.name = "element"; + + output_col.children.push_back(std::move(element_col)); + } + std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting)); path_is_valid = true; // If we're able to reach leaf then path is valid } @@ -850,6 +880,10 @@ void generate_depth_remappings(std::map, std::ve // if this is a repeated field, map it one level deeper shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth; } + // if it's one-level encoding list + else if (cur_schema.is_one_level_list()) { + shallowest = cur_depth - 1; + } if (!cur_schema.is_stub()) { cur_depth--; } schema_idx = cur_schema.parent_idx; } @@ -1770,7 +1804,6 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Forward to implementation reader::reader(std::vector>&& sources, parquet_reader_options const& options, - rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) : _impl(std::make_unique(std::move(sources), options, mr)) { diff --git a/python/cudf/cudf/tests/data/parquet/one_level_list.parquet b/python/cudf/cudf/tests/data/parquet/one_level_list.parquet new file mode 100644 index 00000000000..f10d3a10290 Binary files /dev/null and b/python/cudf/cudf/tests/data/parquet/one_level_list.parquet differ diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 597ae6c05c0..c52dab5c72f 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2269,6 +2269,15 @@ def test_parquet_reader_brotli(datadir): assert_eq(expect, got) +def test_parquet_reader_one_level_list(datadir): + fname = datadir / "one_level_list.parquet" + + expect = pd.read_parquet(fname) + got = cudf.read_parquet(fname).to_pandas(nullable=True) + + assert_eq(expect, got) + + @pytest.mark.parametrize("size_bytes", [4_000_000, 1_000_000, 600_000]) @pytest.mark.parametrize("size_rows", [1_000_000, 100_000, 10_000]) def test_parquet_writer_row_group_size(