diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 8c7a7a21978..7f107017864 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -81,6 +81,62 @@ class reader { table_with_metadata read(parquet_reader_options const& options); }; +/** + * @brief The reader class that supports iterative reading of a given file. + * + * This class intentionally subclasses the `reader` class with private inheritance to hide the + * `reader::read()` API. As such, only chunked reading APIs are supported. + */ +class chunked_reader : private reader { + public: + /** + * @brief Constructor from a read size limit and an array of data sources with reader options. + * + * The typical usage should be similar to this: + * ``` + * do { + * auto const chunk = reader.read_chunk(); + * // Process chunk + * } while (reader.has_next()); + * + * ``` + * + * If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the + * whole file and return a table containing all rows. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param sources Input `datasource` objects to read the dataset from + * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource to use for device memory allocation + */ + explicit chunked_reader(std::size_t chunk_read_limit, + std::vector>&& sources, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + + /** + * @brief Destructor explicitly-declared to avoid inlined in header. + * + * Since the declaration of the internal `_impl` object does not exist in this header, this + * destructor needs to be defined in a separate source file which can access to that object's + * declaration. + */ + ~chunked_reader(); + + /** + * @copydoc cudf::io::chunked_parquet_reader::has_next + */ + [[nodiscard]] bool has_next() const; + + /** + * @copydoc cudf::io::chunked_parquet_reader::read_chunk + */ + [[nodiscard]] table_with_metadata read_chunk() const; +}; + /** * @brief Class to write parquet dataset data into columns. */ diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index c5425de308c..f3facae098d 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -399,6 +399,74 @@ table_with_metadata read_parquet( parquet_reader_options const& options, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +/** + * @brief The chunked parquet reader class to read Parquet file iteratively in to a series of + * tables, chunk by chunk. + * + * This class is designed to address the reading issue when reading very large Parquet files such + * that the sizes of their column exceed the limit that can be stored in cudf column. By reading the + * file content by chunks using this class, each chunk is guaranteed to have its sizes stay within + * the given limit. + */ +class chunked_parquet_reader { + public: + /** + * @brief Default constructor, this should never be used. + * + * This is added just to satisfy cython. + */ + chunked_parquet_reader() = default; + + /** + * @brief Constructor for chunked reader. + * + * This constructor requires the same `parquet_reader_option` parameter as in + * `cudf::read_parquet()`, and an additional parameter to specify the size byte limit of the + * output table for each reading. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param options The options used to read Parquet file + * @param mr Device memory resource to use for device memory allocation + */ + chunked_parquet_reader( + std::size_t chunk_read_limit, + parquet_reader_options const& options, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + + /** + * @brief Destructor, destroying the internal reader instance. + * + * Since the declaration of the internal `reader` object does not exist in this header, this + * destructor needs to be defined in a separate source file which can access to that object's + * declaration. + */ + ~chunked_parquet_reader(); + + /** + * @brief Check if there is any data in the given file has not yet read. + * + * @return A boolean value indicating if there is any data left to read + */ + [[nodiscard]] bool has_next() const; + + /** + * @brief Read a chunk of rows in the given Parquet file. + * + * The sequence of returned tables, if concatenated by their order, guarantees to form a complete + * dataset as reading the entire given file at once. + * + * An empty table will be returned if the given file is empty, or all the data in the file has + * been read and returned by the previous calls. + * + * @return An output `cudf::table` along with its metadata + */ + [[nodiscard]] table_with_metadata read_chunk() const; + + private: + std::unique_ptr reader; +}; + /** @} */ // end of group /** * @addtogroup io_writers diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index c244a30dc75..1a5a43d2b90 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -501,6 +501,45 @@ std::unique_ptr> write_parquet(parquet_writer_options const return writer->close(options.get_column_chunks_file_paths()); } +/** + * @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader + */ +chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, + parquet_reader_options const& options, + rmm::mr::device_memory_resource* mr) + : reader{std::make_unique(chunk_read_limit, + make_datasources(options.get_source()), + options, + cudf::get_default_stream(), + mr)} +{ +} + +/** + * @copydoc cudf::io::chunked_parquet_reader::~chunked_parquet_reader + */ +chunked_parquet_reader::~chunked_parquet_reader() = default; + +/** + * @copydoc cudf::io::chunked_parquet_reader::has_next + */ +bool chunked_parquet_reader::has_next() const +{ + CUDF_FUNC_RANGE(); + CUDF_EXPECTS(reader != nullptr, "Reader has not been constructed properly."); + return reader->has_next(); +} + +/** + * @copydoc cudf::io::chunked_parquet_reader::read_chunk + */ +table_with_metadata chunked_parquet_reader::read_chunk() const +{ + CUDF_FUNC_RANGE(); + CUDF_EXPECTS(reader != nullptr, "Reader has not been constructed properly."); + return reader->read_chunk(); +} + /** * @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer */ diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index b36826002f4..c580aa5bbc0 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -18,8 +18,10 @@ #include #include +#include #include #include +#include #include #include @@ -52,6 +54,8 @@ namespace io { namespace parquet { namespace gpu { +namespace { + struct page_state_s { const uint8_t* data_start; const uint8_t* data_end; @@ -281,13 +285,18 @@ __device__ void gpuDecodeStream( * 31) * @param[in] t Warp1 thread ID (0..31) * - * @return The new output position + * @return A pair containing the new output position, and the total length of strings decoded (this + * will only be valid on thread 0 and if sizes_only is true) */ -__device__ int gpuDecodeDictionaryIndices(volatile page_state_s* s, int target_pos, int t) +template +__device__ cuda::std::pair gpuDecodeDictionaryIndices(volatile page_state_s* s, + int target_pos, + int t) { const uint8_t* end = s->data_end; int dict_bits = s->dict_bits; int pos = s->dict_pos; + int str_len = 0; while (pos < target_pos) { int is_literal, batch_len; @@ -332,8 +341,11 @@ __device__ int gpuDecodeDictionaryIndices(volatile page_state_s* s, int target_p __syncwarp(); is_literal = shuffle(is_literal); batch_len = shuffle(batch_len); + + // compute dictionary index. + int dict_idx = 0; if (t < batch_len) { - int dict_idx = s->dict_val; + dict_idx = s->dict_val; if (is_literal) { int32_t ofs = (t - ((batch_len + 7) & ~7)) * dict_bits; const uint8_t* p = s->data_start + (ofs >> 3); @@ -353,11 +365,36 @@ __device__ int gpuDecodeDictionaryIndices(volatile page_state_s* s, int target_p dict_idx &= (1 << dict_bits) - 1; } } - s->dict_idx[(pos + t) & (non_zero_buffer_size - 1)] = dict_idx; + + // if we're not computing sizes, store off the dictionary index + if constexpr (!sizes_only) { s->dict_idx[(pos + t) & (non_zero_buffer_size - 1)] = dict_idx; } + } + + // if we're computing sizes, add the length(s) + if constexpr (sizes_only) { + int const len = [&]() { + if (t >= batch_len) { return 0; } + // we may end up decoding more indices than we asked for. so don't include those in the + // size calculation + if (pos + t >= target_pos) { return 0; } + // TODO: refactor this with gpuGetStringData / gpuGetStringSize + uint32_t const dict_pos = (s->dict_bits > 0) ? dict_idx * sizeof(string_index_pair) : 0; + if (target_pos && dict_pos < (uint32_t)s->dict_size) { + const auto* src = reinterpret_cast(s->dict_base + dict_pos); + return src->second; + } + return 0; + }(); + + using WarpReduce = cub::WarpReduce; + __shared__ typename WarpReduce::TempStorage temp_storage; + // note: str_len will only be valid on thread 0. + str_len += WarpReduce(temp_storage).Sum(len); } + pos += batch_len; } - return pos; + return {pos, str_len}; } /** @@ -424,17 +461,20 @@ __device__ int gpuDecodeRleBooleans(volatile page_state_s* s, int target_pos, in } /** - * @brief Parses the length and position of strings + * @brief Parses the length and position of strings and returns total length of all strings + * processed * * @param[in,out] s Page state input/output * @param[in] target_pos Target output position * @param[in] t Thread ID * - * @return The new output position + * @return Total length of strings processed */ -__device__ void gpuInitStringDescriptors(volatile page_state_s* s, int target_pos, int t) +__device__ size_type gpuInitStringDescriptors(volatile page_state_s* s, int target_pos, int t) { - int pos = s->dict_pos; + int pos = s->dict_pos; + int total_len = 0; + // This step is purely serial if (!t) { const uint8_t* cur = s->data_start; @@ -453,21 +493,26 @@ __device__ void gpuInitStringDescriptors(volatile page_state_s* s, int target_po s->dict_idx[pos & (non_zero_buffer_size - 1)] = k; s->str_len[pos & (non_zero_buffer_size - 1)] = len; k += len; + total_len += len; pos++; } s->dict_val = k; __threadfence_block(); } + + return total_len; } /** - * @brief Output a string descriptor + * @brief Retrieves string information for a string at the specified source position * - * @param[in,out] s Page state input/output + * @param[in] s Page state input * @param[in] src_pos Source position - * @param[in] dstv Pointer to row output data (string descriptor or 32-bit hash) + * + * @return A pair containing a pointer to the string and its length */ -inline __device__ void gpuOutputString(volatile page_state_s* s, int src_pos, void* dstv) +inline __device__ cuda::std::pair gpuGetStringData(volatile page_state_s* s, + int src_pos) { const char* ptr = nullptr; size_t len = 0; @@ -490,6 +535,20 @@ inline __device__ void gpuOutputString(volatile page_state_s* s, int src_pos, vo len = s->str_len[src_pos & (non_zero_buffer_size - 1)]; } } + + return {ptr, len}; +} + +/** + * @brief Output a string descriptor + * + * @param[in,out] s Page state input/output + * @param[in] src_pos Source position + * @param[in] dstv Pointer to row output data (string descriptor or 32-bit hash) + */ +inline __device__ void gpuOutputString(volatile page_state_s* s, int src_pos, void* dstv) +{ + auto [ptr, len] = gpuGetStringData(s, src_pos); if (s->dtype_len == 4) { // Output hash. This hash value is used if the option to convert strings to // categoricals is enabled. The seed value is chosen arbitrarily. @@ -818,14 +877,17 @@ static __device__ void gpuOutputGeneric(volatile page_state_s* s, * @param[in, out] s The local page state to be filled in * @param[in] p The global page to be copied from * @param[in] chunks The global list of chunks - * @param[in] num_rows Maximum number of rows to read * @param[in] min_row Crop all rows below min_row + * @param[in] num_rows Maximum number of rows to read + * @param[in] is_decode_step If we are setting up for the decode step (instead of the preprocess + * step) */ static __device__ bool setupLocalPageInfo(page_state_s* const s, PageInfo const* p, device_span chunks, size_t min_row, - size_t num_rows) + size_t num_rows, + bool is_decode_step) { int t = threadIdx.x; int chunk_idx; @@ -926,17 +988,25 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->dtype_len = 8; // Convert to 64-bit timestamp } - // first row within the page to output - if (page_start_row >= min_row) { - s->first_row = 0; - } else { - s->first_row = (int32_t)min(min_row - page_start_row, (size_t)s->page.num_rows); - } - // # of rows within the page to output - s->num_rows = s->page.num_rows; - if ((page_start_row + s->first_row) + s->num_rows > min_row + num_rows) { - s->num_rows = - (int32_t)max((int64_t)(min_row + num_rows - (page_start_row + s->first_row)), INT64_C(0)); + // NOTE: s->page.num_rows, s->col.chunk_row, s->first_row and s->num_rows will be + // invalid/bogus during first pass of the preprocess step for nested types. this is ok + // because we ignore these values in that stage. + { + auto const max_row = min_row + num_rows; + + // if we are totally outside the range of the input, do nothing + if ((page_start_row > max_row) || (page_start_row + s->page.num_rows < min_row)) { + s->first_row = 0; + s->num_rows = 0; + } + // otherwise + else { + s->first_row = page_start_row >= min_row ? 0 : min_row - page_start_row; + auto const max_page_rows = s->page.num_rows - s->first_row; + s->num_rows = (page_start_row + s->first_row) + max_page_rows <= max_row + ? max_page_rows + : max_row - (page_start_row + s->first_row); + } } // during the decoding step we need to offset the global output buffers @@ -944,7 +1014,11 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // is responsible for. // - for flat schemas, we can do this directly by using row counts // - for nested schemas, these offsets are computed during the preprocess step - if (s->col.column_data_base != nullptr) { + // + // NOTE: in a chunked read situation, s->col.column_data_base and s->col.valid_map_base + // will be aliased to memory that has been freed when we get here in the non-decode step, so + // we cannot check against nullptr. we'll just check a flag directly. + if (is_decode_step) { int max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { PageNestingInfo* pni = &s->page.nesting[idx]; @@ -954,12 +1028,13 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (s->col.max_level[level_type::REPETITION] == 0) { output_offset = page_start_row >= min_row ? page_start_row - min_row : 0; } - // for schemas with lists, we've already got the exactly value precomputed + // for schemas with lists, we've already got the exact value precomputed else { output_offset = pni->page_start_value; } pni->data_out = static_cast(s->col.column_data_base[idx]); + if (pni->data_out != nullptr) { // anything below max depth with a valid data pointer must be a list, so the // element size is the size of the offset type. @@ -1036,6 +1111,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->page.skipped_leaf_values = 0; s->input_value_count = 0; s->input_row_count = 0; + s->input_leaf_count = 0; s->row_index_lower_bound = -1; } @@ -1064,13 +1140,14 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // if we're in the decoding step, jump directly to the first // value we care about - if (s->col.column_data_base != nullptr) { + if (is_decode_step) { s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0; } else { - s->input_value_count = 0; - s->input_leaf_count = 0; - s->page.skipped_values = -1; - s->page.skipped_leaf_values = -1; + s->input_value_count = 0; + s->input_leaf_count = 0; + s->page.skipped_values = + -1; // magic number to indicate it hasn't been set for use inside UpdatePageSizes + s->page.skipped_leaf_values = 0; } } @@ -1397,7 +1474,7 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, bool bounds_set) { // max nesting depth of the column - int max_depth = s->col.max_nesting_depth; + int const max_depth = s->col.max_nesting_depth; // how many input level values we've processed in the page so far int input_value_count = s->input_value_count; // how many leaf values we've processed in the page so far @@ -1411,11 +1488,10 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // count rows and leaf values - int is_new_row = start_depth == 0 ? 1 : 0; - uint32_t warp_row_count_mask = ballot(is_new_row); - int is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; - uint32_t warp_leaf_count_mask = ballot(is_new_leaf); - + int const is_new_row = start_depth == 0 ? 1 : 0; + uint32_t const warp_row_count_mask = ballot(is_new_row); + int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; + uint32_t const warp_leaf_count_mask = ballot(is_new_leaf); // is this thread within row bounds? on the first pass we don't know the bounds, so we will be // computing the full size of the column. on the second pass, we will know our actual row // bounds, so the computation will cap sizes properly. @@ -1429,8 +1505,8 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, ? 1 : 0; - uint32_t row_bounds_mask = ballot(in_row_bounds); - int first_thread_in_range = __ffs(row_bounds_mask) - 1; + uint32_t const row_bounds_mask = ballot(in_row_bounds); + int const first_thread_in_range = __ffs(row_bounds_mask) - 1; // if we've found the beginning of the first row, mark down the position // in the def/repetition buffer (skipped_values) and the data buffer (skipped_leaf_values) @@ -1443,13 +1519,15 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, } } - // increment counts across all nesting depths + // increment value counts across all nesting depths for (int s_idx = 0; s_idx < max_depth; s_idx++) { - // if we are within the range of nesting levels we should be adding value indices for - int in_nesting_bounds = (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; + PageNestingInfo* pni = &s->page.nesting[s_idx]; - uint32_t count_mask = ballot(in_nesting_bounds); - if (!t) { s->page.nesting[s_idx].size += __popc(count_mask); } + // if we are within the range of nesting levels we should be adding value indices for + int const in_nesting_bounds = + (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; + uint32_t const count_mask = ballot(in_nesting_bounds); + if (!t) { pni->batch_size += __popc(count_mask); } } input_value_count += min(32, (target_input_value_count - input_value_count)); @@ -1465,6 +1543,21 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, } } +__device__ size_type gpuGetStringSize(page_state_s* s, int target_count, int t) +{ + auto dict_target_pos = target_count; + size_type str_len = 0; + if (s->dict_base) { + auto const [new_target_pos, len] = gpuDecodeDictionaryIndices(s, target_count, t); + dict_target_pos = new_target_pos; + str_len = len; + } else if ((s->col.data_type & 7) == BYTE_ARRAY) { + str_len = gpuInitStringDescriptors(s, target_count, t); + } + if (!t) { *(volatile int32_t*)&s->dict_pos = dict_target_pos; } + return str_len; +} + /** * @brief Kernel for computing per-page column size information for all nesting levels. * @@ -1473,17 +1566,20 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, * @param pages List of pages * @param chunks List of column chunks * @param min_row Row index to start reading at - * @param num_rows Maximum number of rows to read. Pass as INT_MAX to guarantee reading all rows. - * @param trim_pass Whether or not this is the trim pass. We first have to compute + * @param num_rows Maximum number of rows to read. Pass as INT_MAX to guarantee reading all rows + * @param is_base_pass Whether or not this is the base pass. We first have to compute * the full size information of every page before we come through in a second (trim) pass - * to determine what subset of rows in this page we should be reading. + * to determine what subset of rows in this page we should be reading + * @param compute_string_sizes Whether or not we should be computing string sizes + * (PageInfo::str_bytes) as part of the pass */ __global__ void __launch_bounds__(block_size) gpuComputePageSizes(PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows, - bool trim_pass) + bool is_base_pass, + bool compute_string_sizes) { __shared__ __align__(16) page_state_s state_g; @@ -1492,34 +1588,81 @@ __global__ void __launch_bounds__(block_size) int t = threadIdx.x; PageInfo* pp = &pages[page_idx]; + if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, false)) { return; } + + if (!t) { + s->page.skipped_values = -1; + s->page.skipped_leaf_values = 0; + s->page.str_bytes = 0; + s->input_row_count = 0; + s->input_value_count = 0; + + // in the base pass, we're computing the number of rows, make sure we visit absolutely + // everything + if (is_base_pass) { + s->first_row = 0; + s->num_rows = INT_MAX; + s->row_index_lower_bound = -1; + } + } + // we only need to preprocess hierarchies with repetition in them (ie, hierarchies // containing lists anywhere within). bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; - if (!has_repetition) { return; } + compute_string_sizes = + compute_string_sizes && ((s->col.data_type & 7) == BYTE_ARRAY && s->dtype_len != 4); + + // various early out optimizations: + + // - if this is a flat hierarchy (no lists) and is not a string column. in this case we don't need + // to do + // the expensive work of traversing the level data to determine sizes. we can just compute it + // directly. + if (!has_repetition && !compute_string_sizes) { + int d = 0; + while (d < s->page.num_nesting_levels) { + auto const i = d + t; + if (i < s->page.num_nesting_levels) { + if (is_base_pass) { pp->nesting[i].size = pp->num_input_values; } + pp->nesting[i].batch_size = pp->num_input_values; + } + d += blockDim.x; + } + return; + } + + // - if this page is not at the beginning or end of the trim bounds, the batch size is + // the full page size + if (!is_base_pass && s->num_rows == s->page.num_rows) { + int d = 0; + while (d < s->page.num_nesting_levels) { + auto const i = d + t; + if (i < s->page.num_nesting_levels) { pp->nesting[i].batch_size = pp->nesting[i].size; } + d += blockDim.x; + } + return; + } - if (!setupLocalPageInfo(s, pp, chunks, trim_pass ? min_row : 0, trim_pass ? num_rows : INT_MAX)) { + // - if this page is completely trimmed, zero out sizes. + if (!is_base_pass && s->num_rows == 0) { + int d = 0; + while (d < s->page.num_nesting_levels) { + auto const i = d + t; + if (i < s->page.num_nesting_levels) { pp->nesting[i].batch_size = 0; } + d += blockDim.x; + } return; } + // at this point we are going to be fully recomputing batch information + // zero sizes int d = 0; while (d < s->page.num_nesting_levels) { - if (d + t < s->page.num_nesting_levels) { s->page.nesting[d + t].size = 0; } + if (d + t < s->page.num_nesting_levels) { s->page.nesting[d + t].batch_size = 0; } d += blockDim.x; } - if (!t) { - s->page.skipped_values = -1; - s->page.skipped_leaf_values = -1; - s->input_row_count = 0; - s->input_value_count = 0; - // if this isn't the trim pass, make sure we visit absolutely everything - if (!trim_pass) { - s->first_row = 0; - s->num_rows = INT_MAX; - s->row_index_lower_bound = -1; - } - } __syncthreads(); // optimization : it might be useful to have a version of gpuDecodeStream that could go wider than @@ -1532,25 +1675,51 @@ __global__ void __launch_bounds__(block_size) while (!s->error && s->input_value_count < s->num_input_values) { // decode repetition and definition levels. these will attempt to decode at // least up to the target, but may decode a few more. - gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION); + if (has_repetition) { + gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION); + } gpuDecodeStream(s->def, s, target_input_count, t, level_type::DEFINITION); __syncwarp(); // we may have decoded different amounts from each stream, so only process what we've been - int actual_input_count = - min(s->lvl_count[level_type::REPETITION], s->lvl_count[level_type::DEFINITION]); + int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], + s->lvl_count[level_type::DEFINITION]) + : s->lvl_count[level_type::DEFINITION]; // process what we got back - gpuUpdatePageSizes(s, actual_input_count, t, trim_pass); + gpuUpdatePageSizes(s, actual_input_count, t, !is_base_pass); + if (compute_string_sizes) { + auto const str_len = gpuGetStringSize(s, s->input_leaf_count, t); + if (!t) { s->page.str_bytes += str_len; } + } + target_input_count = actual_input_count + batch_size; __syncwarp(); } } - // update # rows in the actual page + + // update output results: + // - real number of rows for the whole page + // - nesting sizes for the whole page + // - skipped value information for trimmed pages + // - string bytes + if (is_base_pass) { + // nesting level 0 is the root column, so the size is also the # of rows + if (!t) { pp->num_rows = s->page.nesting[0].batch_size; } + + // store off this batch size as the "full" size + int d = 0; + while (d < s->page.num_nesting_levels) { + auto const i = d + t; + if (i < s->page.num_nesting_levels) { pp->nesting[i].size = pp->nesting[i].batch_size; } + d += blockDim.x; + } + } + if (!t) { - pp->num_rows = s->page.nesting[0].size; pp->skipped_values = s->page.skipped_values; pp->skipped_leaf_values = s->page.skipped_leaf_values; + pp->str_bytes = s->page.str_bytes; } } @@ -1577,7 +1746,10 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( int t = threadIdx.x; int out_thread0; - if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows)) { return; } + if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows, true)) { return; } + + // if we have no rows to do (eg, in a skip_rows/num_rows case) + if (s->num_rows == 0) { return; } if (s->dict_base) { out_thread0 = (s->dict_bits > 0) ? 64 : 32; @@ -1614,7 +1786,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( // WARP1: Decode dictionary indices, booleans or string positions if (s->dict_base) { - src_target_pos = gpuDecodeDictionaryIndices(s, src_target_pos, t & 0x1f); + src_target_pos = gpuDecodeDictionaryIndices(s, src_target_pos, t & 0x1f).first; } else if ((s->col.data_type & 7) == BOOLEAN) { src_target_pos = gpuDecodeRleBooleans(s, src_target_pos, t & 0x1f); } else if ((s->col.data_type & 7) == BYTE_ARRAY) { @@ -1701,71 +1873,18 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( } } -struct chunk_row_output_iter { - PageInfo* p; - using value_type = size_type; - using difference_type = size_type; - using pointer = size_type*; - using reference = size_type&; - using iterator_category = thrust::output_device_iterator_tag; - - __host__ __device__ chunk_row_output_iter operator+(int i) - { - return chunk_row_output_iter{p + i}; - } - - __host__ __device__ void operator++() { p++; } - - __device__ reference operator[](int i) { return p[i].chunk_row; } - __device__ reference operator*() { return p->chunk_row; } - __device__ void operator=(value_type v) { p->chunk_row = v; } -}; - -struct start_offset_output_iterator { - PageInfo* pages; - int* page_indices; - int cur_index; - int src_col_schema; - int nesting_depth; - int empty = 0; - using value_type = size_type; - using difference_type = size_type; - using pointer = size_type*; - using reference = size_type&; - using iterator_category = thrust::output_device_iterator_tag; - - __host__ __device__ start_offset_output_iterator operator+(int i) - { - return start_offset_output_iterator{ - pages, page_indices, cur_index + i, src_col_schema, nesting_depth}; - } - - __host__ __device__ void operator++() { cur_index++; } - - __device__ reference operator[](int i) { return dereference(cur_index + i); } - __device__ reference operator*() { return dereference(cur_index); } - - private: - __device__ reference dereference(int index) - { - PageInfo const& p = pages[page_indices[index]]; - if (p.src_col_schema != src_col_schema || p.flags & PAGEINFO_FLAGS_DICTIONARY) { return empty; } - return p.nesting[nesting_depth].page_start_value; - } -}; +} // anonymous namespace /** - * @copydoc cudf::io::parquet::gpu::PreprocessColumnData + * @copydoc cudf::io::parquet::gpu::ComputePageSizes */ -void PreprocessColumnData(hostdevice_vector& pages, - hostdevice_vector const& chunks, - std::vector& input_columns, - std::vector& output_columns, - size_t num_rows, - size_t min_row, - bool uses_custom_row_bounds, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +void ComputePageSizes(hostdevice_vector& pages, + hostdevice_vector const& chunks, + size_t min_row, + size_t num_rows, + bool compute_num_rows, + bool compute_string_sizes, + rmm::cuda_stream_view stream) { dim3 dim_block(block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page @@ -1776,124 +1895,7 @@ void PreprocessColumnData(hostdevice_vector& pages, // If uses_custom_row_bounds is set to true, we have to do a second pass later that "trims" // the starting and ending read values to account for these bounds. gpuComputePageSizes<<>>( - pages.device_ptr(), - chunks, - // if uses_custom_row_bounds is false, include all possible rows. - uses_custom_row_bounds ? min_row : 0, - uses_custom_row_bounds ? num_rows : INT_MAX, - !uses_custom_row_bounds); - - // computes: - // PageInfo::chunk_row for all pages - // Note: this is doing some redundant work for pages in flat hierarchies. chunk_row has already - // been computed during header decoding. the overall amount of work here is very small though. - auto key_input = thrust::make_transform_iterator( - pages.device_ptr(), [] __device__(PageInfo const& page) { return page.chunk_idx; }); - auto page_input = thrust::make_transform_iterator( - pages.device_ptr(), [] __device__(PageInfo const& page) { return page.num_rows; }); - thrust::exclusive_scan_by_key(rmm::exec_policy(stream), - key_input, - key_input + pages.size(), - page_input, - chunk_row_output_iter{pages.device_ptr()}); - - // computes: - // PageNestingInfo::size for each level of nesting, for each page, taking row bounds into account. - // PageInfo::skipped_values, which tells us where to start decoding in the input . - // It is only necessary to do this second pass if uses_custom_row_bounds is set (if the user has - // specified artifical bounds). - if (uses_custom_row_bounds) { - gpuComputePageSizes<<>>( - pages.device_ptr(), chunks, min_row, num_rows, true); - } - - // ordering of pages is by input column schema, repeated across row groups. so - // if we had 3 columns, each with 2 pages, and 1 row group, our schema values might look like - // - // 1, 1, 2, 2, 3, 3 - // - // However, if we had more than one row group, the pattern would be - // - // 1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3 - // ^ row group 0 | - // ^ row group 1 - // - // To use exclusive_scan_by_key, the ordering we actually want is - // - // 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 - // - // We also need to preserve key-relative page ordering, so we need to use a stable sort. - rmm::device_uvector page_keys(pages.size(), stream); - rmm::device_uvector page_index(pages.size(), stream); - { - thrust::transform(rmm::exec_policy(stream), - pages.device_ptr(), - pages.device_ptr() + pages.size(), - page_keys.begin(), - [] __device__(PageInfo const& page) { return page.src_col_schema; }); - - thrust::sequence(rmm::exec_policy(stream), page_index.begin(), page_index.end()); - thrust::stable_sort_by_key(rmm::exec_policy(stream), - page_keys.begin(), - page_keys.end(), - page_index.begin(), - thrust::less()); - } - - // compute output column sizes by examining the pages of the -input- columns - for (size_t idx = 0; idx < input_columns.size(); idx++) { - auto const& input_col = input_columns[idx]; - auto src_col_schema = input_col.schema_idx; - size_t max_depth = input_col.nesting_depth(); - - auto* cols = &output_columns; - for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { - auto& out_buf = (*cols)[input_col.nesting[l_idx]]; - cols = &out_buf.children; - - // size iterator. indexes pages by sorted order - auto size_input = thrust::make_transform_iterator( - page_index.begin(), - [src_col_schema, l_idx, pages = pages.device_ptr()] __device__(int index) { - auto const& page = pages[index]; - if (page.src_col_schema != src_col_schema || page.flags & PAGEINFO_FLAGS_DICTIONARY) { - return 0; - } - return page.nesting[l_idx].size; - }); - - // if this buffer is part of a list hierarchy, we need to determine it's - // final size and allocate it here. - // - // for struct columns, higher levels of the output columns are shared between input - // columns. so don't compute any given level more than once. - if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { - int size = thrust::reduce(rmm::exec_policy(stream), size_input, size_input + pages.size()); - - // if this is a list column add 1 for non-leaf levels for the terminating offset - if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } - - // allocate - out_buf.create(size, stream, mr); - } - - // for nested hierarchies, compute per-page start offset - if (input_col.has_repetition) { - thrust::exclusive_scan_by_key(rmm::exec_policy(stream), - page_keys.begin(), - page_keys.end(), - size_input, - start_offset_output_iterator{pages.device_ptr(), - page_index.begin(), - 0, - static_cast(src_col_schema), - static_cast(l_idx)}); - } - } - } - - // retrieve pages back - pages.device_to_host(stream); + pages.device_ptr(), chunks, min_row, num_rows, compute_num_rows, compute_string_sizes); } /** @@ -1905,6 +1907,8 @@ void __host__ DecodePageData(hostdevice_vector& pages, size_t min_row, rmm::cuda_stream_view stream) { + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); + dim3 dim_block(block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 19922bf7022..ffb4cb60a20 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -367,6 +367,7 @@ __global__ void __launch_bounds__(128) // definition levels bs->page.chunk_row = 0; bs->page.num_rows = 0; + bs->page.str_bytes = 0; } num_values = bs->ck.num_values; page_info = bs->ck.page_info; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 7849e05eb68..ccf4b056ae8 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -95,10 +95,13 @@ struct PageNestingInfo { // set at initialization int32_t max_def_level; int32_t max_rep_level; + cudf::type_id type; // type of the corresponding cudf output column + bool nullable; // set during preprocessing int32_t size; // this page/nesting-level's row count contribution to the output column, if fully // decoded + int32_t batch_size; // the size of the page for this batch int32_t page_start_value; // absolute output start index in output column data // set during data decoding @@ -152,6 +155,9 @@ struct PageInfo { int skipped_values; // # of values skipped in the actual data stream. int skipped_leaf_values; + // for string columns only, the size of all the chars in the string for + // this page. only valid/computed during the base preprocess pass + int32_t str_bytes; // nesting information (input/output) for each page int num_nesting_levels; @@ -238,7 +244,7 @@ struct ColumnChunkDesc { }; /** - * @brief The struct to store raw/intermediate file data before parsing. + * @brief Struct to store raw/intermediate file data before parsing. */ struct file_intermediate_data { std::vector> raw_page_data; @@ -248,6 +254,23 @@ struct file_intermediate_data { hostdevice_vector page_nesting_info{}; }; +/** + * @brief Struct to store intermediate page data for parsing each chunk of rows in chunked reading. + */ +struct chunk_intermediate_data { + rmm::device_uvector page_keys{0, rmm::cuda_stream_default}; + rmm::device_uvector page_index{0, rmm::cuda_stream_default}; + rmm::device_uvector str_dict_index{0, rmm::cuda_stream_default}; +}; + +/** + * @brief Structs to identify the reading row range for each chunk of rows in chunked reading. + */ +struct chunk_read_info { + size_t skip_rows; + size_t num_rows; +}; + /** * @brief Struct describing an encoder column */ @@ -378,35 +401,35 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks, rmm::cuda_stream_view stream); /** - * @brief Preprocess column information for nested schemas. + * @brief Compute page output size information. * - * There are several pieces of information we can't compute directly from row counts in - * the parquet headers when dealing with nested schemas. - * - The total sizes of all output columns at all nesting levels - * - The starting output buffer offset for each page, for each nesting level - * For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders) + * When dealing with nested hierarchies (those that contain lists), or when doing a chunked + * read, we need to obtain more information up front than we have with just the row counts. + * + * - We need to determine the sizes of each output cudf column per page + * - We need to determine information about where to start decoding the value stream + * if we are using custom user bounds (skip_rows / num_rows) + * - We need to determine actual number of top level rows per page + * - If we are doing a chunked read, we need to determine the total string size per page * - * Note : this function is where output device memory is allocated for nested columns. * * @param pages All pages to be decoded * @param chunks All chunks to be decoded - * @param input_columns Input column information - * @param output_columns Output column information * @param num_rows Maximum number of rows to read * @param min_rows crop all rows below min_row - * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific - * bounds - * @param stream Cuda stream + * @param compute_num_rows If set to true, the num_rows field in PageInfo will be + * computed + * @param compute_string_sizes If set to true, the str_bytes field in PageInfo will + * be computed + * @param stream CUDA stream to use, default 0 */ -void PreprocessColumnData(hostdevice_vector& pages, - hostdevice_vector const& chunks, - std::vector& input_columns, - std::vector& output_columns, - size_t num_rows, - size_t min_row, - bool uses_custom_row_bounds, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); +void ComputePageSizes(hostdevice_vector& pages, + hostdevice_vector const& chunks, + size_t num_rows, + size_t min_row, + bool compute_num_rows, + bool compute_string_sizes, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for reading the column data stored in the pages diff --git a/cpp/src/io/parquet/reader.cpp b/cpp/src/io/parquet/reader.cpp index 6be6987b7cb..1321e8073d7 100644 --- a/cpp/src/io/parquet/reader.cpp +++ b/cpp/src/io/parquet/reader.cpp @@ -40,4 +40,19 @@ table_with_metadata reader::read(parquet_reader_options const& options) options.get_row_groups()); } +chunked_reader::chunked_reader(std::size_t chunk_read_limit, + std::vector>&& sources, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + _impl = std::make_unique(chunk_read_limit, std::move(sources), options, stream, mr); +} + +chunked_reader::~chunked_reader() = default; + +bool chunked_reader::has_next() const { return _impl->has_next(); } + +table_with_metadata chunked_reader::read_chunk() const { return _impl->read_chunk(); } + } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index a61f63f6645..84d8cfc273f 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -28,22 +28,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) auto& pages = _file_itm_data.pages_info; auto& page_nesting = _file_itm_data.page_nesting_info; - auto is_dict_chunk = [](const gpu::ColumnChunkDesc& chunk) { - return (chunk.data_type & 0x7) == BYTE_ARRAY && chunk.num_dict_pages > 0; - }; - - // Count the number of string dictionary entries - // NOTE: Assumes first page in the chunk is always the dictionary page - size_t total_str_dict_indexes = 0; - for (size_t c = 0, page_count = 0; c < chunks.size(); c++) { - if (is_dict_chunk(chunks[c])) { total_str_dict_indexes += pages[page_count].num_input_values; } - page_count += chunks[c].max_num_pages; - } - - // Build index for string dictionaries since they can't be indexed - // directly due to variable-sized elements - auto str_dict_index = cudf::detail::make_zeroed_device_uvector_async( - total_str_dict_indexes, _stream); + // Should not reach here if there is no page data. + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); size_t const sum_max_depths = std::accumulate( chunks.begin(), chunks.end(), 0, [&](size_t cursum, gpu::ColumnChunkDesc const& chunk) { @@ -58,16 +44,11 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) auto chunk_offsets = std::vector(); // Update chunks with pointers to column data. - for (size_t c = 0, page_count = 0, str_ofs = 0, chunk_off = 0; c < chunks.size(); c++) { + for (size_t c = 0, page_count = 0, chunk_off = 0; c < chunks.size(); c++) { input_column_info const& input_col = _input_columns[chunks[c].src_col_index]; CUDF_EXPECTS(input_col.schema_idx == chunks[c].src_col_schema, "Column/page schema index mismatch"); - if (is_dict_chunk(chunks[c])) { - chunks[c].str_dict_index = str_dict_index.data() + str_ofs; - str_ofs += pages[page_count].num_input_values; - } - size_t max_depth = _metadata->get_output_nesting_depth(chunks[c].src_col_schema); chunk_offsets.push_back(chunk_off); @@ -139,18 +120,15 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) chunk_nested_valids.host_to_device(_stream); chunk_nested_data.host_to_device(_stream); - if (total_str_dict_indexes > 0) { - gpu::BuildStringDictionaryIndex(chunks.device_ptr(), chunks.size(), _stream); - } - gpu::DecodePageData(pages, chunks, num_rows, skip_rows, _stream); + pages.device_to_host(_stream); page_nesting.device_to_host(_stream); _stream.synchronize(); // for list columns, add the final offset to every offset buffer. // TODO : make this happen in more efficiently. Maybe use thrust::for_each - // on each buffer. Or potentially do it in PreprocessColumnData + // on each buffer. // Note : the reason we are doing this here instead of in the decode kernel is // that it is difficult/impossible for a given page to know that it is writing the very // last value that should then be followed by a terminator (because rows can span @@ -211,7 +189,20 @@ reader::impl::impl(std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : _stream(stream), _mr(mr), _sources(std::move(sources)) + : impl(0 /*chunk_read_limit*/, + std::forward>>(sources), + options, + stream, + mr) +{ +} + +reader::impl::impl(std::size_t chunk_read_limit, + std::vector>&& sources, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + : _stream{stream}, _mr{mr}, _sources{std::move(sources)}, _chunk_read_limit{chunk_read_limit} { // Open and parse the source dataset metadata _metadata = std::make_unique(_sources); @@ -233,6 +224,14 @@ reader::impl::impl(std::vector>&& sources, options.is_enabled_use_pandas_metadata(), _strings_to_categorical, _timestamp_type.id()); + + // Save the states of the output buffers for reuse in `chunk_read()`. + // Don't need to do it if we read the file all at once. + if (_chunk_read_limit > 0) { + for (auto const& buff : _output_buffers) { + _output_buffers_template.emplace_back(column_buffer::empty_like(buff)); + } + } } void reader::impl::prepare_data(size_type skip_rows, @@ -240,39 +239,61 @@ void reader::impl::prepare_data(size_type skip_rows, bool uses_custom_row_bounds, host_span const> row_group_indices) { + if (_file_preprocessed) { return; } + const auto [skip_rows_corrected, num_rows_corrected, row_groups_info] = _metadata->select_row_groups(row_group_indices, skip_rows, num_rows); - _skip_rows = skip_rows_corrected; - _num_rows = num_rows_corrected; if (num_rows_corrected > 0 && row_groups_info.size() != 0 && _input_columns.size() != 0) { load_and_decompress_data(row_groups_info, num_rows_corrected); + preprocess_pages( + skip_rows_corrected, num_rows_corrected, uses_custom_row_bounds, _chunk_read_limit); + + if (_chunk_read_limit == 0) { // read the whole file at once + CUDF_EXPECTS(_chunk_read_info.size() == 1, + "Reading the whole file should yield only one chunk."); + } } + + _file_preprocessed = true; } table_with_metadata reader::impl::read_chunk_internal(bool uses_custom_row_bounds) { - auto out_metadata = table_metadata{}; + // If `_output_metadata` has been constructed, just copy it over. + auto out_metadata = _output_metadata ? table_metadata{*_output_metadata} : table_metadata{}; // output cudf columns as determined by the top level schema auto out_columns = std::vector>{}; out_columns.reserve(_output_buffers.size()); - if (_num_rows == 0) { return finalize_output(out_metadata, out_columns); } + if (!has_next() || _chunk_read_info.size() == 0) { + return finalize_output(out_metadata, out_columns); + } - allocate_columns(_skip_rows, _num_rows, uses_custom_row_bounds); + auto const& read_info = _chunk_read_info[_current_read_chunk++]; - decode_page_data(_skip_rows, _num_rows); + // Allocate memory buffers for the output columns. + allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds); - // Create the final output cudf columns + // Parse data into the output buffers. + decode_page_data(read_info.skip_rows, read_info.num_rows); + + // Create the final output cudf columns. for (size_t i = 0; i < _output_buffers.size(); ++i) { - auto const metadata = _reader_column_schema.has_value() - ? std::make_optional((*_reader_column_schema)[i]) - : std::nullopt; - column_name_info& col_name = out_metadata.schema_info.emplace_back(""); - out_columns.emplace_back(make_column(_output_buffers[i], &col_name, metadata, _stream, _mr)); + auto const metadata = _reader_column_schema.has_value() + ? std::make_optional((*_reader_column_schema)[i]) + : std::nullopt; + // Only construct `out_metadata` if `_output_metadata` has not been cached. + if (!_output_metadata) { + column_name_info& col_name = out_metadata.schema_info.emplace_back(""); + out_columns.emplace_back(make_column(_output_buffers[i], &col_name, metadata, _stream, _mr)); + } else { + out_columns.emplace_back(make_column(_output_buffers[i], nullptr, metadata, _stream, _mr)); + } } + // Add empty columns if needed. return finalize_output(out_metadata, out_columns); } @@ -281,21 +302,30 @@ table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata, { // Create empty columns as needed (this can happen if we've ended up with no actual data to read) for (size_t i = out_columns.size(); i < _output_buffers.size(); ++i) { - column_name_info& col_name = out_metadata.schema_info.emplace_back(""); - out_columns.emplace_back(io::detail::empty_like(_output_buffers[i], &col_name, _stream, _mr)); + if (!_output_metadata) { + column_name_info& col_name = out_metadata.schema_info.emplace_back(""); + out_columns.emplace_back(io::detail::empty_like(_output_buffers[i], &col_name, _stream, _mr)); + } else { + out_columns.emplace_back(io::detail::empty_like(_output_buffers[i], nullptr, _stream, _mr)); + } } - // Return column names (must match order of returned columns) - out_metadata.column_names.resize(_output_buffers.size()); - for (size_t i = 0; i < _output_column_schemas.size(); i++) { - auto const& schema = _metadata->get_schema(_output_column_schemas[i]); - out_metadata.column_names[i] = schema.name; - } + if (!_output_metadata) { + // Return column names (must match order of returned columns) + out_metadata.column_names.resize(_output_buffers.size()); + for (size_t i = 0; i < _output_column_schemas.size(); i++) { + auto const& schema = _metadata->get_schema(_output_column_schemas[i]); + out_metadata.column_names[i] = schema.name; + } - // Return user metadata - out_metadata.per_file_user_data = _metadata->get_key_value_metadata(); - out_metadata.user_data = {out_metadata.per_file_user_data[0].begin(), - out_metadata.per_file_user_data[0].end()}; + // Return user metadata + out_metadata.per_file_user_data = _metadata->get_key_value_metadata(); + out_metadata.user_data = {out_metadata.per_file_user_data[0].begin(), + out_metadata.per_file_user_data[0].end()}; + + // Finally, save the output table metadata into `_output_metadata` for reuse next time. + _output_metadata = std::make_unique(out_metadata); + } return {std::make_unique(std::move(out_columns)), std::move(out_metadata)}; } @@ -305,8 +335,36 @@ table_with_metadata reader::impl::read(size_type skip_rows, bool uses_custom_row_bounds, host_span const> row_group_indices) { + CUDF_EXPECTS(_chunk_read_limit == 0, "Reading the whole file must not have non-zero byte_limit."); prepare_data(skip_rows, num_rows, uses_custom_row_bounds, row_group_indices); return read_chunk_internal(uses_custom_row_bounds); } +table_with_metadata reader::impl::read_chunk() +{ + // Reset the output buffers to their original states (right after reader construction). + // Don't need to do it if we read the file all at once. + if (_chunk_read_limit > 0) { + _output_buffers.resize(0); + for (auto const& buff : _output_buffers_template) { + _output_buffers.emplace_back(column_buffer::empty_like(buff)); + } + } + + prepare_data(0 /*skip_rows*/, + -1 /*num_rows, `-1` means unlimited*/, + true /*uses_custom_row_bounds*/, + {} /*row_group_indices, empty means read all row groups*/); + return read_chunk_internal(true); +} + +bool reader::impl::has_next() +{ + prepare_data(0 /*skip_rows*/, + -1 /*num_rows, `-1` means unlimited*/, + true /*uses_custom_row_bounds*/, + {} /*row_group_indices, empty means read all row groups*/); + return _current_read_chunk < _chunk_read_info.size(); +} + } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index b53487c824b..6d42e9fab84 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -38,7 +38,6 @@ #include namespace cudf::io::detail::parquet { - /** * @brief Implementation for Parquet reader */ @@ -47,6 +46,9 @@ class reader::impl { /** * @brief Constructor from an array of dataset sources with reader options. * + * By using this constructor, each call to `read()` or `read_chunk()` will perform reading the + * entire given file. + * * @param sources Dataset sources * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches @@ -73,6 +75,46 @@ class reader::impl { bool uses_custom_row_bounds, host_span const> row_group_indices); + /** + * @brief Constructor from a chunk read limit and an array of dataset sources with reader options. + * + * By using this constructor, the reader will support iterative (chunked) reading through + * `has_next() ` and `read_chunk()`. For example: + * ``` + * do { + * auto const chunk = reader.read_chunk(); + * // Process chunk + * } while (reader.has_next()); + * + * ``` + * + * Reading the whole given file at once through `read()` function is still supported if + * `chunk_read_limit == 0` (i.e., no reading limit). + * In such case, `read_chunk()` will also return rows of the entire file. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param sources Dataset sources + * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to use for device memory allocation + */ + explicit impl(std::size_t chunk_read_limit, + std::vector>&& sources, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + + /** + * @copydoc cudf::io::chunked_parquet_reader::has_next + */ + bool has_next(); + + /** + * @copydoc cudf::io::chunked_parquet_reader::read_chunk + */ + table_with_metadata read_chunk(); + private: /** * @brief Perform the necessary data preprocessing for parsing file later on. @@ -94,6 +136,29 @@ class reader::impl { void load_and_decompress_data(std::vector const& row_groups_info, size_type num_rows); + /** + * @brief Perform some preprocessing for page data and also compute the split locations + * {skip_rows, num_rows} for chunked reading. + * + * There are several pieces of information we can't compute directly from row counts in + * the parquet headers when dealing with nested schemas: + * - The total sizes of all output columns at all nesting levels + * - The starting output buffer offset for each page, for each nesting level + * + * For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders). + * + * @param skip_rows Crop all rows below skip_rows + * @param num_rows Maximum number of rows to read + * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific + * bounds + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + */ + void preprocess_pages(size_t skip_rows, + size_t num_rows, + bool uses_custom_row_bounds, + size_t chunk_read_limit); + /** * @brief Allocate nesting information storage for all pages and set pointers to it. * @@ -158,17 +223,26 @@ class reader::impl { // Buffers for generating output columns std::vector _output_buffers; + // Buffers copied from `_output_buffers` after construction for reuse + std::vector _output_buffers_template; + // _output_buffers associated schema indices std::vector _output_column_schemas; + // _output_buffers associated metadata + std::unique_ptr _output_metadata; + bool _strings_to_categorical = false; std::optional> _reader_column_schema; data_type _timestamp_type{type_id::EMPTY}; + // Variables used for chunked reading: cudf::io::parquet::gpu::file_intermediate_data _file_itm_data; - - size_type _skip_rows{0}; - size_type _num_rows{0}; + cudf::io::parquet::gpu::chunk_intermediate_data _chunk_itm_data; + std::vector _chunk_read_info; + std::size_t _chunk_read_limit{0}; + std::size_t _current_read_chunk{0}; + bool _file_preprocessed{false}; }; } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index ca2009d3c74..38fce7d3263 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -20,17 +20,30 @@ #include #include +#include +#include #include #include +#include #include +#include +#include +#include +#include +#include #include +#include +#include +#include +#include +#include +#include #include namespace cudf::io::detail::parquet { - namespace { /** @@ -157,7 +170,7 @@ void generate_depth_remappings(std::map, std::ve } /** - * @brief Function that returns the required the number of bits to store a value + * @brief Return the required number of bits to store a value. */ template [[nodiscard]] T required_bits(uint32_t max_level) @@ -197,7 +210,7 @@ template } /** - * @brief Reads compressed page data to device memory + * @brief Reads compressed page data to device memory. * * @param sources Dataset sources * @param page_data Buffers to hold compressed page data for each chunk @@ -606,6 +619,9 @@ void reader::impl::allocate_nesting_info() pni[cur_depth].max_def_level = cur_schema.max_definition_level; pni[cur_depth].max_rep_level = cur_schema.max_repetition_level; pni[cur_depth].size = 0; + pni[cur_depth].type = + to_type_id(cur_schema, _strings_to_categorical, _timestamp_type.id()); + pni[cur_depth].nullable = cur_schema.repetition_type == OPTIONAL; } // move up the hierarchy @@ -721,6 +737,7 @@ void reader::impl::load_and_decompress_data(std::vector const& r for (auto& task : read_rowgroup_tasks) { task.wait(); } + CUDF_EXPECTS(remaining_rows <= 0, "All rows data must be read."); // Process dataset chunk pages into output columns @@ -762,11 +779,669 @@ void reader::impl::load_and_decompress_data(std::vector const& r } } -void reader::impl::allocate_columns(size_t min_row, size_t total_rows, bool uses_custom_row_bounds) +namespace { + +struct cumulative_row_info { + size_t row_count; // cumulative row count + size_t size_bytes; // cumulative size in bytes + int key; // schema index +}; + +#if defined(PREPROCESS_DEBUG) +void print_pages(hostdevice_vector& pages, rmm::cuda_stream_view _stream) +{ + pages.device_to_host(_stream, true); + for (size_t idx = 0; idx < pages.size(); idx++) { + auto const& p = pages[idx]; + // skip dictionary pages + if (p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { continue; } + printf( + "P(%lu, s:%d): chunk_row(%d), num_rows(%d), skipped_values(%d), skipped_leaf_values(%d)\n", + idx, + p.src_col_schema, + p.chunk_row, + p.num_rows, + p.skipped_values, + p.skipped_leaf_values); + } +} + +void print_cumulative_page_info(hostdevice_vector& pages, + rmm::device_uvector const& page_index, + rmm::device_uvector const& c_info, + rmm::cuda_stream_view stream) +{ + pages.device_to_host(stream, true); + + printf("------------\nCumulative sizes by page\n"); + + std::vector schemas(pages.size()); + std::vector h_page_index(pages.size()); + cudaMemcpy( + h_page_index.data(), page_index.data(), sizeof(int) * pages.size(), cudaMemcpyDeviceToHost); + std::vector h_cinfo(pages.size()); + cudaMemcpy(h_cinfo.data(), + c_info.data(), + sizeof(cumulative_row_info) * pages.size(), + cudaMemcpyDeviceToHost); + auto schema_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](size_type i) { return pages[h_page_index[i]].src_col_schema; }); + thrust::copy(thrust::seq, schema_iter, schema_iter + pages.size(), schemas.begin()); + auto last = thrust::unique(thrust::seq, schemas.begin(), schemas.end()); + schemas.resize(last - schemas.begin()); + printf("Num schemas: %lu\n", schemas.size()); + + for (size_t idx = 0; idx < schemas.size(); idx++) { + printf("Schema %d\n", schemas[idx]); + for (size_t pidx = 0; pidx < pages.size(); pidx++) { + auto const& page = pages[h_page_index[pidx]]; + if (page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || page.src_col_schema != schemas[idx]) { + continue; + } + printf("\tP: {%lu, %lu}\n", h_cinfo[pidx].row_count, h_cinfo[pidx].size_bytes); + } + } +} + +void print_cumulative_row_info( + host_span sizes, + std::string const& label, + std::optional> splits = std::nullopt) +{ + if (splits.has_value()) { + printf("------------\nSplits\n"); + for (size_t idx = 0; idx < splits->size(); idx++) { + printf("{%lu, %lu}\n", splits.value()[idx].skip_rows, splits.value()[idx].num_rows); + } + } + + printf("------------\nCumulative sizes %s\n", label.c_str()); + for (size_t idx = 0; idx < sizes.size(); idx++) { + printf("{%lu, %lu, %d}", sizes[idx].row_count, sizes[idx].size_bytes, sizes[idx].key); + if (splits.has_value()) { + // if we have a split at this row count and this is the last instance of this row count + auto start = thrust::make_transform_iterator( + splits->begin(), [](gpu::chunk_read_info const& i) { return i.skip_rows; }); + auto end = start + splits->size(); + auto split = std::find(start, end, sizes[idx].row_count); + auto const split_index = [&]() -> int { + if (split != end && + ((idx == sizes.size() - 1) || (sizes[idx + 1].row_count > sizes[idx].row_count))) { + return static_cast(std::distance(start, split)); + } + return idx == 0 ? 0 : -1; + }(); + if (split_index >= 0) { + printf(" <-- split {%lu, %lu}", + splits.value()[split_index].skip_rows, + splits.value()[split_index].num_rows); + } + } + printf("\n"); + } +} +#endif // PREPROCESS_DEBUG + +/** + * @brief Functor which reduces two cumulative_row_info structs of the same key. + */ +struct cumulative_row_sum { + cumulative_row_info operator() + __device__(cumulative_row_info const& a, cumulative_row_info const& b) const + { + return cumulative_row_info{a.row_count + b.row_count, a.size_bytes + b.size_bytes, a.key}; + } +}; + +/** + * @brief Functor which computes the total data size for a given type of cudf column. + * + * In the case of strings, the return size does not include the chars themselves. That + * information is tracked separately (see PageInfo::str_bytes). + */ +struct row_size_functor { + __device__ size_t validity_size(size_t num_rows, bool nullable) + { + return nullable ? (cudf::util::div_rounding_up_safe(num_rows, size_t{32}) * 4) : 0; + } + + template + __device__ size_t operator()(size_t num_rows, bool nullable) + { + auto const element_size = sizeof(device_storage_type_t); + return (element_size * num_rows) + validity_size(num_rows, nullable); + } +}; + +template <> +__device__ size_t row_size_functor::operator()(size_t num_rows, bool nullable) +{ + auto const offset_size = sizeof(offset_type); + // NOTE: Adding the + 1 offset here isn't strictly correct. There will only be 1 extra offset + // for the entire column, whereas this is adding an extra offset per page. So we will get a + // small over-estimate of the real size of the order : # of pages * 4 bytes. It seems better + // to overestimate size somewhat than to underestimate it and potentially generate chunks + // that are too large. + return (offset_size * (num_rows + 1)) + validity_size(num_rows, nullable); +} + +template <> +__device__ size_t row_size_functor::operator()(size_t num_rows, bool nullable) +{ + return validity_size(num_rows, nullable); +} + +template <> +__device__ size_t row_size_functor::operator()(size_t num_rows, bool nullable) +{ + // only returns the size of offsets and validity. the size of the actual string chars + // is tracked separately. + auto const offset_size = sizeof(offset_type); + // see note about offsets in the list_view template. + return (offset_size * (num_rows + 1)) + validity_size(num_rows, nullable); +} + +/** + * @brief Functor which computes the total output cudf data size for all of + * the data in this page. + * + * Sums across all nesting levels. + */ +struct get_cumulative_row_info { + gpu::PageInfo const* const pages; + + __device__ cumulative_row_info operator()(size_type index) + { + auto const& page = pages[index]; + if (page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + return cumulative_row_info{0, 0, page.src_col_schema}; + } + + // total nested size, not counting string data + auto iter = + cudf::detail::make_counting_transform_iterator(0, [page, index] __device__(size_type i) { + auto const& pni = page.nesting[i]; + return cudf::type_dispatcher( + data_type{pni.type}, row_size_functor{}, pni.size, pni.nullable); + }); + + size_t const row_count = static_cast(page.nesting[0].size); + return {row_count, + thrust::reduce(thrust::seq, iter, iter + page.num_nesting_levels) + page.str_bytes, + page.src_col_schema}; + } +}; + +/** + * @brief Functor which computes the effective size of all input columns by page. + * + * For a given row, we want to find the cost of all pages for all columns involved + * in loading up to that row. The complication here is that not all pages are the + * same size between columns. Example: + * + * page row counts + * Column A: 0 <----> 100 <----> 200 + * Column B: 0 <---------------> 200 <--------> 400 + | + * if we decide to split at row 100, we don't really know the actual amount of bytes in column B + * at that point. So we have to proceed as if we are taking the bytes from all 200 rows of that + * page. Essentially, a conservative over-estimate of the real size. + */ +struct row_total_size { + cumulative_row_info const* c_info; + size_type const* key_offsets; + size_t num_keys; + + __device__ cumulative_row_info operator()(cumulative_row_info const& i) + { + // sum sizes for each input column at this row + size_t sum = 0; + for (int idx = 0; idx < num_keys; idx++) { + auto const start = key_offsets[idx]; + auto const end = key_offsets[idx + 1]; + auto iter = cudf::detail::make_counting_transform_iterator( + 0, [&] __device__(size_type i) { return c_info[i].row_count; }); + auto const page_index = + thrust::lower_bound(thrust::seq, iter + start, iter + end, i.row_count) - iter; + sum += c_info[page_index].size_bytes; + } + return {i.row_count, sum, i.key}; + } +}; + +/** + * @brief Given a vector of cumulative {row_count, byte_size} pairs and a chunk read + * limit, determine the set of splits. + * + * @param sizes Vector of cumulative {row_count, byte_size} pairs + * @param num_rows Total number of rows to read + * @param chunk_read_limit Limit on total number of bytes to be returned per read, for all columns + */ +std::vector find_splits(std::vector const& sizes, + size_t num_rows, + size_t chunk_read_limit) +{ + // now we have an array of {row_count, real output bytes}. just walk through it and generate + // splits. + // TODO: come up with a clever way to do this entirely in parallel. For now, as long as batch + // sizes are reasonably large, this shouldn't iterate too many times + std::vector splits; + { + size_t cur_pos = 0; + size_t cur_cumulative_size = 0; + size_t cur_row_count = 0; + auto start = thrust::make_transform_iterator(sizes.begin(), [&](cumulative_row_info const& i) { + return i.size_bytes - cur_cumulative_size; + }); + auto end = start + sizes.size(); + while (cur_row_count < num_rows) { + int64_t split_pos = + thrust::lower_bound(thrust::seq, start + cur_pos, end, chunk_read_limit) - start; + + // if we're past the end, or if the returned bucket is > than the chunk_read_limit, move back + // one. + if (static_cast(split_pos) >= sizes.size() || + (sizes[split_pos].size_bytes - cur_cumulative_size > chunk_read_limit)) { + split_pos--; + } + + // best-try. if we can't find something that'll fit, we have to go bigger. we're doing this in + // a loop because all of the cumulative sizes for all the pages are sorted into one big list. + // so if we had two columns, both of which had an entry {1000, 10000}, that entry would be in + // the list twice. so we have to iterate until we skip past all of them. The idea is that we + // either do this, or we have to call unique() on the input first. + while (split_pos < (static_cast(sizes.size()) - 1) && + (split_pos < 0 || sizes[split_pos].row_count == cur_row_count)) { + split_pos++; + } + + auto const start_row = cur_row_count; + cur_row_count = sizes[split_pos].row_count; + splits.push_back(gpu::chunk_read_info{start_row, cur_row_count - start_row}); + cur_pos = split_pos; + cur_cumulative_size = sizes[split_pos].size_bytes; + } + } + // print_cumulative_row_info(sizes, "adjusted", splits); + + return splits; +} + +/** + * @brief Given a set of pages that have had their sizes computed by nesting level and + * a limit on total read size, generate a set of {skip_rows, num_rows} pairs representing + * a set of reads that will generate output columns of total size <= `chunk_read_limit` bytes. + * + * @param pages All pages in the file + * @param id Additional intermediate information required to process the pages + * @param num_rows Total number of rows to read + * @param chunk_read_limit Limit on total number of bytes to be returned per read, for all columns + * @param stream CUDA stream to use, default 0 + */ +std::vector compute_splits(hostdevice_vector& pages, + gpu::chunk_intermediate_data const& id, + size_t num_rows, + size_t chunk_read_limit, + rmm::cuda_stream_view stream) +{ + auto const& page_keys = id.page_keys; + auto const& page_index = id.page_index; + + // generate cumulative row counts and sizes + rmm::device_uvector c_info(page_keys.size(), stream); + // convert PageInfo to cumulative_row_info + auto page_input = thrust::make_transform_iterator(page_index.begin(), + get_cumulative_row_info{pages.device_ptr()}); + thrust::inclusive_scan_by_key(rmm::exec_policy(stream), + page_keys.begin(), + page_keys.end(), + page_input, + c_info.begin(), + thrust::equal_to{}, + cumulative_row_sum{}); + // print_cumulative_page_info(pages, page_index, c_info, stream); + + // sort by row count + rmm::device_uvector c_info_sorted{c_info, stream}; + thrust::sort(rmm::exec_policy(stream), + c_info_sorted.begin(), + c_info_sorted.end(), + [] __device__(cumulative_row_info const& a, cumulative_row_info const& b) { + return a.row_count < b.row_count; + }); + + std::vector h_c_info_sorted(c_info_sorted.size()); + cudaMemcpy(h_c_info_sorted.data(), + c_info_sorted.data(), + sizeof(cumulative_row_info) * c_info_sorted.size(), + cudaMemcpyDeviceToHost); + // print_cumulative_row_info(h_c_info_sorted, "raw"); + + // generate key offsets (offsets to the start of each partition of keys). worst case is 1 page per + // key + rmm::device_uvector key_offsets(page_keys.size() + 1, stream); + auto const key_offsets_end = thrust::reduce_by_key(rmm::exec_policy(stream), + page_keys.begin(), + page_keys.end(), + thrust::make_constant_iterator(1), + thrust::make_discard_iterator(), + key_offsets.begin()) + .second; + size_t const num_unique_keys = key_offsets_end - key_offsets.begin(); + thrust::exclusive_scan( + rmm::exec_policy(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin()); + + // adjust the cumulative info such that for each row count, the size includes any pages that span + // that row count. this is so that if we have this case: + // page row counts + // Column A: 0 <----> 100 <----> 200 + // Column B: 0 <---------------> 200 <--------> 400 + // | + // if we decide to split at row 100, we don't really know the actual amount of bytes in column B + // at that point. So we have to proceed as if we are taking the bytes from all 200 rows of that + // page. + // + rmm::device_uvector aggregated_info(c_info.size(), stream); + thrust::transform(rmm::exec_policy(stream), + c_info_sorted.begin(), + c_info_sorted.end(), + aggregated_info.begin(), + row_total_size{c_info.data(), key_offsets.data(), num_unique_keys}); + + // bring back to the cpu + std::vector h_aggregated_info(aggregated_info.size()); + cudaMemcpyAsync(h_aggregated_info.data(), + aggregated_info.data(), + sizeof(cumulative_row_info) * c_info.size(), + cudaMemcpyDeviceToHost, + stream); + stream.synchronize(); + + return find_splits(h_aggregated_info, num_rows, chunk_read_limit); +} + +struct get_page_chunk_idx { + __device__ size_type operator()(gpu::PageInfo const& page) { return page.chunk_idx; } +}; + +struct get_page_num_rows { + __device__ size_type operator()(gpu::PageInfo const& page) { return page.num_rows; } +}; + +struct get_page_schema { + __device__ size_type operator()(gpu::PageInfo const& page) { return page.src_col_schema; } +}; + +/** + * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. + */ +struct get_page_nesting_size { + size_type const src_col_schema; + size_type const depth; + gpu::PageInfo const* const pages; + + __device__ size_type operator()(int index) const + { + auto const& page = pages[index]; + if (page.src_col_schema != src_col_schema || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + return 0; + } + return page.nesting[depth].batch_size; + } +}; + +/** + * @brief Writes to the chunk_row field of the PageInfo struct. + */ +struct chunk_row_output_iter { + gpu::PageInfo* p; + using value_type = size_type; + using difference_type = size_type; + using pointer = size_type*; + using reference = size_type&; + using iterator_category = thrust::output_device_iterator_tag; + + __host__ __device__ chunk_row_output_iter operator+(int i) + { + return chunk_row_output_iter{p + i}; + } + + __host__ __device__ void operator++() { p++; } + + __device__ reference operator[](int i) { return p[i].chunk_row; } + __device__ reference operator*() { return p->chunk_row; } +}; + +/** + * @brief Writes to the page_start_value field of the PageNestingInfo struct, keyed by schema. + */ +struct start_offset_output_iterator { + gpu::PageInfo* pages; + int const* page_indices; + int cur_index; + int src_col_schema; + int nesting_depth; + int empty = 0; + using value_type = size_type; + using difference_type = size_type; + using pointer = size_type*; + using reference = size_type&; + using iterator_category = thrust::output_device_iterator_tag; + + constexpr void operator=(start_offset_output_iterator const& other) + { + pages = other.pages; + page_indices = other.page_indices; + cur_index = other.cur_index; + src_col_schema = other.src_col_schema; + nesting_depth = other.nesting_depth; + } + + constexpr start_offset_output_iterator operator+(int i) + { + return start_offset_output_iterator{ + pages, page_indices, cur_index + i, src_col_schema, nesting_depth}; + } + + constexpr void operator++() { cur_index++; } + + __device__ reference operator[](int i) { return dereference(cur_index + i); } + __device__ reference operator*() { return dereference(cur_index); } + + private: + __device__ reference dereference(int index) + { + gpu::PageInfo const& p = pages[page_indices[index]]; + if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + return empty; + } + return p.nesting[nesting_depth].page_start_value; + } +}; + +} // anonymous namespace + +void reader::impl::preprocess_pages(size_t skip_rows, + size_t num_rows, + bool uses_custom_row_bounds, + size_t chunk_read_limit) +{ + auto& chunks = _file_itm_data.chunks; + auto& pages = _file_itm_data.pages_info; + + // iterate over all input columns and determine if they contain lists so we can further + // preprocess them. + bool has_lists = false; + for (size_t idx = 0; idx < _input_columns.size(); idx++) { + auto const& input_col = _input_columns[idx]; + size_t const max_depth = input_col.nesting_depth(); + + auto* cols = &_output_buffers; + for (size_t l_idx = 0; l_idx < max_depth; l_idx++) { + auto& out_buf = (*cols)[input_col.nesting[l_idx]]; + cols = &out_buf.children; + + // if this has a list parent, we have to get column sizes from the + // data computed during gpu::ComputePageSizes + if (out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) { + has_lists = true; + break; + } + } + if (has_lists) { break; } + } + + // generate string dict indices if necessary + { + auto is_dict_chunk = [](const gpu::ColumnChunkDesc& chunk) { + return (chunk.data_type & 0x7) == BYTE_ARRAY && chunk.num_dict_pages > 0; + }; + + // Count the number of string dictionary entries + // NOTE: Assumes first page in the chunk is always the dictionary page + size_t total_str_dict_indexes = 0; + for (size_t c = 0, page_count = 0; c < chunks.size(); c++) { + if (is_dict_chunk(chunks[c])) { + total_str_dict_indexes += pages[page_count].num_input_values; + } + page_count += chunks[c].max_num_pages; + } + + // Build index for string dictionaries since they can't be indexed + // directly due to variable-sized elements + _chunk_itm_data.str_dict_index = + cudf::detail::make_zeroed_device_uvector_async(total_str_dict_indexes, + _stream); + + // Update chunks with pointers to string dict indices + for (size_t c = 0, page_count = 0, str_ofs = 0; c < chunks.size(); c++) { + input_column_info const& input_col = _input_columns[chunks[c].src_col_index]; + CUDF_EXPECTS(input_col.schema_idx == chunks[c].src_col_schema, + "Column/page schema index mismatch"); + if (is_dict_chunk(chunks[c])) { + chunks[c].str_dict_index = _chunk_itm_data.str_dict_index.data() + str_ofs; + str_ofs += pages[page_count].num_input_values; + } + + // column_data_base will always point to leaf data, even for nested types. + page_count += chunks[c].max_num_pages; + } + + if (total_str_dict_indexes > 0) { + chunks.host_to_device(_stream); + gpu::BuildStringDictionaryIndex(chunks.device_ptr(), chunks.size(), _stream); + } + } + + // intermediate data we will need for further chunked reads + if (has_lists || chunk_read_limit > 0) { + // computes: + // PageNestingInfo::num_rows for each page. the true number of rows (taking repetition into + // account), not just the number of values. PageNestingInfo::size for each level of nesting, for + // each page. + // + // we will be applying a later "trim" pass if skip_rows/num_rows is being used, which can happen + // if: + // - user has passed custom row bounds + // - we will be doing a chunked read + gpu::ComputePageSizes(pages, + chunks, + 0, // 0-max size_t. process all possible rows + std::numeric_limits::max(), + true, // compute num_rows + chunk_read_limit > 0, // compute string sizes + _stream); + + // computes: + // PageInfo::chunk_row (the absolute start row index) for all pages + // Note: this is doing some redundant work for pages in flat hierarchies. chunk_row has already + // been computed during header decoding. the overall amount of work here is very small though. + auto key_input = thrust::make_transform_iterator(pages.device_ptr(), get_page_chunk_idx{}); + auto page_input = thrust::make_transform_iterator(pages.device_ptr(), get_page_num_rows{}); + thrust::exclusive_scan_by_key(rmm::exec_policy(_stream), + key_input, + key_input + pages.size(), + page_input, + chunk_row_output_iter{pages.device_ptr()}); + + // compute page ordering. + // + // ordering of pages is by input column schema, repeated across row groups. so + // if we had 3 columns, each with 2 pages, and 1 row group, our schema values might look like + // + // 1, 1, 2, 2, 3, 3 + // + // However, if we had more than one row group, the pattern would be + // + // 1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3 + // ^ row group 0 | + // ^ row group 1 + // + // To use exclusive_scan_by_key, the ordering we actually want is + // + // 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 + // + // We also need to preserve key-relative page ordering, so we need to use a stable sort. + _chunk_itm_data.page_keys = rmm::device_uvector(pages.size(), _stream); + _chunk_itm_data.page_index = rmm::device_uvector(pages.size(), _stream); + auto& page_keys = _chunk_itm_data.page_keys; + auto& page_index = _chunk_itm_data.page_index; + { + thrust::transform(rmm::exec_policy(_stream), + pages.device_ptr(), + pages.device_ptr() + pages.size(), + page_keys.begin(), + get_page_schema{}); + + thrust::sequence(rmm::exec_policy(_stream), page_index.begin(), page_index.end()); + thrust::stable_sort_by_key(rmm::exec_policy(_stream), + page_keys.begin(), + page_keys.end(), + page_index.begin(), + thrust::less()); + } + + // retrieve pages back + pages.device_to_host(_stream, true); + +#if defined(PREPROCESS_DEBUG) + print_pages(pages, _stream); +#endif + } + + // compute splits if necessary. otherwise retun a single split representing + // the whole file. + _chunk_read_info = chunk_read_limit > 0 + ? compute_splits(pages, _chunk_itm_data, num_rows, chunk_read_limit, _stream) + : std::vector{{skip_rows, num_rows}}; +} + +void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses_custom_row_bounds) { auto const& chunks = _file_itm_data.chunks; auto& pages = _file_itm_data.pages_info; + // Should not reach here if there is no page data. + CUDF_EXPECTS(pages.size() > 0, "There is no page to parse"); + + // computes: + // PageNestingInfo::batch_size for each level of nesting, for each page, taking row bounds into + // account. PageInfo::skipped_values, which tells us where to start decoding in the input to + // respect the user bounds. It is only necessary to do this second pass if uses_custom_row_bounds + // is set (if the user has specified artifical bounds). + if (uses_custom_row_bounds) { + gpu::ComputePageSizes(pages, + chunks, + skip_rows, + num_rows, + false, // num_rows is already computed + false, // no need to compute string sizes + _stream); +#if defined(PREPROCESS_DEBUG) + print_pages(pages, _stream); +#endif + } + // iterate over all input columns and allocate any associated output // buffers if they are not part of a list hierarchy. mark down // if we have any list columns that need further processing. @@ -780,8 +1455,8 @@ void reader::impl::allocate_columns(size_t min_row, size_t total_rows, bool uses auto& out_buf = (*cols)[input_col.nesting[l_idx]]; cols = &out_buf.children; - // if this has a list parent, we will have to do further work in gpu::PreprocessColumnData - // to know how big this buffer actually is. + // if this has a list parent, we have to get column sizes from the + // data computed during gpu::ComputePageSizes if (out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) { has_lists = true; } @@ -789,25 +1464,63 @@ void reader::impl::allocate_columns(size_t min_row, size_t total_rows, bool uses else if (out_buf.size == 0) { // add 1 for the offset if this is a list column out_buf.create( - out_buf.type.id() == type_id::LIST && l_idx < max_depth ? total_rows + 1 : total_rows, + out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows, _stream, _mr); } } } - // if we have columns containing lists, further preprocessing is necessary. + // compute output column sizes by examining the pages of the -input- columns if (has_lists) { - gpu::PreprocessColumnData(pages, - chunks, - _input_columns, - _output_buffers, - total_rows, - min_row, - uses_custom_row_bounds, - _stream, - _mr); - _stream.synchronize(); + auto& page_keys = _chunk_itm_data.page_keys; + auto& page_index = _chunk_itm_data.page_index; + for (size_t idx = 0; idx < _input_columns.size(); idx++) { + auto const& input_col = _input_columns[idx]; + auto src_col_schema = input_col.schema_idx; + size_t max_depth = input_col.nesting_depth(); + + auto* cols = &_output_buffers; + for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { + auto& out_buf = (*cols)[input_col.nesting[l_idx]]; + cols = &out_buf.children; + + // size iterator. indexes pages by sorted order + auto size_input = thrust::make_transform_iterator( + page_index.begin(), + get_page_nesting_size{src_col_schema, static_cast(l_idx), pages.device_ptr()}); + + // if this buffer is part of a list hierarchy, we need to determine it's + // final size and allocate it here. + // + // for struct columns, higher levels of the output columns are shared between input + // columns. so don't compute any given level more than once. + if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { + int size = + thrust::reduce(rmm::exec_policy(_stream), size_input, size_input + pages.size()); + + // if this is a list column add 1 for non-leaf levels for the terminating offset + if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } + + // allocate + out_buf.create(size, _stream, _mr); + } + + // for nested hierarchies, compute per-page start offset + if (input_col.has_repetition) { + thrust::exclusive_scan_by_key( + rmm::exec_policy(_stream), + page_keys.begin(), + page_keys.end(), + size_input, + start_offset_output_iterator{pages.device_ptr(), + page_index.begin(), + 0, + static_cast(src_col_schema), + static_cast(l_idx)}); + } + } + } } } diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index de145486662..89ba5c598e8 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -55,6 +55,33 @@ void column_buffer::create(size_type _size, } } +namespace { + +/** + * @brief Recursively copy `name` and `user_data` fields of one buffer to another. + * + * @param buff The old output buffer + * @param new_buff The new output buffer + */ +void copy_buffer_data(column_buffer const& buff, column_buffer& new_buff) +{ + new_buff.name = buff.name; + new_buff.user_data = buff.user_data; + for (auto const& child : buff.children) { + auto& new_child = new_buff.children.emplace_back(column_buffer(child.type, child.is_nullable)); + copy_buffer_data(child, new_child); + } +} + +} // namespace + +column_buffer column_buffer::empty_like(column_buffer const& input) +{ + auto new_buff = column_buffer(input.type, input.is_nullable); + copy_buffer_data(input, new_buff); + return new_buff; +} + /** * @copydoc cudf::io::detail::make_column */ diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index 8ae3d39a3ba..8f181157fae 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -104,10 +104,14 @@ struct column_buffer { { return static_cast(_null_mask.data()); } - auto null_mask_size() { return _null_mask.size(); }; + auto null_mask_size() { return _null_mask.size(); } auto& null_count() { return _null_count; } + // Create a new column_buffer that has empty data but with the same basic information as the + // input column, including same type, nullability, name, and user_data. + static column_buffer empty_like(column_buffer const& input); + std::unique_ptr> _strings; rmm::device_buffer _data{}; rmm::device_buffer _null_mask{}; diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index c602ccc7374..bdf74368ffe 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -223,7 +223,7 @@ ConfigureTest(DECOMPRESSION_TEST io/comp/decomp_test.cpp) ConfigureTest(CSV_TEST io/csv_test.cpp) ConfigureTest(FILE_IO_TEST io/file_io_test.cpp) ConfigureTest(ORC_TEST io/orc_test.cpp) -ConfigureTest(PARQUET_TEST io/parquet_test.cpp) +ConfigureTest(PARQUET_TEST io/parquet_test.cpp io/parquet_chunked_reader_test.cpp) ConfigureTest(JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp) ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu) ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) diff --git a/cpp/tests/io/parquet_chunked_reader_test.cpp b/cpp/tests/io/parquet_chunked_reader_test.cpp new file mode 100644 index 00000000000..76a65857e6f --- /dev/null +++ b/cpp/tests/io/parquet_chunked_reader_test.cpp @@ -0,0 +1,887 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include + +#include +#include + +namespace { +// Global environment for temporary files +auto const temp_env = static_cast( + ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); + +using int32s_col = cudf::test::fixed_width_column_wrapper; +using int64s_col = cudf::test::fixed_width_column_wrapper; +using strings_col = cudf::test::strings_column_wrapper; +using structs_col = cudf::test::structs_column_wrapper; +using int32s_lists_col = cudf::test::lists_column_wrapper; + +auto write_file(std::vector>& input_columns, + std::string const& filename, + bool nullable, + std::size_t max_page_size_bytes = cudf::io::default_max_page_size_bytes, + std::size_t max_page_size_rows = cudf::io::default_max_page_size_rows) +{ + // Just shift nulls of the next column by one position to avoid having all nulls in the same + // table rows. + if (nullable) { + // Generate deterministic bitmask instead of random bitmask for easy computation of data size. + auto const valid_iter = cudf::detail::make_counting_transform_iterator( + 0, [](cudf::size_type i) { return i % 4 != 3; }); + + cudf::size_type offset{0}; + for (auto& col : input_columns) { + auto const col_typeid = col->type().id(); + col->set_null_mask( + cudf::test::detail::make_null_mask(valid_iter + offset, valid_iter + col->size() + offset)); + + if (col_typeid == cudf::type_id::STRUCT) { + auto const null_mask = col->view().null_mask(); + auto const null_count = col->null_count(); + + for (cudf::size_type idx = 0; idx < col->num_children(); ++idx) { + cudf::structs::detail::superimpose_parent_nulls(null_mask, + null_count, + col->child(idx), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + } + } + + if (col_typeid == cudf::type_id::LIST || col_typeid == cudf::type_id::STRUCT || + col_typeid == cudf::type_id::STRING) { + col = cudf::purge_nonempty_nulls(col->view()); + } + } + } + + auto input_table = std::make_unique(std::move(input_columns)); + auto filepath = + temp_env->get_temp_filepath(nullable ? filename + "_nullable.parquet" : filename + ".parquet"); + + auto const write_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *input_table) + .max_page_size_bytes(max_page_size_bytes) + .max_page_size_rows(max_page_size_rows) + .build(); + cudf::io::write_parquet(write_opts); + + return std::pair{std::move(input_table), std::move(filepath)}; +} + +auto chunked_read(std::string const& filepath, std::size_t byte_limit) +{ + auto const read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build(); + auto reader = cudf::io::chunked_parquet_reader(byte_limit, read_opts); + + auto num_chunks = 0; + auto out_tables = std::vector>{}; + + do { + auto chunk = reader.read_chunk(); + // If the input file is empty, the first call to `read_chunk` will return an empty table. + // Thus, we only check for non-empty output table from the second call. + if (num_chunks > 0) { + CUDF_EXPECTS(chunk.tbl->num_rows() != 0, "Number of rows in the new chunk is zero."); + } + ++num_chunks; + out_tables.emplace_back(std::move(chunk.tbl)); + } while (reader.has_next()); + + auto out_tviews = std::vector{}; + for (auto const& tbl : out_tables) { + out_tviews.emplace_back(tbl->view()); + } + + return std::pair(cudf::concatenate(out_tviews), num_chunks); +} + +} // namespace + +struct ParquetChunkedReaderTest : public cudf::test::BaseFixture { +}; + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadNoData) +{ + std::vector> input_columns; + input_columns.emplace_back(int32s_col{}.release()); + input_columns.emplace_back(int64s_col{}.release()); + + auto const [expected, filepath] = write_file(input_columns, "chunked_read_empty", false); + auto const [result, num_chunks] = chunked_read(filepath, 1'000); + EXPECT_EQ(num_chunks, 1); + EXPECT_EQ(result->num_rows(), 0); + EXPECT_EQ(result->num_columns(), 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadSimpleData) +{ + auto constexpr num_rows = 40'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const value_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(value_iter, value_iter + num_rows).release()); + input_columns.emplace_back(int64s_col(value_iter, value_iter + num_rows).release()); + + return write_file(input_columns, "chunked_read_simple", nullable); + }; + + { + auto const [expected, filepath] = generate_input(false); + auto const [result, num_chunks] = chunked_read(filepath, 240'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + { + auto const [expected, filepath] = generate_input(true); + auto const [result, num_chunks] = chunked_read(filepath, 240'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadBoundaryCases) +{ + // Tests some specific boundary conditions in the split calculations. + + auto constexpr num_rows = 40'000; + + auto const [expected, filepath] = [num_rows]() { + std::vector> input_columns; + auto const value_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(value_iter, value_iter + num_rows).release()); + return write_file(input_columns, "chunked_read_simple_boundary", false /*nullable*/); + }(); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath, 1); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit slightly less than one page of data + { + auto const [result, num_chunks] = chunked_read(filepath, 79'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit exactly the size one page of data + { + auto const [result, num_chunks] = chunked_read(filepath, 80'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit slightly more the size one page of data + { + auto const [result, num_chunks] = chunked_read(filepath, 81'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit slightly less than two pages of data + { + auto const [result, num_chunks] = chunked_read(filepath, 159'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit exactly the size of two pages of data minus one byte + { + auto const [result, num_chunks] = chunked_read(filepath, 159'999); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit exactly the size of two pages of data + { + auto const [result, num_chunks] = chunked_read(filepath, 160'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a limit slightly more the size two pages of data + { + auto const [result, num_chunks] = chunked_read(filepath, 161'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithString) +{ + auto constexpr num_rows = 60'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const value_iter = thrust::make_counting_iterator(0); + + // ints Page total bytes cumulative bytes + // 20000 rows of 4 bytes each = A0 80000 80000 + // 20000 rows of 4 bytes each = A1 80000 160000 + // 20000 rows of 4 bytes each = A2 80000 240000 + input_columns.emplace_back(int32s_col(value_iter, value_iter + num_rows).release()); + + // strings Page total bytes cumulative bytes + // 20000 rows of 1 char each (20000 + 80004) = B0 100004 100004 + // 20000 rows of 4 chars each (80000 + 80004) = B1 160004 260008 + // 20000 rows of 16 chars each (320000 + 80004) = B2 400004 660012 + auto const strings = std::vector{"a", "bbbb", "cccccccccccccccc"}; + auto const str_iter = cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { + if (i < 20000) { return strings[0]; } + if (i < 40000) { return strings[1]; } + return strings[2]; + }); + input_columns.emplace_back(strings_col(str_iter, str_iter + num_rows).release()); + + // Cumulative sizes: + // A0 + B0 : 180004 + // A1 + B1 : 420008 + // A2 + B2 : 900012 + // skip_rows / num_rows + // byte_limit==500000 should give 2 chunks: {0, 40000}, {40000, 20000} + // byte_limit==1000000 should give 1 chunks: {0, 60000}, + return write_file(input_columns, + "chunked_read_with_strings", + nullable, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }; + + auto const [expected_no_null, filepath_no_null] = generate_input(false); + auto const [expected_with_nulls, filepath_with_nulls] = generate_input(true); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Other tests: + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 500'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 500'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'000'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'000'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithStructs) +{ + auto constexpr num_rows = 100'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const int_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(int_iter, int_iter + num_rows).release()); + input_columns.emplace_back([=] { + auto child1 = int32s_col(int_iter, int_iter + num_rows); + auto child2 = int32s_col(int_iter + num_rows, int_iter + num_rows * 2); + + auto const str_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](int32_t i) { return std::to_string(i); }); + auto child3 = strings_col{str_iter, str_iter + num_rows}; + + return structs_col{{child1, child2, child3}}.release(); + }()); + + return write_file(input_columns, + "chunked_read_with_structs", + nullable, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }; + + auto const [expected_no_null, filepath_no_null] = generate_input(false); + auto const [expected_with_nulls, filepath_with_nulls] = generate_input(true); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Other tests: + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 500'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 500'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithListsNoNulls) +{ + auto constexpr num_rows = 100'000; + + auto const [expected, filepath] = [num_rows]() { + std::vector> input_columns; + // 20000 rows in 1 page consist of: + // + // 20001 offsets : 80004 bytes + // 30000 ints : 120000 bytes + // total : 200004 bytes + auto const template_lists = int32s_lists_col{ + int32s_lists_col{}, int32s_lists_col{0}, int32s_lists_col{1, 2}, int32s_lists_col{3, 4, 5}}; + + auto const gather_iter = + cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { return i % 4; }); + auto const gather_map = int32s_col(gather_iter, gather_iter + num_rows); + input_columns.emplace_back( + std::move(cudf::gather(cudf::table_view{{template_lists}}, gather_map)->release().front())); + + return write_file(input_columns, + "chunked_read_with_lists_no_null", + false /*nullable*/, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }(); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size slightly less than 1 page (forcing it to be at least 1 page per read) + { + auto const [result, num_chunks] = chunked_read(filepath, 200'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size exactly 1 page + { + auto const [result, num_chunks] = chunked_read(filepath, 200'004); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size 2 pages. 3 chunks (2 pages + 2 pages + 1 page) + { + auto const [result, num_chunks] = chunked_read(filepath, 400'008); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size 2 pages minus one byte: each chunk will be just one page + { + auto const [result, num_chunks] = chunked_read(filepath, 400'007); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithListsHavingNulls) +{ + auto constexpr num_rows = 100'000; + + auto const [expected, filepath] = [num_rows]() { + std::vector> input_columns; + // 20000 rows in 1 page consist of: + // + // 625 validity words : 2500 bytes (a null every 4 rows: null at indices [3, 7, 11, ...]) + // 20001 offsets : 80004 bytes + // 15000 ints : 60000 bytes + // total : 142504 bytes + auto const template_lists = + int32s_lists_col{// these will all be null + int32s_lists_col{}, + int32s_lists_col{0}, + int32s_lists_col{1, 2}, + int32s_lists_col{3, 4, 5, 6, 7, 8, 9} /* this list will be nullified out */}; + auto const gather_iter = + cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { return i % 4; }); + auto const gather_map = int32s_col(gather_iter, gather_iter + num_rows); + input_columns.emplace_back( + std::move(cudf::gather(cudf::table_view{{template_lists}}, gather_map)->release().front())); + + return write_file(input_columns, + "chunked_read_with_lists_nulls", + true /*nullable*/, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }(); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size slightly less than 1 page (forcing it to be at least 1 page per read) + { + auto const [result, num_chunks] = chunked_read(filepath, 142'500); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size exactly 1 page + { + auto const [result, num_chunks] = chunked_read(filepath, 142'504); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size 2 pages. 3 chunks (2 pages + 2 pages + 1 page) + { + auto const [result, num_chunks] = chunked_read(filepath, 285'008); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } + + // chunk size 2 pages minus 1 byte: each chunk will be just one page + { + auto const [result, num_chunks] = chunked_read(filepath, 285'007); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithStructsOfLists) +{ + auto constexpr num_rows = 100'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const int_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(int_iter, int_iter + num_rows).release()); + input_columns.emplace_back([=] { + std::vector> child_columns; + child_columns.emplace_back(int32s_col(int_iter, int_iter + num_rows).release()); + child_columns.emplace_back( + int32s_col(int_iter + num_rows, int_iter + num_rows * 2).release()); + + auto const str_iter = cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { + return std::to_string(i) + "++++++++++++++++++++" + std::to_string(i); + }); + child_columns.emplace_back(strings_col{str_iter, str_iter + num_rows}.release()); + + auto const template_lists = int32s_lists_col{ + int32s_lists_col{}, int32s_lists_col{0}, int32s_lists_col{0, 1}, int32s_lists_col{0, 1, 2}}; + auto const gather_iter = + cudf::detail::make_counting_transform_iterator(0, [&](int32_t i) { return i % 4; }); + auto const gather_map = int32s_col(gather_iter, gather_iter + num_rows); + child_columns.emplace_back( + std::move(cudf::gather(cudf::table_view{{template_lists}}, gather_map)->release().front())); + + return structs_col(std::move(child_columns)).release(); + }()); + + return write_file(input_columns, + "chunked_read_with_structs_of_lists", + nullable, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }; + + auto const [expected_no_null, filepath_no_null] = generate_input(false); + auto const [expected_with_nulls, filepath_with_nulls] = generate_input(true); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + EXPECT_EQ(num_chunks, 10); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Other tests: + + // for these tests, different columns get written to different numbers of pages so it's a + // little tricky to describe the expected results by page counts. To get an idea of how + // these values are chosen, see the debug output from the call to print_cumulative_row_info() in + // reader_impl_preprocess.cu -> find_splits() + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'000'000); + EXPECT_EQ(num_chunks, 7); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'500'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2'000'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 5'000'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'000'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'500'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2'000'000); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 5'000'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } +} + +TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithListsOfStructs) +{ + auto constexpr num_rows = 100'000; + + auto const generate_input = [num_rows](bool nullable) { + std::vector> input_columns; + auto const int_iter = thrust::make_counting_iterator(0); + input_columns.emplace_back(int32s_col(int_iter, int_iter + num_rows).release()); + + auto offsets = std::vector{}; + offsets.reserve(num_rows * 2); + cudf::size_type num_structs = 0; + for (int i = 0; i < num_rows; ++i) { + offsets.push_back(num_structs); + auto const new_list_size = i % 4; + num_structs += new_list_size; + } + offsets.push_back(num_structs); + + auto const make_structs_col = [=] { + auto child1 = int32s_col(int_iter, int_iter + num_structs); + auto child2 = int32s_col(int_iter + num_structs, int_iter + num_structs * 2); + + auto const str_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](int32_t i) { return std::to_string(i) + std::to_string(i) + std::to_string(i); }); + auto child3 = strings_col{str_iter, str_iter + num_structs}; + + return structs_col{{child1, child2, child3}}.release(); + }; + + input_columns.emplace_back( + cudf::make_lists_column(static_cast(offsets.size() - 1), + int32s_col(offsets.begin(), offsets.end()).release(), + make_structs_col(), + 0, + rmm::device_buffer{})); + + return write_file(input_columns, + "chunked_read_with_lists_of_structs", + nullable, + 512 * 1024, // 512KB per page + 20000 // 20k rows per page + ); + }; + + auto const [expected_no_null, filepath_no_null] = generate_input(false); + auto const [expected_with_nulls, filepath_with_nulls] = generate_input(true); + + // Test with zero limit: everything will be read in one chunk + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very small limit: 1 byte + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + EXPECT_EQ(num_chunks, 10); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // Test with a very large limit + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2L << 40); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + // for these tests, different columns get written to different numbers of pages so it's a + // little tricky to describe the expected results by page counts. To get an idea of how + // these values are chosen, see the debug output from the call to print_cumulative_row_info() in + // reader_impl_preprocess.cu -> find_splits() + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'000'000); + EXPECT_EQ(num_chunks, 7); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 1'500'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 2'000'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_no_null, 5'000'000); + EXPECT_EQ(num_chunks, 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'000'000); + EXPECT_EQ(num_chunks, 5); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 1'500'000); + EXPECT_EQ(num_chunks, 4); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 2'000'000); + EXPECT_EQ(num_chunks, 3); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } + + { + auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 5'000'000); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); + } +}