diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index 474f404be0f..71be53243dc 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -539,10 +539,7 @@ class OrcDecompressor { }; /** - * @brief Stores orc id for each column and its adjacent number of children - * in case of struct or number of children in case of list column. - * If list column has struct column, then all child columns of that struct are treated as child - * column of list. + * @brief Stores orc id for each column and number of children in that column. * * @code{.pseudo} * Consider following data where a struct has two members and a list column @@ -559,11 +556,18 @@ class OrcDecompressor { * */ struct orc_column_meta { - // orc_column_meta(uint32_t _id, uint32_t _num_children) : id(_id), num_children(_num_children){}; uint32_t id; // orc id for the column uint32_t num_children; // number of children at the same level of nesting in case of struct }; +/** + * @brief Stores column's validity map and null count + */ +struct column_validity_info { + uint32_t* valid_map_base; + uint32_t null_count; +}; + /** * @brief A helper class for ORC file metadata. Provides some additional * convenience methods for initializing and accessing metadata. diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index fa91dd13755..3ccc2c672c9 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -24,6 +24,7 @@ #include #include #include +#include "orc.h" #include "orc_common.h" #include @@ -109,6 +110,7 @@ struct ColumnDesc { uint8_t dtype_len; // data type length (for types that can be mapped to different sizes) int32_t decimal_scale; // number of fractional decimal digits for decimal type int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) + column_validity_info parent_validity_info; // consists of parent column valid_map and null count }; /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index b2b4538994e..8e47da98a7c 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -28,6 +28,7 @@ #include #include +#include #include #include @@ -180,7 +181,8 @@ size_t gather_stream_info(const size_t stripe_index, bool use_index, size_t* num_dictionary_entries, cudf::detail::hostdevice_2dvector& chunks, - std::vector& stream_info) + std::vector& stream_info, + bool apply_struct_map) { uint64_t src_offset = 0; uint64_t dst_offset = 0; @@ -193,7 +195,7 @@ size_t gather_stream_info(const size_t stripe_index, auto const column_id = *stream.column_id; auto col = orc2gdf[column_id]; - if (col == -1) { + if (col == -1 and apply_struct_map) { // A struct-type column has no data itself, but rather child columns // for each of its fields. There is only a PRESENT stream, which // needs to be included for the reader. @@ -464,57 +466,42 @@ class aggregate_orc_metadata { /** * @brief Adds column as per the request and saves metadata about children. - * Struct children are in the same level as struct, only list column - * children are pushed to next level. + * Children of a column will be added to the next level. * * @param selection A vector that saves list of columns as per levels of nesting. * @param types A vector of schema types of columns. * @param level current level of nesting. * @param id current column id that needs to be added. * @param has_timestamp_column True if timestamp column present and false otherwise. - * - * @return returns number of child columns at same level in case of struct and next level in case - * of list + * @param has_nested_column True if any of the selected column is a nested type. */ - uint32_t add_column(std::vector>& selection, - std::vector const& types, - const size_t level, - const uint32_t id, - bool& has_timestamp_column, - bool& has_list_column) + void add_column(std::vector>& selection, + std::vector const& types, + const size_t level, + const uint32_t id, + bool& has_timestamp_column, + bool& has_nested_column) { - uint32_t num_lvl_child_columns = 0; if (level == selection.size()) { selection.emplace_back(); } selection[level].push_back({id, 0}); const int col_id = selection[level].size() - 1; if (types[id].kind == orc::TIMESTAMP) { has_timestamp_column = true; } switch (types[id].kind) { - case orc::LIST: { - uint32_t lvl_cols = 0; - if (not types[id].subtypes.empty()) { - has_list_column = true; - // Since list column needs to be processed before its child can be processed, + case orc::LIST: + case orc::STRUCT: { + has_nested_column = true; + for (const auto child_id : types[id].subtypes) { + // Since nested column needs to be processed before its child can be processed, // child column is being added to next level - lvl_cols = - add_column(selection, types, level + 1, id + 1, has_timestamp_column, has_list_column); + add_column( + selection, types, level + 1, child_id, has_timestamp_column, has_nested_column); } - // The list child column may be a struct in which case lvl_cols will be > 1 - selection[level][col_id].num_children = lvl_cols; + selection[level][col_id].num_children = types[id].subtypes.size(); } break; - case orc::STRUCT: - for (const auto child_id : types[id].subtypes) { - num_lvl_child_columns += - add_column(selection, types, level, child_id, has_timestamp_column, has_list_column); - } - selection[level][col_id].num_children = num_lvl_child_columns; - break; - default: break; } - - return num_lvl_child_columns + 1; } /** @@ -522,11 +509,12 @@ class aggregate_orc_metadata { * * @param use_names List of column names to select * @param has_timestamp_column True if timestamp column present and false otherwise + * @param has_nested_column True if any of the selected column is a nested type. * * @return Vector of list of ORC column meta-data */ std::vector> select_columns( - std::vector const& use_names, bool& has_timestamp_column, bool& has_list_column) + std::vector const& use_names, bool& has_timestamp_column, bool& has_nested_column) { auto const& pfm = per_file_metadata[0]; std::vector> selection; @@ -543,7 +531,7 @@ class aggregate_orc_metadata { auto col_id = pfm.ff.types[0].subtypes[index]; if (pfm.get_column_name(col_id) == use_name) { name_found = true; - add_column(selection, pfm.ff.types, 0, col_id, has_timestamp_column, has_list_column); + add_column(selection, pfm.ff.types, 0, col_id, has_timestamp_column, has_nested_column); // Should start with next index index = i + 1; break; @@ -553,7 +541,7 @@ class aggregate_orc_metadata { } } else { for (auto const& col_id : pfm.ff.types[0].subtypes) { - add_column(selection, pfm.ff.types, 0, col_id, has_timestamp_column, has_list_column); + add_column(selection, pfm.ff.types, 0, col_id, has_timestamp_column, has_nested_column); } } @@ -683,6 +671,94 @@ rmm::device_buffer reader::impl::decompress_stripe_data( return decomp_data; } +/** + * @brief Updates null mask of columns whose parent is a struct column. + * If struct column has null element, that row would be + * skipped while writing child column in ORC, so we need to insert the missing null + * elements in child column. + * There is another behavior from pyspark, where if the child column doesn't have any null + * elements, it will not have present stream, so in that case parent null mask need to be + * copied to child column. + * + * @param chunks Vector of list of column chunk descriptors + * @param out_buffers Output columns' device buffers + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource to use for device memory allocation + */ +void update_null_mask(cudf::detail::hostdevice_2dvector& chunks, + std::vector& out_buffers, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + const auto num_stripes = chunks.size().first; + const auto num_columns = chunks.size().second; + bool is_mask_updated = false; + + for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) { + if (chunks[0][col_idx].parent_validity_info.valid_map_base != nullptr) { + if (not is_mask_updated) { + chunks.device_to_host(stream, true); + is_mask_updated = true; + } + + auto parent_valid_map_base = chunks[0][col_idx].parent_validity_info.valid_map_base; + auto child_valid_map_base = out_buffers[col_idx].null_mask(); + auto child_mask_len = + chunks[0][col_idx].column_num_rows - chunks[0][col_idx].parent_validity_info.null_count; + auto parent_mask_len = chunks[0][col_idx].column_num_rows; + + if (child_valid_map_base != nullptr) { + rmm::device_uvector dst_idx(child_mask_len, stream); + // Copy indexes at which the parent has valid value. + thrust::copy_if(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + parent_mask_len, + dst_idx.begin(), + [parent_valid_map_base] __device__(auto idx) { + return bit_is_set(parent_valid_map_base, idx); + }); + + auto merged_null_mask = cudf::detail::create_null_mask( + parent_mask_len, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), mr); + auto merged_mask = static_cast(merged_null_mask.data()); + uint32_t* dst_idx_ptr = dst_idx.data(); + // Copy child valid bits from child column to valid indexes, this will merge both child and + // parent null masks + thrust::for_each(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + dst_idx.size(), + [child_valid_map_base, dst_idx_ptr, merged_mask] __device__(auto idx) { + if (bit_is_set(child_valid_map_base, idx)) { + cudf::set_bit(merged_mask, dst_idx_ptr[idx]); + }; + }); + + out_buffers[col_idx]._null_mask = std::move(merged_null_mask); + + } else { + // Since child column doesn't have a mask, copy parent null mask + auto mask_size = bitmask_allocation_size_bytes(parent_mask_len); + out_buffers[col_idx]._null_mask = + rmm::device_buffer(static_cast(parent_valid_map_base), mask_size, stream, mr); + } + } + } + + thrust::counting_iterator col_idx_it(0); + thrust::counting_iterator stripe_idx_it(0); + + if (is_mask_updated) { + // Update chunks with pointers to column data which might have been changed. + std::for_each(stripe_idx_it, stripe_idx_it + num_stripes, [&](auto stripe_idx) { + std::for_each(col_idx_it, col_idx_it + num_columns, [&](auto col_idx) { + auto& chunk = chunks[stripe_idx][col_idx]; + chunk.valid_map_base = out_buffers[col_idx].null_mask(); + }); + }); + chunks.host_to_device(stream, true); + } +} + void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector& chunks, size_t num_dicts, size_t skip_rows, @@ -695,22 +771,31 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector col_idx_it(0); + thrust::counting_iterator stripe_idx_it(0); // Update chunks with pointers to column data - for (size_t i = 0; i < num_stripes; ++i) { - for (size_t j = 0; j < num_columns; ++j) { - auto& chunk = chunks[i][j]; - chunk.column_data_base = out_buffers[j].data(); - chunk.valid_map_base = out_buffers[j].null_mask(); - } - } + std::for_each(stripe_idx_it, stripe_idx_it + num_stripes, [&](auto stripe_idx) { + std::for_each(col_idx_it, col_idx_it + num_columns, [&](auto col_idx) { + auto& chunk = chunks[stripe_idx][col_idx]; + chunk.column_data_base = out_buffers[col_idx].data(); + chunk.valid_map_base = out_buffers[col_idx].null_mask(); + }); + }); // Allocate global dictionary for deserializing rmm::device_uvector global_dict(num_dicts, stream); - chunks.host_to_device(stream); + chunks.host_to_device(stream, true); gpu::DecodeNullsAndStringDictionaries( chunks.base_device_ptr(), global_dict.data(), num_columns, num_stripes, skip_rows, stream); + + if (level > 0) { + // Update nullmasks for children if parent was a struct and had null mask + update_null_mask(chunks, out_buffers, stream, _mr); + } + + // Update the null map for child columns gpu::DecodeOrcColumnData(chunks.base_device_ptr(), global_dict.data(), row_groups, @@ -724,16 +809,23 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector chunks, cudf::detail::host_2dspan row_groups, + std::vector& out_buffers, std::vector const& list_col, const int32_t level) { @@ -743,10 +835,12 @@ void reader::impl::aggregate_child_meta(cudf::detail::host_2dspan(out_buffers[parent_col_idx].null_count()); + auto parent_valid_map = out_buffers[parent_col_idx].null_mask(); + auto num_rows = out_buffers[parent_col_idx].size; + + for (uint32_t id = 0; id < p_col.num_children; id++) { + const auto child_col_idx = index + id; + if (type == type_id::STRUCT) { + parent_column_data[child_col_idx] = {parent_valid_map, parent_null_count}; + // Number of rows in child will remain same as parent in case of struct column + num_child_rows[child_col_idx] = num_rows; + } else { + parent_column_data[child_col_idx] = {nullptr, 0}; + } + } index += p_col.num_children; }); } @@ -861,13 +973,9 @@ column_buffer&& reader::impl::assemble_buffer(const int32_t orc_col_id, col_buffer.name = _metadata->get_column_name(0, orc_col_id); switch (col_buffer.type.id()) { case type_id::LIST: - col_buffer.children.emplace_back( - assemble_buffer(_metadata->get_col_type(orc_col_id).subtypes[0], col_buffers, level + 1)); - break; - case type_id::STRUCT: for (auto const& col : _metadata->get_col_type(orc_col_id).subtypes) { - col_buffer.children.emplace_back(assemble_buffer(col, col_buffers, level)); + col_buffer.children.emplace_back(assemble_buffer(col, col_buffers, level + 1)); } break; @@ -884,16 +992,14 @@ void reader::impl::create_columns(std::vector>&& col_ std::vector& schema_info, rmm::cuda_stream_view stream) { - for (size_t i = 0; i < _selected_columns[0].size();) { - auto const& col_meta = _selected_columns[0][i]; - schema_info.emplace_back(""); - - auto col_buffer = assemble_buffer(col_meta.id, col_buffers, 0); - out_columns.emplace_back(make_column(col_buffer, &schema_info.back(), stream, _mr)); - - // Need to skip child columns of struct which are at the same level and have been processed - i += (col_buffers[0][i].type.id() == type_id::STRUCT) ? col_meta.num_children + 1 : 1; - } + std::transform(_selected_columns[0].begin(), + _selected_columns[0].end(), + std::back_inserter(out_columns), + [&](auto const col_meta) { + schema_info.emplace_back(""); + auto col_buffer = assemble_buffer(col_meta.id, col_buffers, 0); + return make_column(col_buffer, &schema_info.back(), stream, _mr); + }); } reader::impl::impl(std::vector>&& sources, @@ -906,7 +1012,7 @@ reader::impl::impl(std::vector>&& sources, // Select only columns required by the options _selected_columns = - _metadata->select_columns(options.get_columns(), _has_timestamp_column, _has_list_column); + _metadata->select_columns(options.get_columns(), _has_timestamp_column, _has_nested_column); // Override output timestamp resolution if requested if (options.get_timestamp_type().id() != type_id::EMPTY) { @@ -928,8 +1034,8 @@ table_with_metadata reader::impl::read(size_type skip_rows, const std::vector>& stripes, rmm::cuda_stream_view stream) { - CUDF_EXPECTS(skip_rows == 0 or (not _has_list_column), - "skip_rows is not supported by list column"); + CUDF_EXPECTS(skip_rows == 0 or (not _has_nested_column), + "skip_rows is not supported by nested columns"); std::vector> out_columns; // buffer and stripe data are stored as per nesting level @@ -944,14 +1050,13 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Select only stripes required (aka row groups) const auto selected_stripes = _metadata->select_stripes(stripes, skip_rows, num_rows); - // Iterates through levels of nested columns, struct columns and its children will be - // in the same level since child column also have same number of rows, - // list column children will be 1 level down compared to parent. + // Iterates through levels of nested columns, child column will be one level down + // compared to parent column. for (size_t level = 0; level < _selected_columns.size(); level++) { auto& selected_columns = _selected_columns[level]; // Association between each ORC column and its cudf::column _col_meta.orc_col_map.emplace_back(_metadata->get_num_cols(), -1); - std::vector list_col; + std::vector nested_col; // Get a list of column data types std::vector column_types; @@ -979,20 +1084,18 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Map each ORC column to its column _col_meta.orc_col_map[level][col.id] = column_types.size() - 1; - if (col_type == type_id::LIST) list_col.emplace_back(col); + if (col_type == type_id::LIST or col_type == type_id::STRUCT) nested_col.emplace_back(col); } // If no rows or stripes to read, return empty columns if (num_rows <= 0 || selected_stripes.empty()) { - for (size_t i = 0; i < _selected_columns[0].size();) { - auto const& col_meta = _selected_columns[0][i]; - auto const schema = _metadata->get_schema(col_meta.id); - schema_info.emplace_back(""); - out_columns.push_back( - std::move(create_empty_column(col_meta.id, schema_info.back(), stream))); - // Since struct children will be in the same level, have to skip them. - i += (schema.kind == orc::STRUCT) ? col_meta.num_children + 1 : 1; - } + std::transform(_selected_columns[0].begin(), + _selected_columns[0].end(), + std::back_inserter(out_columns), + [&](auto const col_meta) { + schema_info.emplace_back(""); + return create_empty_column(col_meta.id, schema_info.back(), stream); + }); break; } else { // Get the total number of stripes across all input files. @@ -1045,7 +1148,8 @@ table_with_metadata reader::impl::read(size_type skip_rows, use_index, &num_dict_entries, chunks, - stream_info); + stream_info, + level == 0); CUDF_EXPECTS(total_data_size > 0, "Expected streams data within stripe"); @@ -1101,10 +1205,17 @@ table_with_metadata reader::impl::read(size_type skip_rows, ? stripe_info->numberOfRows : _col_meta.num_child_rows_per_stripe[stripe_idx * num_columns + col_idx]; chunk.column_num_rows = (level == 0) ? num_rows : _col_meta.num_child_rows[col_idx]; - chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind; - chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx] + chunk.parent_validity_info.valid_map_base = + (level == 0) ? nullptr : _col_meta.parent_column_data[col_idx].valid_map_base; + chunk.parent_validity_info.null_count = + (level == 0) ? 0 : _col_meta.parent_column_data[col_idx].null_count; + chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind; + chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx] .ff.types[selected_columns[col_idx].id] .kind; + // num_child_rows for a struct column will be same, for other nested types it will be + // calculated. + chunk.num_child_rows = (chunk.type_kind != orc::STRUCT) ? 0 : chunk.num_rows; auto const decimal_as_float64 = should_convert_decimal_column_to_float(_decimal_cols_as_float, _metadata->per_file_metadata[0], @@ -1218,14 +1329,14 @@ table_with_metadata reader::impl::read(size_type skip_rows, level, stream); - // Extract information to process list child columns - if (list_col.size()) { + // Extract information to process nested child columns + if (nested_col.size()) { row_groups.device_to_host(stream, true); - aggregate_child_meta(chunks, row_groups, list_col, level); + aggregate_child_meta(chunks, row_groups, out_buffers[level], nested_col, level); } // ORC stores number of elements at each row, so we need to generate offsets from that - if (list_col.size()) { + if (nested_col.size()) { std::vector buff_data; std::for_each( out_buffers[level].begin(), out_buffers[level].end(), [&buff_data](auto& out_buffer) { @@ -1235,8 +1346,10 @@ table_with_metadata reader::impl::read(size_type skip_rows, } }); - auto const dev_buff_data = cudf::detail::make_device_uvector_async(buff_data, stream); - generate_offsets_for_list(dev_buff_data, stream); + if (buff_data.size()) { + auto const dev_buff_data = cudf::detail::make_device_uvector_async(buff_data, stream); + generate_offsets_for_list(dev_buff_data, stream); + } } } } diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 1769fb6f193..9f6f7b82b35 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -53,8 +53,12 @@ class aggregate_orc_metadata; */ struct reader_column_meta { std::vector> - orc_col_map; // Mapping between column id in orc to processing order. - std::vector num_child_rows; // number of rows in child columns + orc_col_map; // Mapping between column id in orc to processing order. + std::vector num_child_rows; // number of rows in child columns + + std::vector + parent_column_data; // consists of parent column valid_map and null count + std::vector child_start_row; // start row of child columns [stripe][column] std::vector num_child_rows_per_stripe; // number of rows of child columns [stripe][column] @@ -151,12 +155,14 @@ class reader::impl { * @brief Aggregate child metadata from parent column chunks. * * @param chunks Vector of list of parent column chunks. - * @param chunks Vector of list of parent column row groups. + * @param row_groups Vector of list of row index descriptors + * @param out_buffers Column buffers for columns. * @param list_col Vector of column metadata of list type parent columns. * @param level Current nesting level being processed. */ void aggregate_child_meta(cudf::detail::host_2dspan chunks, cudf::detail::host_2dspan row_groups, + std::vector& out_buffers, std::vector const& list_col, const int32_t level); @@ -207,7 +213,7 @@ class reader::impl { bool _use_index = true; bool _use_np_dtypes = true; bool _has_timestamp_column = false; - bool _has_list_column = false; + bool _has_nested_column = false; std::vector _decimal_cols_as_float; data_type _timestamp_type{type_id::EMPTY}; reader_column_meta _col_meta; diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 903f9475e2a..923dc366d74 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1152,7 +1152,7 @@ __global__ void __launch_bounds__(block_size) if (t == 0) s->chunk = chunks[chunk_id]; __syncthreads(); - const size_t max_num_rows = s->chunk.column_num_rows; + const size_t max_num_rows = s->chunk.column_num_rows - s->chunk.parent_validity_info.null_count; if (is_nulldec) { uint32_t null_count = 0; @@ -1186,6 +1186,7 @@ __global__ void __launch_bounds__(block_size) nrows = nrows_max; } __syncthreads(); + row_in = s->chunk.start_row + s->top.nulls_desc_row; if (row_in + nrows > first_row && row_in < first_row + max_num_rows && s->chunk.valid_map_base != NULL) { @@ -1334,7 +1335,7 @@ static __device__ void DecodeRowPositions(orcdec_state_s* s, s->top.data.cur_row + s->top.data.nrows < s->top.data.end_row) { uint32_t nrows = min(s->top.data.end_row - (s->top.data.cur_row + s->top.data.nrows), min((row_decoder_buffer_size - s->u.rowdec.nz_count) * 2, blockDim.x)); - if (s->chunk.strm_len[CI_PRESENT] > 0) { + if (s->chunk.valid_map_base != NULL) { // We have a present stream uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row); uint32_t r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row); diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 317b7255718..3ee725c6e2f 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -456,8 +456,10 @@ extern "C" __global__ void __launch_bounds__(128, 8) ((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] = ((volatile uint32_t*)&s->rowgroups[i])[j]; } - row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows; - row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row = start_row; + row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows; + // Updating in case of struct + row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_child_rows = num_rows; + row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row = start_row; } __syncthreads(); if (t == 0) { s->rowgroup_start += num_rowgroups; } diff --git a/python/cudf/cudf/tests/data/orc/TestOrcFile.testPySparkStruct.orc b/python/cudf/cudf/tests/data/orc/TestOrcFile.testPySparkStruct.orc new file mode 100644 index 00000000000..7748e901bce Binary files /dev/null and b/python/cudf/cudf/tests/data/orc/TestOrcFile.testPySparkStruct.orc differ diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 1a785d28b48..41e96eb7782 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -843,9 +843,10 @@ def test_orc_string_stream_offset_issue(): assert_eq(df, cudf.read_orc(buffer)) +# Data is generated using pyorc module def generate_list_struct_buff(size=28000): - rd = random.Random(0) - np.random.seed(seed=0) + rd = random.Random(1) + np.random.seed(seed=1) buff = BytesIO() @@ -875,16 +876,21 @@ def generate_list_struct_buff(size=28000): schema = po.Struct(**schema) lvl3_list = [ - [ + rd.choice( [ + None, [ - rd.choice([None, np.random.randint(1, 3)]) - for z in range(np.random.randint(1, 3)) - ] - for z in range(np.random.randint(0, 3)) + [ + [ + rd.choice([None, np.random.randint(1, 3)]) + for z in range(np.random.randint(1, 3)) + ] + for z in range(np.random.randint(0, 3)) + ] + for y in range(np.random.randint(0, 3)) + ], ] - for y in range(np.random.randint(0, 3)) - ] + ) for x in range(size) ] lvl1_list = [ @@ -895,15 +901,21 @@ def generate_list_struct_buff(size=28000): for x in range(size) ] lvl1_struct = [ - (np.random.randint(0, 3), np.random.randint(0, 3)) for x in range(size) + rd.choice([None, (np.random.randint(0, 3), np.random.randint(0, 3))]) + for x in range(size) ] lvl2_struct = [ - ( - rd.choice([None, np.random.randint(0, 3)]), - ( - rd.choice([None, np.random.randint(0, 3)]), - np.random.randint(0, 3), - ), + rd.choice( + [ + None, + ( + rd.choice([None, np.random.randint(0, 3)]), + ( + rd.choice([None, np.random.randint(0, 3)]), + np.random.randint(0, 3), + ), + ), + ] ) for x in range(size) ] @@ -953,47 +965,48 @@ def generate_list_struct_buff(size=28000): ) @pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000]) @pytest.mark.parametrize("use_index", [True, False]) -@pytest.mark.parametrize("skip_rows", [0, 101, 1007, 27000]) def test_lists_struct_nests( - columns, num_rows, use_index, skip_rows, + columns, num_rows, use_index, ): - has_lists = ( - any("list" in col_name for col_name in columns) if columns else True + gdf = cudf.read_orc( + list_struct_buff, + columns=columns, + num_rows=num_rows, + use_index=use_index, ) - if has_lists and skip_rows > 0: - with pytest.raises( - RuntimeError, match="skip_rows is not supported by list column" - ): - cudf.read_orc( - list_struct_buff, - columns=columns, - num_rows=num_rows, - use_index=use_index, - skiprows=skip_rows, - ) + pyarrow_tbl = pyarrow.orc.ORCFile(list_struct_buff).read() + + pyarrow_tbl = ( + pyarrow_tbl[:num_rows] + if columns is None + else pyarrow_tbl.select(columns)[:num_rows] + ) + + if num_rows > 0: + assert pyarrow_tbl.equals(gdf.to_arrow()) else: - gdf = cudf.read_orc( - list_struct_buff, - columns=columns, - num_rows=num_rows, - use_index=use_index, - skiprows=skip_rows, - ) + assert_eq(pyarrow_tbl.to_pandas(), gdf) - pyarrow_tbl = pyarrow.orc.ORCFile(list_struct_buff).read() - pyarrow_tbl = ( - pyarrow_tbl[skip_rows : skip_rows + num_rows] - if columns is None - else pyarrow_tbl.select(columns)[skip_rows : skip_rows + num_rows] +@pytest.mark.parametrize("columns", [None, ["lvl1_struct"], ["lvl1_list"]]) +def test_skip_rows_for_nested_types(columns): + with pytest.raises( + RuntimeError, match="skip_rows is not supported by nested column" + ): + cudf.read_orc( + list_struct_buff, columns=columns, use_index=True, skiprows=5, ) - if num_rows > 0: - assert_eq(True, pyarrow_tbl.equals(gdf.to_arrow())) - else: - assert_eq(pyarrow_tbl.to_pandas(), gdf) + +def test_pyspark_struct(datadir): + path = datadir / "TestOrcFile.testPySparkStruct.orc" + + pdf = pa.orc.ORCFile(path).read().to_pandas() + gdf = cudf.read_orc(path) + + assert_eq(pdf, gdf) @pytest.mark.parametrize(