diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 56833502fdf..e9a93894f7d 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -94,6 +94,7 @@ enum statistics_freq { STATISTICS_NONE = 0, ///< No column statistics STATISTICS_ROWGROUP = 1, ///< Per-Rowgroup column statistics STATISTICS_PAGE = 2, ///< Per-page column statistics + STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP }; /** diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index baa1b164c35..b80a6c93135 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1171,6 +1171,12 @@ __global__ void __launch_bounds__(128) gpuDecideCompression(device_span 0x7f) { @@ -1237,6 +1243,35 @@ class header_encoder { current_field_index = field; } + inline __device__ void field_list_begin(int field, size_t len, int type) + { + current_header_ptr = cpw_put_fldh(current_header_ptr, field, current_field_index, ST_FLD_LIST); + current_header_ptr = cpw_put_uint8( + current_header_ptr, static_cast((std::min(len, size_t{0xfu}) << 4) | type)); + if (len >= 0xf) { current_header_ptr = cpw_put_uint32(current_header_ptr, len); } + current_field_index = 0; + } + + inline __device__ void field_list_end(int field) { current_field_index = field; } + + inline __device__ void put_bool(bool value) + { + current_header_ptr = cpw_put_uint8(current_header_ptr, value ? ST_FLD_TRUE : ST_FLD_FALSE); + } + + inline __device__ void put_binary(const void* value, uint32_t length) + { + current_header_ptr = cpw_put_uint32(current_header_ptr, length); + memcpy(current_header_ptr, value, length); + current_header_ptr += length; + } + + template + inline __device__ void put_int64(T value) + { + current_header_ptr = cpw_put_int64(current_header_ptr, static_cast(value)); + } + template inline __device__ void field_int32(int field, T value) { @@ -1286,12 +1321,13 @@ static __device__ void byte_reverse128(__int128_t v, void* dst) d_char_ptr); } -__device__ uint8_t* EncodeStatistics(uint8_t* start, - const statistics_chunk* s, - uint8_t dtype, - void* scratch) +__device__ void get_extremum(const statistics_val* stats_val, + statistics_dtype dtype, + void* scratch, + const void** val, + uint32_t* len) { - uint8_t *end, dtype_len; + uint8_t dtype_len; switch (dtype) { case dtype_bool: dtype_len = 1; break; case dtype_int8: @@ -1307,37 +1343,40 @@ __device__ uint8_t* EncodeStatistics(uint8_t* start, case dtype_string: default: dtype_len = 0; break; } + + if (dtype == dtype_string) { + *len = stats_val->str_val.length; + *val = stats_val->str_val.ptr; + } else { + *len = dtype_len; + if (dtype == dtype_float32) { // Convert from double to float32 + auto const fp_scratch = static_cast(scratch); + fp_scratch[0] = stats_val->fp_val; + *val = scratch; + } else if (dtype == dtype_decimal128) { + byte_reverse128(stats_val->d128_val, scratch); + *val = scratch; + } else { + *val = stats_val; + } + } +} + +__device__ uint8_t* EncodeStatistics(uint8_t* start, + const statistics_chunk* s, + statistics_dtype dtype, + void* scratch) +{ + uint8_t* end; header_encoder encoder(start); encoder.field_int64(3, s->null_count); if (s->has_minmax) { const void *vmin, *vmax; uint32_t lmin, lmax; - if (dtype == dtype_string) { - lmin = s->min_value.str_val.length; - vmin = s->min_value.str_val.ptr; - lmax = s->max_value.str_val.length; - vmax = s->max_value.str_val.ptr; - } else { - lmin = lmax = dtype_len; - if (dtype == dtype_float32) { // Convert from double to float32 - auto const fp_scratch = static_cast(scratch); - fp_scratch[0] = s->min_value.fp_val; - fp_scratch[1] = s->max_value.fp_val; - vmin = &fp_scratch[0]; - vmax = &fp_scratch[1]; - } else if (dtype == dtype_decimal128) { - auto const d128_scratch = static_cast(scratch); - byte_reverse128(s->min_value.d128_val, d128_scratch); - byte_reverse128(s->max_value.d128_val, &d128_scratch[16]); - vmin = &d128_scratch[0]; - vmax = &d128_scratch[16]; - } else { - vmin = &s->min_value; - vmax = &s->max_value; - } - } + get_extremum(&s->max_value, dtype, scratch, &vmax, &lmax); encoder.field_binary(5, vmax, lmax); + get_extremum(&s->min_value, dtype, scratch, &vmin, &lmin); encoder.field_binary(6, vmin, lmin); } encoder.end(&end); @@ -1355,7 +1394,7 @@ __global__ void __launch_bounds__(128) __shared__ __align__(8) parquet_column_device_view col_g; __shared__ __align__(8) EncColumnChunk ck_g; __shared__ __align__(8) EncPage page_g; - __shared__ __align__(8) unsigned char scratch[32]; + __shared__ __align__(8) unsigned char scratch[16]; uint32_t t = threadIdx.x; @@ -1479,6 +1518,176 @@ __global__ void __launch_bounds__(1024) } } +/** + * @brief Tests if statistics are comparable given the column's + * physical and converted types + */ +static __device__ bool is_comparable(Type ptype, ConvertedType ctype) +{ + switch (ptype) { + case Type::BOOLEAN: + case Type::INT32: + case Type::INT64: + case Type::FLOAT: + case Type::DOUBLE: + case Type::BYTE_ARRAY: return true; + case Type::FIXED_LEN_BYTE_ARRAY: + if (ctype == ConvertedType::DECIMAL) { return true; } + [[fallthrough]]; + default: return false; + } +} + +/** + * @brief Compares two values. + * @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2 + */ +template +constexpr __device__ int32_t compare(T& v1, T& v2) +{ + return (v1 > v2) - (v1 < v2); +} + +/** + * @brief Compares two statistics_val structs. + * @return < 0 if v1 < v2, 0 if v1 == v2, > 0 if v1 > v2 + */ +static __device__ int32_t compare_values(Type ptype, + ConvertedType ctype, + const statistics_val& v1, + const statistics_val& v2) +{ + switch (ptype) { + case Type::BOOLEAN: return compare(v1.u_val, v2.u_val); + case Type::INT32: + case Type::INT64: + switch (ctype) { + case ConvertedType::UINT_8: + case ConvertedType::UINT_16: + case ConvertedType::UINT_32: + case ConvertedType::UINT_64: return compare(v1.u_val, v2.u_val); + default: // assume everything else is signed + return compare(v1.i_val, v2.i_val); + } + case Type::FLOAT: + case Type::DOUBLE: return compare(v1.fp_val, v2.fp_val); + case Type::BYTE_ARRAY: return static_cast(v1.str_val).compare(v2.str_val); + case Type::FIXED_LEN_BYTE_ARRAY: + if (ctype == ConvertedType::DECIMAL) { return compare(v1.d128_val, v2.d128_val); } + } + // calling is_comparable() should prevent reaching here + CUDF_UNREACHABLE("Trying to compare non-comparable type"); + return 0; +} + +/** + * @brief Determine if a set of statstistics are in ascending order. + */ +static __device__ bool is_ascending(const statistics_chunk* s, + Type ptype, + ConvertedType ctype, + uint32_t num_pages) +{ + for (uint32_t i = 1; i < num_pages; i++) { + if (compare_values(ptype, ctype, s[i - 1].min_value, s[i].min_value) > 0 || + compare_values(ptype, ctype, s[i - 1].max_value, s[i].max_value) > 0) { + return false; + } + } + return true; +} + +/** + * @brief Determine if a set of statstistics are in descending order. + */ +static __device__ bool is_descending(const statistics_chunk* s, + Type ptype, + ConvertedType ctype, + uint32_t num_pages) +{ + for (uint32_t i = 1; i < num_pages; i++) { + if (compare_values(ptype, ctype, s[i - 1].min_value, s[i].min_value) < 0 || + compare_values(ptype, ctype, s[i - 1].max_value, s[i].max_value) < 0) { + return false; + } + } + return true; +} + +/** + * @brief Determine the ordering of a set of statistics. + */ +static __device__ int32_t calculate_boundary_order(const statistics_chunk* s, + Type ptype, + ConvertedType ctype, + uint32_t num_pages) +{ + if (not is_comparable(ptype, ctype)) { return BoundaryOrder::UNORDERED; } + if (is_ascending(s, ptype, ctype, num_pages)) { + return BoundaryOrder::ASCENDING; + } else if (is_descending(s, ptype, ctype, num_pages)) { + return BoundaryOrder::DESCENDING; + } + return BoundaryOrder::UNORDERED; +} + +// blockDim(1, 1, 1) +__global__ void __launch_bounds__(1) + gpuEncodeColumnIndexes(device_span chunks, + device_span column_stats) +{ + const void *vmin, *vmax; + uint32_t lmin, lmax; + uint8_t* col_idx_end; + unsigned char scratch[16]; + + if (column_stats.empty()) { return; } + + EncColumnChunk* ck_g = &chunks[blockIdx.x]; + uint32_t num_pages = ck_g->num_pages; + parquet_column_device_view col_g = *ck_g->col_desc; + size_t first_data_page = ck_g->use_dictionary ? 1 : 0; + uint32_t pageidx = ck_g->first_page; + + header_encoder encoder(ck_g->column_index_blob); + + // null_pages + encoder.field_list_begin(1, num_pages - first_data_page, ST_FLD_TRUE); + for (uint32_t page = first_data_page; page < num_pages; page++) { + encoder.put_bool(column_stats[pageidx + page].non_nulls == 0); + } + encoder.field_list_end(1); + // min_values + encoder.field_list_begin(2, num_pages - first_data_page, ST_FLD_BINARY); + for (uint32_t page = first_data_page; page < num_pages; page++) { + get_extremum(&column_stats[pageidx + page].min_value, col_g.stats_dtype, scratch, &vmin, &lmin); + encoder.put_binary(vmin, lmin); + } + encoder.field_list_end(2); + // max_values + encoder.field_list_begin(3, num_pages - first_data_page, ST_FLD_BINARY); + for (uint32_t page = first_data_page; page < num_pages; page++) { + get_extremum(&column_stats[pageidx + page].max_value, col_g.stats_dtype, scratch, &vmax, &lmax); + encoder.put_binary(vmax, lmax); + } + encoder.field_list_end(3); + // boundary_order + encoder.field_int32(4, + calculate_boundary_order(&column_stats[first_data_page + pageidx], + col_g.physical_type, + col_g.converted_type, + num_pages - first_data_page)); + // null_counts + encoder.field_list_begin(5, num_pages - first_data_page, ST_FLD_I64); + for (uint32_t page = first_data_page; page < num_pages; page++) { + encoder.put_int64(column_stats[pageidx + page].null_count); + } + encoder.field_list_end(5); + encoder.end(&col_idx_end, false); + + ck_g->column_index_size = static_cast(col_idx_end - ck_g->column_index_blob); +} + /** * @brief Functor to get definition level value for a nested struct column until the leaf level or * the first list level. @@ -2067,6 +2276,13 @@ void GatherPages(device_span chunks, gpuGatherPages<<>>(chunks, pages); } +void EncodeColumnIndexes(device_span chunks, + device_span column_stats, + rmm::cuda_stream_view stream) +{ + gpuEncodeColumnIndexes<<>>(chunks, column_stats); +} + } // namespace gpu } // namespace parquet } // namespace io diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 1de6be38b3d..1a2422acc12 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -237,8 +237,8 @@ struct ColumnChunkDesc { * @brief Struct describing an encoder column */ struct parquet_column_device_view : stats_column_desc { - Type physical_type; //!< physical data type - uint8_t converted_type; //!< logical data type + Type physical_type; //!< physical data type + ConvertedType converted_type; //!< logical data type uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble) //!< levels constexpr uint8_t num_def_level_bits() { return level_bits & 0xf; } @@ -358,6 +358,8 @@ struct EncColumnChunk { uint16_t* dict_index; //!< Index of value in dictionary page. column[dict_data[dict_index[row]]] uint8_t dict_rle_bits; //!< Bit size for encoding dictionary indices bool use_dictionary; //!< True if the chunk uses dictionary encoding + uint8_t* column_index_blob; //!< Binary blob containing encoded column index for this chunk + uint32_t column_index_size; //!< Size of column index blob }; /** @@ -632,6 +634,17 @@ void GatherPages(device_span chunks, device_span pages, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel to calculate ColumnIndex information per chunk + * + * @param[in,out] chunks Column chunks + * @param[in] column_stats Page-level statistics to be encoded + * @param[in] stream CUDA stream to use + */ +void EncodeColumnIndexes(device_span chunks, + device_span column_stats, + rmm::cuda_stream_view stream); + } // namespace gpu } // namespace parquet } // namespace io diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 70f4201c04b..2ae0f76598e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -82,6 +82,35 @@ parquet::Compression to_parquet_compression(compression_type compression) } } +/** + * @brief Function to calculate the memory needed to encode the column index of the given + * column chunk + */ +size_t column_index_buffer_size(gpu::EncColumnChunk* ck) +{ + // encoding the column index for a given chunk requires: + // each list (4 of them) requires 6 bytes of overhead + // (1 byte field header, 1 byte type, 4 bytes length) + // 1 byte overhead for boundary_order + // 1 byte overhead for termination + // sizeof(char) for boundary_order + // sizeof(bool) * num_pages for null_pages + // (ck_max_stats_len + 4) * num_pages * 2 for min/max values + // (each binary requires 4 bytes length + ck_max_stats_len) + // sizeof(int64_t) * num_pages for null_counts + // + // so 26 bytes overhead + sizeof(char) + + // (sizeof(bool) + sizeof(int64_t) + 2 * (4 + ck_max_stats_len)) * num_pages + // + // we already have ck->ck_stat_size = 48 + 2 * ck_max_stats_len + // all of the overhead and non-stats data can fit in under 48 bytes + // + // so we can simply use ck_stat_size * num_pages + // + // calculating this per-chunk because the sizes can be wildly different + return ck->ck_stat_size * ck->num_pages; +} + } // namespace struct aggregate_writer_metadata { @@ -185,6 +214,8 @@ struct aggregate_writer_metadata { int64_t num_rows = 0; std::vector row_groups; std::vector key_value_metadata; + std::vector offset_indexes; + std::vector> column_indexes; }; std::vector files; std::string created_by = ""; @@ -244,15 +275,17 @@ struct leaf_schema_fn { template std::enable_if_t, void> operator()() { - col_schema.type = Type::INT32; - col_schema.stats_dtype = statistics_dtype::dtype_int32; + col_schema.type = Type::INT32; + col_schema.converted_type = ConvertedType::INT_32; + col_schema.stats_dtype = statistics_dtype::dtype_int32; } template std::enable_if_t, void> operator()() { - col_schema.type = Type::INT64; - col_schema.stats_dtype = statistics_dtype::dtype_int64; + col_schema.type = Type::INT64; + col_schema.converted_type = ConvertedType::INT_64; + col_schema.stats_dtype = statistics_dtype::dtype_int64; } template @@ -659,6 +692,7 @@ struct parquet_column_view { [[nodiscard]] column_view cudf_column_view() const { return cudf_col; } [[nodiscard]] parquet::Type physical_type() const { return schema_node.type; } + [[nodiscard]] parquet::ConvertedType converted_type() const { return schema_node.converted_type; } std::vector const& get_path_in_schema() { return path_in_schema; } @@ -814,8 +848,9 @@ gpu::parquet_column_device_view parquet_column_view::get_device_view( desc.rep_values = _rep_level.data(); desc.def_values = _def_level.data(); } - desc.num_rows = cudf_col.size(); - desc.physical_type = physical_type(); + desc.num_rows = cudf_col.size(); + desc.physical_type = physical_type(); + desc.converted_type = converted_type(); desc.level_bits = CompactProtocolReader::NumRequiredBits(max_rep_level()) << 4 | CompactProtocolReader::NumRequiredBits(max_def_level()); @@ -1136,7 +1171,8 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks uint32_t rowgroups_in_batch, uint32_t first_rowgroup, const statistics_chunk* page_stats, - const statistics_chunk* chunk_stats) + const statistics_chunk* chunk_stats, + const statistics_chunk* column_stats) { auto batch_pages = pages.subspan(first_page_in_batch, pages_in_batch); @@ -1170,6 +1206,12 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks EncodePageHeaders(batch_pages, comp_stats, batch_pages_stats, chunk_stats, stream); GatherPages(d_chunks_in_batch.flat_view(), pages, stream); + if (column_stats != nullptr) { + auto batch_column_stats = + device_span(column_stats + first_page_in_batch, pages_in_batch); + EncodeColumnIndexes(d_chunks_in_batch.flat_view(), batch_column_stats, stream); + } + auto h_chunks_in_batch = chunks.host_view().subspan(first_rowgroup, rowgroups_in_batch); CUDF_CUDA_TRY(cudaMemcpyAsync(h_chunks_in_batch.data(), d_chunks_in_batch.data(), @@ -1492,6 +1534,7 @@ void writer::impl::write(table_view const& table, std::vector co size_type max_pages_in_batch = 0; size_t bytes_in_batch = 0; size_t comp_bytes_in_batch = 0; + size_t column_index_bfr_size = 0; for (size_type r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { size_t rowgroup_size = 0; size_t comp_rowgroup_size = 0; @@ -1505,6 +1548,9 @@ void writer::impl::write(table_view const& table, std::vector co comp_rowgroup_size += ck->compressed_size; max_chunk_bfr_size = std::max(max_chunk_bfr_size, (size_t)std::max(ck->bfr_size, ck->compressed_size)); + if (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) { + column_index_bfr_size += column_index_buffer_size(ck); + } } } // TBD: We may want to also shorten the batch if we have enough pages (not just based on size) @@ -1534,6 +1580,7 @@ void writer::impl::write(table_view const& table, std::vector co (stats_granularity_ != statistics_freq::STATISTICS_NONE) ? num_pages + num_chunks : 0; rmm::device_buffer uncomp_bfr(max_uncomp_bfr_size, stream); rmm::device_buffer comp_bfr(max_comp_bfr_size, stream); + rmm::device_buffer col_idx_bfr(column_index_bfr_size, stream); rmm::device_uvector pages(num_pages, stream); // This contains stats for both the pages and the rowgroups. TODO: make them separate. @@ -1541,13 +1588,18 @@ void writer::impl::write(table_view const& table, std::vector co for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { auto bfr = static_cast(uncomp_bfr.data()); auto bfr_c = static_cast(comp_bfr.data()); + auto bfr_i = static_cast(col_idx_bfr.data()); for (auto j = 0; j < batch_list[b]; j++, r++) { for (auto i = 0; i < num_columns; i++) { gpu::EncColumnChunk& ck = chunks[r][i]; ck.uncompressed_bfr = bfr; ck.compressed_bfr = bfr_c; + ck.column_index_blob = bfr_i; bfr += ck.bfr_size; bfr_c += ck.compressed_size; + if (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) { + bfr_i += column_index_buffer_size(&ck); + } } } } @@ -1584,7 +1636,9 @@ void writer::impl::write(table_view const& table, std::vector co r, (stats_granularity_ == statistics_freq::STATISTICS_PAGE) ? page_stats.data() : nullptr, (stats_granularity_ != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages - : nullptr); + : nullptr, + (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) ? page_stats.data() : nullptr); + std::vector> write_tasks; for (; r < rnext; r++) { int p = rg_to_part[r]; @@ -1651,6 +1705,58 @@ void writer::impl::write(table_view const& table, std::vector co task.wait(); } } + + if (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) { + // need pages on host to create offset_indexes + thrust::host_vector h_pages = cudf::detail::make_host_vector_async(pages, stream); + stream.synchronize(); + + // add column and offset indexes to metadata + for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { + auto const rnext = r + batch_list[b]; + auto curr_page_idx = chunks[r][0].first_page; + for (; r < rnext; r++) { + int p = rg_to_part[r]; + int global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; + auto const& row_group = md->file(p).row_groups[global_r]; + for (auto i = 0; i < num_columns; i++) { + gpu::EncColumnChunk const& ck = chunks[r][i]; + auto const& column_chunk_meta = row_group.columns[i].meta_data; + + // start transfer of the column index + std::vector column_idx; + column_idx.resize(ck.column_index_size); + CUDF_CUDA_TRY(cudaMemcpyAsync(column_idx.data(), + ck.column_index_blob, + ck.column_index_size, + cudaMemcpyDeviceToHost, + stream.value())); + + // calculate offsets while the column index is transfering + int64_t curr_pg_offset = column_chunk_meta.data_page_offset; + + OffsetIndex offset_idx; + for (uint32_t pg = 0; pg < ck.num_pages; pg++) { + auto const& enc_page = h_pages[curr_page_idx++]; + + // skip dict pages + if (enc_page.page_type != PageType::DATA_PAGE) { continue; } + + int32_t this_page_size = enc_page.hdr_size + enc_page.max_data_size; + // first_row_idx is relative to start of row group + PageLocation loc{curr_pg_offset, this_page_size, enc_page.start_row - ck.start_row}; + offset_idx.page_locations.push_back(loc); + curr_pg_offset += this_page_size; + } + + stream.synchronize(); + md->file(p).offset_indexes.push_back(offset_idx); + md->file(p).column_indexes.push_back(column_idx); + } + } + } + } + last_write_successful = true; } @@ -1664,6 +1770,35 @@ std::unique_ptr> writer::impl::close( std::vector buffer; CompactProtocolWriter cpw(&buffer); file_ender_s fendr; + + if (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) { + auto& fmd = md->file(p); + + // write column indices, updating column metadata along the way + int chunkidx = 0; + for (auto& r : fmd.row_groups) { + for (auto& c : r.columns) { + auto const& index = fmd.column_indexes[chunkidx++]; + c.column_index_offset = out_sink_[p]->bytes_written(); + c.column_index_length = index.size(); + out_sink_[p]->host_write(index.data(), index.size()); + } + } + + // write offset indices, updating column metadata along the way + chunkidx = 0; + for (auto& r : fmd.row_groups) { + for (auto& c : r.columns) { + auto const& offsets = fmd.offset_indexes[chunkidx++]; + buffer.resize(0); + int32_t len = cpw.write(offsets); + c.offset_index_offset = out_sink_[p]->bytes_written(); + c.offset_index_length = len; + out_sink_[p]->host_write(buffer.data(), buffer.size()); + } + } + } + buffer.resize(0); fendr.footer_len = static_cast(cpw.write(md->get_metadata(p))); fendr.magic = parquet_magic; diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 731f7006400..0a21c73ae7d 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -181,6 +181,7 @@ class writer::impl { * @param first_rowgroup first rowgroup in batch * @param page_stats optional page-level statistics (nullptr if none) * @param chunk_stats optional chunk-level statistics (nullptr if none) + * @param column_stats optional page-level statistics for column index (nullptr if none) */ void encode_pages(hostdevice_2dvector& chunks, device_span pages, @@ -190,7 +191,8 @@ class writer::impl { uint32_t rowgroups_in_batch, uint32_t first_rowgroup, const statistics_chunk* page_stats, - const statistics_chunk* chunk_stats); + const statistics_chunk* chunk_stats, + const statistics_chunk* column_stats); private: // TODO : figure out if we want to keep this. It is currently unused. diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 66a4a463ba5..41bf399ad9a 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -196,25 +196,24 @@ std::unique_ptr make_parquet_list_col( // given a datasource pointing to a parquet file, read the footer // of the file to populate the FileMetaData pointed to by file_meta_data. -// returns true on success, false if the file metadata cannot be parsed. -// throws cudf::logic_error if the file is invalid. -bool read_footer(std::unique_ptr& source, - cudf_io::parquet::FileMetaData* file_meta_data) +// throws cudf::logic_error if the file or metadata is invalid. +void read_footer(const std::unique_ptr& source, + cudf::io::parquet::FileMetaData* file_meta_data) { - constexpr auto header_len = sizeof(cudf_io::parquet::file_header_s); - constexpr auto ender_len = sizeof(cudf_io::parquet::file_ender_s); + constexpr auto header_len = sizeof(cudf::io::parquet::file_header_s); + constexpr auto ender_len = sizeof(cudf::io::parquet::file_ender_s); const auto len = source->size(); const auto header_buffer = source->host_read(0, header_len); const auto header = - reinterpret_cast(header_buffer->data()); + reinterpret_cast(header_buffer->data()); const auto ender_buffer = source->host_read(len - ender_len, ender_len); - const auto ender = reinterpret_cast(ender_buffer->data()); + const auto ender = reinterpret_cast(ender_buffer->data()); // checks for valid header, footer, and file length CUDF_EXPECTS(len > header_len + ender_len, "Incorrect data source"); - CUDF_EXPECTS(header->magic == cudf_io::parquet::parquet_magic && - ender->magic == cudf_io::parquet::parquet_magic, + CUDF_EXPECTS(header->magic == cudf::io::parquet::parquet_magic && + ender->magic == cudf::io::parquet::parquet_magic, "Corrupted header or footer"); CUDF_EXPECTS(ender->footer_len != 0 && ender->footer_len <= (len - header_len - ender_len), "Incorrect footer length"); @@ -223,25 +222,78 @@ bool read_footer(std::unique_ptr& source, // seek backwards from the end of the file (footer_length + 8 bytes of ender) const auto footer_buffer = source->host_read(len - ender->footer_len - ender_len, ender->footer_len); - cudf_io::parquet::CompactProtocolReader cp(footer_buffer->data(), ender->footer_len); + cudf::io::parquet::CompactProtocolReader cp(footer_buffer->data(), ender->footer_len); // returns true on success - return cp.read(file_meta_data); + bool res = cp.read(file_meta_data); + CUDF_EXPECTS(res, "Cannot parse file metadata"); +} + +// read column index from datasource at location indicated by chunk, +// parse and return as a ColumnIndex struct. +// throws cudf::logic_error if the chunk data is invalid. +cudf::io::parquet::ColumnIndex read_column_index( + const std::unique_ptr& source, const cudf::io::parquet::ColumnChunk& chunk) +{ + CUDF_EXPECTS(chunk.column_index_offset > 0, "Cannot find column index"); + CUDF_EXPECTS(chunk.column_index_length > 0, "Invalid column index length"); + + cudf::io::parquet::ColumnIndex colidx; + const auto ci_buf = source->host_read(chunk.column_index_offset, chunk.column_index_length); + cudf::io::parquet::CompactProtocolReader cp(ci_buf->data(), ci_buf->size()); + bool res = cp.read(&colidx); + CUDF_EXPECTS(res, "Cannot parse column index"); + return colidx; +} + +// read offset index from datasource at location indicated by chunk, +// parse and return as an OffsetIndex struct. +// throws cudf::logic_error if the chunk data is invalid. +cudf::io::parquet::OffsetIndex read_offset_index( + const std::unique_ptr& source, const cudf::io::parquet::ColumnChunk& chunk) +{ + CUDF_EXPECTS(chunk.offset_index_offset > 0, "Cannot find offset index"); + CUDF_EXPECTS(chunk.offset_index_length > 0, "Invalid offset index length"); + + cudf::io::parquet::OffsetIndex offidx; + const auto oi_buf = source->host_read(chunk.offset_index_offset, chunk.offset_index_length); + cudf::io::parquet::CompactProtocolReader cp(oi_buf->data(), oi_buf->size()); + bool res = cp.read(&offidx); + CUDF_EXPECTS(res, "Cannot parse offset index"); + return offidx; } // parse the statistics_blob on chunk and return as a Statistics struct. // throws cudf::logic_error if the chunk statistics_blob is invalid. -cudf_io::parquet::Statistics parse_statistics(const cudf_io::parquet::ColumnChunk& chunk) +cudf::io::parquet::Statistics parse_statistics(const cudf::io::parquet::ColumnChunk& chunk) { auto& stats_blob = chunk.meta_data.statistics_blob; CUDF_EXPECTS(stats_blob.size() > 0, "Invalid statistics length"); - cudf_io::parquet::Statistics stats; - cudf_io::parquet::CompactProtocolReader cp(stats_blob.data(), stats_blob.size()); - CUDF_EXPECTS(cp.read(&stats), "Cannot parse column statistics"); + cudf::io::parquet::Statistics stats; + cudf::io::parquet::CompactProtocolReader cp(stats_blob.data(), stats_blob.size()); + bool res = cp.read(&stats); + CUDF_EXPECTS(res, "Cannot parse column statistics"); return stats; } +// read page header from datasource at location indicated by page_loc, +// parse and return as a PageHeader struct. +// throws cudf::logic_error if the page_loc data is invalid. +cudf::io::parquet::PageHeader read_page_header(const std::unique_ptr& source, + const cudf::io::parquet::PageLocation& page_loc) +{ + CUDF_EXPECTS(page_loc.offset > 0, "Cannot find page header"); + CUDF_EXPECTS(page_loc.compressed_page_size > 0, "Invalid page header length"); + + cudf::io::parquet::PageHeader page_hdr; + const auto page_buf = source->host_read(page_loc.offset, page_loc.compressed_page_size); + cudf::io::parquet::CompactProtocolReader cp(page_buf->data(), page_buf->size()); + bool res = cp.read(&page_hdr); + CUDF_EXPECTS(res, "Cannot parse page header"); + return page_hdr; +} + // Base test fixture for tests struct ParquetWriterTest : public cudf::test::BaseFixture { }; @@ -260,6 +312,12 @@ struct ParquetWriterNumericTypeTest : public ParquetWriterTest { auto type() { return cudf::data_type{cudf::type_to_id()}; } }; +// Typed test fixture for comparable type tests +template +struct ParquetWriterComparableTypeTest : public ParquetWriterTest { + auto type() { return cudf::data_type{cudf::type_to_id()}; } +}; + // Typed test fixture for timestamp type tests template struct ParquetWriterChronoTypeTest : public ParquetWriterTest { @@ -282,6 +340,9 @@ struct ParquetWriterSchemaTest : public ParquetWriterTest { // TODO: Replace with `NumericTypes` when unsigned support is added. Issue #5352 using SupportedTypes = cudf::test::Types; TYPED_TEST_SUITE(ParquetWriterNumericTypeTest, SupportedTypes); +using ComparableAndFixedTypes = + cudf::test::Concat; +TYPED_TEST_SUITE(ParquetWriterComparableTypeTest, ComparableAndFixedTypes); TYPED_TEST_SUITE(ParquetWriterChronoTypeTest, cudf::test::ChronoTypes); using SupportedTimestampTypes = cudf::test::Types; @@ -3389,28 +3450,26 @@ TEST_F(ParquetWriterTest, CheckPageRows) auto expected = std::make_unique(std::move(cols)); EXPECT_EQ(1, expected->num_columns()); - auto filepath = temp_env->get_temp_filepath("CheckPageRows.parquet"); - cudf_io::parquet_writer_options out_opts = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, expected->view()) + auto const filepath = temp_env->get_temp_filepath("CheckPageRows.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected->view()) .max_page_size_rows(page_rows); - cudf_io::write_parquet(out_opts); + cudf::io::write_parquet(out_opts); // check first page header and make sure it has only page_rows values - auto source = cudf_io::datasource::create(filepath); - cudf_io::parquet::FileMetaData fmd; + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; - CUDF_EXPECTS(read_footer(source, &fmd), "Cannot parse metadata"); + read_footer(source, &fmd); CUDF_EXPECTS(fmd.row_groups.size() > 0, "No row groups found"); CUDF_EXPECTS(fmd.row_groups[0].columns.size() == 1, "Invalid number of columns"); - auto& first_chunk = fmd.row_groups[0].columns[0].meta_data; + auto const& first_chunk = fmd.row_groups[0].columns[0].meta_data; CUDF_EXPECTS(first_chunk.data_page_offset > 0, "Invalid location for first data page"); - // read first data page header. sizeof(ph) is not exact, but the thrift encoded + // read first data page header. sizeof(PageHeader) is not exact, but the thrift encoded // version should be smaller than size of the struct. - cudf_io::parquet::PageHeader ph; - const auto pg_hdr_buf = source->host_read(first_chunk.data_page_offset, sizeof(ph)); - cudf_io::parquet::CompactProtocolReader cp(pg_hdr_buf->data(), pg_hdr_buf->size()); - CUDF_EXPECTS(cp.read(&ph), "Cannot read page header"); + auto const ph = read_page_header( + source, {first_chunk.data_page_offset, sizeof(cudf::io::parquet::PageHeader), 0}); EXPECT_EQ(ph.data_page_header.num_values, page_rows); } @@ -3433,15 +3492,15 @@ TEST_F(ParquetWriterTest, Decimal128Stats) cols.push_back(col0.release()); auto expected = std::make_unique
(std::move(cols)); - auto filepath = temp_env->get_temp_filepath("Decimal128Stats.parquet"); - cudf_io::parquet_writer_options out_opts = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, expected->view()); - cudf_io::write_parquet(out_opts); + auto const filepath = temp_env->get_temp_filepath("Decimal128Stats.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected->view()); + cudf::io::write_parquet(out_opts); - auto source = cudf_io::datasource::create(filepath); - cudf_io::parquet::FileMetaData fmd; + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; - CUDF_EXPECTS(read_footer(source, &fmd), "Cannot parse metadata"); + read_footer(source, &fmd); auto const stats = parse_statistics(fmd.row_groups[0].columns[0]); @@ -3449,6 +3508,693 @@ TEST_F(ParquetWriterTest, Decimal128Stats) EXPECT_EQ(expected_max, stats.max_value); } +// ============================================================================= +// ---- test data for stats sort order tests +// need at least 3 pages, and min page count is 5000, so need at least 15000 values. +// use 20000 to be safe. +static constexpr int num_ordered_rows = 20000; +static constexpr int page_size_for_ordered_tests = 5000; + +namespace { +namespace testdata { +// ----- most numerics. scale by 100 so all values fit in a single byte + +template +std::enable_if_t && !std::is_same_v, + cudf::test::fixed_width_column_wrapper> +ascending() +{ + int start = std::is_signed_v ? -num_ordered_rows / 2 : 0; + auto elements = + cudf::detail::make_counting_transform_iterator(start, [](auto i) { return i / 100; }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t && !std::is_same_v, + cudf::test::fixed_width_column_wrapper> +descending() +{ + if (std::is_signed_v) { + auto elements = cudf::detail::make_counting_transform_iterator(-num_ordered_rows / 2, + [](auto i) { return -i / 100; }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); + } else { + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return (num_ordered_rows - i) / 100; }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); + } +} + +template +std::enable_if_t && !std::is_same_v, + cudf::test::fixed_width_column_wrapper> +unordered() +{ + if (std::is_signed_v) { + auto elements = cudf::detail::make_counting_transform_iterator( + -num_ordered_rows / 2, [](auto i) { return (i % 2 ? i : -i) / 100; }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); + } else { + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return (i % 2 ? i : num_ordered_rows - i) / 100; }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); + } +} + +// ----- bool + +template +std::enable_if_t, cudf::test::fixed_width_column_wrapper> ascending() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return i < num_ordered_rows / 2 ? false : true; }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t, cudf::test::fixed_width_column_wrapper> descending() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return i < num_ordered_rows / 2 ? true : false; }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t, cudf::test::fixed_width_column_wrapper> unordered() +{ + auto elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + switch (i / page_size_for_ordered_tests) { + case 0: return true; + case 1: return false; + case 2: return true; + default: return false; + } + }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +// ----- fixed point types + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> ascending() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + -num_ordered_rows / 2, [](auto i) { return T(i, numeric::scale_type{0}); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> descending() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + -num_ordered_rows / 2, [](auto i) { return T(-i, numeric::scale_type{0}); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> unordered() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + -num_ordered_rows / 2, [](auto i) { return T(i % 2 ? i : -i, numeric::scale_type{0}); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +// ----- chrono types +// ----- timstamp + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> ascending() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return T(typename T::duration(i)); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> descending() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return T(typename T::duration(num_ordered_rows - i)); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> unordered() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return T(typename T::duration(i % 2 ? i : num_ordered_rows - i)); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +// ----- duration + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> ascending() +{ + auto elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return T(i); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> descending() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return T(num_ordered_rows - i); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t(), cudf::test::fixed_width_column_wrapper> unordered() +{ + auto elements = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return T(i % 2 ? i : num_ordered_rows - i); }); + return cudf::test::fixed_width_column_wrapper(elements, elements + num_ordered_rows); +} + +// ----- string_view + +template +std::enable_if_t, cudf::test::strings_column_wrapper> +ascending() +{ + char buf[10]; + auto elements = cudf::detail::make_counting_transform_iterator(0, [&buf](auto i) { + sprintf(buf, "%09d", i); + return std::string(buf); + }); + return cudf::test::strings_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t, cudf::test::strings_column_wrapper> +descending() +{ + char buf[10]; + auto elements = cudf::detail::make_counting_transform_iterator(0, [&buf](auto i) { + sprintf(buf, "%09d", num_ordered_rows - i); + return std::string(buf); + }); + return cudf::test::strings_column_wrapper(elements, elements + num_ordered_rows); +} + +template +std::enable_if_t, cudf::test::strings_column_wrapper> +unordered() +{ + char buf[10]; + auto elements = cudf::detail::make_counting_transform_iterator(0, [&buf](auto i) { + sprintf(buf, "%09d", (i % 2 == 0) ? i : (num_ordered_rows - i)); + return std::string(buf); + }); + return cudf::test::strings_column_wrapper(elements, elements + num_ordered_rows); +} + +} // namespace testdata +} // anonymous namespace + +TYPED_TEST(ParquetWriterComparableTypeTest, ThreeColumnSorted) +{ + using T = TypeParam; + + auto col1 = testdata::ascending(); + auto col2 = testdata::descending(); + auto col3 = testdata::unordered(); + + std::vector> cols; + cols.push_back(col1.release()); + cols.push_back(col2.release()); + cols.push_back(col3.release()); + auto const expected = std::make_unique
(std::move(cols)); + + auto const filepath = temp_env->get_temp_filepath("ThreeColumnSorted.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected->view()) + .max_page_size_rows(page_size_for_ordered_tests) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN); + cudf::io::write_parquet(out_opts); + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + CUDF_EXPECTS(fmd.row_groups.size() > 0, "No row groups found"); + + auto const& columns = fmd.row_groups[0].columns; + CUDF_EXPECTS(columns.size() == static_cast(expected->num_columns()), + "Invalid number of columns"); + + // now check that the boundary order for chunk 1 is ascending, + // chunk 2 is descending, and chunk 3 is unordered + cudf::io::parquet::BoundaryOrder expected_orders[] = { + cudf::io::parquet::BoundaryOrder::ASCENDING, + cudf::io::parquet::BoundaryOrder::DESCENDING, + cudf::io::parquet::BoundaryOrder::UNORDERED}; + + for (std::size_t i = 0; i < columns.size(); i++) { + auto const ci = read_column_index(source, columns[i]); + EXPECT_EQ(ci.boundary_order, expected_orders[i]); + } +} + +// utility functions for column index tests + +// compare two values. return -1 if v1 < v2, +// 0 if v1 == v2, and 1 if v1 > v2. +template +int32_t compare(T& v1, T& v2) +{ + return (v1 > v2) - (v1 < v2); +} + +// compare two binary statistics blobs based on their physical +// and converted types. returns -1 if v1 < v2, 0 if v1 == v2, and +// 1 if v1 > v2. +int32_t compare_binary(const std::vector& v1, + const std::vector& v2, + cudf::io::parquet::Type ptype, + cudf::io::parquet::ConvertedType ctype) +{ + switch (ptype) { + case cudf::io::parquet::INT32: + switch (ctype) { + case cudf::io::parquet::UINT_8: + case cudf::io::parquet::UINT_16: + case cudf::io::parquet::UINT_32: + return compare(*(reinterpret_cast(v1.data())), + *(reinterpret_cast(v2.data()))); + default: + return compare(*(reinterpret_cast(v1.data())), + *(reinterpret_cast(v2.data()))); + } + + case cudf::io::parquet::INT64: + if (ctype == cudf::io::parquet::UINT_64) { + return compare(*(reinterpret_cast(v1.data())), + *(reinterpret_cast(v2.data()))); + } + return compare(*(reinterpret_cast(v1.data())), + *(reinterpret_cast(v2.data()))); + + case cudf::io::parquet::FLOAT: + return compare(*(reinterpret_cast(v1.data())), + *(reinterpret_cast(v2.data()))); + + case cudf::io::parquet::DOUBLE: + return compare(*(reinterpret_cast(v1.data())), + *(reinterpret_cast(v2.data()))); + + case cudf::io::parquet::BYTE_ARRAY: { + int32_t v1sz = v1.size(); + int32_t v2sz = v2.size(); + int32_t ret = memcmp(v1.data(), v2.data(), std::min(v1sz, v2sz)); + if (ret != 0 or v1sz == v2sz) { return ret; } + return v1sz - v2sz; + } + + default: CUDF_FAIL("Invalid type in compare_binary"); + } + + return 0; +} + +TEST_F(ParquetWriterTest, CheckColumnOffsetIndex) +{ + constexpr auto num_rows = 100000; + + // fixed length strings + auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + char buf[30]; + sprintf(buf, "%012d", i); + return std::string(buf); + }); + auto col0 = cudf::test::strings_column_wrapper(str1_elements, str1_elements + num_rows); + + auto col1_data = random_values(num_rows); + auto col2_data = random_values(num_rows); + auto col3_data = random_values(num_rows); + auto col4_data = random_values(num_rows); + auto col5_data = random_values(num_rows); + auto col6_data = random_values(num_rows); + + auto col1 = cudf::test::fixed_width_column_wrapper(col1_data.begin(), col1_data.end()); + auto col2 = cudf::test::fixed_width_column_wrapper(col2_data.begin(), col2_data.end()); + auto col3 = cudf::test::fixed_width_column_wrapper(col3_data.begin(), col3_data.end()); + auto col4 = cudf::test::fixed_width_column_wrapper(col4_data.begin(), col4_data.end()); + auto col5 = cudf::test::fixed_width_column_wrapper(col5_data.begin(), col5_data.end()); + auto col6 = cudf::test::fixed_width_column_wrapper(col6_data.begin(), col6_data.end()); + + // mixed length strings + auto str2_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + char buf[30]; + sprintf(buf, "%d", i); + return std::string(buf); + }); + auto col7 = cudf::test::strings_column_wrapper(str2_elements, str2_elements + num_rows); + + std::vector> cols; + cols.push_back(col0.release()); + cols.push_back(col1.release()); + cols.push_back(col2.release()); + cols.push_back(col3.release()); + cols.push_back(col4.release()); + cols.push_back(col5.release()); + cols.push_back(col6.release()); + cols.push_back(col7.release()); + auto const expected = std::make_unique
(std::move(cols)); + + auto const filepath = temp_env->get_temp_filepath("CheckColumnOffsetIndex.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected->view()) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .max_page_size_rows(20000); + cudf::io::write_parquet(out_opts); + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + + for (size_t r = 0; r < fmd.row_groups.size(); r++) { + auto const& rg = fmd.row_groups[r]; + for (size_t c = 0; c < rg.columns.size(); c++) { + auto const& chunk = rg.columns[c]; + + // loop over offsets, read each page header, make sure it's a data page and that + // the first row index is correct + auto const oi = read_offset_index(source, chunk); + + int64_t num_vals = 0; + for (size_t o = 0; o < oi.page_locations.size(); o++) { + auto const& page_loc = oi.page_locations[o]; + auto const ph = read_page_header(source, page_loc); + EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); + EXPECT_EQ(page_loc.first_row_index, num_vals); + num_vals += ph.data_page_header.num_values; + } + + // loop over page stats from the column index. check that stats.min <= page.min + // and stats.max >= page.max for each page. + auto const ci = read_column_index(source, chunk); + auto const stats = parse_statistics(chunk); + + // schema indexing starts at 1 + auto const ptype = fmd.schema[c + 1].type; + auto const ctype = fmd.schema[c + 1].converted_type; + for (size_t p = 0; p < ci.min_values.size(); p++) { + // null_pages should always be false + EXPECT_FALSE(ci.null_pages[p]); + // null_counts should always be 0 + EXPECT_EQ(ci.null_counts[p], 0); + EXPECT_TRUE(compare_binary(stats.min_value, ci.min_values[p], ptype, ctype) <= 0); + } + for (size_t p = 0; p < ci.max_values.size(); p++) + EXPECT_TRUE(compare_binary(stats.max_value, ci.max_values[p], ptype, ctype) >= 0); + } + } +} + +TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNulls) +{ + constexpr auto num_rows = 100000; + + // fixed length strings + auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + char buf[30]; + sprintf(buf, "%012d", i); + return std::string(buf); + }); + auto col0 = cudf::test::strings_column_wrapper(str1_elements, str1_elements + num_rows); + + auto col1_data = random_values(num_rows); + auto col2_data = random_values(num_rows); + auto col3_data = random_values(num_rows); + auto col4_data = random_values(num_rows); + auto col5_data = random_values(num_rows); + auto col6_data = random_values(num_rows); + + auto valids = + cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 2 == 0; }); + + // add null values for all but first column + auto col1 = + cudf::test::fixed_width_column_wrapper(col1_data.begin(), col1_data.end(), valids); + auto col2 = + cudf::test::fixed_width_column_wrapper(col2_data.begin(), col2_data.end(), valids); + auto col3 = + cudf::test::fixed_width_column_wrapper(col3_data.begin(), col3_data.end(), valids); + auto col4 = + cudf::test::fixed_width_column_wrapper(col4_data.begin(), col4_data.end(), valids); + auto col5 = + cudf::test::fixed_width_column_wrapper(col5_data.begin(), col5_data.end(), valids); + auto col6 = + cudf::test::fixed_width_column_wrapper(col6_data.begin(), col6_data.end(), valids); + + // mixed length strings + auto str2_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + char buf[30]; + sprintf(buf, "%d", i); + return std::string(buf); + }); + auto col7 = cudf::test::strings_column_wrapper(str2_elements, str2_elements + num_rows, valids); + + std::vector> cols; + cols.push_back(col0.release()); + cols.push_back(col1.release()); + cols.push_back(col2.release()); + cols.push_back(col3.release()); + cols.push_back(col4.release()); + cols.push_back(col5.release()); + cols.push_back(col6.release()); + cols.push_back(col7.release()); + auto expected = std::make_unique
(std::move(cols)); + + auto const filepath = temp_env->get_temp_filepath("CheckColumnOffsetIndexNulls.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected->view()) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .max_page_size_rows(20000); + cudf::io::write_parquet(out_opts); + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + + for (size_t r = 0; r < fmd.row_groups.size(); r++) { + auto const& rg = fmd.row_groups[r]; + for (size_t c = 0; c < rg.columns.size(); c++) { + auto const& chunk = rg.columns[c]; + + // loop over offsets, read each page header, make sure it's a data page and that + // the first row index is correct + auto const oi = read_offset_index(source, chunk); + + int64_t num_vals = 0; + for (size_t o = 0; o < oi.page_locations.size(); o++) { + auto const& page_loc = oi.page_locations[o]; + auto const ph = read_page_header(source, page_loc); + EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); + EXPECT_EQ(page_loc.first_row_index, num_vals); + num_vals += ph.data_page_header.num_values; + } + + // loop over page stats from the column index. check that stats.min <= page.min + // and stats.max >= page.max for each page. + auto const ci = read_column_index(source, chunk); + auto const stats = parse_statistics(chunk); + + // schema indexing starts at 1 + auto const ptype = fmd.schema[c + 1].type; + auto const ctype = fmd.schema[c + 1].converted_type; + for (size_t p = 0; p < ci.min_values.size(); p++) { + EXPECT_FALSE(ci.null_pages[p]); + if (c > 0) { // first column has no nulls + EXPECT_GT(ci.null_counts[p], 0); + } else { + EXPECT_EQ(ci.null_counts[p], 0); + } + EXPECT_TRUE(compare_binary(stats.min_value, ci.min_values[p], ptype, ctype) <= 0); + } + for (size_t p = 0; p < ci.max_values.size(); p++) { + EXPECT_TRUE(compare_binary(stats.max_value, ci.max_values[p], ptype, ctype) >= 0); + } + } + } +} + +TEST_F(ParquetWriterTest, CheckColumnOffsetIndexNullColumn) +{ + constexpr auto num_rows = 100000; + + // fixed length strings + auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + char buf[30]; + sprintf(buf, "%012d", i); + return std::string(buf); + }); + auto col0 = cudf::test::strings_column_wrapper(str1_elements, str1_elements + num_rows); + + auto col1_data = random_values(num_rows); + auto col2_data = random_values(num_rows); + + // col1 is all nulls + auto valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return false; }); + auto col1 = + cudf::test::fixed_width_column_wrapper(col1_data.begin(), col1_data.end(), valids); + auto col2 = cudf::test::fixed_width_column_wrapper(col2_data.begin(), col2_data.end()); + + // mixed length strings + auto str2_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + char buf[30]; + sprintf(buf, "%d", i); + return std::string(buf); + }); + auto col3 = cudf::test::strings_column_wrapper(str2_elements, str2_elements + num_rows); + + std::vector> cols; + cols.push_back(col0.release()); + cols.push_back(col1.release()); + cols.push_back(col2.release()); + cols.push_back(col3.release()); + auto expected = std::make_unique
(std::move(cols)); + + auto const filepath = temp_env->get_temp_filepath("CheckColumnOffsetIndexNullColumn.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected->view()) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .max_page_size_rows(20000); + cudf::io::write_parquet(out_opts); + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + + for (size_t r = 0; r < fmd.row_groups.size(); r++) { + auto const& rg = fmd.row_groups[r]; + for (size_t c = 0; c < rg.columns.size(); c++) { + auto const& chunk = rg.columns[c]; + + // loop over offsets, read each page header, make sure it's a data page and that + // the first row index is correct + auto const oi = read_offset_index(source, chunk); + + int64_t num_vals = 0; + for (size_t o = 0; o < oi.page_locations.size(); o++) { + auto const& page_loc = oi.page_locations[o]; + auto const ph = read_page_header(source, page_loc); + EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); + EXPECT_EQ(page_loc.first_row_index, num_vals); + num_vals += ph.data_page_header.num_values; + } + + // loop over page stats from the column index. check that stats.min <= page.min + // and stats.max >= page.max for each non-empty page. + auto const ci = read_column_index(source, chunk); + auto const stats = parse_statistics(chunk); + + // schema indexing starts at 1 + auto const ptype = fmd.schema[c + 1].type; + auto const ctype = fmd.schema[c + 1].converted_type; + for (size_t p = 0; p < ci.min_values.size(); p++) { + // check tnat null_pages is true for column 1 + if (c == 1) { + EXPECT_TRUE(ci.null_pages[p]); + EXPECT_GT(ci.null_counts[p], 0); + } + if (not ci.null_pages[p]) { + EXPECT_EQ(ci.null_counts[p], 0); + EXPECT_TRUE(compare_binary(stats.min_value, ci.min_values[p], ptype, ctype) <= 0); + } + } + for (size_t p = 0; p < ci.max_values.size(); p++) { + if (not ci.null_pages[p]) { + EXPECT_TRUE(compare_binary(stats.max_value, ci.max_values[p], ptype, ctype) >= 0); + } + } + } + } +} + +TEST_F(ParquetWriterTest, CheckColumnOffsetIndexStruct) +{ + auto c0 = testdata::ascending(); + + auto sc0 = testdata::ascending(); + auto sc1 = testdata::descending(); + auto sc2 = testdata::unordered(); + + std::vector> struct_children; + struct_children.push_back(sc0.release()); + struct_children.push_back(sc1.release()); + struct_children.push_back(sc2.release()); + cudf::test::structs_column_wrapper c1(std::move(struct_children)); + + auto listgen = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return i % 2 == 0 ? i / 2 : num_ordered_rows - (i / 2); }); + auto list = + cudf::test::fixed_width_column_wrapper(listgen, listgen + 2 * num_ordered_rows); + auto offgen = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i * 2; }); + auto offsets = + cudf::test::fixed_width_column_wrapper(offgen, offgen + num_ordered_rows + 1); + + auto c2 = cudf::make_lists_column(num_ordered_rows, offsets.release(), list.release(), 0, {}); + + table_view expected({c0, c1, *c2}); + + auto const filepath = temp_env->get_temp_filepath("CheckColumnOffsetIndexStruct.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .max_page_size_rows(page_size_for_ordered_tests); + cudf::io::write_parquet(out_opts); + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + + // hard coded schema indices. + // TODO find a way to do this without magic + size_t colidxs[] = {1, 3, 4, 5, 8}; + for (size_t r = 0; r < fmd.row_groups.size(); r++) { + auto const& rg = fmd.row_groups[r]; + for (size_t c = 0; c < rg.columns.size(); c++) { + size_t colidx = colidxs[c]; + auto const& chunk = rg.columns[c]; + + // loop over offsets, read each page header, make sure it's a data page and that + // the first row index is correct + auto const oi = read_offset_index(source, chunk); + + int64_t num_vals = 0; + for (size_t o = 0; o < oi.page_locations.size(); o++) { + auto const& page_loc = oi.page_locations[o]; + auto const ph = read_page_header(source, page_loc); + EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE); + // last column has 2 values per row + EXPECT_EQ(page_loc.first_row_index * (c == rg.columns.size() - 1 ? 2 : 1), num_vals); + num_vals += ph.data_page_header.num_values; + } + + // loop over page stats from the column index. check that stats.min <= page.min + // and stats.max >= page.max for each page. + auto const ci = read_column_index(source, chunk); + auto const stats = parse_statistics(chunk); + + auto const ptype = fmd.schema[colidx].type; + auto const ctype = fmd.schema[colidx].converted_type; + for (size_t p = 0; p < ci.min_values.size(); p++) { + EXPECT_TRUE(compare_binary(stats.min_value, ci.min_values[p], ptype, ctype) <= 0); + } + for (size_t p = 0; p < ci.max_values.size(); p++) { + EXPECT_TRUE(compare_binary(stats.max_value, ci.max_values[p], ptype, ctype) >= 0); + } + } + } +} + TEST_F(ParquetReaderTest, EmptyColumnsParam) { srand(31337);