From 3fb09d173d98a78fbda218d32de5bb8df1d1db0f Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 17 Nov 2022 18:35:40 -0800 Subject: [PATCH] Implement chunked Parquet reader (#11867) This adds chunked Parquet reader, which can perform chunked reading for accessing files by an iterative manner. Instead of reading the input file all at once, we can read it chunk by chunk, each chunk can be limited to be small enough to not exceed the cudf internal limit (2GB/2 billions rows): ``` auto reader = cudf::io::chunked_parquet_reader(byte_limit, read_opts); do { auto const chunk = reader.read_chunk(); // Process chunk } while (reader.has_next()); ``` Authors: - Nghia Truong (https://github.com/ttnghia) - https://github.com/nvdbaranec Approvers: - Yunsong Wang (https://github.com/PointKernel) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/11867 --- cpp/include/cudf/io/detail/parquet.hpp | 56 ++ cpp/include/cudf/io/parquet.hpp | 68 ++ cpp/src/io/functions.cpp | 39 + cpp/src/io/parquet/page_data.cu | 510 +++++------ cpp/src/io/parquet/page_hdr.cu | 1 + cpp/src/io/parquet/parquet_gpu.hpp | 67 +- cpp/src/io/parquet/reader.cpp | 15 + cpp/src/io/parquet/reader_impl.cpp | 162 ++-- cpp/src/io/parquet/reader_impl.hpp | 82 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 749 +++++++++++++++- cpp/src/io/utilities/column_buffer.cpp | 27 + cpp/src/io/utilities/column_buffer.hpp | 6 +- cpp/tests/CMakeLists.txt | 2 +- cpp/tests/io/parquet_chunked_reader_test.cpp | 887 +++++++++++++++++++ 14 files changed, 2320 insertions(+), 351 deletions(-) create mode 100644 cpp/tests/io/parquet_chunked_reader_test.cpp diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 8c7a7a21978..7f107017864 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -81,6 +81,62 @@ class reader { table_with_metadata read(parquet_reader_options const& options); }; +/** + * @brief The reader class that supports iterative reading of a given file. + * + * This class intentionally subclasses the `reader` class with private inheritance to hide the + * `reader::read()` API. As such, only chunked reading APIs are supported. + */ +class chunked_reader : private reader { + public: + /** + * @brief Constructor from a read size limit and an array of data sources with reader options. + * + * The typical usage should be similar to this: + * ``` + * do { + * auto const chunk = reader.read_chunk(); + * // Process chunk + * } while (reader.has_next()); + * + * ``` + * + * If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the + * whole file and return a table containing all rows. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param sources Input `datasource` objects to read the dataset from + * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource to use for device memory allocation + */ + explicit chunked_reader(std::size_t chunk_read_limit, + std::vector>&& sources, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + + /** + * @brief Destructor explicitly-declared to avoid inlined in header. + * + * Since the declaration of the internal `_impl` object does not exist in this header, this + * destructor needs to be defined in a separate source file which can access to that object's + * declaration. + */ + ~chunked_reader(); + + /** + * @copydoc cudf::io::chunked_parquet_reader::has_next + */ + [[nodiscard]] bool has_next() const; + + /** + * @copydoc cudf::io::chunked_parquet_reader::read_chunk + */ + [[nodiscard]] table_with_metadata read_chunk() const; +}; + /** * @brief Class to write parquet dataset data into columns. */ diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index c5425de308c..f3facae098d 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -399,6 +399,74 @@ table_with_metadata read_parquet( parquet_reader_options const& options, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +/** + * @brief The chunked parquet reader class to read Parquet file iteratively in to a series of + * tables, chunk by chunk. + * + * This class is designed to address the reading issue when reading very large Parquet files such + * that the sizes of their column exceed the limit that can be stored in cudf column. By reading the + * file content by chunks using this class, each chunk is guaranteed to have its sizes stay within + * the given limit. + */ +class chunked_parquet_reader { + public: + /** + * @brief Default constructor, this should never be used. + * + * This is added just to satisfy cython. + */ + chunked_parquet_reader() = default; + + /** + * @brief Constructor for chunked reader. + * + * This constructor requires the same `parquet_reader_option` parameter as in + * `cudf::read_parquet()`, and an additional parameter to specify the size byte limit of the + * output table for each reading. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param options The options used to read Parquet file + * @param mr Device memory resource to use for device memory allocation + */ + chunked_parquet_reader( + std::size_t chunk_read_limit, + parquet_reader_options const& options, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + + /** + * @brief Destructor, destroying the internal reader instance. + * + * Since the declaration of the internal `reader` object does not exist in this header, this + * destructor needs to be defined in a separate source file which can access to that object's + * declaration. + */ + ~chunked_parquet_reader(); + + /** + * @brief Check if there is any data in the given file has not yet read. + * + * @return A boolean value indicating if there is any data left to read + */ + [[nodiscard]] bool has_next() const; + + /** + * @brief Read a chunk of rows in the given Parquet file. + * + * The sequence of returned tables, if concatenated by their order, guarantees to form a complete + * dataset as reading the entire given file at once. + * + * An empty table will be returned if the given file is empty, or all the data in the file has + * been read and returned by the previous calls. + * + * @return An output `cudf::table` along with its metadata + */ + [[nodiscard]] table_with_metadata read_chunk() const; + + private: + std::unique_ptr reader; +}; + /** @} */ // end of group /** * @addtogroup io_writers diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index c244a30dc75..1a5a43d2b90 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -501,6 +501,45 @@ std::unique_ptr> write_parquet(parquet_writer_options const return writer->close(options.get_column_chunks_file_paths()); } +/** + * @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader + */ +chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, + parquet_reader_options const& options, + rmm::mr::device_memory_resource* mr) + : reader{std::make_unique(chunk_read_limit, + make_datasources(options.get_source()), + options, + cudf::get_default_stream(), + mr)} +{ +} + +/** + * @copydoc cudf::io::chunked_parquet_reader::~chunked_parquet_reader + */ +chunked_parquet_reader::~chunked_parquet_reader() = default; + +/** + * @copydoc cudf::io::chunked_parquet_reader::has_next + */ +bool chunked_parquet_reader::has_next() const +{ + CUDF_FUNC_RANGE(); + CUDF_EXPECTS(reader != nullptr, "Reader has not been constructed properly."); + return reader->has_next(); +} + +/** + * @copydoc cudf::io::chunked_parquet_reader::read_chunk + */ +table_with_metadata chunked_parquet_reader::read_chunk() const +{ + CUDF_FUNC_RANGE(); + CUDF_EXPECTS(reader != nullptr, "Reader has not been constructed properly."); + return reader->read_chunk(); +} + /** * @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer */ diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index b36826002f4..c580aa5bbc0 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -18,8 +18,10 @@ #include #include +#include #include #include +#include #include #include @@ -52,6 +54,8 @@ namespace io { namespace parquet { namespace gpu { +namespace { + struct page_state_s { const uint8_t* data_start; const uint8_t* data_end; @@ -281,13 +285,18 @@ __device__ void gpuDecodeStream( * 31) * @param[in] t Warp1 thread ID (0..31) * - * @return The new output position + * @return A pair containing the new output position, and the total length of strings decoded (this + * will only be valid on thread 0 and if sizes_only is true) */ -__device__ int gpuDecodeDictionaryIndices(volatile page_state_s* s, int target_pos, int t) +template +__device__ cuda::std::pair gpuDecodeDictionaryIndices(volatile page_state_s* s, + int target_pos, + int t) { const uint8_t* end = s->data_end; int dict_bits = s->dict_bits; int pos = s->dict_pos; + int str_len = 0; while (pos < target_pos) { int is_literal, batch_len; @@ -332,8 +341,11 @@ __device__ int gpuDecodeDictionaryIndices(volatile page_state_s* s, int target_p __syncwarp(); is_literal = shuffle(is_literal); batch_len = shuffle(batch_len); + + // compute dictionary index. + int dict_idx = 0; if (t < batch_len) { - int dict_idx = s->dict_val; + dict_idx = s->dict_val; if (is_literal) { int32_t ofs = (t - ((batch_len + 7) & ~7)) * dict_bits; const uint8_t* p = s->data_start + (ofs >> 3); @@ -353,11 +365,36 @@ __device__ int gpuDecodeDictionaryIndices(volatile page_state_s* s, int target_p dict_idx &= (1 << dict_bits) - 1; } } - s->dict_idx[(pos + t) & (non_zero_buffer_size - 1)] = dict_idx; + + // if we're not computing sizes, store off the dictionary index + if constexpr (!sizes_only) { s->dict_idx[(pos + t) & (non_zero_buffer_size - 1)] = dict_idx; } + } + + // if we're computing sizes, add the length(s) + if constexpr (sizes_only) { + int const len = [&]() { + if (t >= batch_len) { return 0; } + // we may end up decoding more indices than we asked for. so don't include those in the + // size calculation + if (pos + t >= target_pos) { return 0; } + // TODO: refactor this with gpuGetStringData / gpuGetStringSize + uint32_t const dict_pos = (s->dict_bits > 0) ? dict_idx * sizeof(string_index_pair) : 0; + if (target_pos && dict_pos < (uint32_t)s->dict_size) { + const auto* src = reinterpret_cast(s->dict_base + dict_pos); + return src->second; + } + return 0; + }(); + + using WarpReduce = cub::WarpReduce; + __shared__ typename WarpReduce::TempStorage temp_storage; + // note: str_len will only be valid on thread 0. + str_len += WarpReduce(temp_storage).Sum(len); } + pos += batch_len; } - return pos; + return {pos, str_len}; } /** @@ -424,17 +461,20 @@ __device__ int gpuDecodeRleBooleans(volatile page_state_s* s, int target_pos, in } /** - * @brief Parses the length and position of strings + * @brief Parses the length and position of strings and returns total length of all strings + * processed * * @param[in,out] s Page state input/output * @param[in] target_pos Target output position * @param[in] t Thread ID * - * @return The new output position + * @return Total length of strings processed */ -__device__ void gpuInitStringDescriptors(volatile page_state_s* s, int target_pos, int t) +__device__ size_type gpuInitStringDescriptors(volatile page_state_s* s, int target_pos, int t) { - int pos = s->dict_pos; + int pos = s->dict_pos; + int total_len = 0; + // This step is purely serial if (!t) { const uint8_t* cur = s->data_start; @@ -453,21 +493,26 @@ __device__ void gpuInitStringDescriptors(volatile page_state_s* s, int target_po s->dict_idx[pos & (non_zero_buffer_size - 1)] = k; s->str_len[pos & (non_zero_buffer_size - 1)] = len; k += len; + total_len += len; pos++; } s->dict_val = k; __threadfence_block(); } + + return total_len; } /** - * @brief Output a string descriptor + * @brief Retrieves string information for a string at the specified source position * - * @param[in,out] s Page state input/output + * @param[in] s Page state input * @param[in] src_pos Source position - * @param[in] dstv Pointer to row output data (string descriptor or 32-bit hash) + * + * @return A pair containing a pointer to the string and its length */ -inline __device__ void gpuOutputString(volatile page_state_s* s, int src_pos, void* dstv) +inline __device__ cuda::std::pair gpuGetStringData(volatile page_state_s* s, + int src_pos) { const char* ptr = nullptr; size_t len = 0; @@ -490,6 +535,20 @@ inline __device__ void gpuOutputString(volatile page_state_s* s, int src_pos, vo len = s->str_len[src_pos & (non_zero_buffer_size - 1)]; } } + + return {ptr, len}; +} + +/** + * @brief Output a string descriptor + * + * @param[in,out] s Page state input/output + * @param[in] src_pos Source position + * @param[in] dstv Pointer to row output data (string descriptor or 32-bit hash) + */ +inline __device__ void gpuOutputString(volatile page_state_s* s, int src_pos, void* dstv) +{ + auto [ptr, len] = gpuGetStringData(s, src_pos); if (s->dtype_len == 4) { // Output hash. This hash value is used if the option to convert strings to // categoricals is enabled. The seed value is chosen arbitrarily. @@ -818,14 +877,17 @@ static __device__ void gpuOutputGeneric(volatile page_state_s* s, * @param[in, out] s The local page state to be filled in * @param[in] p The global page to be copied from * @param[in] chunks The global list of chunks - * @param[in] num_rows Maximum number of rows to read * @param[in] min_row Crop all rows below min_row + * @param[in] num_rows Maximum number of rows to read + * @param[in] is_decode_step If we are setting up for the decode step (instead of the preprocess + * step) */ static __device__ bool setupLocalPageInfo(page_state_s* const s, PageInfo const* p, device_span chunks, size_t min_row, - size_t num_rows) + size_t num_rows, + bool is_decode_step) { int t = threadIdx.x; int chunk_idx; @@ -926,17 +988,25 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->dtype_len = 8; // Convert to 64-bit timestamp } - // first row within the page to output - if (page_start_row >= min_row) { - s->first_row = 0; - } else { - s->first_row = (int32_t)min(min_row - page_start_row, (size_t)s->page.num_rows); - } - // # of rows within the page to output - s->num_rows = s->page.num_rows; - if ((page_start_row + s->first_row) + s->num_rows > min_row + num_rows) { - s->num_rows = - (int32_t)max((int64_t)(min_row + num_rows - (page_start_row + s->first_row)), INT64_C(0)); + // NOTE: s->page.num_rows, s->col.chunk_row, s->first_row and s->num_rows will be + // invalid/bogus during first pass of the preprocess step for nested types. this is ok + // because we ignore these values in that stage. + { + auto const max_row = min_row + num_rows; + + // if we are totally outside the range of the input, do nothing + if ((page_start_row > max_row) || (page_start_row + s->page.num_rows < min_row)) { + s->first_row = 0; + s->num_rows = 0; + } + // otherwise + else { + s->first_row = page_start_row >= min_row ? 0 : min_row - page_start_row; + auto const max_page_rows = s->page.num_rows - s->first_row; + s->num_rows = (page_start_row + s->first_row) + max_page_rows <= max_row + ? max_page_rows + : max_row - (page_start_row + s->first_row); + } } // during the decoding step we need to offset the global output buffers @@ -944,7 +1014,11 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // is responsible for. // - for flat schemas, we can do this directly by using row counts // - for nested schemas, these offsets are computed during the preprocess step - if (s->col.column_data_base != nullptr) { + // + // NOTE: in a chunked read situation, s->col.column_data_base and s->col.valid_map_base + // will be aliased to memory that has been freed when we get here in the non-decode step, so + // we cannot check against nullptr. we'll just check a flag directly. + if (is_decode_step) { int max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { PageNestingInfo* pni = &s->page.nesting[idx]; @@ -954,12 +1028,13 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (s->col.max_level[level_type::REPETITION] == 0) { output_offset = page_start_row >= min_row ? page_start_row - min_row : 0; } - // for schemas with lists, we've already got the exactly value precomputed + // for schemas with lists, we've already got the exact value precomputed else { output_offset = pni->page_start_value; } pni->data_out = static_cast(s->col.column_data_base[idx]); + if (pni->data_out != nullptr) { // anything below max depth with a valid data pointer must be a list, so the // element size is the size of the offset type. @@ -1036,6 +1111,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->page.skipped_leaf_values = 0; s->input_value_count = 0; s->input_row_count = 0; + s->input_leaf_count = 0; s->row_index_lower_bound = -1; } @@ -1064,13 +1140,14 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // if we're in the decoding step, jump directly to the first // value we care about - if (s->col.column_data_base != nullptr) { + if (is_decode_step) { s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0; } else { - s->input_value_count = 0; - s->input_leaf_count = 0; - s->page.skipped_values = -1; - s->page.skipped_leaf_values = -1; + s->input_value_count = 0; + s->input_leaf_count = 0; + s->page.skipped_values = + -1; // magic number to indicate it hasn't been set for use inside UpdatePageSizes + s->page.skipped_leaf_values = 0; } } @@ -1397,7 +1474,7 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, bool bounds_set) { // max nesting depth of the column - int max_depth = s->col.max_nesting_depth; + int const max_depth = s->col.max_nesting_depth; // how many input level values we've processed in the page so far int input_value_count = s->input_value_count; // how many leaf values we've processed in the page so far @@ -1411,11 +1488,10 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // count rows and leaf values - int is_new_row = start_depth == 0 ? 1 : 0; - uint32_t warp_row_count_mask = ballot(is_new_row); - int is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; - uint32_t warp_leaf_count_mask = ballot(is_new_leaf); - + int const is_new_row = start_depth == 0 ? 1 : 0; + uint32_t const warp_row_count_mask = ballot(is_new_row); + int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; + uint32_t const warp_leaf_count_mask = ballot(is_new_leaf); // is this thread within row bounds? on the first pass we don't know the bounds, so we will be // computing the full size of the column. on the second pass, we will know our actual row // bounds, so the computation will cap sizes properly. @@ -1429,8 +1505,8 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, ? 1 : 0; - uint32_t row_bounds_mask = ballot(in_row_bounds); - int first_thread_in_range = __ffs(row_bounds_mask) - 1; + uint32_t const row_bounds_mask = ballot(in_row_bounds); + int const first_thread_in_range = __ffs(row_bounds_mask) - 1; // if we've found the beginning of the first row, mark down the position // in the def/repetition buffer (skipped_values) and the data buffer (skipped_leaf_values) @@ -1443,13 +1519,15 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, } } - // increment counts across all nesting depths + // increment value counts across all nesting depths for (int s_idx = 0; s_idx < max_depth; s_idx++) { - // if we are within the range of nesting levels we should be adding value indices for - int in_nesting_bounds = (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; + PageNestingInfo* pni = &s->page.nesting[s_idx]; - uint32_t count_mask = ballot(in_nesting_bounds); - if (!t) { s->page.nesting[s_idx].size += __popc(count_mask); } + // if we are within the range of nesting levels we should be adding value indices for + int const in_nesting_bounds = + (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; + uint32_t const count_mask = ballot(in_nesting_bounds); + if (!t) { pni->batch_size += __popc(count_mask); } } input_value_count += min(32, (target_input_value_count - input_value_count)); @@ -1465,6 +1543,21 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, } } +__device__ size_type gpuGetStringSize(page_state_s* s, int target_count, int t) +{ + auto dict_target_pos = target_count; + size_type str_len = 0; + if (s->dict_base) { + auto const [new_target_pos, len] = gpuDecodeDictionaryIndices(s, target_count, t); + dict_target_pos = new_target_pos; + str_len = len; + } else if ((s->col.data_type & 7) == BYTE_ARRAY) { + str_len = gpuInitStringDescriptors(s, target_count, t); + } + if (!t) { *(volatile int32_t*)&s->dict_pos = dict_target_pos; } + return str_len; +} + /** * @brief Kernel for computing per-page column size information for all nesting levels. * @@ -1473,17 +1566,20 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, * @param pages List of pages * @param chunks List of column chunks * @param min_row Row index to start reading at - * @param num_rows Maximum number of rows to read. Pass as INT_MAX to guarantee reading all rows. - * @param trim_pass Whether or not this is the trim pass. We first have to compute + * @param num_rows Maximum number of rows to read. Pass as INT_MAX to guarantee reading all rows + * @param is_base_pass Whether or not this is the base pass. We first have to compute * the full size information of every page before we come through in a second (trim) pass - * to determine what subset of rows in this page we should be reading. + * to determine what subset of rows in this page we should be reading + * @param compute_string_sizes Whether or not we should be computing string sizes + * (PageInfo::str_bytes) as part of the pass */ __global__ void __launch_bounds__(block_size) gpuComputePageSizes(PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows, - bool trim_pass) + bool is_base_pass, + bool compute_string_sizes) { __shared__ __align__(16) page_state_s state_g; @@ -1492,34 +1588,81 @@ __global__ void __launch_bounds__(block_size) int t = threadIdx.x; PageInfo* pp = &pages[page_idx]; + if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, false)) { return; } + + if (!t) { + s->page.skipped_values = -1; + s->page.skipped_leaf_values = 0; + s->page.str_bytes = 0; + s->input_row_count = 0; + s->input_value_count = 0; + + // in the base pass, we're computing the number of rows, make sure we visit absolutely + // everything + if (is_base_pass) { + s->first_row = 0; + s->num_rows = INT_MAX; + s->row_index_lower_bound = -1; + } + } + // we only need to preprocess hierarchies with repetition in them (ie, hierarchies // containing lists anywhere within). bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; - if (!has_repetition) { return; } + compute_string_sizes = + compute_string_sizes && ((s->col.data_type & 7) == BYTE_ARRAY && s->dtype_len != 4); + + // various early out optimizations: + + // - if this is a flat hierarchy (no lists) and is not a string column. in this case we don't need + // to do + // the expensive work of traversing the level data to determine sizes. we can just compute it + // directly. + if (!has_repetition && !compute_string_sizes) { + int d = 0; + while (d < s->page.num_nesting_levels) { + auto const i = d + t; + if (i < s->page.num_nesting_levels) { + if (is_base_pass) { pp->nesting[i].size = pp->num_input_values; } + pp->nesting[i].batch_size = pp->num_input_values; + } + d += blockDim.x; + } + return; + } + + // - if this page is not at the beginning or end of the trim bounds, the batch size is + // the full page size + if (!is_base_pass && s->num_rows == s->page.num_rows) { + int d = 0; + while (d < s->page.num_nesting_levels) { + auto const i = d + t; + if (i < s->page.num_nesting_levels) { pp->nesting[i].batch_size = pp->nesting[i].size; } + d += blockDim.x; + } + return; + } - if (!setupLocalPageInfo(s, pp, chunks, trim_pass ? min_row : 0, trim_pass ? num_rows : INT_MAX)) { + // - if this page is completely trimmed, zero out sizes. + if (!is_base_pass && s->num_rows == 0) { + int d = 0; + while (d < s->page.num_nesting_levels) { + auto const i = d + t; + if (i < s->page.num_nesting_levels) { pp->nesting[i].batch_size = 0; } + d += blockDim.x; + } return; } + // at this point we are going to be fully recomputing batch information + // zero sizes int d = 0; while (d < s->page.num_nesting_levels) { - if (d + t < s->page.num_nesting_levels) { s->page.nesting[d + t].size = 0; } + if (d + t < s->page.num_nesting_levels) { s->page.nesting[d + t].batch_size = 0; } d += blockDim.x; } - if (!t) { - s->page.skipped_values = -1; - s->page.skipped_leaf_values = -1; - s->input_row_count = 0; - s->input_value_count = 0; - // if this isn't the trim pass, make sure we visit absolutely everything - if (!trim_pass) { - s->first_row = 0; - s->num_rows = INT_MAX; - s->row_index_lower_bound = -1; - } - } __syncthreads(); // optimization : it might be useful to have a version of gpuDecodeStream that could go wider than @@ -1532,25 +1675,51 @@ __global__ void __launch_bounds__(block_size) while (!s->error && s->input_value_count < s->num_input_values) { // decode repetition and definition levels. these will attempt to decode at // least up to the target, but may decode a few more. - gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION); + if (has_repetition) { + gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION); + } gpuDecodeStream(s->def, s, target_input_count, t, level_type::DEFINITION); __syncwarp(); // we may have decoded different amounts from each stream, so only process what we've been - int actual_input_count = - min(s->lvl_count[level_type::REPETITION], s->lvl_count[level_type::DEFINITION]); + int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], + s->lvl_count[level_type::DEFINITION]) + : s->lvl_count[level_type::DEFINITION]; // process what we got back - gpuUpdatePageSizes(s, actual_input_count, t, trim_pass); + gpuUpdatePageSizes(s, actual_input_count, t, !is_base_pass); + if (compute_string_sizes) { + auto const str_len = gpuGetStringSize(s, s->input_leaf_count, t); + if (!t) { s->page.str_bytes += str_len; } + } + target_input_count = actual_input_count + batch_size; __syncwarp(); } } - // update # rows in the actual page + + // update output results: + // - real number of rows for the whole page + // - nesting sizes for the whole page + // - skipped value information for trimmed pages + // - string bytes + if (is_base_pass) { + // nesting level 0 is the root column, so the size is also the # of rows + if (!t) { pp->num_rows = s->page.nesting[0].batch_size; } + + // store off this batch size as the "full" size + int d = 0; + while (d < s->page.num_nesting_levels) { + auto const i = d + t; + if (i < s->page.num_nesting_levels) { pp->nesting[i].size = pp->nesting[i].batch_size; } + d += blockDim.x; + } + } + if (!t) { - pp->num_rows = s->page.nesting[0].size; pp->skipped_values = s->page.skipped_values; pp->skipped_leaf_values = s->page.skipped_leaf_values; + pp->str_bytes = s->page.str_bytes; } } @@ -1577,7 +1746,10 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( int t = threadIdx.x; int out_thread0; - if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows)) { return; } + if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows, true)) { return; } + + // if we have no rows to do (eg, in a skip_rows/num_rows case) + if (s->num_rows == 0) { return; } if (s->dict_base) { out_thread0 = (s->dict_bits > 0) ? 64 : 32; @@ -1614,7 +1786,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( // WARP1: Decode dictionary indices, booleans or string positions if (s->dict_base) { - src_target_pos = gpuDecodeDictionaryIndices(s, src_target_pos, t & 0x1f); + src_target_pos = gpuDecodeDictionaryIndices(s, src_target_pos, t & 0x1f).first; } else if ((s->col.data_type & 7) == BOOLEAN) { src_target_pos = gpuDecodeRleBooleans(s, src_target_pos, t & 0x1f); } else if ((s->col.data_type & 7) == BYTE_ARRAY) { @@ -1701,71 +1873,18 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( } } -struct chunk_row_output_iter { - PageInfo* p; - using value_type = size_type; - using difference_type = size_type; - using pointer = size_type*; - using reference = size_type&; - using iterator_category = thrust::output_device_iterator_tag; - - __host__ __device__ chunk_row_output_iter operator+(int i) - { - return chunk_row_output_iter{p + i}; - } - - __host__ __device__ void operator++() { p++; } - - __device__ reference operator[](int i) { return p[i].chunk_row; } - __device__ reference operator*() { return p->chunk_row; } - __device__ void operator=(value_type v) { p->chunk_row = v; } -}; - -struct start_offset_output_iterator { - PageInfo* pages; - int* page_indices; - int cur_index; - int src_col_schema; - int nesting_depth; - int empty = 0; - using value_type = size_type; - using difference_type = size_type; - using pointer = size_type*; - using reference = size_type&; - using iterator_category = thrust::output_device_iterator_tag; - - __host__ __device__ start_offset_output_iterator operator+(int i) - { - return start_offset_output_iterator{ - pages, page_indices, cur_index + i, src_col_schema, nesting_depth}; - } - - __host__ __device__ void operator++() { cur_index++; } - - __device__ reference operator[](int i) { return dereference(cur_index + i); } - __device__ reference operator*() { return dereference(cur_index); } - - private: - __device__ reference dereference(int index) - { - PageInfo const& p = pages[page_indices[index]]; - if (p.src_col_schema != src_col_schema || p.flags & PAGEINFO_FLAGS_DICTIONARY) { return empty; } - return p.nesting[nesting_depth].page_start_value; - } -}; +} // anonymous namespace /** - * @copydoc cudf::io::parquet::gpu::PreprocessColumnData + * @copydoc cudf::io::parquet::gpu::ComputePageSizes */ -void PreprocessColumnData(hostdevice_vector& pages, - hostdevice_vector const& chunks, - std::vector& input_columns, - std::vector& output_columns, - size_t num_rows, - size_t min_row, - bool uses_custom_row_bounds, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +void ComputePageSizes(hostdevice_vector& pages, + hostdevice_vector const& chunks, + size_t min_row, + size_t num_rows, + bool compute_num_rows, + bool compute_string_sizes, + rmm::cuda_stream_view stream) { dim3 dim_block(block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page @@ -1776,124 +1895,7 @@ void PreprocessColumnData(hostdevice_vector& pages, // If uses_custom_row_bounds is set to true, we have to do a second pass later that "trims" // the starting and ending read values to account for these bounds. gpuComputePageSizes<<>>( - pages.device_ptr(), - chunks, - // if uses_custom_row_bounds is false, include all possible rows. - uses_custom_row_bounds ? min_row : 0, - uses_custom_row_bounds ? num_rows : INT_MAX, - !uses_custom_row_bounds); - - // computes: - // PageInfo::chunk_row for all pages - // Note: this is doing some redundant work for pages in flat hierarchies. chunk_row has already - // been computed during header decoding. the overall amount of work here is very small though. - auto key_input = thrust::make_transform_iterator( - pages.device_ptr(), [] __device__(PageInfo const& page) { return page.chunk_idx; }); - auto page_input = thrust::make_transform_iterator( - pages.device_ptr(), [] __device__(PageInfo const& page) { return page.num_rows; }); - thrust::exclusive_scan_by_key(rmm::exec_policy(stream), - key_input, - key_input + pages.size(), - page_input, - chunk_row_output_iter{pages.device_ptr()}); - - // computes: - // PageNestingInfo::size for each level of nesting, for each page, taking row bounds into account. - // PageInfo::skipped_values, which tells us where to start decoding in the input . - // It is only necessary to do this second pass if uses_custom_row_bounds is set (if the user has - // specified artifical bounds). - if (uses_custom_row_bounds) { - gpuComputePageSizes<<>>( - pages.device_ptr(), chunks, min_row, num_rows, true); - } - - // ordering of pages is by input column schema, repeated across row groups. so - // if we had 3 columns, each with 2 pages, and 1 row group, our schema values might look like - // - // 1, 1, 2, 2, 3, 3 - // - // However, if we had more than one row group, the pattern would be - // - // 1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3 - // ^ row group 0 | - // ^ row group 1 - // - // To use exclusive_scan_by_key, the ordering we actually want is - // - // 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 - // - // We also need to preserve key-relative page ordering, so we need to use a stable sort. - rmm::device_uvector page_keys(pages.size(), stream); - rmm::device_uvector page_index(pages.size(), stream); - { - thrust::transform(rmm::exec_policy(stream), - pages.device_ptr(), - pages.device_ptr() + pages.size(), - page_keys.begin(), - [] __device__(PageInfo const& page) { return page.src_col_schema; }); - - thrust::sequence(rmm::exec_policy(stream), page_index.begin(), page_index.end()); - thrust::stable_sort_by_key(rmm::exec_policy(stream), - page_keys.begin(), - page_keys.end(), - page_index.begin(), - thrust::less()); - } - - // compute output column sizes by examining the pages of the -input- columns - for (size_t idx = 0; idx < input_columns.size(); idx++) { - auto const& input_col = input_columns[idx]; - auto src_col_schema = input_col.schema_idx; - size_t max_depth = input_col.nesting_depth(); - - auto* cols = &output_columns; - for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { - auto& out_buf = (*cols)[input_col.nesting[l_idx]]; - cols = &out_buf.children; - - // size iterator. indexes pages by sorted order - auto size_input = thrust::make_transform_iterator( - page_index.begin(), - [src_col_schema, l_idx, pages = pages.device_ptr()] __device__(int index) { - auto const& page = pages[index]; - if (page.src_col_schema != src_col_schema || page.flags & PAGEINFO_FLAGS_DICTIONARY) { - return 0; - } - return page.nesting[l_idx].size; - }); - - // if this buffer is part of a list hierarchy, we need to determine it's - // final size and allocate it here. - // - // for struct columns, higher levels of the output columns are shared between input - // columns. so don't compute any given level more than once. - if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { - int size = thrust::reduce(rmm::exec_policy(stream), size_input, size_input + pages.size()); - - // if this is a list column add 1 for non-leaf levels for the terminating offset - if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } - - // allocate - out_buf.create(size, stream, mr); - } - - // for nested hierarchies, compute per-page start offset - if (input_col.has_repetition) { - thrust::exclusive_scan_by_key(rmm::exec_policy(stream), - page_keys.begin(), - page_keys.end(), - size_input, - start_offset_output_iterator{pages.device_ptr(), - page_index.begin(), - 0, - static_cast(src_col_schema), - static_cast(l_idx)}); - } - } - } - - // retrieve pages back - pages.device_to_host(stream); + pages.device_ptr(), chunks, min_row, num_rows, compute_num_rows, compute_string_sizes); } /** @@ -1905,6 +1907,8 @@ void __host__ DecodePageData(hostdevice_vector& pages, size_t min_row, rmm::cuda_stream_view stream) { + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); + dim3 dim_block(block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 19922bf7022..ffb4cb60a20 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -367,6 +367,7 @@ __global__ void __launch_bounds__(128) // definition levels bs->page.chunk_row = 0; bs->page.num_rows = 0; + bs->page.str_bytes = 0; } num_values = bs->ck.num_values; page_info = bs->ck.page_info; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 7849e05eb68..ccf4b056ae8 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -95,10 +95,13 @@ struct PageNestingInfo { // set at initialization int32_t max_def_level; int32_t max_rep_level; + cudf::type_id type; // type of the corresponding cudf output column + bool nullable; // set during preprocessing int32_t size; // this page/nesting-level's row count contribution to the output column, if fully // decoded + int32_t batch_size; // the size of the page for this batch int32_t page_start_value; // absolute output start index in output column data // set during data decoding @@ -152,6 +155,9 @@ struct PageInfo { int skipped_values; // # of values skipped in the actual data stream. int skipped_leaf_values; + // for string columns only, the size of all the chars in the string for + // this page. only valid/computed during the base preprocess pass + int32_t str_bytes; // nesting information (input/output) for each page int num_nesting_levels; @@ -238,7 +244,7 @@ struct ColumnChunkDesc { }; /** - * @brief The struct to store raw/intermediate file data before parsing. + * @brief Struct to store raw/intermediate file data before parsing. */ struct file_intermediate_data { std::vector> raw_page_data; @@ -248,6 +254,23 @@ struct file_intermediate_data { hostdevice_vector page_nesting_info{}; }; +/** + * @brief Struct to store intermediate page data for parsing each chunk of rows in chunked reading. + */ +struct chunk_intermediate_data { + rmm::device_uvector page_keys{0, rmm::cuda_stream_default}; + rmm::device_uvector page_index{0, rmm::cuda_stream_default}; + rmm::device_uvector str_dict_index{0, rmm::cuda_stream_default}; +}; + +/** + * @brief Structs to identify the reading row range for each chunk of rows in chunked reading. + */ +struct chunk_read_info { + size_t skip_rows; + size_t num_rows; +}; + /** * @brief Struct describing an encoder column */ @@ -378,35 +401,35 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks, rmm::cuda_stream_view stream); /** - * @brief Preprocess column information for nested schemas. + * @brief Compute page output size information. * - * There are several pieces of information we can't compute directly from row counts in - * the parquet headers when dealing with nested schemas. - * - The total sizes of all output columns at all nesting levels - * - The starting output buffer offset for each page, for each nesting level - * For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders) + * When dealing with nested hierarchies (those that contain lists), or when doing a chunked + * read, we need to obtain more information up front than we have with just the row counts. + * + * - We need to determine the sizes of each output cudf column per page + * - We need to determine information about where to start decoding the value stream + * if we are using custom user bounds (skip_rows / num_rows) + * - We need to determine actual number of top level rows per page + * - If we are doing a chunked read, we need to determine the total string size per page * - * Note : this function is where output device memory is allocated for nested columns. * * @param pages All pages to be decoded * @param chunks All chunks to be decoded - * @param input_columns Input column information - * @param output_columns Output column information * @param num_rows Maximum number of rows to read * @param min_rows crop all rows below min_row - * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific - * bounds - * @param stream Cuda stream + * @param compute_num_rows If set to true, the num_rows field in PageInfo will be + * computed + * @param compute_string_sizes If set to true, the str_bytes field in PageInfo will + * be computed + * @param stream CUDA stream to use, default 0 */ -void PreprocessColumnData(hostdevice_vector& pages, - hostdevice_vector const& chunks, - std::vector& input_columns, - std::vector& output_columns, - size_t num_rows, - size_t min_row, - bool uses_custom_row_bounds, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); +void ComputePageSizes(hostdevice_vector& pages, + hostdevice_vector const& chunks, + size_t num_rows, + size_t min_row, + bool compute_num_rows, + bool compute_string_sizes, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for reading the column data stored in the pages diff --git a/cpp/src/io/parquet/reader.cpp b/cpp/src/io/parquet/reader.cpp index 6be6987b7cb..1321e8073d7 100644 --- a/cpp/src/io/parquet/reader.cpp +++ b/cpp/src/io/parquet/reader.cpp @@ -40,4 +40,19 @@ table_with_metadata reader::read(parquet_reader_options const& options) options.get_row_groups()); } +chunked_reader::chunked_reader(std::size_t chunk_read_limit, + std::vector>&& sources, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + _impl = std::make_unique(chunk_read_limit, std::move(sources), options, stream, mr); +} + +chunked_reader::~chunked_reader() = default; + +bool chunked_reader::has_next() const { return _impl->has_next(); } + +table_with_metadata chunked_reader::read_chunk() const { return _impl->read_chunk(); } + } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index a61f63f6645..84d8cfc273f 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -28,22 +28,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) auto& pages = _file_itm_data.pages_info; auto& page_nesting = _file_itm_data.page_nesting_info; - auto is_dict_chunk = [](const gpu::ColumnChunkDesc& chunk) { - return (chunk.data_type & 0x7) == BYTE_ARRAY && chunk.num_dict_pages > 0; - }; - - // Count the number of string dictionary entries - // NOTE: Assumes first page in the chunk is always the dictionary page - size_t total_str_dict_indexes = 0; - for (size_t c = 0, page_count = 0; c < chunks.size(); c++) { - if (is_dict_chunk(chunks[c])) { total_str_dict_indexes += pages[page_count].num_input_values; } - page_count += chunks[c].max_num_pages; - } - - // Build index for string dictionaries since they can't be indexed - // directly due to variable-sized elements - auto str_dict_index = cudf::detail::make_zeroed_device_uvector_async( - total_str_dict_indexes, _stream); + // Should not reach here if there is no page data. + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); size_t const sum_max_depths = std::accumulate( chunks.begin(), chunks.end(), 0, [&](size_t cursum, gpu::ColumnChunkDesc const& chunk) { @@ -58,16 +44,11 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) auto chunk_offsets = std::vector(); // Update chunks with pointers to column data. - for (size_t c = 0, page_count = 0, str_ofs = 0, chunk_off = 0; c < chunks.size(); c++) { + for (size_t c = 0, page_count = 0, chunk_off = 0; c < chunks.size(); c++) { input_column_info const& input_col = _input_columns[chunks[c].src_col_index]; CUDF_EXPECTS(input_col.schema_idx == chunks[c].src_col_schema, "Column/page schema index mismatch"); - if (is_dict_chunk(chunks[c])) { - chunks[c].str_dict_index = str_dict_index.data() + str_ofs; - str_ofs += pages[page_count].num_input_values; - } - size_t max_depth = _metadata->get_output_nesting_depth(chunks[c].src_col_schema); chunk_offsets.push_back(chunk_off); @@ -139,18 +120,15 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) chunk_nested_valids.host_to_device(_stream); chunk_nested_data.host_to_device(_stream); - if (total_str_dict_indexes > 0) { - gpu::BuildStringDictionaryIndex(chunks.device_ptr(), chunks.size(), _stream); - } - gpu::DecodePageData(pages, chunks, num_rows, skip_rows, _stream); + pages.device_to_host(_stream); page_nesting.device_to_host(_stream); _stream.synchronize(); // for list columns, add the final offset to every offset buffer. // TODO : make this happen in more efficiently. Maybe use thrust::for_each - // on each buffer. Or potentially do it in PreprocessColumnData + // on each buffer. // Note : the reason we are doing this here instead of in the decode kernel is // that it is difficult/impossible for a given page to know that it is writing the very // last value that should then be followed by a terminator (because rows can span @@ -211,7 +189,20 @@ reader::impl::impl(std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : _stream(stream), _mr(mr), _sources(std::move(sources)) + : impl(0 /*chunk_read_limit*/, + std::forward>>(sources), + options, + stream, + mr) +{ +} + +reader::impl::impl(std::size_t chunk_read_limit, + std::vector>&& sources, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + : _stream{stream}, _mr{mr}, _sources{std::move(sources)}, _chunk_read_limit{chunk_read_limit} { // Open and parse the source dataset metadata _metadata = std::make_unique(_sources); @@ -233,6 +224,14 @@ reader::impl::impl(std::vector>&& sources, options.is_enabled_use_pandas_metadata(), _strings_to_categorical, _timestamp_type.id()); + + // Save the states of the output buffers for reuse in `chunk_read()`. + // Don't need to do it if we read the file all at once. + if (_chunk_read_limit > 0) { + for (auto const& buff : _output_buffers) { + _output_buffers_template.emplace_back(column_buffer::empty_like(buff)); + } + } } void reader::impl::prepare_data(size_type skip_rows, @@ -240,39 +239,61 @@ void reader::impl::prepare_data(size_type skip_rows, bool uses_custom_row_bounds, host_span const> row_group_indices) { + if (_file_preprocessed) { return; } + const auto [skip_rows_corrected, num_rows_corrected, row_groups_info] = _metadata->select_row_groups(row_group_indices, skip_rows, num_rows); - _skip_rows = skip_rows_corrected; - _num_rows = num_rows_corrected; if (num_rows_corrected > 0 && row_groups_info.size() != 0 && _input_columns.size() != 0) { load_and_decompress_data(row_groups_info, num_rows_corrected); + preprocess_pages( + skip_rows_corrected, num_rows_corrected, uses_custom_row_bounds, _chunk_read_limit); + + if (_chunk_read_limit == 0) { // read the whole file at once + CUDF_EXPECTS(_chunk_read_info.size() == 1, + "Reading the whole file should yield only one chunk."); + } } + + _file_preprocessed = true; } table_with_metadata reader::impl::read_chunk_internal(bool uses_custom_row_bounds) { - auto out_metadata = table_metadata{}; + // If `_output_metadata` has been constructed, just copy it over. + auto out_metadata = _output_metadata ? table_metadata{*_output_metadata} : table_metadata{}; // output cudf columns as determined by the top level schema auto out_columns = std::vector>{}; out_columns.reserve(_output_buffers.size()); - if (_num_rows == 0) { return finalize_output(out_metadata, out_columns); } + if (!has_next() || _chunk_read_info.size() == 0) { + return finalize_output(out_metadata, out_columns); + } - allocate_columns(_skip_rows, _num_rows, uses_custom_row_bounds); + auto const& read_info = _chunk_read_info[_current_read_chunk++]; - decode_page_data(_skip_rows, _num_rows); + // Allocate memory buffers for the output columns. + allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds); - // Create the final output cudf columns + // Parse data into the output buffers. + decode_page_data(read_info.skip_rows, read_info.num_rows); + + // Create the final output cudf columns. for (size_t i = 0; i < _output_buffers.size(); ++i) { - auto const metadata = _reader_column_schema.has_value() - ? std::make_optional((*_reader_column_schema)[i]) - : std::nullopt; - column_name_info& col_name = out_metadata.schema_info.emplace_back(""); - out_columns.emplace_back(make_column(_output_buffers[i], &col_name, metadata, _stream, _mr)); + auto const metadata = _reader_column_schema.has_value() + ? std::make_optional((*_reader_column_schema)[i]) + : std::nullopt; + // Only construct `out_metadata` if `_output_metadata` has not been cached. + if (!_output_metadata) { + column_name_info& col_name = out_metadata.schema_info.emplace_back(""); + out_columns.emplace_back(make_column(_output_buffers[i], &col_name, metadata, _stream, _mr)); + } else { + out_columns.emplace_back(make_column(_output_buffers[i], nullptr, metadata, _stream, _mr)); + } } + // Add empty columns if needed. return finalize_output(out_metadata, out_columns); } @@ -281,21 +302,30 @@ table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata, { // Create empty columns as needed (this can happen if we've ended up with no actual data to read) for (size_t i = out_columns.size(); i < _output_buffers.size(); ++i) { - column_name_info& col_name = out_metadata.schema_info.emplace_back(""); - out_columns.emplace_back(io::detail::empty_like(_output_buffers[i], &col_name, _stream, _mr)); + if (!_output_metadata) { + column_name_info& col_name = out_metadata.schema_info.emplace_back(""); + out_columns.emplace_back(io::detail::empty_like(_output_buffers[i], &col_name, _stream, _mr)); + } else { + out_columns.emplace_back(io::detail::empty_like(_output_buffers[i], nullptr, _stream, _mr)); + } } - // Return column names (must match order of returned columns) - out_metadata.column_names.resize(_output_buffers.size()); - for (size_t i = 0; i < _output_column_schemas.size(); i++) { - auto const& schema = _metadata->get_schema(_output_column_schemas[i]); - out_metadata.column_names[i] = schema.name; - } + if (!_output_metadata) { + // Return column names (must match order of returned columns) + out_metadata.column_names.resize(_output_buffers.size()); + for (size_t i = 0; i < _output_column_schemas.size(); i++) { + auto const& schema = _metadata->get_schema(_output_column_schemas[i]); + out_metadata.column_names[i] = schema.name; + } - // Return user metadata - out_metadata.per_file_user_data = _metadata->get_key_value_metadata(); - out_metadata.user_data = {out_metadata.per_file_user_data[0].begin(), - out_metadata.per_file_user_data[0].end()}; + // Return user metadata + out_metadata.per_file_user_data = _metadata->get_key_value_metadata(); + out_metadata.user_data = {out_metadata.per_file_user_data[0].begin(), + out_metadata.per_file_user_data[0].end()}; + + // Finally, save the output table metadata into `_output_metadata` for reuse next time. + _output_metadata = std::make_unique(out_metadata); + } return {std::make_unique(std::move(out_columns)), std::move(out_metadata)}; } @@ -305,8 +335,36 @@ table_with_metadata reader::impl::read(size_type skip_rows, bool uses_custom_row_bounds, host_span const> row_group_indices) { + CUDF_EXPECTS(_chunk_read_limit == 0, "Reading the whole file must not have non-zero byte_limit."); prepare_data(skip_rows, num_rows, uses_custom_row_bounds, row_group_indices); return read_chunk_internal(uses_custom_row_bounds); } +table_with_metadata reader::impl::read_chunk() +{ + // Reset the output buffers to their original states (right after reader construction). + // Don't need to do it if we read the file all at once. + if (_chunk_read_limit > 0) { + _output_buffers.resize(0); + for (auto const& buff : _output_buffers_template) { + _output_buffers.emplace_back(column_buffer::empty_like(buff)); + } + } + + prepare_data(0 /*skip_rows*/, + -1 /*num_rows, `-1` means unlimited*/, + true /*uses_custom_row_bounds*/, + {} /*row_group_indices, empty means read all row groups*/); + return read_chunk_internal(true); +} + +bool reader::impl::has_next() +{ + prepare_data(0 /*skip_rows*/, + -1 /*num_rows, `-1` means unlimited*/, + true /*uses_custom_row_bounds*/, + {} /*row_group_indices, empty means read all row groups*/); + return _current_read_chunk < _chunk_read_info.size(); +} + } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index b53487c824b..6d42e9fab84 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -38,7 +38,6 @@ #include namespace cudf::io::detail::parquet { - /** * @brief Implementation for Parquet reader */ @@ -47,6 +46,9 @@ class reader::impl { /** * @brief Constructor from an array of dataset sources with reader options. * + * By using this constructor, each call to `read()` or `read_chunk()` will perform reading the + * entire given file. + * * @param sources Dataset sources * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches @@ -73,6 +75,46 @@ class reader::impl { bool uses_custom_row_bounds, host_span const> row_group_indices); + /** + * @brief Constructor from a chunk read limit and an array of dataset sources with reader options. + * + * By using this constructor, the reader will support iterative (chunked) reading through + * `has_next() ` and `read_chunk()`. For example: + * ``` + * do { + * auto const chunk = reader.read_chunk(); + * // Process chunk + * } while (reader.has_next()); + * + * ``` + * + * Reading the whole given file at once through `read()` function is still supported if + * `chunk_read_limit == 0` (i.e., no reading limit). + * In such case, `read_chunk()` will also return rows of the entire file. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param sources Dataset sources + * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to use for device memory allocation + */ + explicit impl(std::size_t chunk_read_limit, + std::vector>&& sources, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + + /** + * @copydoc cudf::io::chunked_parquet_reader::has_next + */ + bool has_next(); + + /** + * @copydoc cudf::io::chunked_parquet_reader::read_chunk + */ + table_with_metadata read_chunk(); + private: /** * @brief Perform the necessary data preprocessing for parsing file later on. @@ -94,6 +136,29 @@ class reader::impl { void load_and_decompress_data(std::vector const& row_groups_info, size_type num_rows); + /** + * @brief Perform some preprocessing for page data and also compute the split locations + * {skip_rows, num_rows} for chunked reading. + * + * There are several pieces of information we can't compute directly from row counts in + * the parquet headers when dealing with nested schemas: + * - The total sizes of all output columns at all nesting levels + * - The starting output buffer offset for each page, for each nesting level + * + * For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders). + * + * @param skip_rows Crop all rows below skip_rows + * @param num_rows Maximum number of rows to read + * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific + * bounds + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + */ + void preprocess_pages(size_t skip_rows, + size_t num_rows, + bool uses_custom_row_bounds, + size_t chunk_read_limit); + /** * @brief Allocate nesting information storage for all pages and set pointers to it. * @@ -158,17 +223,26 @@ class reader::impl { // Buffers for generating output columns std::vector _output_buffers; + // Buffers copied from `_output_buffers` after construction for reuse + std::vector _output_buffers_template; + // _output_buffers associated schema indices std::vector _output_column_schemas; + // _output_buffers associated metadata + std::unique_ptr _output_metadata; + bool _strings_to_categorical = false; std::optional> _reader_column_schema; data_type _timestamp_type{type_id::EMPTY}; + // Variables used for chunked reading: cudf::io::parquet::gpu::file_intermediate_data _file_itm_data; - - size_type _skip_rows{0}; - size_type _num_rows{0}; + cudf::io::parquet::gpu::chunk_intermediate_data _chunk_itm_data; + std::vector _chunk_read_info; + std::size_t _chunk_read_limit{0}; + std::size_t _current_read_chunk{0}; + bool _file_preprocessed{false}; }; } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index ca2009d3c74..38fce7d3263 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -20,17 +20,30 @@ #include #include +#include +#include #include #include +#include #include +#include +#include +#include +#include +#include #include +#include +#include +#include +#include +#include +#include #include namespace cudf::io::detail::parquet { - namespace { /** @@ -157,7 +170,7 @@ void generate_depth_remappings(std::map, std::ve } /** - * @brief Function that returns the required the number of bits to store a value + * @brief Return the required number of bits to store a value. */ template [[nodiscard]] T required_bits(uint32_t max_level) @@ -197,7 +210,7 @@ template } /** - * @brief Reads compressed page data to device memory + * @brief Reads compressed page data to device memory. * * @param sources Dataset sources * @param page_data Buffers to hold compressed page data for each chunk @@ -606,6 +619,9 @@ void reader::impl::allocate_nesting_info() pni[cur_depth].max_def_level = cur_schema.max_definition_level; pni[cur_depth].max_rep_level = cur_schema.max_repetition_level; pni[cur_depth].size = 0; + pni[cur_depth].type = + to_type_id(cur_schema, _strings_to_categorical, _timestamp_type.id()); + pni[cur_depth].nullable = cur_schema.repetition_type == OPTIONAL; } // move up the hierarchy @@ -721,6 +737,7 @@ void reader::impl::load_and_decompress_data(std::vector const& r for (auto& task : read_rowgroup_tasks) { task.wait(); } + CUDF_EXPECTS(remaining_rows <= 0, "All rows data must be read."); // Process dataset chunk pages into output columns @@ -762,11 +779,669 @@ void reader::impl::load_and_decompress_data(std::vector const& r } } -void reader::impl::allocate_columns(size_t min_row, size_t total_rows, bool uses_custom_row_bounds) +namespace { + +struct cumulative_row_info { + size_t row_count; // cumulative row count + size_t size_bytes; // cumulative size in bytes + int key; // schema index +}; + +#if defined(PREPROCESS_DEBUG) +void print_pages(hostdevice_vector& pages, rmm::cuda_stream_view _stream) +{ + pages.device_to_host(_stream, true); + for (size_t idx = 0; idx < pages.size(); idx++) { + auto const& p = pages[idx]; + // skip dictionary pages + if (p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { continue; } + printf( + "P(%lu, s:%d): chunk_row(%d), num_rows(%d), skipped_values(%d), skipped_leaf_values(%d)\n", + idx, + p.src_col_schema, + p.chunk_row, + p.num_rows, + p.skipped_values, + p.skipped_leaf_values); + } +} + +void print_cumulative_page_info(hostdevice_vector& pages, + rmm::device_uvector const& page_index, + rmm::device_uvector const& c_info, + rmm::cuda_stream_view stream) +{ + pages.device_to_host(stream, true); + + printf("------------\nCumulative sizes by page\n"); + + std::vector schemas(pages.size()); + std::vector h_page_index(pages.size()); + cudaMemcpy( + h_page_index.data(), page_index.data(), sizeof(int) * pages.size(), cudaMemcpyDeviceToHost); + std::vector h_cinfo(pages.size()); + cudaMemcpy(h_cinfo.data(), + c_info.data(), + sizeof(cumulative_row_info) * pages.size(), + cudaMemcpyDeviceToHost); + auto schema_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](size_type i) { return pages[h_page_index[i]].src_col_schema; }); + thrust::copy(thrust::seq, schema_iter, schema_iter + pages.size(), schemas.begin()); + auto last = thrust::unique(thrust::seq, schemas.begin(), schemas.end()); + schemas.resize(last - schemas.begin()); + printf("Num schemas: %lu\n", schemas.size()); + + for (size_t idx = 0; idx < schemas.size(); idx++) { + printf("Schema %d\n", schemas[idx]); + for (size_t pidx = 0; pidx < pages.size(); pidx++) { + auto const& page = pages[h_page_index[pidx]]; + if (page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || page.src_col_schema != schemas[idx]) { + continue; + } + printf("\tP: {%lu, %lu}\n", h_cinfo[pidx].row_count, h_cinfo[pidx].size_bytes); + } + } +} + +void print_cumulative_row_info( + host_span sizes, + std::string const& label, + std::optional> splits = std::nullopt) +{ + if (splits.has_value()) { + printf("------------\nSplits\n"); + for (size_t idx = 0; idx < splits->size(); idx++) { + printf("{%lu, %lu}\n", splits.value()[idx].skip_rows, splits.value()[idx].num_rows); + } + } + + printf("------------\nCumulative sizes %s\n", label.c_str()); + for (size_t idx = 0; idx < sizes.size(); idx++) { + printf("{%lu, %lu, %d}", sizes[idx].row_count, sizes[idx].size_bytes, sizes[idx].key); + if (splits.has_value()) { + // if we have a split at this row count and this is the last instance of this row count + auto start = thrust::make_transform_iterator( + splits->begin(), [](gpu::chunk_read_info const& i) { return i.skip_rows; }); + auto end = start + splits->size(); + auto split = std::find(start, end, sizes[idx].row_count); + auto const split_index = [&]() -> int { + if (split != end && + ((idx == sizes.size() - 1) || (sizes[idx + 1].row_count > sizes[idx].row_count))) { + return static_cast(std::distance(start, split)); + } + return idx == 0 ? 0 : -1; + }(); + if (split_index >= 0) { + printf(" <-- split {%lu, %lu}", + splits.value()[split_index].skip_rows, + splits.value()[split_index].num_rows); + } + } + printf("\n"); + } +} +#endif // PREPROCESS_DEBUG + +/** + * @brief Functor which reduces two cumulative_row_info structs of the same key. + */ +struct cumulative_row_sum { + cumulative_row_info operator() + __device__(cumulative_row_info const& a, cumulative_row_info const& b) const + { + return cumulative_row_info{a.row_count + b.row_count, a.size_bytes + b.size_bytes, a.key}; + } +}; + +/** + * @brief Functor which computes the total data size for a given type of cudf column. + * + * In the case of strings, the return size does not include the chars themselves. That + * information is tracked separately (see PageInfo::str_bytes). + */ +struct row_size_functor { + __device__ size_t validity_size(size_t num_rows, bool nullable) + { + return nullable ? (cudf::util::div_rounding_up_safe(num_rows, size_t{32}) * 4) : 0; + } + + template + __device__ size_t operator()(size_t num_rows, bool nullable) + { + auto const element_size = sizeof(device_storage_type_t); + return (element_size * num_rows) + validity_size(num_rows, nullable); + } +}; + +template <> +__device__ size_t row_size_functor::operator()(size_t num_rows, bool nullable) +{ + auto const offset_size = sizeof(offset_type); + // NOTE: Adding the + 1 offset here isn't strictly correct. There will only be 1 extra offset + // for the entire column, whereas this is adding an extra offset per page. So we will get a + // small over-estimate of the real size of the order : # of pages * 4 bytes. It seems better + // to overestimate size somewhat than to underestimate it and potentially generate chunks + // that are too large. + return (offset_size * (num_rows + 1)) + validity_size(num_rows, nullable); +} + +template <> +__device__ size_t row_size_functor::operator()(size_t num_rows, bool nullable) +{ + return validity_size(num_rows, nullable); +} + +template <> +__device__ size_t row_size_functor::operator()(size_t num_rows, bool nullable) +{ + // only returns the size of offsets and validity. the size of the actual string chars + // is tracked separately. + auto const offset_size = sizeof(offset_type); + // see note about offsets in the list_view template. + return (offset_size * (num_rows + 1)) + validity_size(num_rows, nullable); +} + +/** + * @brief Functor which computes the total output cudf data size for all of + * the data in this page. + * + * Sums across all nesting levels. + */ +struct get_cumulative_row_info { + gpu::PageInfo const* const pages; + + __device__ cumulative_row_info operator()(size_type index) + { + auto const& page = pages[index]; + if (page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + return cumulative_row_info{0, 0, page.src_col_schema}; + } + + // total nested size, not counting string data + auto iter = + cudf::detail::make_counting_transform_iterator(0, [page, index] __device__(size_type i) { + auto const& pni = page.nesting[i]; + return cudf::type_dispatcher( + data_type{pni.type}, row_size_functor{}, pni.size, pni.nullable); + }); + + size_t const row_count = static_cast(page.nesting[0].size); + return {row_count, + thrust::reduce(thrust::seq, iter, iter + page.num_nesting_levels) + page.str_bytes, + page.src_col_schema}; + } +}; + +/** + * @brief Functor which computes the effective size of all input columns by page. + * + * For a given row, we want to find the cost of all pages for all columns involved + * in loading up to that row. The complication here is that not all pages are the + * same size between columns. Example: + * + * page row counts + * Column A: 0 <----> 100 <----> 200 + * Column B: 0 <---------------> 200 <--------> 400 + | + * if we decide to split at row 100, we don't really know the actual amount of bytes in column B + * at that point. So we have to proceed as if we are taking the bytes from all 200 rows of that + * page. Essentially, a conservative over-estimate of the real size. + */ +struct row_total_size { + cumulative_row_info const* c_info; + size_type const* key_offsets; + size_t num_keys; + + __device__ cumulative_row_info operator()(cumulative_row_info const& i) + { + // sum sizes for each input column at this row + size_t sum = 0; + for (int idx = 0; idx < num_keys; idx++) { + auto const start = key_offsets[idx]; + auto const end = key_offsets[idx + 1]; + auto iter = cudf::detail::make_counting_transform_iterator( + 0, [&] __device__(size_type i) { return c_info[i].row_count; }); + auto const page_index = + thrust::lower_bound(thrust::seq, iter + start, iter + end, i.row_count) - iter; + sum += c_info[page_index].size_bytes; + } + return {i.row_count, sum, i.key}; + } +}; + +/** + * @brief Given a vector of cumulative {row_count, byte_size} pairs and a chunk read + * limit, determine the set of splits. + * + * @param sizes Vector of cumulative {row_count, byte_size} pairs + * @param num_rows Total number of rows to read + * @param chunk_read_limit Limit on total number of bytes to be returned per read, for all columns + */ +std::vector find_splits(std::vector const& sizes, + size_t num_rows, + size_t chunk_read_limit) +{ + // now we have an array of {row_count, real output bytes}. just walk through it and generate + // splits. + // TODO: come up with a clever way to do this entirely in parallel. For now, as long as batch + // sizes are reasonably large, this shouldn't iterate too many times + std::vector splits; + { + size_t cur_pos = 0; + size_t cur_cumulative_size = 0; + size_t cur_row_count = 0; + auto start = thrust::make_transform_iterator(sizes.begin(), [&](cumulative_row_info const& i) { + return i.size_bytes - cur_cumulative_size; + }); + auto end = start + sizes.size(); + while (cur_row_count < num_rows) { + int64_t split_pos = + thrust::lower_bound(thrust::seq, start + cur_pos, end, chunk_read_limit) - start; + + // if we're past the end, or if the returned bucket is > than the chunk_read_limit, move back + // one. + if (static_cast(split_pos) >= sizes.size() || + (sizes[split_pos].size_bytes - cur_cumulative_size > chunk_read_limit)) { + split_pos--; + } + + // best-try. if we can't find something that'll fit, we have to go bigger. we're doing this in + // a loop because all of the cumulative sizes for all the pages are sorted into one big list. + // so if we had two columns, both of which had an entry {1000, 10000}, that entry would be in + // the list twice. so we have to iterate until we skip past all of them. The idea is that we + // either do this, or we have to call unique() on the input first. + while (split_pos < (static_cast(sizes.size()) - 1) && + (split_pos < 0 || sizes[split_pos].row_count == cur_row_count)) { + split_pos++; + } + + auto const start_row = cur_row_count; + cur_row_count = sizes[split_pos].row_count; + splits.push_back(gpu::chunk_read_info{start_row, cur_row_count - start_row}); + cur_pos = split_pos; + cur_cumulative_size = sizes[split_pos].size_bytes; + } + } + // print_cumulative_row_info(sizes, "adjusted", splits); + + return splits; +} + +/** + * @brief Given a set of pages that have had their sizes computed by nesting level and + * a limit on total read size, generate a set of {skip_rows, num_rows} pairs representing + * a set of reads that will generate output columns of total size <= `chunk_read_limit` bytes. + * + * @param pages All pages in the file + * @param id Additional intermediate information required to process the pages + * @param num_rows Total number of rows to read + * @param chunk_read_limit Limit on total number of bytes to be returned per read, for all columns + * @param stream CUDA stream to use, default 0 + */ +std::vector compute_splits(hostdevice_vector& pages, + gpu::chunk_intermediate_data const& id, + size_t num_rows, + size_t chunk_read_limit, + rmm::cuda_stream_view stream) +{ + auto const& page_keys = id.page_keys; + auto const& page_index = id.page_index; + + // generate cumulative row counts and sizes + rmm::device_uvector c_info(page_keys.size(), stream); + // convert PageInfo to cumulative_row_info + auto page_input = thrust::make_transform_iterator(page_index.begin(), + get_cumulative_row_info{pages.device_ptr()}); + thrust::inclusive_scan_by_key(rmm::exec_policy(stream), + page_keys.begin(), + page_keys.end(), + page_input, + c_info.begin(), + thrust::equal_to{}, + cumulative_row_sum{}); + // print_cumulative_page_info(pages, page_index, c_info, stream); + + // sort by row count + rmm::device_uvector c_info_sorted{c_info, stream}; + thrust::sort(rmm::exec_policy(stream), + c_info_sorted.begin(), + c_info_sorted.end(), + [] __device__(cumulative_row_info const& a, cumulative_row_info const& b) { + return a.row_count < b.row_count; + }); + + std::vector h_c_info_sorted(c_info_sorted.size()); + cudaMemcpy(h_c_info_sorted.data(), + c_info_sorted.data(), + sizeof(cumulative_row_info) * c_info_sorted.size(), + cudaMemcpyDeviceToHost); + // print_cumulative_row_info(h_c_info_sorted, "raw"); + + // generate key offsets (offsets to the start of each partition of keys). worst case is 1 page per + // key + rmm::device_uvector key_offsets(page_keys.size() + 1, stream); + auto const key_offsets_end = thrust::reduce_by_key(rmm::exec_policy(stream), + page_keys.begin(), + page_keys.end(), + thrust::make_constant_iterator(1), + thrust::make_discard_iterator(), + key_offsets.begin()) + .second; + size_t const num_unique_keys = key_offsets_end - key_offsets.begin(); + thrust::exclusive_scan( + rmm::exec_policy(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin()); + + // adjust the cumulative info such that for each row count, the size includes any pages that span + // that row count. this is so that if we have this case: + // page row counts + // Column A: 0 <----> 100 <----> 200 + // Column B: 0 <---------------> 200 <--------> 400 + // | + // if we decide to split at row 100, we don't really know the actual amount of bytes in column B + // at that point. So we have to proceed as if we are taking the bytes from all 200 rows of that + // page. + // + rmm::device_uvector aggregated_info(c_info.size(), stream); + thrust::transform(rmm::exec_policy(stream), + c_info_sorted.begin(), + c_info_sorted.end(), + aggregated_info.begin(), + row_total_size{c_info.data(), key_offsets.data(), num_unique_keys}); + + // bring back to the cpu + std::vector h_aggregated_info(aggregated_info.size()); + cudaMemcpyAsync(h_aggregated_info.data(), + aggregated_info.data(), + sizeof(cumulative_row_info) * c_info.size(), + cudaMemcpyDeviceToHost, + stream); + stream.synchronize(); + + return find_splits(h_aggregated_info, num_rows, chunk_read_limit); +} + +struct get_page_chunk_idx { + __device__ size_type operator()(gpu::PageInfo const& page) { return page.chunk_idx; } +}; + +struct get_page_num_rows { + __device__ size_type operator()(gpu::PageInfo const& page) { return page.num_rows; } +}; + +struct get_page_schema { + __device__ size_type operator()(gpu::PageInfo const& page) { return page.src_col_schema; } +}; + +/** + * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. + */ +struct get_page_nesting_size { + size_type const src_col_schema; + size_type const depth; + gpu::PageInfo const* const pages; + + __device__ size_type operator()(int index) const + { + auto const& page = pages[index]; + if (page.src_col_schema != src_col_schema || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + return 0; + } + return page.nesting[depth].batch_size; + } +}; + +/** + * @brief Writes to the chunk_row field of the PageInfo struct. + */ +struct chunk_row_output_iter { + gpu::PageInfo* p; + using value_type = size_type; + using difference_type = size_type; + using pointer = size_type*; + using reference = size_type&; + using iterator_category = thrust::output_device_iterator_tag; + + __host__ __device__ chunk_row_output_iter operator+(int i) + { + return chunk_row_output_iter{p + i}; + } + + __host__ __device__ void operator++() { p++; } + + __device__ reference operator[](int i) { return p[i].chunk_row; } + __device__ reference operator*() { return p->chunk_row; } +}; + +/** + * @brief Writes to the page_start_value field of the PageNestingInfo struct, keyed by schema. + */ +struct start_offset_output_iterator { + gpu::PageInfo* pages; + int const* page_indices; + int cur_index; + int src_col_schema; + int nesting_depth; + int empty = 0; + using value_type = size_type; + using difference_type = size_type; + using pointer = size_type*; + using reference = size_type&; + using iterator_category = thrust::output_device_iterator_tag; + + constexpr void operator=(start_offset_output_iterator const& other) + { + pages = other.pages; + page_indices = other.page_indices; + cur_index = other.cur_index; + src_col_schema = other.src_col_schema; + nesting_depth = other.nesting_depth; + } + + constexpr start_offset_output_iterator operator+(int i) + { + return start_offset_output_iterator{ + pages, page_indices, cur_index + i, src_col_schema, nesting_depth}; + } + + constexpr void operator++() { cur_index++; } + + __device__ reference operator[](int i) { return dereference(cur_index + i); } + __device__ reference operator*() { return dereference(cur_index); } + + private: + __device__ reference dereference(int index) + { + gpu::PageInfo const& p = pages[page_indices[index]]; + if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + return empty; + } + return p.nesting[nesting_depth].page_start_value; + } +}; + +} // anonymous namespace + +void reader::impl::preprocess_pages(size_t skip_rows, + size_t num_rows, + bool uses_custom_row_bounds, + size_t chunk_read_limit) +{ + auto& chunks = _file_itm_data.chunks; + auto& pages = _file_itm_data.pages_info; + + // iterate over all input columns and determine if they contain lists so we can further + // preprocess them. + bool has_lists = false; + for (size_t idx = 0; idx < _input_columns.size(); idx++) { + auto const& input_col = _input_columns[idx]; + size_t const max_depth = input_col.nesting_depth(); + + auto* cols = &_output_buffers; + for (size_t l_idx = 0; l_idx < max_depth; l_idx++) { + auto& out_buf = (*cols)[input_col.nesting[l_idx]]; + cols = &out_buf.children; + + // if this has a list parent, we have to get column sizes from the + // data computed during gpu::ComputePageSizes + if (out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) { + has_lists = true; + break; + } + } + if (has_lists) { break; } + } + + // generate string dict indices if necessary + { + auto is_dict_chunk = [](const gpu::ColumnChunkDesc& chunk) { + return (chunk.data_type & 0x7) == BYTE_ARRAY && chunk.num_dict_pages > 0; + }; + + // Count the number of string dictionary entries + // NOTE: Assumes first page in the chunk is always the dictionary page + size_t total_str_dict_indexes = 0; + for (size_t c = 0, page_count = 0; c < chunks.size(); c++) { + if (is_dict_chunk(chunks[c])) { + total_str_dict_indexes += pages[page_count].num_input_values; + } + page_count += chunks[c].max_num_pages; + } + + // Build index for string dictionaries since they can't be indexed + // directly due to variable-sized elements + _chunk_itm_data.str_dict_index = + cudf::detail::make_zeroed_device_uvector_async(total_str_dict_indexes, + _stream); + + // Update chunks with pointers to string dict indices + for (size_t c = 0, page_count = 0, str_ofs = 0; c < chunks.size(); c++) { + input_column_info const& input_col = _input_columns[chunks[c].src_col_index]; + CUDF_EXPECTS(input_col.schema_idx == chunks[c].src_col_schema, + "Column/page schema index mismatch"); + if (is_dict_chunk(chunks[c])) { + chunks[c].str_dict_index = _chunk_itm_data.str_dict_index.data() + str_ofs; + str_ofs += pages[page_count].num_input_values; + } + + // column_data_base will always point to leaf data, even for nested types. + page_count += chunks[c].max_num_pages; + } + + if (total_str_dict_indexes > 0) { + chunks.host_to_device(_stream); + gpu::BuildStringDictionaryIndex(chunks.device_ptr(), chunks.size(), _stream); + } + } + + // intermediate data we will need for further chunked reads + if (has_lists || chunk_read_limit > 0) { + // computes: + // PageNestingInfo::num_rows for each page. the true number of rows (taking repetition into + // account), not just the number of values. PageNestingInfo::size for each level of nesting, for + // each page. + // + // we will be applying a later "trim" pass if skip_rows/num_rows is being used, which can happen + // if: + // - user has passed custom row bounds + // - we will be doing a chunked read + gpu::ComputePageSizes(pages, + chunks, + 0, // 0-max size_t. process all possible rows + std::numeric_limits::max(), + true, // compute num_rows + chunk_read_limit > 0, // compute string sizes + _stream); + + // computes: + // PageInfo::chunk_row (the absolute start row index) for all pages + // Note: this is doing some redundant work for pages in flat hierarchies. chunk_row has already + // been computed during header decoding. the overall amount of work here is very small though. + auto key_input = thrust::make_transform_iterator(pages.device_ptr(), get_page_chunk_idx{}); + auto page_input = thrust::make_transform_iterator(pages.device_ptr(), get_page_num_rows{}); + thrust::exclusive_scan_by_key(rmm::exec_policy(_stream), + key_input, + key_input + pages.size(), + page_input, + chunk_row_output_iter{pages.device_ptr()}); + + // compute page ordering. + // + // ordering of pages is by input column schema, repeated across row groups. so + // if we had 3 columns, each with 2 pages, and 1 row group, our schema values might look like + // + // 1, 1, 2, 2, 3, 3 + // + // However, if we had more than one row group, the pattern would be + // + // 1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3 + // ^ row group 0 | + // ^ row group 1 + // + // To use exclusive_scan_by_key, the ordering we actually want is + // + // 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 + // + // We also need to preserve key-relative page ordering, so we need to use a stable sort. + _chunk_itm_data.page_keys = rmm::device_uvector(pages.size(), _stream); + _chunk_itm_data.page_index = rmm::device_uvector(pages.size(), _stream); + auto& page_keys = _chunk_itm_data.page_keys; + auto& page_index = _chunk_itm_data.page_index; + { + thrust::transform(rmm::exec_policy(_stream), + pages.device_ptr(), + pages.device_ptr() + pages.size(), + page_keys.begin(), + get_page_schema{}); + + thrust::sequence(rmm::exec_policy(_stream), page_index.begin(), page_index.end()); + thrust::stable_sort_by_key(rmm::exec_policy(_stream), + page_keys.begin(), + page_keys.end(), + page_index.begin(), + thrust::less()); + } + + // retrieve pages back + pages.device_to_host(_stream, true); + +#if defined(PREPROCESS_DEBUG) + print_pages(pages, _stream); +#endif + } + + // compute splits if necessary. otherwise retun a single split representing + // the whole file. + _chunk_read_info = chunk_read_limit > 0 + ? compute_splits(pages, _chunk_itm_data, num_rows, chunk_read_limit, _stream) + : std::vector{{skip_rows, num_rows}}; +} + +void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses_custom_row_bounds) { auto const& chunks = _file_itm_data.chunks; auto& pages = _file_itm_data.pages_info; + // Should not reach here if there is no page data. + CUDF_EXPECTS(pages.size() > 0, "There is no page to parse"); + + // computes: + // PageNestingInfo::batch_size for each level of nesting, for each page, taking row bounds into + // account. PageInfo::skipped_values, which tells us where to start decoding in the input to + // respect the user bounds. It is only necessary to do this second pass if uses_custom_row_bounds + // is set (if the user has specified artifical bounds). + if (uses_custom_row_bounds) { + gpu::ComputePageSizes(pages, + chunks, + skip_rows, + num_rows, + false, // num_rows is already computed + false, // no need to compute string sizes + _stream); +#if defined(PREPROCESS_DEBUG) + print_pages(pages, _stream); +#endif + } + // iterate over all input columns and allocate any associated output // buffers if they are not part of a list hierarchy. mark down // if we have any list columns that need further processing. @@ -780,8 +1455,8 @@ void reader::impl::allocate_columns(size_t min_row, size_t total_rows, bool uses auto& out_buf = (*cols)[input_col.nesting[l_idx]]; cols = &out_buf.children; - // if this has a list parent, we will have to do further work in gpu::PreprocessColumnData - // to know how big this buffer actually is. + // if this has a list parent, we have to get column sizes from the + // data computed during gpu::ComputePageSizes if (out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) { has_lists = true; } @@ -789,25 +1464,63 @@ void reader::impl::allocate_columns(size_t min_row, size_t total_rows, bool uses else if (out_buf.size == 0) { // add 1 for the offset if this is a list column out_buf.create( - out_buf.type.id() == type_id::LIST && l_idx < max_depth ? total_rows + 1 : total_rows, + out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows, _stream, _mr); } } } - // if we have columns containing lists, further preprocessing is necessary. + // compute output column sizes by examining the pages of the -input- columns if (has_lists) { - gpu::PreprocessColumnData(pages, - chunks, - _input_columns, - _output_buffers, - total_rows, - min_row, - uses_custom_row_bounds, - _stream, - _mr); - _stream.synchronize(); + auto& page_keys = _chunk_itm_data.page_keys; + auto& page_index = _chunk_itm_data.page_index; + for (size_t idx = 0; idx < _input_columns.size(); idx++) { + auto const& input_col = _input_columns[idx]; + auto src_col_schema = input_col.schema_idx; + size_t max_depth = input_col.nesting_depth(); + + auto* cols = &_output_buffers; + for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { + auto& out_buf = (*cols)[input_col.nesting[l_idx]]; + cols = &out_buf.children; + + // size iterator. indexes pages by sorted order + auto size_input = thrust::make_transform_iterator( + page_index.begin(), + get_page_nesting_size{src_col_schema, static_cast(l_idx), pages.device_ptr()}); + + // if this buffer is part of a list hierarchy, we need to determine it's + // final size and allocate it here. + // + // for struct columns, higher levels of the output columns are shared between input + // columns. so don't compute any given level more than once. + if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { + int size = + thrust::reduce(rmm::exec_policy(_stream), size_input, size_input + pages.size()); + + // if this is a list column add 1 for non-leaf levels for the terminating offset + if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } + + // allocate + out_buf.create(size, _stream, _mr); + } + + // for nested hierarchies, compute per-page start offset + if (input_col.has_repetition) { + thrust::exclusive_scan_by_key( + rmm::exec_policy(_stream), + page_keys.begin(), + page_keys.end(), + size_input, + start_offset_output_iterator{pages.device_ptr(), + page_index.begin(), + 0, + static_cast(src_col_schema), + static_cast(l_idx)}); + } + } + } } } diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index de145486662..89ba5c598e8 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -55,6 +55,33 @@ void column_buffer::create(size_type _size, } } +namespace { + +/** + * @brief Recursively copy `name` and `user_data` fields of one buffer to another. + * + * @param buff The old output buffer + * @param new_buff The new output buffer + */ +void copy_buffer_data(column_buffer const& buff, column_buffer& new_buff) +{ + new_buff.name = buff.name; + new_buff.user_data = buff.user_data; + for (auto const& child : buff.children) { + auto& new_child = new_buff.children.emplace_back(column_buffer(child.type, child.is_nullable)); + copy_buffer_data(child, new_child); + } +} + +} // namespace + +column_buffer column_buffer::empty_like(column_buffer const& input) +{ + auto new_buff = column_buffer(input.type, input.is_nullable); + copy_buffer_data(input, new_buff); + return new_buff; +} + /** * @copydoc cudf::io::detail::make_column */ diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index 8ae3d39a3ba..8f181157fae 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -104,10 +104,14 @@ struct column_buffer { { return static_cast(_null_mask.data()); } - auto null_mask_size() { return _null_mask.size(); }; + auto null_mask_size() { return _null_mask.size(); } auto& null_count() { return _null_count; } + // Create a new column_buffer that has empty data but with the same basic information as the + // input column, including same type, nullability, name, and user_data. + static column_buffer empty_like(column_buffer const& input); + std::unique_ptr> _strings; rmm::device_buffer _data{}; rmm::device_buffer _null_mask{}; diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index c602ccc7374..bdf74368ffe 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -223,7 +223,7 @@ ConfigureTest(DECOMPRESSION_TEST io/comp/decomp_test.cpp) ConfigureTest(CSV_TEST io/csv_test.cpp) ConfigureTest(FILE_IO_TEST io/file_io_test.cpp) ConfigureTest(ORC_TEST io/orc_test.cpp) -ConfigureTest(PARQUET_TEST io/parquet_test.cpp) +ConfigureTest(PARQUET_TEST io/parquet_test.cpp io/parquet_chunked_reader_test.cpp) ConfigureTest(JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp) ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu) ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) diff --git a/cpp/tests/io/parquet_chunked_reader_test.cpp b/cpp/tests/io/parquet_chunked_reader_test.cpp new file mode 100644 index 00000000000..76a65857e6f --- /dev/null +++ b/cpp/tests/io/parquet_chunked_reader_test.cpp @@ -0,0 +1,887 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include + +#include +#include + +namespace { +// Global environment for temporary files +auto const temp_env = static_cast( + ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); + +using int32s_col = cudf::test::fixed_width_column_wrapper; +using int64s_col = cudf::test::fixed_width_column_wrapper; +using strings_col = cudf::test::strings_column_wrapper; +using structs_col = cudf::test::structs_column_wrapper; +using int32s_lists_col = cudf::test::lists_column_wrapper; + +auto write_file(std::vector>& input_columns, + std::string const& filename, + bool nullable, + std::size_t max_page_size_bytes = cudf::io::default_max_page_size_bytes, + std::size_t max_page_size_rows = cudf::io::default_max_page_size_rows) +{ + // Just shift nulls of the next column by one position to avoid having all nulls in the same + // table rows. + if (nullable) { + // Generate deterministic bitmask instead of random bitmask for easy computation of data size. + auto const valid_iter = cudf::detail::make_counting_transform_iterator( + 0, [](cudf::size_type i) { return i % 4 != 3; }); + + cudf::size_type offset{0}; + for (auto& col : input_columns) { + auto const col_typeid = col->type().id(); + col->set_null_mask( + cudf::test::detail::make_null_mask(valid_iter + offset, valid_iter + col->size() + offset)); + + if (col_typeid == cudf::type_id::STRUCT) { + auto const null_mask = col->view().null_mask(); + auto const null_count = col->null_count(); + + for (cudf::size_type idx = 0; idx < col->num_children(); ++idx) { + cudf::structs::detail::superimpose_parent_nulls(null_mask, + null_count, + col->child(idx), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + } + } + + if (col_typeid == cudf::type_id::LIST || col_typeid == cudf::type_id::STRUCT || + col_typeid == cudf::type_id::STRING) { + col = cudf::purge_nonempty_nulls(col->view()); + } + } + } + + auto input_table = std::make_unique(std::move(input_columns)); + auto filepath = + temp_env->get_temp_filepath(nullable ? filename + "_nullable.parquet" : filename + ".parquet"); + + auto const write_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *input_table) + .max_page_size_bytes(max_page_size_bytes) + .max_page_size_rows(max_page_size_rows) + .build(); + cudf::io::write_parquet(write_opts); + + return std::pair{std::move(input_table), std::move(filepath)}; +} + +auto chunked_read(std::string const& filepath, std::size_t byte_limit) +{ + auto const read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build(); + auto reader = cudf::io::chunked_parquet_reader(byte_limit, read_opts); + + auto num_chunks = 0; + auto out_tables = std::vector>{}; + + do { + auto chunk = reader.read_chunk(); + // If the input file is empty, the first call to `read_chunk` will return an empty table. + // Thus, we only check for non-empty output table from the second call. + if (num_chunks > 0) { + CUDF_EXPECTS(chunk.tbl->num_rows() != 0, "Number of rows in the new chunk is zero."); + } + ++num_chunks; + out_tables.emplace_back(std::move(chunk.tbl)); + } while (reader.has_next()); + + auto out_tviews = std::vector{}; + for (auto const& tbl : out_tables) { + out_tviews.emplace_back(tbl->view()); + } + + return std::pair(cudf::concatenate(out_tviews), num_chunks); +} + +} // namespace + +struct ParquetChunkedReaderTest : public cudf::test::BaseFixture { +}; + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadNoData) +{ + std::vector> input_columns; + input_columns.emplace_back(int32s_col{}.release()); + input_columns.emplace_back(int64s_col{}.release()); + + auto const [expected, filepath] = write_file(input_columns, "chunked_read_empty", false); + auto const [result, num_chunks] = chunked_read(filepath, 1'000); + EXPECT_EQ(num_chunks, 1); + EXPECT_EQ(result->num_rows(), 0); + EXPECT_EQ(result->num_columns(), 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadSimpleData) +{ + auto constexpr num_rows = 40'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const value_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(value_iter, value_iter + num_rows).release()); + input_columns.emplace_back(int64s_col(value_iter, value_iter + num_rows).release()); + + return write_file(input_columns, "chunked_read_simple", nullable); + }; + + { + auto const [expected, filepath] = generate_input(false); + auto const [result, num_chunks] = chunked_read(filepath, 240'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + { + auto const [expected, filepath] = generate_input(true); + auto const [result, num_chunks] = chunked_read(filepath, 240'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadBoundaryCases) +{ + // Tests some specific boundary conditions in the split calculations. + + auto constexpr num_rows = 40'000; + + auto const [expected, filepath] = [num_rows]() { + std::vector> input_columns; + auto const value_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(value_iter, value_iter + num_rows).release()); + return write_file(input_columns, "chunked_read_simple_boundary", false /*nullable*/); + }(); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath, 1); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit slightly less than one page of data + { + auto const [result, num_chunks] = chunked_read(filepath, 79'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit exactly the size one page of data + { + auto const [result, num_chunks] = chunked_read(filepath, 80'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit slightly more the size one page of data + { + auto const [result, num_chunks] = chunked_read(filepath, 81'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit slightly less than two pages of data + { + auto const [result, num_chunks] = chunked_read(filepath, 159'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit exactly the size of two pages of data minus one byte + { + auto const [result, num_chunks] = chunked_read(filepath, 159'999); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit exactly the size of two pages of data + { + auto const [result, num_chunks] = chunked_read(filepath, 160'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit slightly more the size two pages of data + { + auto const [result, num_chunks] = chunked_read(filepath, 161'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithString) +{ + auto constexpr num_rows = 60'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const value_iter = thrust::make_counting_iterator(0); + + // ints Page total bytes cumulative bytes + // 20000 rows of 4 bytes each = A0 80000 80000 + // 20000 rows of 4 bytes each = A1 80000 160000 + // 20000 rows of 4 bytes each = A2 80000 240000 + input_columns.emplace_back(int32s_col(value_iter, value_iter + num_rows).release()); + + // strings Page total bytes cumulative bytes + // 20000 rows of 1 char each (20000 + 80004) = B0 100004 100004 + // 20000 rows of 4 chars each (80000 + 80004) = B1 160004 260008 + // 20000 rows of 16 chars each (320000 + 80004) = B2 400004 660012 + auto const strings = std::vector{"a", "bbbb", "cccccccccccccccc"}; + auto const str_iter = cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { + if (i < 20000) { return strings[0]; } + if (i < 40000) { return strings[1]; } + return strings[2]; + }); + input_columns.emplace_back(strings_col(str_iter, str_iter + num_rows).release()); + + // Cumulative sizes: + // A0 + B0 : 180004 + // A1 + B1 : 420008 + // A2 + B2 : 900012 + // skip_rows / num_rows + // byte_limit==500000 should give 2 chunks: {0, 40000}, {40000, 20000} + // byte_limit==1000000 should give 1 chunks: {0, 60000}, + return write_file(input_columns, + "chunked_read_with_strings", + nullable, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }; + + auto const [expected_no_null, filepath_no_null] = generate_input(false); + auto const [expected_with_nulls, filepath_with_nulls] = generate_input(true); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Other tests: + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 500'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 500'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'000'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'000'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithStructs) +{ + auto constexpr num_rows = 100'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const int_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(int_iter, int_iter + num_rows).release()); + input_columns.emplace_back([=] { + auto child1 = int32s_col(int_iter, int_iter + num_rows); + auto child2 = int32s_col(int_iter + num_rows, int_iter + num_rows * 2); + + auto const str_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](int32_t i) { return std::to_string(i); }); + auto child3 = strings_col{str_iter, str_iter + num_rows}; + + return structs_col{{child1, child2, child3}}.release(); + }()); + + return write_file(input_columns, + "chunked_read_with_structs", + nullable, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }; + + auto const [expected_no_null, filepath_no_null] = generate_input(false); + auto const [expected_with_nulls, filepath_with_nulls] = generate_input(true); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Other tests: + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 500'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 500'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithListsNoNulls) +{ + auto constexpr num_rows = 100'000; + + auto const [expected, filepath] = [num_rows]() { + std::vector> input_columns; + // 20000 rows in 1 page consist of: + // + // 20001 offsets : 80004 bytes + // 30000 ints : 120000 bytes + // total : 200004 bytes + auto const template_lists = int32s_lists_col{ + int32s_lists_col{}, int32s_lists_col{0}, int32s_lists_col{1, 2}, int32s_lists_col{3, 4, 5}}; + + auto const gather_iter = + cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { return i % 4; }); + auto const gather_map = int32s_col(gather_iter, gather_iter + num_rows); + input_columns.emplace_back( + std::move(cudf::gather(cudf::table_view{{template_lists}}, gather_map)->release().front())); + + return write_file(input_columns, + "chunked_read_with_lists_no_null", + false /*nullable*/, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }(); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size slightly less than 1 page (forcing it to be at least 1 page per read) + { + auto const [result, num_chunks] = chunked_read(filepath, 200'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size exactly 1 page + { + auto const [result, num_chunks] = chunked_read(filepath, 200'004); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size 2 pages. 3 chunks (2 pages + 2 pages + 1 page) + { + auto const [result, num_chunks] = chunked_read(filepath, 400'008); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size 2 pages minus one byte: each chunk will be just one page + { + auto const [result, num_chunks] = chunked_read(filepath, 400'007); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithListsHavingNulls) +{ + auto constexpr num_rows = 100'000; + + auto const [expected, filepath] = [num_rows]() { + std::vector> input_columns; + // 20000 rows in 1 page consist of: + // + // 625 validity words : 2500 bytes (a null every 4 rows: null at indices [3, 7, 11, ...]) + // 20001 offsets : 80004 bytes + // 15000 ints : 60000 bytes + // total : 142504 bytes + auto const template_lists = + int32s_lists_col{// these will all be null + int32s_lists_col{}, + int32s_lists_col{0}, + int32s_lists_col{1, 2}, + int32s_lists_col{3, 4, 5, 6, 7, 8, 9} /* this list will be nullified out */}; + auto const gather_iter = + cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { return i % 4; }); + auto const gather_map = int32s_col(gather_iter, gather_iter + num_rows); + input_columns.emplace_back( + std::move(cudf::gather(cudf::table_view{{template_lists}}, gather_map)->release().front())); + + return write_file(input_columns, + "chunked_read_with_lists_nulls", + true /*nullable*/, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }(); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size slightly less than 1 page (forcing it to be at least 1 page per read) + { + auto const [result, num_chunks] = chunked_read(filepath, 142'500); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size exactly 1 page + { + auto const [result, num_chunks] = chunked_read(filepath, 142'504); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size 2 pages. 3 chunks (2 pages + 2 pages + 1 page) + { + auto const [result, num_chunks] = chunked_read(filepath, 285'008); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size 2 pages minus 1 byte: each chunk will be just one page + { + auto const [result, num_chunks] = chunked_read(filepath, 285'007); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithStructsOfLists) +{ + auto constexpr num_rows = 100'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const int_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(int_iter, int_iter + num_rows).release()); + input_columns.emplace_back([=] { + std::vector> child_columns; + child_columns.emplace_back(int32s_col(int_iter, int_iter + num_rows).release()); + child_columns.emplace_back( + int32s_col(int_iter + num_rows, int_iter + num_rows * 2).release()); + + auto const str_iter = cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { + return std::to_string(i) + "++++++++++++++++++++" + std::to_string(i); + }); + child_columns.emplace_back(strings_col{str_iter, str_iter + num_rows}.release()); + + auto const template_lists = int32s_lists_col{ + int32s_lists_col{}, int32s_lists_col{0}, int32s_lists_col{0, 1}, int32s_lists_col{0, 1, 2}}; + auto const gather_iter = + cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { return i % 4; }); + auto const gather_map = int32s_col(gather_iter, gather_iter + num_rows); + child_columns.emplace_back( + std::move(cudf::gather(cudf::table_view{{template_lists}}, gather_map)->release().front())); + + return structs_col(std::move(child_columns)).release(); + }()); + + return write_file(input_columns, + "chunked_read_with_structs_of_lists", + nullable, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }; + + auto const [expected_no_null, filepath_no_null] = generate_input(false); + auto const [expected_with_nulls, filepath_with_nulls] = generate_input(true); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + EXPECT_EQ(num_chunks, 10); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Other tests: + + // for these tests, different columns get written to different numbers of pages so it's a + // little tricky to describe the expected results by page counts. To get an idea of how + // these values are chosen, see the debug output from the call to print_cumulative_row_info() in + // reader_impl_preprocess.cu -> find_splits() + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'000'000); + EXPECT_EQ(num_chunks, 7); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'500'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2'000'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 5'000'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'000'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'500'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2'000'000); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 5'000'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithListsOfStructs) +{ + auto constexpr num_rows = 100'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const int_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(int_iter, int_iter + num_rows).release()); + + auto offsets = std::vector{}; + offsets.reserve(num_rows * 2); + cudf::size_type num_structs = 0; + for (int i = 0; i < num_rows; ++i) { + offsets.push_back(num_structs); + auto const new_list_size = i % 4; + num_structs += new_list_size; + } + offsets.push_back(num_structs); + + auto const make_structs_col = [=] { + auto child1 = int32s_col(int_iter, int_iter + num_structs); + auto child2 = int32s_col(int_iter + num_structs, int_iter + num_structs * 2); + + auto const str_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](int32_t i) { return std::to_string(i) + std::to_string(i) + std::to_string(i); }); + auto child3 = strings_col{str_iter, str_iter + num_structs}; + + return structs_col{{child1, child2, child3}}.release(); + }; + + input_columns.emplace_back( + cudf::make_lists_column(static_cast(offsets.size() - 1), + int32s_col(offsets.begin(), offsets.end()).release(), + make_structs_col(), + 0, + rmm::device_buffer{})); + + return write_file(input_columns, + "chunked_read_with_lists_of_structs", + nullable, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }; + + auto const [expected_no_null, filepath_no_null] = generate_input(false); + auto const [expected_with_nulls, filepath_with_nulls] = generate_input(true); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + EXPECT_EQ(num_chunks, 10); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // for these tests, different columns get written to different numbers of pages so it's a + // little tricky to describe the expected results by page counts. To get an idea of how + // these values are chosen, see the debug output from the call to print_cumulative_row_info() in + // reader_impl_preprocess.cu -> find_splits() + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'000'000); + EXPECT_EQ(num_chunks, 7); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'500'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2'000'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 5'000'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'000'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'500'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2'000'000); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 5'000'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } +}