diff --git a/CHANGELOG.md b/CHANGELOG.md index 9610e699b23..88667eaf55b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - PR #6139 Add column conversion to big endian byte list. - PR #6220 Add `list_topics()` to supply list of underlying Kafka connection topics - PR #6254 Add `cudf::make_dictionary_from_scalar` factory function +- PR #6318 Add support for reading Struct and map types from Parquet files - PR #6315 Native code for string-map lookups, for cudf-java - PR #6302 Add custom dataframe accessors - PR #6301 Add JNI bindings to nvcomp diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 19aab6a46db..a50ab95195d 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -89,6 +89,19 @@ enum statistics_freq { STATISTICS_PAGE = 2, //!< Per-page column statistics }; +/** + * @brief Detailed name information for output columns. + * + * The hierarchy of children matches the hierarchy of children in the output + * cudf columns. + */ +struct column_name_info { + std::string name; + std::vector children; + column_name_info(std::string const& _name) : name(_name) {} + column_name_info() = default; +}; + /** * @brief Table metadata for io readers/writers (primarily column names) * For nested types (structs, maps, unions), the ordering of names in the column_names vector @@ -105,7 +118,9 @@ enum statistics_freq { * f5 f6 */ struct table_metadata { - std::vector column_names; //!< Names of columns contained in the table + std::vector column_names; //!< Names of columns contained in the table + std::vector + schema_info; //!< Detailed name information for the entire output hierarchy std::map user_data; //!< Format-dependent metadata as key-values pairs }; diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 1328c36e5ef..ced8fc90cb0 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -660,7 +660,7 @@ std::vector reader::impl::decode_data(std::vector cons is_final_allocation ? mr_ : rmm::mr::get_current_device_resource()); out_buffer.name = col_names_[col]; - out_buffers.emplace_back(out_buffer); + out_buffers.emplace_back(std::move(out_buffer)); active_col++; } } diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 9c3ba66eb07..b64870d61b6 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -1016,24 +1017,27 @@ static __device__ bool setupLocalPageInfo(page_state_s *const s, // - for flat schemas, we can do this directly by using row counts // - for nested schemas, these offsets are computed during the preprocess step if (s->col.column_data_base != nullptr) { - int max_depth = s->col.max_level[level_type::REPETITION]; - for (int idx = 0; idx <= max_depth; idx++) { + int max_depth = s->col.max_nesting_depth; + for (int idx = 0; idx < max_depth; idx++) { PageNestingInfo *pni = &s->page.nesting[idx]; size_t output_offset; - if (max_depth == 0) { + // schemas without lists + if (s->col.max_level[level_type::REPETITION] == 0) { output_offset = page_start_row >= min_row ? page_start_row - min_row : 0; } - // for nested schemas, we've already got the exactly value precomputed + // for schemas with lists, we've already got the exactly value precomputed else { output_offset = pni->page_start_value; } - // anything below max depth is an offset - uint32_t len = idx < max_depth ? sizeof(cudf::size_type) : s->dtype_len; - - pni->data_out = - reinterpret_cast(s->col.column_data_base[idx]) + (output_offset * len); + pni->data_out = reinterpret_cast(s->col.column_data_base[idx]); + if (pni->data_out != nullptr) { + // anything below max depth with a valid data pointer must be a list, so the + // element size is the size of the offset type. + uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; + pni->data_out += (output_offset * len); + } pni->valid_map = s->col.valid_map_base[idx]; if (pni->valid_map != nullptr) { pni->valid_map += output_offset >> 5; @@ -1187,6 +1191,49 @@ static __device__ void store_validity(PageNestingInfo *pni, pni->valid_map_offset += value_count; } +/** + * @brief Compute the nesting bounds within the hierarchy to add values to, and the definition level + * D to which we should considered them null or not. + * + * @param[out] start_depth The start nesting depth + * @param[out] end_depth The end nesting depth (inclusive) + * @param[out] d The definition level up to which added values are not-null. if t is out of bounds, + * d will be -1 + * @param[in] s Local page information + * @param[in] input_value_count The current count of input level values we have processed + * @param[in] target_input_value_count The desired # of input level values we want to process + * @param[in] t Thread index + */ +inline __device__ void get_nesting_bounds(int &start_depth, + int &end_depth, + int &d, + page_state_s *s, + int input_value_count, + int32_t target_input_value_count, + int t) +{ + start_depth = -1; + end_depth = -1; + d = -1; + if (input_value_count + t < target_input_value_count) { + int index = rolling_index(input_value_count + t); + d = s->def[index]; + // if we have repetition (there are list columns involved) we have to + // bound what nesting levels we apply values to + if (s->col.max_level[level_type::REPETITION] > 0) { + int r = s->rep[index]; + start_depth = s->page.nesting[r].start_depth; + end_depth = s->page.nesting[d].end_depth; + } + // for columns without repetition (even ones involving structs) we always + // traverse the entire hierarchy. + else { + start_depth = 0; + end_depth = s->col.max_nesting_depth - 1; + } + } +} + /** * @brief Process a batch of incoming repetition/definition level values and generate * validity, nested column offsets (where appropriate) and decoding indices. @@ -1200,7 +1247,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu int t) { // max nesting depth of the column - int max_depth = s->col.max_level[level_type::REPETITION]; + int max_depth = s->col.max_nesting_depth; // how many (input) values we've processed in the page so far int input_value_count = s->input_value_count; // how many rows we've processed in the page so far @@ -1208,19 +1255,11 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // process until we've reached the target while (input_value_count < target_input_value_count) { - // determine the nesting bounds for this thread - int start_depth = -1; - int end_depth = -1; - int d = -1; - if (input_value_count + t < target_input_value_count) { - int index = rolling_index(input_value_count + t); - // important : we don't decode repetition levels for flat schemas. we can assume the - // repetition level is always 0. - int r = max_depth == 0 ? 0 : s->rep[index]; - start_depth = r; - d = s->def[index]; - end_depth = s->page.nesting[d].d_remap; - } + // determine the nesting bounds for this thread (the range of nesting depths we + // will generate new value indices and validity bits for) + int start_depth, end_depth, d; + get_nesting_bounds( + start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // 4 interesting things to track: // thread_value_count : # of output values from the view of this thread @@ -1247,22 +1286,23 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // compute warp and thread value counts uint32_t warp_count_mask = BALLOT((0 >= start_depth && 0 <= end_depth) && in_row_bounds ? 1 : 0); + warp_value_count = __popc(warp_count_mask); // Note : ((1 << t) - 1) implies "for all threads before me" thread_value_count = __popc(warp_count_mask & ((1 << t) - 1)); - // always walk from 0 to max_depth even if our start and end depths are different. - // otherwise we'd have thread/warp synchronization issues on the BALLOT() and WarpReduce() - // calls. + // walk from 0 to max_depth uint32_t next_thread_value_count, next_warp_value_count; - for (int s_idx = 0; s_idx <= max_depth; s_idx++) { + for (int s_idx = 0; s_idx < max_depth; s_idx++) { PageNestingInfo *pni = &s->page.nesting[s_idx]; - int in_bounds = ((s_idx >= start_depth && s_idx <= end_depth) && in_row_bounds) ? 1 : 0; + // if we are within the range of nesting levels we should be adding value indices for + int in_nesting_bounds = + ((s_idx >= start_depth && s_idx <= end_depth) && in_row_bounds) ? 1 : 0; - // everything up to the max_def_level is a real value + // everything up to the max_def_level is a non-null value uint32_t is_valid = 0; - if (d >= pni->max_def_level && in_bounds) { is_valid = 1; } + if (d >= pni->max_def_level && in_nesting_bounds) { is_valid = 1; } // compute warp and thread valid counts uint32_t warp_valid_mask; @@ -1282,25 +1322,27 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu thread_valid_count = __popc(warp_valid_mask & ((1 << thread_value_count) - 1)); warp_valid_count = __popc(warp_valid_mask); - // if this is the value column emit an index - if (is_valid && s_idx == max_depth) { + // if this is the value column emit an index for value decoding + if (is_valid && s_idx == max_depth - 1) { int idx = pni->valid_count + thread_valid_count; int ofs = pni->value_count + thread_value_count; s->nz_idx[rolling_index(idx)] = ofs; } // compute warp and thread value counts for the -next- nesting level. we need to - // do this for here nested schemas so that we can emit an offset for the -current- nesting + // do this for nested schemas so that we can emit an offset for the -current- nesting // level. more concretely : the offset for the current nesting level == current length of the // next nesting level - if (s_idx < max_depth) { + if (s_idx < max_depth - 1) { uint32_t next_warp_count_mask = BALLOT((s_idx + 1 >= start_depth && s_idx + 1 <= end_depth && in_row_bounds) ? 1 : 0); next_warp_value_count = __popc(next_warp_count_mask); next_thread_value_count = __popc(next_warp_count_mask & ((1 << t) - 1)); - // if we're -not- at a leaf column, and we're within row bounds, emit an offset - if (in_bounds) { + // if we're -not- at a leaf column and we're within nesting/row bounds + // and we have a valid data_out pointer, it implies this is a list column, so + // emit an offset. + if (in_nesting_bounds && pni->data_out != nullptr) { int idx = pni->value_count + thread_value_count; cudf::size_type ofs = s->page.nesting[s_idx + 1].value_count + next_thread_value_count + s->page.nesting[s_idx + 1].page_start_value; @@ -1329,7 +1371,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // update if (!t) { // update valid value count for decoding and total # of values we've processed - s->nz_count = s->page.nesting[max_depth].valid_count; + s->nz_count = s->page.nesting[max_depth - 1].valid_count; s->input_value_count = input_value_count; s->input_row_count = input_row_count; } @@ -1352,23 +1394,22 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu */ __device__ void gpuDecodeLevels(page_state_s *s, int32_t target_leaf_count, int t) { - int max_depth = s->col.max_level[level_type::REPETITION]; + bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; constexpr int batch_size = 32; int cur_leaf_count = target_leaf_count; while (!s->error && s->nz_count < target_leaf_count && s->input_value_count < s->num_input_values) { - // only need to decode repetition levels in the case of a nested schema - if (max_depth > 0) { gpuDecodeStream(s->rep, s, cur_leaf_count, t, level_type::REPETITION); } + if (has_repetition) { gpuDecodeStream(s->rep, s, cur_leaf_count, t, level_type::REPETITION); } gpuDecodeStream(s->def, s, cur_leaf_count, t, level_type::DEFINITION); SYNCWARP(); // because the rep and def streams are encoded seperately, we cannot request an exact // # of values to be decoded at once. we can only process the lowest # of decoded rep/def // levels we get. - int actual_leaf_count = max_depth > 0 ? min(s->lvl_count[level_type::REPETITION], - s->lvl_count[level_type::DEFINITION]) - : s->lvl_count[level_type::DEFINITION]; + int actual_leaf_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], + s->lvl_count[level_type::DEFINITION]) + : s->lvl_count[level_type::DEFINITION]; // process what we got back gpuUpdateValidityOffsetsAndRowIndices(actual_leaf_count, s, t); @@ -1396,7 +1437,8 @@ static __device__ void gpuUpdatePageSizes(page_state_s *s, bool bounds_set) { // max nesting depth of the column - int max_depth = s->col.max_level[level_type::REPETITION]; + int max_depth = s->col.max_nesting_depth; + // bool has_repetition = s->col.max_level[level_type::REPETITION] > 0 ? true : false; // how many input level values we've processed in the page so far int input_value_count = s->input_value_count; // how many leaf values we've processed in the page so far @@ -1405,22 +1447,14 @@ static __device__ void gpuUpdatePageSizes(page_state_s *s, int input_row_count = s->input_row_count; while (input_value_count < target_input_value_count) { - // determine the nesting bounds for this thread - int start_depth = -1; - int end_depth = -1; - int d = -1; - if (input_value_count + t < target_input_value_count) { - int index = rolling_index(input_value_count + t); - int r = s->rep[index]; - start_depth = r; - d = s->def[index]; - end_depth = s->page.nesting[d].d_remap; - } + int start_depth, end_depth, d; + get_nesting_bounds( + start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // count rows and leaf values int is_new_row = start_depth == 0 ? 1 : 0; uint32_t warp_row_count_mask = BALLOT(is_new_row); - int is_new_leaf = (d >= s->page.nesting[max_depth].max_def_level) ? 1 : 0; + int is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; uint32_t warp_leaf_count_mask = BALLOT(is_new_leaf); // is this thread within row bounds? on the first pass we don't know the bounds, so we will be @@ -1451,9 +1485,11 @@ static __device__ void gpuUpdatePageSizes(page_state_s *s, } // increment counts across all nesting depths - for (int s_idx = 0; s_idx <= max_depth; s_idx++) { - int in_bounds = (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; - uint32_t count_mask = BALLOT(in_bounds); + for (int s_idx = 0; s_idx < max_depth; s_idx++) { + // if we are within the range of nesting levels we should be adding value indices for + int in_nesting_bounds = (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; + + uint32_t count_mask = BALLOT(in_nesting_bounds); if (!t) { s->page.nesting[s_idx].size += __popc(count_mask); } } @@ -1526,6 +1562,8 @@ extern "C" __global__ void __launch_bounds__(NTHREADS) } __syncthreads(); + bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; + // optimization : it might be useful to have a version of gpuDecodeStream that could go // wider than 1 warp. Currently it only only uses 1 warp so that it can overlap work // with the value decoding step when in the actual value decoding kernel. however during @@ -1536,13 +1574,16 @@ extern "C" __global__ void __launch_bounds__(NTHREADS) while (!s->error && s->input_value_count < s->num_input_values) { // decode repetition and definition levels. these will attempt to decode at // least up to the target, but may decode a few more. - gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION); + if (has_repetition) { + gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION); + } gpuDecodeStream(s->def, s, target_input_count, t, level_type::DEFINITION); SYNCWARP(); // we may have decoded different amounts from each stream, so only process what we've been - int actual_input_count = - min(s->lvl_count[level_type::REPETITION], s->lvl_count[level_type::DEFINITION]); + int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], + s->lvl_count[level_type::DEFINITION]) + : s->lvl_count[level_type::DEFINITION]; // process what we got back gpuUpdatePageSizes(s, actual_input_count, t, trim_pass); @@ -1637,7 +1678,7 @@ extern "C" __global__ void __launch_bounds__(NTHREADS) if (out_pos < target_pos && output_value_idx >= 0 && output_value_idx < s->num_input_values) { // nesting level that is storing actual leaf values - int leaf_level_index = s->col.max_level[level_type::REPETITION]; + int leaf_level_index = s->col.max_nesting_depth - 1; uint32_t dtype_len = s->dtype_len; uint8_t *dst = s->page.nesting[leaf_level_index].data_out + @@ -1689,7 +1730,7 @@ struct chunk_row_output_iter { struct start_offset_output_iterator { PageInfo *p; - int col_index; + int src_col_schema; int nesting_depth; int empty = 0; using value_type = size_type; @@ -1700,24 +1741,20 @@ struct start_offset_output_iterator { start_offset_output_iterator operator+ __host__ __device__(int i) { - return start_offset_output_iterator{p + i, col_index, nesting_depth}; + return start_offset_output_iterator{p + i, src_col_schema, nesting_depth}; } void operator++ __host__ __device__() { p++; } reference operator[] __device__(int i) { return dereference(p + i); } reference operator*__device__() { return dereference(p); } - void operator= __device__(value_type v) - { - if (p->column_idx == col_index && !(p->flags & PAGEINFO_FLAGS_DICTIONARY)) { - p->nesting[nesting_depth].page_start_value = 2; - } - } private: reference __device__ dereference(PageInfo *p) { - if (p->column_idx != col_index || p->flags & PAGEINFO_FLAGS_DICTIONARY) { return empty; } + if (p->src_col_schema != src_col_schema || p->flags & PAGEINFO_FLAGS_DICTIONARY) { + return empty; + } return p->nesting[nesting_depth].page_start_value; } }; @@ -1727,10 +1764,12 @@ struct start_offset_output_iterator { */ cudaError_t PreprocessColumnData(hostdevice_vector &pages, hostdevice_vector const &chunks, - std::vector>> &nested_info, + std::vector &input_columns, + std::vector &output_columns, size_t num_rows, size_t min_row, - cudaStream_t stream) + cudaStream_t stream, + rmm::mr::device_memory_resource *mr) { dim3 dim_block(NTHREADS, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page @@ -1764,35 +1803,49 @@ cudaError_t PreprocessColumnData(hostdevice_vector &pages, // back, this value will get overwritten later on). pages.device_to_host(stream, true); - // computes: - // output column sizes for each level of nesting (summing PageNestingInfo::size) - // per-page start offsets for each level of nesting - for (size_t idx = 0; idx < nested_info.size(); idx++) { - size_t max_depth = nested_info[idx].size() - 1; - for (size_t l_idx = 0; l_idx <= max_depth; l_idx++) { - // column size - auto page_input = thrust::make_transform_iterator( - pages.device_ptr(), [idx, l_idx] __device__(PageInfo const &page) { - if (page.column_idx != idx || page.flags & PAGEINFO_FLAGS_DICTIONARY) { return 0; } + // compute output column sizes by examining the pages of the -input- columns + for (size_t idx = 0; idx < input_columns.size(); idx++) { + auto const &input_col = input_columns[idx]; + auto src_col_schema = input_col.schema_idx; + size_t max_depth = input_col.nesting_depth(); + + auto *cols = &output_columns; + for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { + auto &out_buf = (*cols)[input_col.nesting[l_idx]]; + cols = &out_buf.children; + + auto size_input = thrust::make_transform_iterator( + pages.device_ptr(), [src_col_schema, l_idx] __device__(PageInfo const &page) { + if (page.src_col_schema != src_col_schema || page.flags & PAGEINFO_FLAGS_DICTIONARY) { + return 0; + } return page.nesting[l_idx].size; }); - nested_info[idx][l_idx].first = - thrust::reduce(rmm::exec_policy(stream)->on(stream), page_input, page_input + pages.size()); - // add 1 for non-leaf levels for the terminating offset - if (l_idx < max_depth) { nested_info[idx][l_idx].first++; } + // column size + // for struct columns, higher levels of the output columns are shared between input + // columns. so don't compute any given level more than once. + if (out_buf.size == 0) { + int size = thrust::reduce( + rmm::exec_policy(stream)->on(stream), size_input, size_input + pages.size()); + + // if this is a list column add 1 for non-leaf levels for the terminating offset + if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } + + // allocate + out_buf.create(size, stream, mr); + } // per-page start offset auto key_input = thrust::make_transform_iterator( - pages.device_ptr(), [] __device__(PageInfo const &page) { return page.column_idx; }); + pages.device_ptr(), [] __device__(PageInfo const &page) { return page.src_col_schema; }); thrust::exclusive_scan_by_key( rmm::exec_policy(stream)->on(stream), key_input, key_input + pages.size(), - page_input, - start_offset_output_iterator{pages.device_ptr(), - static_cast(chunks[idx].dst_col_index), - static_cast(l_idx)}); + size_input, + start_offset_output_iterator{ + pages.device_ptr(), static_cast(src_col_schema), static_cast(l_idx)}); } } diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 72f14d8de25..4793499361e 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -218,10 +218,10 @@ extern "C" __global__ void __launch_bounds__(128) PageInfo *page_info; if (!t) { - bs->base = bs->cur = bs->ck.compressed_data; - bs->end = bs->base + bs->ck.compressed_size; - bs->page.chunk_idx = chunk; - bs->page.column_idx = bs->ck.dst_col_index; + bs->base = bs->cur = bs->ck.compressed_data; + bs->end = bs->base + bs->ck.compressed_size; + bs->page.chunk_idx = chunk; + bs->page.src_col_schema = bs->ck.src_col_schema; // this computation is only valid for flat schemas. for nested schemas, // they will be recomputed in the preprocess step by examining repetition and // definition levels diff --git a/cpp/src/io/parquet/parquet.cpp b/cpp/src/io/parquet/parquet.cpp index 270cb68087d..1b8dcf126db 100644 --- a/cpp/src/io/parquet/parquet.cpp +++ b/cpp/src/io/parquet/parquet.cpp @@ -290,7 +290,7 @@ PARQUET_END_STRUCT() */ bool CompactProtocolReader::InitSchema(FileMetaData *md) { - if (WalkSchema(md->schema) != md->schema.size()) return false; + if (WalkSchema(md) != md->schema.size()) return false; /* Inside FileMetaData, there is a std::vector of RowGroups and each RowGroup contains a * a std::vector of ColumnChunks. Each ColumnChunk has a member ColumnMetaData, which contains @@ -313,24 +313,19 @@ bool CompactProtocolReader::InitSchema(FileMetaData *md) }(); if (it == md->schema.cend()) return false; current_schema_index = std::distance(md->schema.cbegin(), it); - - // if the schema index is already pointing at a nested type, we'll leave it alone. - if (column.schema_idx < 0 || - md->schema[column.schema_idx].converted_type != parquet::LIST) { - column.schema_idx = current_schema_index; - } - column.leaf_schema_idx = current_schema_index; - parent = current_schema_index; + column.schema_idx = current_schema_index; + parent = current_schema_index; } } } + return true; } /** * @brief Populates each node in the schema tree * - * @param[out] schema Current node + * @param[out] md File metadata * @param[in] idx Current node index * @param[in] parent_idx Parent node index * @param[in] max_def_level Max definition level @@ -339,10 +334,10 @@ bool CompactProtocolReader::InitSchema(FileMetaData *md) * @return The node index that was populated */ int CompactProtocolReader::WalkSchema( - std::vector &schema, int idx, int parent_idx, int max_def_level, int max_rep_level) + FileMetaData *md, int idx, int parent_idx, int max_def_level, int max_rep_level) { - if (idx >= 0 && (size_t)idx < schema.size()) { - SchemaElement *e = &schema[idx]; + if (idx >= 0 && (size_t)idx < md->schema.size()) { + SchemaElement *e = &md->schema[idx]; if (e->repetition_type == OPTIONAL) { ++max_def_level; } else if (e->repetition_type == REPEATED) { @@ -352,12 +347,13 @@ int CompactProtocolReader::WalkSchema( e->max_definition_level = max_def_level; e->max_repetition_level = max_rep_level; e->parent_idx = parent_idx; - parent_idx = idx; + + parent_idx = idx; ++idx; if (e->num_children > 0) { for (int i = 0; i < e->num_children; i++) { int idx_old = idx; - idx = WalkSchema(schema, idx, parent_idx, max_def_level, max_rep_level); + idx = WalkSchema(md, idx, parent_idx, max_def_level, max_rep_level); if (idx <= idx_old) break; // Error } } diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index da2593213b2..01d4e4e85df 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -49,7 +49,7 @@ struct file_ender_s { * * Parquet is a strongly-typed format so the file layout can be interpreted as * as a schema tree. - **/ + */ struct SchemaElement { Type type = UNDEFINED_TYPE; ConvertedType converted_type = UNKNOWN; @@ -73,6 +73,38 @@ struct SchemaElement { name == other.name && num_children == other.num_children && decimal_scale == other.decimal_scale && decimal_precision == other.decimal_precision; } + + // the parquet format is a little squishy when it comes to interpreting + // repeated fields. sometimes repeated fields act as "stubs" in the schema + // that don't represent a true nesting level. + // + // this is the case with plain lists: + // + // optional group my_list (LIST) { + // repeated group element { <-- not part of the output hierarchy + // required binary str (UTF8); + // }; + // } + // + // However, for backwards compatibility reasons, there are a few special cases, namely + // List> (which also corresponds to how the map type is specified), where + // this does not hold true + // + // optional group my_list (LIST) { + // repeated group element { <-- part of the hierarchy because it represents a struct + // required binary str (UTF8); + // required int32 num; + // }; + // } + bool is_stub() const { return repetition_type == REPEATED && num_children == 1; } + // in parquet terms, a group is a level of nesting in the schema. a group + // can be a struct or a list + bool is_struct() const + { + return type == UNDEFINED_TYPE && + // this assumption might be a little weak. + ((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children == 2)); + } }; /** @@ -114,10 +146,6 @@ struct ColumnChunk { // Following fields are derived from other fields int schema_idx = -1; // Index in flattened schema (derived from path_in_schema) - // if this is a non-nested type, this index will be the same as schema_idx. - // for a nested type, this will point to the fundamental leaf type schema - // element (int, string, etc) - int leaf_schema_idx = -1; }; /** @@ -308,7 +336,7 @@ class CompactProtocolReader { bool InitSchema(FileMetaData *md); protected: - int WalkSchema(std::vector &schema, + int WalkSchema(FileMetaData *md, int idx = 0, int parent_idx = 0, int max_def_level = 0, diff --git a/cpp/src/io/parquet/parquet_common.hpp b/cpp/src/io/parquet/parquet_common.hpp index 9d2017faa54..366217cb4a4 100644 --- a/cpp/src/io/parquet/parquet_common.hpp +++ b/cpp/src/io/parquet/parquet_common.hpp @@ -18,6 +18,9 @@ #include +#include +#include + namespace cudf { namespace io { namespace parquet { diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 2c90b96aabd..2247926b316 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -19,13 +19,28 @@ #include #include #include +#include #include +#include #include #include namespace cudf { namespace io { namespace parquet { + +/** + * @brief Struct representing an input column in the file. + */ +struct input_column_info { + int schema_idx; + std::string name; + // size == nesting depth. the associated real output + // buffer index in the dest column for each level of nesting. + std::vector nesting; + auto nesting_depth() const { return nesting.size(); } +}; + namespace gpu { /** @@ -57,15 +72,14 @@ struct nvstrdesc_s { * @brief Nesting information */ struct PageNestingInfo { - // input definition levels are remapped with this into - // the corresponding output PageNestingInfo struct - // within the same PageInfo. - // eg. - // PageNestingInfo *out = &page.nesting[page.nesting[d].d_remap]; - int32_t d_remap; + // input repetition/definition levels are remapped with these values + // into the corresponding real output nesting depths. + int32_t start_depth; + int32_t end_depth; // set at initialization int32_t max_def_level; + int32_t max_rep_level; // set during preprocessing int32_t size; // this page/nesting-level's size contribution to the output column @@ -76,7 +90,7 @@ struct PageNestingInfo { int32_t value_count; // total # of values decoded in this page/nesting-level int32_t valid_map_offset; // current offset in bits relative to valid_map uint8_t *data_out; // pointer into output buffer - uint32_t *valid_map; // pointer into output validity buferr + uint32_t *valid_map; // pointer into output validity buffer }; /** @@ -95,12 +109,12 @@ struct PageInfo { // - In the case of a nested schema, you have to decode the repetition and definition // levels to extract actual column values int32_t num_input_values; - int32_t chunk_row; // starting row of this page relative to the start of the chunk - int32_t num_rows; // number of rows in this page - int32_t chunk_idx; // column chunk this page belongs to - int32_t column_idx; // column index this page belongs to - uint8_t flags; // PAGEINFO_FLAGS_XXX - uint8_t encoding; // Encoding for data or dictionary page + int32_t chunk_row; // starting row of this page relative to the start of the chunk + int32_t num_rows; // number of rows in this page + int32_t chunk_idx; // column chunk this page belongs to + int32_t src_col_schema; // schema index of this column + uint8_t flags; // PAGEINFO_FLAGS_XXX + uint8_t encoding; // Encoding for data or dictionary page uint8_t definition_level_encoding; // Encoding used for definition levels (data page) uint8_t repetition_level_encoding; // Encoding used for repetition levels (data page) @@ -126,20 +140,22 @@ struct ColumnChunkDesc { uint32_t num_rows_, int16_t max_definition_level_, int16_t max_repetition_level_, + int16_t max_nesting_depth_, uint8_t def_level_bits_, uint8_t rep_level_bits_, int8_t codec_, int8_t converted_type_, int8_t decimal_scale_, int32_t ts_clock_rate_, - int32_t dst_col_index_, - int32_t src_col_index_) + int32_t src_col_index_, + int32_t src_col_schema_) : compressed_data(compressed_data_), compressed_size(compressed_size_), num_values(num_values_), start_row(start_row_), num_rows(num_rows_), max_level{max_definition_level_, max_repetition_level_}, + max_nesting_depth{max_nesting_depth_}, data_type(datatype_ | (datatype_length_ << 3)), level_bits{def_level_bits_, rep_level_bits_}, num_data_pages(0), @@ -153,8 +169,8 @@ struct ColumnChunkDesc { converted_type(converted_type_), decimal_scale(decimal_scale_), ts_clock_rate(ts_clock_rate_), - dst_col_index(dst_col_index_), - src_col_index(src_col_index_) + src_col_index(src_col_index_), + src_col_schema(src_col_schema_) { } @@ -164,6 +180,7 @@ struct ColumnChunkDesc { size_t start_row; // starting row of this chunk uint32_t num_rows; // number of rows in this chunk int16_t max_level[level_type::NUM_LEVEL_TYPES]; // max definition/repetition level + int16_t max_nesting_depth; // max nesting depth of the output uint16_t data_type; // basic column data type, ((type_length << 3) | // parquet::Type) uint8_t @@ -181,8 +198,8 @@ struct ColumnChunkDesc { int8_t decimal_scale; // decimal scale pow(10, -decimal_scale) int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) - int32_t dst_col_index; // my output column index - int32_t src_col_index; // my source (order in the file) column index + int32_t src_col_index; // my input column index + int32_t src_col_schema; // my schema index in the file }; /** @@ -315,12 +332,14 @@ cudaError_t BuildStringDictionaryIndex(ColumnChunkDesc *chunks, * the parquet headers when dealing with nested schemas. * - The total sizes of all output columns at all nesting levels * - The starting output buffer offset for each page, for each nesting level - * * For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders) * + * Note : this function is where output device memory is allocated for nested columns. + * * @param[in,out] pages All pages to be decoded * @param[in] chunks All chunks to be decoded - * @param[in,out] nested_info Per-output column nesting information (size, nullability) + * @param[in,out] input_columns Input column information + * @param[in,out] output_columns Output column information * @param[in] num_rows Maximum number of rows to read * @param[in] min_rows crop all rows below min_row * @param[in] stream Cuda stream @@ -329,10 +348,12 @@ cudaError_t BuildStringDictionaryIndex(ColumnChunkDesc *chunks, */ cudaError_t PreprocessColumnData(hostdevice_vector &pages, hostdevice_vector const &chunks, - std::vector>> &nested_sizes, + std::vector &input_columns, + std::vector &output_columns, size_t num_rows, size_t min_row, - cudaStream_t stream = (cudaStream_t)0); + cudaStream_t stream, + rmm::mr::device_memory_resource *mr); /** * @brief Launches kernel for reading the column data stored in the pages diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 8feb688946a..f36546e542d 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -44,6 +44,10 @@ namespace parquet { using namespace cudf::io::parquet; using namespace cudf::io; +// bit space we are reserving in column_buffer::user_data +constexpr uint32_t PARQUET_COLUMN_BUFFER_SCHEMA_MASK = (0xffffff); +constexpr uint32_t PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED = (1 << 24); + namespace { /** * @brief Function that translates Parquet datatype to cuDF type enum @@ -84,11 +88,16 @@ type_id to_type_id(SchemaElement const &schema, } break; + // maps are just List>. + case parquet::MAP: case parquet::LIST: return type_id::LIST; default: break; } + // is it simply a struct? + if (schema.is_struct()) { return type_id::STRUCT; } + // Physical storage type supported by Parquet; controls the on-disk storage // format in combination with the encoding type. switch (physical) { @@ -233,7 +242,6 @@ class aggregate_metadata { std::map const agg_keyval_map; size_type const num_rows; size_type const num_row_groups; - std::vector const column_names; /** * @brief Create a metadata object from each element in the source vector */ @@ -281,27 +289,13 @@ class aggregate_metadata { return sum + pfm.row_groups.size(); }); } - std::vector gather_column_names() - { - for (auto const &pfm : per_file_metadata) { - if (pfm.row_groups.size() != 0) { - std::vector column_names; - for (const auto &chunk : pfm.row_groups[0].columns) { - column_names.emplace_back(name_from_path(chunk.meta_data.path_in_schema)); - } - return column_names; - } - } - return {}; - } public: aggregate_metadata(std::vector> const &sources) : per_file_metadata(metadatas_from_sources(sources)), agg_keyval_map(merge_keyval_metadata()), num_rows(calc_num_rows()), - num_row_groups(calc_num_row_groups()), - column_names(gather_column_names()) + num_row_groups(calc_num_row_groups()) { // Verify that the input files have matching numbers of columns size_type num_cols = -1; @@ -321,52 +315,51 @@ class aggregate_metadata { } } - auto const &get_row_group(size_type idx, size_type src_idx) const + auto const &get_row_group(size_type row_group_index, size_type src_idx) const { CUDF_EXPECTS(src_idx >= 0 && src_idx < static_cast(per_file_metadata.size()), "invalid source index"); - return per_file_metadata[src_idx].row_groups[idx]; + return per_file_metadata[src_idx].row_groups[row_group_index]; + } + + auto const &get_column_metadata(size_type row_group_index, + size_type src_idx, + int schema_idx) const + { + auto col = std::find_if( + per_file_metadata[src_idx].row_groups[row_group_index].columns.begin(), + per_file_metadata[src_idx].row_groups[row_group_index].columns.end(), + [schema_idx](ColumnChunk const &col) { return col.schema_idx == schema_idx ? true : false; }); + CUDF_EXPECTS(col != std::end(per_file_metadata[src_idx].row_groups[row_group_index].columns), + "Found no metadata for schema index"); + return col->meta_data; } auto get_num_rows() const { return num_rows; } auto get_num_row_groups() const { return num_row_groups; } - auto const &get_schema(int idx) const { return per_file_metadata[0].schema[idx]; } + auto const &get_schema(int schema_idx) const { return per_file_metadata[0].schema[schema_idx]; } auto const &get_key_value_metadata() const { return agg_keyval_map; } - inline SchemaElement const &get_column_schema(int col_index) const - { - auto &pfm = per_file_metadata[0]; - return pfm.schema[pfm.row_groups[0].columns[col_index].schema_idx]; - } - - inline int get_column_leaf_schema_index(int col_index) const - { - return per_file_metadata[0].row_groups[0].columns[col_index].leaf_schema_idx; - } - - inline SchemaElement const &get_column_leaf_schema(int col_index) const - { - return per_file_metadata[0].schema[get_column_leaf_schema_index(col_index)]; - } - - inline int get_nesting_depth(int col_index) + /** + * @brief Gets the concrete nesting depth of output cudf columns + * + * @param schema_index Schema index of the input column + * + * @return comma-separated index column names in quotes + */ + inline int get_output_nesting_depth(int schema_index) const { auto &pfm = per_file_metadata[0]; - - // see : the "Nested Types" section here - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - int index = get_column_leaf_schema_index(col_index); int depth = 0; // walk upwards, skipping repeated fields - while (index > 0) { - if (pfm.schema[index].repetition_type != REPEATED) { depth++; } - index = pfm.schema[index].parent_idx; + while (schema_index > 0) { + if (!pfm.schema[schema_index].is_stub()) { depth++; } + schema_index = pfm.schema[schema_index].parent_idx; } - return depth; } @@ -495,38 +488,266 @@ class aggregate_metadata { return selection; } + /** + * @brief Build input and output column structures based on schema input. Recursive. + * + * @param[in,out] schema_idx Schema index to build information for. This value gets + * incremented as the function recurses. + * @param[out] input_columns Input column information (source data in the file) + * @param[out] output_columns Output column structure (resulting cudf columns) + * @param[in,out] nesting A stack keeping track of child column indices so we can + * reproduce the linear list of output columns that correspond to an input column. + * @param[in] strings_to_categorical Type conversion parameter + * @param[in] timestamp_type_id Type conversion parameter + * + */ + void build_column_info(int &schema_idx, + std::vector &input_columns, + std::vector &output_columns, + std::deque &nesting, + bool strings_to_categorical, + type_id timestamp_type_id) const + { + int start_schema_idx = schema_idx; + auto const &schema = get_schema(schema_idx); + schema_idx++; + + // if I am a stub, continue on + if (schema.is_stub()) { + // is this legit? + CUDF_EXPECTS(schema.num_children == 1, "Unexpected number of children for stub"); + build_column_info(schema_idx, + input_columns, + output_columns, + nesting, + strings_to_categorical, + timestamp_type_id); + return; + } + + // if we're at the root, this is a new output column + int index = (int)output_columns.size(); + nesting.push_back(static_cast(output_columns.size())); + output_columns.emplace_back( + data_type{to_type_id(schema, strings_to_categorical, timestamp_type_id)}, + schema.repetition_type == OPTIONAL ? true : false); + column_buffer &output_col = output_columns.back(); + output_col.name = schema.name; + + // build each child + for (int idx = 0; idx < schema.num_children; idx++) { + build_column_info(schema_idx, + input_columns, + output_col.children, + nesting, + strings_to_categorical, + timestamp_type_id); + } + + // if I have no children, we're at a leaf and I'm an input column (that is, one with actual + // data stored) so add me to the list. + if (schema.num_children == 0) { + input_columns.emplace_back(input_column_info{start_schema_idx, schema.name}); + input_column_info &input_col = input_columns.back(); + std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting)); + } + + nesting.pop_back(); + } + /** * @brief Filters and reduces down to a selection of columns * * @param use_names List of column names to select * @param include_index Whether to always include the PANDAS index column(s) + * @param strings_to_categorical Type conversion parameter + * @param timestamp_type_id Type conversion parameter * - * @return List of column names + * @return input column information, output column information, list of output column schema + * indices */ - auto select_columns(std::vector const &use_names, bool include_index) const + auto select_columns(std::vector const &use_names, + bool include_index, + bool strings_to_categorical, + type_id timestamp_type_id) const { - std::vector> selection; + auto const &pfm = per_file_metadata[0]; + + // determine the list of output columns + // + // there is not necessarily a 1:1 mapping between input columns and output columns. + // For example, parquet does not explicitly store a ColumnChunkDesc for struct columns. + // The "structiness" is simply implied by the schema. For example, this schema: + // required group field_id=1 name { + // required binary field_id=2 firstname (String); + // required binary field_id=3 middlename (String); + // required binary field_id=4 lastname (String); + // } + // will only contain 3 internal columns of data (firstname, middlename, lastname). But of + // course "name" is ultimately the struct column we want to return. + // + // "firstname", "middlename" and "lastname" represent the input columns in the file that we + // process to produce the final cudf "name" column. + // + std::vector output_column_schemas; if (use_names.empty()) { - // No columns specified; include all in the dataset - for (const auto &name : column_names) { selection.emplace_back(selection.size(), name); } + // walk the schema and choose all top level columns + for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) { + auto const &schema = pfm.schema[schema_idx]; + if (schema.parent_idx == 0) { output_column_schemas.push_back(schema_idx); } + } } else { // Load subset of columns; include PANDAS index unless excluded std::vector local_use_names = use_names; if (include_index) { add_pandas_index_names(local_use_names); } for (const auto &use_name : local_use_names) { - for (size_t i = 0; i < column_names.size(); ++i) { - if (column_names[i] == use_name) { - selection.emplace_back(i, column_names[i]); - break; - } + for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) { + auto const &schema = pfm.schema[schema_idx]; + if (use_name == schema.name) { output_column_schemas.push_back(schema_idx); } } } } - return selection; + // construct input and output output column info + std::vector output_columns; + output_columns.reserve(output_column_schemas.size()); + std::vector input_columns; + std::deque nesting; + for (size_t idx = 0; idx < output_column_schemas.size(); idx++) { + int schema_index = output_column_schemas[idx]; + build_column_info(schema_index, + input_columns, + output_columns, + nesting, + strings_to_categorical, + timestamp_type_id); + } + + return std::make_tuple( + std::move(input_columns), std::move(output_columns), std::move(output_column_schemas)); } }; +/** + * @brief Generate depth remappings for repetition and definition levels. + * + * When dealing with columns that contain lists, we must examine incoming + * repetition and definition level pairs to determine what range of output nesting + * is indicated when adding new values. This function generates the mappings of + * the R/D levels to those start/end bounds + * + * @param remap Maps column schema index to the R/D remapping vectors for that column + * @param src_col_schema The column schema to generate the new mapping for + * @param md File metadata information + * + */ +void generate_depth_remappings(std::map, std::vector>> &remap, + int src_col_schema, + aggregate_metadata const &md) +{ + // already generated for this level + if (remap.find(src_col_schema) != remap.end()) { return; } + auto schema = md.get_schema(src_col_schema); + int max_depth = md.get_output_nesting_depth(src_col_schema); + + CUDF_EXPECTS(remap.find(src_col_schema) == remap.end(), + "Attempting to remap a schema more than once"); + auto inserted = + remap.insert(std::pair, std::vector>>{src_col_schema, {}}); + auto &depth_remap = inserted.first->second; + + std::vector &rep_depth_remap = (depth_remap.first); + rep_depth_remap.resize(schema.max_repetition_level + 1); + std::vector &def_depth_remap = (depth_remap.second); + def_depth_remap.resize(schema.max_definition_level + 1); + + // the key: + // for incoming level values R/D + // add values starting at the shallowest nesting level X has repetition level R + // until you reach the deepest nesting level Y that corresponds to the repetition level R1 + // held by the nesting level that has definition level D + // + // Example: a 3 level struct with a list at the bottom + // + // R / D Depth + // level0 0 / 1 0 + // level1 0 / 2 1 + // level2 0 / 3 2 + // list 0 / 3 3 + // element 1 / 4 4 + // + // incoming R/D : 0, 0 -> add values from depth 0 to 3 (def level 0 always maps to depth 0) + // incoming R/D : 0, 1 -> add values from depth 0 to 3 + // incoming R/D : 0, 2 -> add values from depth 0 to 3 + // incoming R/D : 1, 4 -> add values from depth 4 to 4 + // + // Note : the -validity- of values is simply checked by comparing the incoming D value against the + // D value of the given nesting level (incoming D >= the D for the nesting level == valid, + // otherwise NULL). The tricky part is determining what nesting levels to add values at. + // + // For schemas with no repetition level (no lists), X is always 0 and Y is always max nesting + // depth. + // + + // compute "X" from above + for (int s_idx = schema.max_repetition_level; s_idx >= 0; s_idx--) { + auto find_shallowest = [&](int r) { + int shallowest = -1; + int cur_depth = max_depth - 1; + int schema_idx = src_col_schema; + while (schema_idx > 0) { + auto cur_schema = md.get_schema(schema_idx); + if (cur_schema.max_repetition_level == r) { + // if this is a repeated field, map it one level deeper + shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth; + } + if (!cur_schema.is_stub()) { cur_depth--; } + schema_idx = cur_schema.parent_idx; + } + return shallowest; + }; + rep_depth_remap[s_idx] = find_shallowest(s_idx); + } + + // compute "Y" from above + for (int s_idx = schema.max_definition_level; s_idx >= 0; s_idx--) { + auto find_deepest = [&](int d) { + SchemaElement prev_schema; + int schema_idx = src_col_schema; + int r1 = 0; + while (schema_idx > 0) { + SchemaElement cur_schema = md.get_schema(schema_idx); + if (cur_schema.max_definition_level == d) { + // if this is a repeated field, map it one level deeper + r1 = cur_schema.is_stub() ? prev_schema.max_repetition_level + : cur_schema.max_repetition_level; + break; + } + prev_schema = cur_schema; + schema_idx = cur_schema.parent_idx; + } + + // we now know R1 from above. return the deepest nesting level that has the + // same repetition level + schema_idx = src_col_schema; + int depth = max_depth - 1; + while (schema_idx > 0) { + SchemaElement cur_schema = md.get_schema(schema_idx); + if (cur_schema.max_repetition_level == r1) { + // if this is a repeated field, map it one level deeper + depth = cur_schema.is_stub() ? depth + 1 : depth; + break; + } + if (!cur_schema.is_stub()) { depth--; } + prev_schema = cur_schema; + schema_idx = cur_schema.parent_idx; + } + return depth; + }; + def_depth_remap[s_idx] = find_deepest(s_idx); + } +} + /** * @copydoc cudf::io::detail::parquet::read_column_chunks */ @@ -731,25 +952,19 @@ rmm::device_buffer reader::impl::decompress_page_data( /** * @copydoc cudf::io::detail::parquet::allocate_nesting_info */ -void reader::impl::allocate_nesting_info( - hostdevice_vector const &chunks, - hostdevice_vector &pages, - hostdevice_vector &page_nesting_info, - std::vector>> &col_nesting_info, - int num_columns, - cudaStream_t stream) +void reader::impl::allocate_nesting_info(hostdevice_vector const &chunks, + hostdevice_vector &pages, + hostdevice_vector &page_nesting_info, + cudaStream_t stream) { - // resize col_nesting_info - col_nesting_info.resize(num_columns); - // compute total # of page_nesting infos needed and allocate space. doing this in one // buffer to keep it to a single gpu allocation size_t const total_page_nesting_infos = std::accumulate( chunks.host_ptr(), chunks.host_ptr() + chunks.size(), 0, [&](int total, auto &chunk) { - auto const src_col_index = chunk.src_col_index; - // the leaf schema represents the bottom of the nested hierarchy - auto const &leaf_schema = _metadata->get_column_leaf_schema(src_col_index); - auto const per_page_nesting_info_size = leaf_schema.max_definition_level + 1; + // the schema of the input column + auto const &schema = _metadata->get_schema(chunk.src_col_schema); + auto const per_page_nesting_info_size = max( + schema.max_definition_level + 1, _metadata->get_output_nesting_depth(chunk.src_col_schema)); return total + (per_page_nesting_info_size * chunk.num_data_pages); }); @@ -762,9 +977,10 @@ void reader::impl::allocate_nesting_info( int target_page_index = 0; int src_info_index = 0; for (size_t idx = 0; idx < chunks.size(); idx++) { - int src_col_index = chunks[idx].src_col_index; - auto &leaf_schema = _metadata->get_column_leaf_schema(src_col_index); - int per_page_nesting_info_size = leaf_schema.max_definition_level + 1; + int src_col_schema = chunks[idx].src_col_schema; + auto &schema = _metadata->get_schema(src_col_schema); + auto const per_page_nesting_info_size = + max(schema.max_definition_level + 1, _metadata->get_output_nesting_depth(src_col_schema)); // skip my dict pages target_page_index += chunks[idx].num_dict_pages; @@ -782,62 +998,64 @@ void reader::impl::allocate_nesting_info( // fill in int nesting_info_index = 0; + std::map, std::vector>> depth_remapping; for (size_t idx = 0; idx < chunks.size(); idx++) { - int dst_col_index = chunks[idx].dst_col_index; - int src_col_index = chunks[idx].src_col_index; + int src_col_schema = chunks[idx].src_col_schema; - // the leaf schema represents the bottom of the nested hierarchy - auto &leaf_schema = _metadata->get_column_leaf_schema(src_col_index); - // real depth of the output cudf column hiearchy (1 == no nesting, 2 == 1 level, etc) - int max_depth = _metadata->get_nesting_depth(src_col_index); + // schema of the input column + auto &schema = _metadata->get_schema(src_col_schema); + // real depth of the output cudf column hierarchy (1 == no nesting, 2 == 1 level, etc) + int max_depth = _metadata->get_output_nesting_depth(src_col_schema); // # of nesting infos stored per page for this column - size_t per_page_nesting_info_size = leaf_schema.max_definition_level + 1; + auto const per_page_nesting_info_size = max(schema.max_definition_level + 1, max_depth); - col_nesting_info[dst_col_index].resize(max_depth); + // if this column has lists, generate depth remapping + std::map, std::vector>> depth_remapping; + if (schema.max_repetition_level > 0) { + generate_depth_remappings(depth_remapping, src_col_schema, *_metadata); + } // fill in host-side nesting info - int schema_idx = _metadata->get_column_leaf_schema_index(src_col_index); - auto cur_schema = _metadata->get_schema(schema_idx); - int output_col_idx = max_depth - 1; + int schema_idx = src_col_schema; + auto cur_schema = _metadata->get_schema(schema_idx); + int cur_depth = max_depth - 1; while (schema_idx > 0) { - // repetition type for this level - FieldRepetitionType repetition_type = cur_schema.repetition_type; - - int d = cur_schema.max_definition_level; - - // set nullability on the column - if (repetition_type != REPEATED) { - col_nesting_info[dst_col_index][output_col_idx].second = - repetition_type == OPTIONAL ? true : false; - } - - // initialize each page within the chunk - for (int p_idx = 0; p_idx < chunks[idx].num_data_pages; p_idx++) { - gpu::PageNestingInfo *pni = - &page_nesting_info[nesting_info_index + (p_idx * per_page_nesting_info_size)]; - int input_index = d; - int output_index = output_col_idx; - - // values indexed by definition level - pni[input_index].d_remap = output_col_idx; + // stub columns (basically the inner field of a list scheme element) are not real columns. + // we can ignore them for the purposes of output nesting info + if (!cur_schema.is_stub()) { + // initialize each page within the chunk + for (int p_idx = 0; p_idx < chunks[idx].num_data_pages; p_idx++) { + gpu::PageNestingInfo *pni = + &page_nesting_info[nesting_info_index + (p_idx * per_page_nesting_info_size)]; + + // if we have lists, set our start and end depth remappings + if (schema.max_repetition_level > 0) { + auto remap = depth_remapping.find(src_col_schema); + CUDF_EXPECTS(remap != depth_remapping.end(), + "Could not find depth remapping for schema"); + std::vector const &rep_depth_remap = (remap->second.first); + std::vector const &def_depth_remap = (remap->second.second); + + for (size_t m = 0; m < rep_depth_remap.size(); m++) { + pni[m].start_depth = rep_depth_remap[m]; + } + for (size_t m = 0; m < def_depth_remap.size(); m++) { + pni[m].end_depth = def_depth_remap[m]; + } + } - // REPEATED fields are not "real" output cudf columns. they just represent a level of - // nesting. - if (repetition_type != REPEATED) { // values indexed by output column index - pni[output_index].max_def_level = d; - pni[output_index].size = 0; - // definition 0 always remaps to column 0. - if (output_index == 0) { pni[output_index].d_remap = 0; } + pni[cur_depth].max_def_level = cur_schema.max_definition_level; + pni[cur_depth].max_rep_level = cur_schema.max_repetition_level; + pni[cur_depth].size = 0; } - } - // move up the hierarchy + // move up the hierarchy + cur_depth--; + } - // if this was a REPEATED field, it represents a level of nesting, so - // move up the output column - if (repetition_type == REPEATED) { output_col_idx--; } + // next schema schema_idx = cur_schema.parent_idx; cur_schema = _metadata->get_schema(schema_idx); } @@ -850,21 +1068,36 @@ void reader::impl::allocate_nesting_info( } /** - * @copydoc cudf::io::detail::parquet::preprocess_nested_columns + * @copydoc cudf::io::detail::parquet::preprocess_columns */ -void reader::impl::preprocess_nested_columns( - hostdevice_vector &chunks, - hostdevice_vector &pages, - hostdevice_vector &page_nesting_info, - std::vector>> &nested_info, - size_t min_row, - size_t total_rows, - cudaStream_t stream) +void reader::impl::preprocess_columns(hostdevice_vector &chunks, + hostdevice_vector &pages, + size_t min_row, + size_t total_rows, + bool has_lists, + cudaStream_t stream) { - // preprocess per-nesting level sizes by page - CUDA_TRY(gpu::PreprocessColumnData(pages, chunks, nested_info, total_rows, min_row, stream)); - - CUDA_TRY(cudaStreamSynchronize(stream)); + // TODO : we should be selectively preprocessing only columns that have + // lists in them instead of doing them all if even one contains lists. + + // if there are no lists, simply allocate every allocate every output + // column to be of size num_rows + if (!has_lists) { + std::function &)> create_columns = + [&](std::vector &cols) { + for (size_t idx = 0; idx < cols.size(); idx++) { + auto &col = cols[idx]; + col.create(total_rows, stream, _mr); + create_columns(col.children); + } + }; + create_columns(_output_columns); + } else { + // preprocess per-nesting level sizes by page + CUDA_TRY(gpu::PreprocessColumnData( + pages, chunks, _input_columns, _output_columns, total_rows, min_row, stream, _mr)); + CUDA_TRY(cudaStreamSynchronize(stream)); + } } /** @@ -875,7 +1108,6 @@ void reader::impl::decode_page_data(hostdevice_vector &chu hostdevice_vector &page_nesting, size_t min_row, size_t total_rows, - std::vector &out_buffers, cudaStream_t stream) { auto is_dict_chunk = [](const gpu::ColumnChunkDesc &chunk) { @@ -900,31 +1132,72 @@ void reader::impl::decode_page_data(hostdevice_vector &chu // Update chunks with pointers to column data. for (size_t c = 0, page_count = 0, str_ofs = 0; c < chunks.size(); c++) { + input_column_info const &input_col = _input_columns[chunks[c].src_col_index]; + CUDF_EXPECTS(input_col.schema_idx == chunks[c].src_col_schema, + "Column/page schema index mismatch"); + if (is_dict_chunk(chunks[c])) { chunks[c].str_dict_index = str_dict_index.data().get() + str_ofs; str_ofs += pages[page_count].num_input_values; } - size_t max_depth = chunks[c].max_level[gpu::level_type::REPETITION]; + size_t max_depth = _metadata->get_output_nesting_depth(chunks[c].src_col_schema); // allocate (gpu) an array of pointers to validity data of size : nesting depth - chunk_nested_valids.emplace_back(hostdevice_vector{max_depth + 1}); + chunk_nested_valids.emplace_back(hostdevice_vector{max_depth}); hostdevice_vector &valids = chunk_nested_valids.back(); chunks[c].valid_map_base = valids.device_ptr(); // allocate (gpu) an array of pointers to out data of size : nesting depth - chunk_nested_data.emplace_back(hostdevice_vector{max_depth + 1}); + chunk_nested_data.emplace_back(hostdevice_vector{max_depth}); hostdevice_vector &data = chunk_nested_data.back(); chunks[c].column_data_base = data.device_ptr(); - // fill in the arrays on the host - column_buffer *buf = &out_buffers[chunks[c].dst_col_index]; - for (size_t idx = 0; idx <= max_depth; idx++) { - valids[idx] = buf->null_mask(); - data[idx] = buf->data(); - if (idx < max_depth) { - CUDF_EXPECTS(buf->children.size() > 0, "Encountered a malformed column_buffer"); - buf = &buf->children[0]; + // fill in the arrays on the host. there are some important considerations to + // take into account here for nested columns. specifically, with structs + // there is sharing of output buffers between input columns. consider this schema + // + // required group field_id=1 name { + // required binary field_id=2 firstname (String); + // required binary field_id=3 middlename (String); + // required binary field_id=4 lastname (String); + // } + // + // there are 3 input columns of data here (firstname, middlename, lastname), but + // only 1 output column (name). The structure of the output column buffers looks like + // the schema itself + // + // struct (name) + // string (firstname) + // string (middlename) + // string (lastname) + // + // The struct column can contain validity information. the problem is, the decode + // step for the input columns will all attempt to decode this validity information + // because each one has it's own copy of the repetition/definition levels. but + // since this is all happening in parallel it would mean multiple blocks would + // be stomping all over the same memory randomly. to work around this, we set + // things up so that only 1 child of any given nesting level fills in the + // data (offsets in the case of lists) or validity information for the higher + // levels of the hierarchy that are shared. In this case, it would mean we + // would just choose firstname to be the one that decodes the validity for name. + // + // we do this by only handing out the pointers to the first child we come across. + // + auto *cols = &_output_columns; + for (size_t idx = 0; idx < max_depth; idx++) { + auto &out_buf = (*cols)[input_col.nesting[idx]]; + cols = &out_buf.children; + + int owning_schema = out_buf.user_data & PARQUET_COLUMN_BUFFER_SCHEMA_MASK; + if (owning_schema == 0 || owning_schema == input_col.schema_idx) { + valids[idx] = out_buf.null_mask(); + data[idx] = out_buf.data(); + out_buf.user_data |= + static_cast(input_col.schema_idx) & PARQUET_COLUMN_BUFFER_SCHEMA_MASK; + } else { + valids[idx] = nullptr; + data[idx] = nullptr; } } @@ -947,45 +1220,61 @@ void reader::impl::decode_page_data(hostdevice_vector &chu page_nesting.device_to_host(stream); cudaStreamSynchronize(stream); - // for nested schemas, add the final offset to every offset buffer. + // for list columns, add the final offset to every offset buffer. // TODO : make this happen in more efficiently. Maybe use thrust::for_each // on each buffer. Or potentially do it in PreprocessColumnData // Note : the reason we are doing this here instead of in the decode kernel is // that it is difficult/impossible for a given page to know that it is writing the very // last value that should then be followed by a terminator (because rows can span // page boundaries). - for (size_t idx = 0; idx < out_buffers.size(); idx++) { - column_buffer *out = &out_buffers[idx]; - int depth = 0; - while (out->children.size() != 0) { - int offset = out->children[0].size; - if (out->children[0].children.size() > 0) { offset--; } - cudaMemcpy(((int32_t *)out->data()) + (out->size - 1), - &offset, - sizeof(offset), - cudaMemcpyHostToDevice); - depth++; - out = &out->children[0]; + for (size_t idx = 0; idx < _input_columns.size(); idx++) { + input_column_info const &input_col = _input_columns[idx]; + + auto *cols = &_output_columns; + for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { + auto &out_buf = (*cols)[input_col.nesting[l_idx]]; + cols = &out_buf.children; + + if (out_buf.type.id() != type_id::LIST || + (out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED)) { + continue; + } + CUDF_EXPECTS(l_idx < input_col.nesting_depth() - 1, "Encountered a leaf list column"); + auto &child = (*cols)[input_col.nesting[l_idx + 1]]; + + // the final offset for a list at level N is the size of it's child + int offset = child.type.id() == type_id::LIST ? child.size - 1 : child.size; + cudaMemcpyAsync(((int32_t *)out_buf.data()) + (out_buf.size - 1), + &offset, + sizeof(offset), + cudaMemcpyHostToDevice, + stream); + out_buf.user_data |= PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED; } } // update null counts in the final column buffers - for (size_t i = 0; i < pages.size(); i++) { - gpu::PageInfo *pi = &pages[i]; + for (size_t idx = 0; idx < pages.size(); idx++) { + gpu::PageInfo *pi = &pages[idx]; if (pi->flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { continue; } - gpu::ColumnChunkDesc *col = &chunks[pi->chunk_idx]; - column_buffer *out = &out_buffers[col->dst_col_index]; + gpu::ColumnChunkDesc *col = &chunks[pi->chunk_idx]; + input_column_info const &input_col = _input_columns[col->src_col_index]; int index = pi->nesting - page_nesting.device_ptr(); gpu::PageNestingInfo *pni = &page_nesting[index]; - int max_depth = col->max_level[gpu::level_type::REPETITION]; - for (int idx = 0; idx <= max_depth; idx++) { - out->null_count() += pni[idx].value_count - pni[idx].valid_count; + auto *cols = &_output_columns; + for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { + auto &out_buf = (*cols)[input_col.nesting[l_idx]]; + cols = &out_buf.children; - if (idx < max_depth) { out = &out->children[0]; } + // if I wasn't the one who wrote out the validity bits, skip it + if (chunk_nested_valids[pi->chunk_idx][l_idx] == nullptr) { continue; } + out_buf.null_count() += pni[l_idx].value_count - pni[l_idx].valid_count; } } + + cudaStreamSynchronize(stream); } reader::impl::impl(std::vector> &&sources, @@ -996,10 +1285,6 @@ reader::impl::impl(std::vector> &&sources, // Open and parse the source dataset metadata _metadata = std::make_unique(_sources); - // Select only columns required by the options - _selected_columns = - _metadata->select_columns(options.get_columns(), options.is_enabled_use_pandas_metadata()); - // Override output timestamp resolution if requested if (options.get_timestamp_type().id() != type_id::EMPTY) { _timestamp_type = options.get_timestamp_type(); @@ -1007,6 +1292,13 @@ reader::impl::impl(std::vector> &&sources, // Strings may be returned as either string or categorical columns _strings_to_categorical = options.is_enabled_convert_strings_to_categories(); + + // Select only columns required by the options + std::tie(_input_columns, _output_columns, _output_column_schemas) = + _metadata->select_columns(options.get_columns(), + options.is_enabled_use_pandas_metadata(), + _strings_to_categorical, + _timestamp_type.id()); } table_with_metadata reader::impl::read(size_type skip_rows, @@ -1018,24 +1310,16 @@ table_with_metadata reader::impl::read(size_type skip_rows, const auto selected_row_groups = _metadata->select_row_groups(row_group_list, skip_rows, num_rows); - // Get a list of column data types - std::vector column_types; - if (_metadata->get_num_row_groups() != 0) { - for (const auto &col : _selected_columns) { - auto &col_schema = _metadata->get_column_schema(col.first); - auto col_type = to_type_id(col_schema, _strings_to_categorical, _timestamp_type.id()); - CUDF_EXPECTS(col_type != type_id::EMPTY, "Unknown type"); - column_types.emplace_back(col_type); - } - } + table_metadata out_metadata; + // output cudf columns as determined by the top level schema std::vector> out_columns; - out_columns.reserve(column_types.size()); + out_columns.reserve(_output_columns.size()); - if (selected_row_groups.size() != 0 && column_types.size() != 0) { + if (selected_row_groups.size() != 0 && _input_columns.size() != 0) { // Descriptors for all the chunks that make up the selected columns - const auto num_columns = _selected_columns.size(); - const auto num_chunks = selected_row_groups.size() * num_columns; + const auto num_input_columns = _input_columns.size(); + const auto num_chunks = selected_row_groups.size() * num_input_columns; hostdevice_vector chunks(0, num_chunks, stream); // Association between each column chunk and its source @@ -1047,8 +1331,8 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Keep track of column chunk file offsets std::vector column_chunk_offsets(num_chunks); - // information needed allocate columns (including potential nesting) - bool has_nesting = false; + // if there are lists present, we need to preprocess + bool has_lists = false; // Initialize column chunk information size_t total_decompressed_size = 0; @@ -1060,23 +1344,18 @@ table_with_metadata reader::impl::read(size_type skip_rows, auto const row_group_rows = std::min(remaining_rows, row_group.num_rows); auto const io_chunk_idx = chunks.size(); - for (size_t i = 0; i < num_columns; ++i) { - auto col = _selected_columns[i]; - auto &col_meta = row_group.columns[col.first].meta_data; - - // the leaf schema represents the -values- encoded in the data, which in the case - // of nested types, is different from the # of rows - auto &leaf_schema = _metadata->get_column_leaf_schema(col.first); + // generate ColumnChunkDesc objects for everything to be decoded (all input columns) + for (size_t i = 0; i < num_input_columns; ++i) { + auto col = _input_columns[i]; + // look up metadata + auto &col_meta = _metadata->get_column_metadata(rg.index, rg.source_index, col.schema_idx); + auto &schema = _metadata->get_schema(col.schema_idx); - // this file contains nesting and will require a preprocess - if (_metadata->get_nesting_depth(col.first) > 1) { has_nesting = true; } + // this column contains repetition levels and will require a preprocess + if (schema.max_repetition_level > 0) { has_lists = true; } // Spec requires each row group to contain exactly one chunk for every // column. If there are too many or too few, continue with best effort - if (col.second != name_from_path(col_meta.path_in_schema)) { - std::cerr << "Detected mismatched column chunk" << std::endl; - continue; - } if (chunks.size() >= chunks.max_size()) { std::cerr << "Detected too many column chunks" << std::endl; continue; @@ -1085,12 +1364,13 @@ table_with_metadata reader::impl::read(size_type skip_rows, int32_t type_width; int32_t clock_rate; int8_t converted_type; + std::tie(type_width, clock_rate, converted_type) = - conversion_info(column_types[i].id(), + conversion_info(to_type_id(schema, _strings_to_categorical, _timestamp_type.id()), _timestamp_type.id(), - leaf_schema.type, - leaf_schema.converted_type, - leaf_schema.type_length); + schema.type, + schema.converted_type, + schema.type_length); column_chunk_offsets[chunks.size()] = (col_meta.dictionary_page_offset != 0) @@ -1100,20 +1380,21 @@ table_with_metadata reader::impl::read(size_type skip_rows, chunks.insert(gpu::ColumnChunkDesc(col_meta.total_compressed_size, nullptr, col_meta.num_values, - leaf_schema.type, + schema.type, type_width, row_group_start, row_group_rows, - leaf_schema.max_definition_level, - leaf_schema.max_repetition_level, - required_bits(leaf_schema.max_definition_level), - required_bits(leaf_schema.max_repetition_level), + schema.max_definition_level, + schema.max_repetition_level, + _metadata->get_output_nesting_depth(col.schema_idx), + required_bits(schema.max_definition_level), + required_bits(schema.max_repetition_level), col_meta.codec, converted_type, - leaf_schema.decimal_scale, + schema.decimal_scale, clock_rate, i, - col.first)); + col.schema_idx)); // Map each column chunk to its column index and its source index chunk_source_map[chunks.size() - 1] = row_group_source; @@ -1154,94 +1435,62 @@ table_with_metadata reader::impl::read(size_type skip_rows, } } + // build output column info + // walk the schema, building out_buffers that mirror what our final cudf columns will look + // like. important : there is not necessarily a 1:1 mapping between input columns and output + // columns. For example, parquet does not explicitly store a ColumnChunkDesc for struct + // columns. The "structiness" is simply implied by the schema. For example, this schema: + // required group field_id=1 name { + // required binary field_id=2 firstname (String); + // required binary field_id=3 middlename (String); + // required binary field_id=4 lastname (String); + // } + // will only contain 3 columns of data (firstname, middlename, lastname). But of course + // "name" is a struct column that we want to return, so we have to make sure that we + // create it ourselves. + // std::vector output_info = build_output_column_info(); + // nesting information (sizes, etc) stored -per page- + // note : even for flat schemas, we allocate 1 level of "nesting" info hostdevice_vector page_nesting_info; - // nesting information at the column level. - // - total column size per nesting level - // - nullability per nesting level - std::vector>> col_nesting_info; - - // even for flat schemas, we allocate 1 level of "nesting" info - allocate_nesting_info( - chunks, pages, page_nesting_info, col_nesting_info, num_columns, stream); - - // for nested schemas, we have to do some further preprocessing to determine: - // - real column output sizes per level of nesting (in a flat schema, there's only 1 level of - // nesting and it's size is the row count) + allocate_nesting_info(chunks, pages, page_nesting_info, stream); + + // - compute column sizes and allocate output buffers. + // important: + // for nested schemas, we have to do some further preprocessing to determine: + // - real column output sizes per level of nesting (in a flat schema, there's only 1 level + // of + // nesting and it's size is the row count) // - // - output buffer offset values per-page, per nesting-level for the purposes of decoding. - if (has_nesting) { - preprocess_nested_columns( - chunks, pages, page_nesting_info, col_nesting_info, skip_rows, num_rows, stream); - } - - std::vector out_buffers; - out_buffers.reserve(column_types.size()); - for (size_t i = 0; i < column_types.size(); ++i) { - auto col = _selected_columns[i]; - auto &leaf_schema = _metadata->get_column_leaf_schema(col.first); - - int output_depth = leaf_schema.max_repetition_level + 1; - - // nested schemas : sizes and nullability come from preprocess step - if (output_depth > 1) { - // the root buffer - out_buffers.emplace_back(column_buffer{column_types[i], - col_nesting_info[i][0].first, - col_nesting_info[i][0].second, - stream, - _mr}); - column_buffer *col = &out_buffers[out_buffers.size() - 1]; - // nested buffers - for (int idx = 1; idx < output_depth - 1; idx++) { - // note : all levels in a list column besides the leaf are offsets, so their length is - // always +1 - col->children.push_back(column_buffer{column_types[i], - col_nesting_info[i][idx].first, - col_nesting_info[i][idx].second, - stream, - _mr}); - col = &col->children[0]; - } - - // leaf buffer - plain data type. int, string, etc - col->children.push_back(column_buffer{ - data_type{to_type_id(leaf_schema, _strings_to_categorical, _timestamp_type.id())}, - col_nesting_info[i][output_depth - 1].first, - col_nesting_info[i][output_depth - 1].second, - stream, - _mr}); - } - // flat schemas can infer sizes directly from # of rows - else { - // note : num_rows == # values for non-nested types - bool is_nullable = leaf_schema.max_definition_level != 0; - out_buffers.emplace_back( - column_buffer{column_types[i], num_rows, is_nullable, stream, _mr}); - } - } + // - for nested schemas, output buffer offset values per-page, per nesting-level for the + // purposes of decoding. + preprocess_columns(chunks, pages, skip_rows, num_rows, has_lists, stream); // decoding of column data itself - decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows, out_buffers, stream); + decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows, stream); // create the final output cudf columns - for (size_t i = 0; i < column_types.size(); ++i) { - out_columns.emplace_back(make_column(out_buffers[i], stream, _mr)); + for (size_t i = 0; i < _output_columns.size(); ++i) { + out_metadata.schema_info.push_back(column_name_info{""}); + out_columns.emplace_back( + make_column(_output_columns[i], stream, _mr, &out_metadata.schema_info.back())); } } } - // Create empty columns as needed - for (size_t i = out_columns.size(); i < column_types.size(); ++i) { - out_columns.emplace_back(make_empty_column(column_types[i])); + // Create empty columns as needed (this can happen if we've ended up with no actual data to read) + for (size_t i = out_columns.size(); i < _output_columns.size(); ++i) { + out_metadata.schema_info.push_back(column_name_info{""}); + out_columns.emplace_back(make_empty_column(_output_columns[i].type)); } - table_metadata out_metadata; // Return column names (must match order of returned columns) - out_metadata.column_names.resize(_selected_columns.size()); - for (size_t i = 0; i < _selected_columns.size(); i++) { - out_metadata.column_names[i] = _selected_columns[i].second; + out_metadata.column_names.resize(_output_columns.size()); + for (size_t i = 0; i < _output_column_schemas.size(); i++) { + auto const &schema = _metadata->get_schema(_output_column_schemas[i]); + out_metadata.column_names[i] = schema.name; } + // Return user metadata out_metadata.user_data = _metadata->get_key_value_metadata(); diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index b37be0fb160..c192b65f0b0 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -144,15 +144,11 @@ class reader::impl { * @param chunks List of column chunk descriptors * @param pages List of page information * @param page_nesting_info The allocated nesting info structs. - * @param col_nesting_info Per-column, per-nesting level size and nullability information. - * @param num_columns Number of columns in the output * @param stream CUDA stream used for device memory operations and kernel launches. */ void allocate_nesting_info(hostdevice_vector const &chunks, hostdevice_vector &pages, hostdevice_vector &page_nesting_info, - std::vector>> &col_nesting_info, - int num_columns, cudaStream_t stream); /** @@ -167,19 +163,18 @@ class reader::impl { * * @param[in,out] chunks All chunks to be decoded * @param[in,out] pages All pages to be decoded - * @param[in,out] page_nesting info Per column-chunk nesting information - * @param[in,out] nested_info Per-output column nesting information (size, nullability) - * @param[in] num_rows Maximum number of rows to read * @param[in] min_rows crop all rows below min_row + * @param[in] total_rows Maximum number of rows to read + * @param[in] has_lists Whether or not this data contains lists and requires + * a preprocess. * @param[in] stream Cuda stream */ - void preprocess_nested_columns(hostdevice_vector &chunks, - hostdevice_vector &pages, - hostdevice_vector &page_nesting_info, - std::vector>> &nested_info, - size_t min_row, - size_t total_rows, - cudaStream_t stream); + void preprocess_columns(hostdevice_vector &chunks, + hostdevice_vector &pages, + size_t min_row, + size_t total_rows, + bool has_lists, + cudaStream_t stream); /** * @brief Converts the page data and outputs to columns. @@ -189,7 +184,6 @@ class reader::impl { * @param page_nesting Page nesting array * @param min_row Minimum number of rows from start * @param total_rows Number of rows to output - * @param out_buffers Output columns' device buffers * @param stream CUDA stream used for device memory operations and kernel launches. */ void decode_page_data(hostdevice_vector &chunks, @@ -197,7 +191,6 @@ class reader::impl { hostdevice_vector &page_nesting, size_t min_row, size_t total_rows, - std::vector &out_buffers, cudaStream_t stream); private: @@ -205,7 +198,13 @@ class reader::impl { std::vector> _sources; std::unique_ptr _metadata; - std::vector> _selected_columns; + // input columns to be processed + std::vector _input_columns; + // output columns to be generated + std::vector _output_columns; + // _output_columns associated schema indices + std::vector _output_column_schemas; + bool _strings_to_categorical = false; data_type _timestamp_type{type_id::EMPTY}; }; diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index 0996d823d85..ac41ac1bcf7 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -22,6 +22,7 @@ #pragma once #include +#include #include #include #include @@ -73,13 +74,37 @@ struct column_buffer { column_buffer() = default; + // construct without a known size. call create() later to actually + // allocate memory + column_buffer(data_type _type, bool _is_nullable) : type(_type), is_nullable(_is_nullable) {} + + // construct with a known size. allocates memory column_buffer(data_type _type, size_type _size, - bool is_nullable = true, + bool _is_nullable = true, cudaStream_t stream = 0, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) - : type(_type), size(_size), _null_count(0) + : type(_type), is_nullable(_is_nullable), _null_count(0) { + create(_size, stream, mr); + } + + // move constructor + column_buffer(column_buffer&& col) = default; + column_buffer& operator=(column_buffer&& col) = default; + + // copy constructor + column_buffer(column_buffer const& col) = delete; + column_buffer& operator=(column_buffer const& col) = delete; + + // instantiate a column of known type with a specified size. Allows deferred creation for + // preprocessing steps such as in the Parquet reader + void create(size_type _size, + cudaStream_t stream = 0, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) + { + size = _size; + switch (type.id()) { case type_id::STRING: _strings.resize(size); break; @@ -87,6 +112,9 @@ struct column_buffer { // their individual rows case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, mr); break; + // struct columns store no data themselves. just validity and children. + case type_id::STRUCT: break; + default: _data = create_data(type, size, stream, mr); break; } if (is_nullable) { _null_mask = create_null_mask(size, mask_state::ALL_NULL, stream, mr); } @@ -109,9 +137,11 @@ struct column_buffer { rmm::device_buffer _null_mask{}; size_type _null_count{0}; + bool is_nullable{false}; data_type type{type_id::EMPTY}; size_type size{0}; std::vector children; + uint32_t user_data{0}; // arbitrary user data std::string name; }; @@ -130,21 +160,36 @@ namespace { std::unique_ptr make_column( column_buffer& buffer, cudaStream_t stream = 0, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(), + column_name_info* schema_info = nullptr) { using str_pair = thrust::pair; + if (schema_info != nullptr) { schema_info->name = buffer.name; } + switch (buffer.type.id()) { - case type_id::STRING: return make_strings_column(buffer._strings, stream, mr); + case type_id::STRING: + if (schema_info != nullptr) { + schema_info->children.push_back(column_name_info{"offsets"}); + schema_info->children.push_back(column_name_info{"chars"}); + } + return make_strings_column(buffer._strings, stream, mr); case type_id::LIST: { // make offsets column auto offsets = std::make_unique(data_type{type_id::INT32}, buffer.size, std::move(buffer._data)); + column_name_info* child_info = nullptr; + if (schema_info != nullptr) { + schema_info->children.push_back(column_name_info{"offsets"}); + schema_info->children.push_back(column_name_info{""}); + child_info = &schema_info->children.back(); + } + // make child column CUDF_EXPECTS(buffer.children.size() > 0, "Encountered malformed column_buffer"); - auto child = make_column(buffer.children[0], stream, mr); + auto child = make_column(buffer.children[0], stream, mr, child_info); // make the final list column (note : size is the # of offsets, so our actual # of rows is 1 // less) @@ -157,6 +202,29 @@ std::unique_ptr make_column( mr); } break; + case type_id::STRUCT: { + std::vector> output_children; + output_children.reserve(buffer.children.size()); + std::transform(buffer.children.begin(), + buffer.children.end(), + std::back_inserter(output_children), + [&](column_buffer& col) { + column_name_info* child_info = nullptr; + if (schema_info != nullptr) { + schema_info->children.push_back(column_name_info{""}); + child_info = &schema_info->children.back(); + } + return make_column(col, stream, mr, child_info); + }); + + return make_structs_column(buffer.size, + std::move(output_children), + buffer._null_count, + std::move(buffer._null_mask), + stream, + mr); + } break; + default: { return std::make_unique(buffer.type, buffer.size, diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index a01b66b303c..907d7763579 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -41,11 +41,16 @@ cdef extern from "cudf/io/types.hpp" \ STATISTICS_ROWGROUP = 1, STATISTICS_PAGE = 2, + cdef cppclass column_name_info: + string name + vector[column_name_info] children + cdef cppclass table_metadata: table_metadata() except + vector[string] column_names map[string, string] user_data + vector[column_name_info] schema_info cdef cppclass table_metadata_with_nullability(table_metadata): table_metadata_with_nullability() except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index e34888c9d8e..4edc552a7fb 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -11,7 +11,11 @@ import json from cython.operator import dereference import numpy as np -from cudf.utils.dtypes import np_to_pa_dtype, is_categorical_dtype +from cudf.utils.dtypes import ( + np_to_pa_dtype, + is_categorical_dtype, + is_struct_dtype +) from libc.stdlib cimport free from libc.stdint cimport uint8_t from libcpp.memory cimport shared_ptr, unique_ptr, make_unique @@ -21,6 +25,7 @@ from libcpp.vector cimport vector from libcpp.utility cimport move from libcpp cimport bool + from cudf._lib.cpp.types cimport data_type, size_type from cudf._lib.table cimport Table from cudf._lib.cpp.table.table cimport table @@ -40,6 +45,7 @@ from cudf._lib.cpp.io.parquet cimport ( merge_rowgroup_metadata as parquet_merge_metadata, pq_chunked_state ) +from cudf._lib.column cimport Column from cudf._lib.io.utils cimport ( make_source_info, make_sink_info @@ -220,6 +226,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, ) ) + _update_struct_field_names(df, c_out_table.metadata.schema_info) + if df.empty and meta is not None: cols_dtype_map = {} for col in meta['columns']: @@ -484,3 +492,37 @@ cdef vector[string] _get_column_names(Table table, object index): column_names.push_back(str.encode(col_name)) return column_names + + +cdef _update_struct_field_names( + Table table, + vector[cudf_io_types.column_name_info]& schema_info +): + for i, (name, col) in enumerate(table._data.items()): + table._data[name] = _update_column_struct_field_names( + col, schema_info[i] + ) + +cdef Column _update_column_struct_field_names( + Column col, + cudf_io_types.column_name_info& info +): + cdef vector[string] field_names + + if is_struct_dtype(col): + field_names.reserve(len(col.base_children)) + for i in range(info.children.size()): + field_names.push_back(info.children[i].name) + col = col._rename_fields( + field_names + ) + + if col.children: + children = list(col.children) + for i, child in enumerate(children): + children[i] = _update_column_struct_field_names( + child, + info.children[i] + ) + col.set_base_children(tuple(children)) + return col diff --git a/python/cudf/cudf/_lib/types.pyx b/python/cudf/cudf/_lib/types.pyx index 6d24f494462..5998f9ec2f9 100644 --- a/python/cudf/cudf/_lib/types.pyx +++ b/python/cudf/cudf/_lib/types.pyx @@ -178,18 +178,16 @@ cdef dtype_from_lists_column_view(column_view cv): return ListDtype(np.dtype("int8")) else: return ListDtype( - cudf_to_np_types[ child.type().id()] + dtype_from_column_view(child) ) - cdef dtype_from_structs_column_view(column_view cv): - cdef column_view child - fields = {} - for i in range(cv.num_children()): - fields[str(i)] = dtype_from_column_view(cv.child(i)) + fields = { + str(i): dtype_from_column_view(cv.child(i)) + for i in range(cv.num_children()) + } return StructDtype(fields) - cdef dtype_from_column_view(column_view cv): cdef libcudf_types.type_id tid = cv.type().id() if tid == libcudf_types.type_id.LIST: diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 1a7ed282309..235bff95d17 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -313,30 +313,14 @@ def to_arrow(self): codes = self.codes.astype(signed_type) categories = self.categories - out_indices = libcudf.interop.to_arrow( - libcudf.table.Table( - cudf.core.column_accessor.ColumnAccessor({"None": codes}) - ), - ["None"], - keep_index=False, - ) - out_dictionary = libcudf.interop.to_arrow( - libcudf.table.Table( - cudf.core.column_accessor.ColumnAccessor( - {"None": categories} - ) - ), - ["None"], - keep_index=False, - ) + out_indices = codes.to_arrow() + out_dictionary = categories.to_arrow() return pa.DictionaryArray.from_arrays( - out_indices["None"].chunk(0), - out_dictionary["None"].chunk(0), - ordered=self.ordered, + out_indices, out_dictionary, ordered=self.ordered, ) - elif isinstance(self, cudf.core.column.StringColumn) and ( + if isinstance(self, cudf.core.column.StringColumn) and ( self.null_count == len(self) ): return pa.NullArray.from_buffers( diff --git a/python/cudf/cudf/core/column/lists.py b/python/cudf/cudf/core/column/lists.py index 7a3fdd9d4f7..d019a2c767e 100644 --- a/python/cudf/cudf/core/column/lists.py +++ b/python/cudf/cudf/core/column/lists.py @@ -1,5 +1,7 @@ # Copyright (c) 2020, NVIDIA CORPORATION. +import pyarrow as pa + from cudf.core.column import ColumnBase from cudf.core.column.methods import ColumnMethodsMixin from cudf.utils.dtypes import is_list_dtype @@ -48,6 +50,25 @@ def offsets(self): def list(self, parent=None): return ListMethods(self, parent=parent) + def to_arrow(self): + offsets = self.offsets.to_arrow() + elements = ( + pa.nulls(len(self.elements)) + if len(self.elements) == self.elements.null_count + else self.elements.to_arrow() + ) + pa_type = pa.list_(elements.type) + + if self.nullable: + nbuf = self.mask.to_host_array().view("int8") + nbuf = pa.py_buffer(nbuf) + buffers = (nbuf, offsets.buffers()[1]) + else: + buffers = offsets.buffers() + return pa.ListArray.from_buffers( + pa_type, len(self), buffers, children=[elements] + ) + class ListMethods(ColumnMethodsMixin): """ diff --git a/python/cudf/cudf/core/column/struct.py b/python/cudf/cudf/core/column/struct.py index 9655bef5ace..577738a2dca 100644 --- a/python/cudf/cudf/core/column/struct.py +++ b/python/cudf/cudf/core/column/struct.py @@ -12,7 +12,7 @@ def base_size(self): if not self.base_children: return 0 else: - return len(self.base_children[0]) - 1 + return len(self.base_children[0]) @classmethod def from_arrow(self, data): @@ -40,8 +40,6 @@ def from_arrow(self, data): ) def to_arrow(self): - pa_type = self.dtype.to_arrow() - children = [ pa.nulls(len(child)) if len(child) == child.null_count @@ -49,6 +47,13 @@ def to_arrow(self): for child in self.children ] + pa_type = pa.struct( + { + field: child.type + for field, child in zip(self.dtype.fields, children) + } + ) + if self.nullable: nbuf = self.mask.to_host_array().view("int8") nbuf = pa.py_buffer(nbuf) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 92d99bab123..26a8e0216bb 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -45,6 +45,7 @@ is_list_like, is_scalar, is_string_dtype, + is_struct_dtype, numeric_normalize_types, ) from cudf.utils.utils import OrderedColumnDict @@ -1175,7 +1176,7 @@ def _clean_nulls_from_dataframe(self, df): filling with `` values. """ for col in df._data: - if is_list_dtype(df._data[col]): + if is_list_dtype(df._data[col]) or is_struct_dtype(df._data[col]): # TODO we need to handle this pass elif df._data[col].has_nulls: diff --git a/python/cudf/cudf/core/dtypes.py b/python/cudf/cudf/core/dtypes.py index 7a706ddebe0..afe24ab826b 100644 --- a/python/cudf/cudf/core/dtypes.py +++ b/python/cudf/cudf/core/dtypes.py @@ -117,8 +117,8 @@ def __init__(self, element_type): if isinstance(element_type, ListDtype): self._typ = pa.list_(element_type._typ) else: - element_type = cudf.utils.dtypes.np_to_pa_dtype( - np.dtype(element_type) + element_type = cudf.utils.dtypes.cudf_dtype_to_pa_type( + element_type ) self._typ = pa.list_(element_type) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index c50cf72275f..4e976badc75 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -21,7 +21,6 @@ is_numerical_dtype, is_scalar, min_scalar_type, - min_signed_type, ) @@ -1962,93 +1961,10 @@ def to_arrow(self): b: int64 index: int64 """ - - data = self.copy(deep=False) - - codes = {} - # saving the name as they might get changed from int to str - codes_keys = [] - categories = {} - # saving the name as they might get changed from int to str - names = self._data.names - null_arrays_names = [] - - for name in names: - col = self._data[name] - if isinstance(col, cudf.core.column.CategoricalColumn,): - # arrow doesn't support unsigned codes - signed_type = ( - min_signed_type(col.categories.size) - if col.codes.size > 0 - else np.int8 - ) - codes[str(name)] = col.codes.astype(signed_type) - categories[str(name)] = col.categories - codes_keys.append(name) - elif isinstance( - col, cudf.core.column.StringColumn - ) and col.null_count == len(col): - null_arrays_names.append(name) - - data = libcudf.table.Table( - data._data.select_by_label( - [ - name - for name in names - if name not in codes_keys + null_arrays_names - ] - ) + return pa.Table.from_pydict( + {name: col.to_arrow() for name, col in self._data.items()} ) - out_table = pa.table([]) - if data._num_columns > 0: - out_table = libcudf.interop.to_arrow( - data, data._data.names, keep_index=False - ) - - if len(codes) > 0: - codes_table = libcudf.table.Table(codes) - indices = libcudf.interop.to_arrow( - codes_table, codes_table._data.names, keep_index=False - ) - dictionaries = dict( - (name, categories[name].to_arrow()) - for name in categories.keys() - ) - for name in codes_keys: - # as name can be interger in case of cudf - actual_name = name - name = str(name) - dict_array = pa.DictionaryArray.from_arrays( - indices[name].chunk(0), - _get_dictionary_array(dictionaries[name]), - ordered=self._data[actual_name].ordered, - ) - - if out_table.num_columns == 0: - out_table = pa.table({name: dict_array}) - else: - try: - out_table = out_table.add_column( - names.index(actual_name), name, dict_array - ) - except pa.lib.ArrowInvalid: - out_table = out_table.append_column(name, dict_array) - - if len(null_arrays_names): - for name in null_arrays_names: - null_array = pa.NullArray.from_buffers( - pa.null(), len(self), [pa.py_buffer((b""))] - ) - if out_table.num_columns == 0: - out_table = pa.table({name: null_array}) - else: - out_table = out_table.add_column( - names.index(name), name, null_array - ) - - return out_table - def drop_duplicates( self, subset=None, @@ -2158,7 +2074,9 @@ def _copy_categories(self, other, include_index=True): (cudf.core.index.CategoricalIndex, cudf.MultiIndex), ) ): - self._index._copy_categories(other._index, include_index=False) + self._index._postprocess_columns( + other._index, include_index=False + ) # When other._index is a CategoricalIndex, there is # possibility that corresposing self._index be GenericIndex # with codes. So to update even the class signature, we diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 9f743f8a10b..9851ef81825 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -770,6 +770,43 @@ def string_gen(first_val, i): return strings[int_gen(first_val, i) % len(strings)] +def list_row_gen( + gen, first_val, list_size, lists_per_row, include_validity=False +): + """ + Generate a single row for a List> column based on input parameters. + + Args: + gen: A callable which generates an individual leaf element based on an + absolute index. + first_val : Generate the column as if it had started at 'first_val' + instead of 0. + list_size : Size of each generated list. + lists_per_row : Number of lists to generate per row. + include_validity : Whether or not to include nulls as part of the + column. If true, it will add a selection of nulls at both the + topmost row level and at the leaf level. + + Returns: + The generated list column. + """ + + def L(list_size, first_val): + return [ + (gen(first_val, i) if i % 2 == 0 else None) + if include_validity + else (gen(first_val, i)) + for i in range(list_size) + ] + + return [ + (L(list_size, first_val + (list_size * i)) if i % 2 == 0 else None) + if include_validity + else L(list_size, first_val + (list_size * i)) + for i in range(lists_per_row) + ] + + def list_gen( gen, skiprows, num_rows, lists_per_row, list_size, include_validity=False ): @@ -925,6 +962,146 @@ def test_parquet_reader_list_num_rows(skip, tmpdir): assert_eq(expect, got, check_dtype=False) +def struct_gen(gen, skip_rows, num_rows, include_validity=False): + """ + Generate a struct column based on input parameters. + + Args: + gen: A array of callables which generate an individual row based on an + absolute index. + skip_rows : Generate the column as if it had started at 'skip_rows' + instead of 0. The intent here is to emulate the skip_rows + parameter of the parquet reader. + num_fields : Number of fields in the struct. + include_validity : Whether or not to include nulls as part of the + column. If true, it will add a selection of nulls at both the + field level and at the value level. + + Returns: + The generated struct column. + """ + + def R(first_val, num_fields): + return { + "col" + + str(f): (gen[f](first_val, first_val) if f % 4 != 0 else None) + if include_validity + else (gen[f](first_val, first_val)) + for f in range(len(gen)) + } + + return [ + (R((i + skip_rows), len(gen)) if (i + skip_rows) % 4 != 0 else None) + if include_validity + else R((i + skip_rows), len(gen)) + for i in range(num_rows) + ] + + +@pytest.mark.parametrize( + "data", + [ + # struct + [ + {"a": 1, "b": 2}, + {"a": 10, "b": 20}, + {"a": None, "b": 22}, + {"a": None, "b": None}, + {"a": 15, "b": None}, + ], + # struct-of-list + [ + {"a": 1, "b": 2, "c": [1, 2, 3]}, + {"a": 10, "b": 20, "c": [4, 5]}, + {"a": None, "b": 22, "c": [6]}, + {"a": None, "b": None, "c": None}, + {"a": 15, "b": None, "c": [-1, -2]}, + None, + {"a": 100, "b": 200, "c": [-10, None, -20]}, + ], + # list-of-struct + [ + [{"a": 1, "b": 2}, {"a": 2, "b": 3}, {"a": 4, "b": 5}], + None, + [{"a": 10, "b": 20}], + [{"a": 100, "b": 200}, {"a": None, "b": 300}, None], + ], + # struct-of-struct + [ + {"a": 1, "b": {"inner_a": 10, "inner_b": 20}, "c": 2}, + {"a": 3, "b": {"inner_a": 30, "inner_b": 40}, "c": 4}, + {"a": 5, "b": {"inner_a": 50, "inner_b": None}, "c": 6}, + {"a": 7, "b": None, "c": 8}, + {"a": None, "b": {"inner_a": None, "inner_b": None}, "c": None}, + None, + {"a": None, "b": {"inner_a": None, "inner_b": 100}, "c": 10}, + ], + ], +) +def test_parquet_reader_struct_basic(tmpdir, data): + expect = pa.Table.from_pydict({"struct": data}) + fname = tmpdir.join("test_parquet_reader_struct_basic.parquet") + pa.parquet.write_table(expect, fname) + assert os.path.exists(fname) + got = cudf.read_parquet(fname) + assert expect.equals(got.to_arrow()) + + +def test_parquet_reader_struct_los_large(tmpdir): + num_rows = 256 + list_size = 64 + data = [ + struct_gen([string_gen, int_gen, string_gen], 0, list_size, False) + if i % 2 == 0 + else None + for i in range(num_rows) + ] + expect = pa.Table.from_pydict({"los": data}) + fname = tmpdir.join("test_parquet_reader_struct_los_large.parquet") + pa.parquet.write_table(expect, fname) + assert os.path.exists(fname) + got = cudf.read_parquet(fname) + assert expect.equals(got.to_arrow()) + + +@pytest.mark.parametrize( + "params", [[3, 4, 32, False], [3, 4, 32, True], [100, 25, 256, True]] +) +def test_parquet_reader_struct_sol_table(tmpdir, params): + # Struct> + lists_per_row = params[0] + list_size = params[1] + num_rows = params[2] + include_validity = params[3] + + def list_gen_wrapped(x, y): + return list_row_gen( + int_gen, x * list_size * lists_per_row, list_size, lists_per_row + ) + + def string_list_gen_wrapped(x, y): + return list_row_gen( + string_gen, + x * list_size * lists_per_row, + list_size, + lists_per_row, + include_validity, + ) + + data = struct_gen( + [int_gen, string_gen, list_gen_wrapped, string_list_gen_wrapped], + 0, + num_rows, + include_validity, + ) + expect = pa.Table.from_pydict({"sol": data}) + fname = tmpdir.join("test_parquet_reader_struct_sol_table.parquet") + pa.parquet.write_table(expect, fname) + assert os.path.exists(fname) + got = cudf.read_parquet(fname) + assert expect.equals(got.to_arrow()) + + @pytest.mark.filterwarnings("ignore:Using CPU") def test_parquet_writer_cpu_pyarrow(tmpdir, pdf, gdf): pdf_fname = tmpdir.join("pdf.parquet")