From 76894e1d7bb1245ff2ce6b05d3660e55a9433075 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 4 Mar 2021 08:42:53 -0800 Subject: [PATCH] Partial clean up of ORC writer (#7324) Issue #6763 Clean up of the code surrounding the column data encode in the ORC writer: 1. Add a 2D version of `hostdevice_vector` (single allocation); 2. Add 2D versions of `host_span` and `device_span`; 3. Add implicit conversions from `hostdevice_vector` to `host_span` and `device_span`. 4. Use the new types to represent collections that currently use flattened `hostdevice_vectors`; 5. Separated a part of `EncChunk` into a separate class, `encoder_chunk_streams`, as this is the only part used after data encode; 6. Add `orc_streams` to represent per-column streams and compute offsets. 7. Partial `writer_impl.cu` code "modernization". 8. Removed redundant size parameters (since 2dspan and 2dvector hold the size info). 9. use `device_uvector` instead of `device_vector`. Authors: - Vukasin Milovanovic (@vuule) Approvers: - Jake Hemstad (@jrhemstad) - Kumar Aatish (@kaatish) - Ram (Ramakrishna Prabhu) (@rgsl888prabhu) URL: https://github.com/rapidsai/cudf/pull/7324 --- cpp/include/cudf/utilities/span.hpp | 87 ++ cpp/src/io/orc/dict_enc.cu | 4 +- cpp/src/io/orc/orc_gpu.h | 111 ++- cpp/src/io/orc/stats_enc.cu | 4 +- cpp/src/io/orc/stripe_data.cu | 8 +- cpp/src/io/orc/stripe_enc.cu | 229 +++-- cpp/src/io/orc/stripe_init.cu | 2 +- cpp/src/io/orc/writer_impl.cu | 918 ++++++++++----------- cpp/src/io/orc/writer_impl.hpp | 240 +++--- cpp/src/io/utilities/hostdevice_vector.hpp | 61 ++ cpp/tests/utilities_tests/span_tests.cu | 66 ++ 11 files changed, 965 insertions(+), 765 deletions(-) diff --git a/cpp/include/cudf/utilities/span.hpp b/cpp/include/cudf/utilities/span.hpp index 1f872a44fec..c13e5ce44ae 100644 --- a/cpp/include/cudf/utilities/span.hpp +++ b/cpp/include/cudf/utilities/span.hpp @@ -135,6 +135,16 @@ struct host_span : public cudf::detail::span_base::value, + void>::type* = nullptr> + constexpr host_span(const host_span& other) noexcept + : base(other.data(), other.size()) + { + } }; // ===== device_span =============================================================================== @@ -174,6 +184,83 @@ struct device_span : public cudf::detail::span_base::value, + void>::type* = nullptr> + constexpr device_span(const device_span& other) noexcept + : base(other.data(), other.size()) + { + } }; +namespace detail { + +/** + * @brief Generic class for row-major 2D spans. Not compliant with STL container semantics/syntax. + * + * The index operator returns the corresponding row. + */ +template typename RowType> +class base_2dspan { + public: + using size_type = std::pair; + + constexpr base_2dspan() noexcept = default; + constexpr base_2dspan(T* data, size_t rows, size_t columns) noexcept + : _data{data}, _size{rows, columns} + { + } + base_2dspan(T* data, size_type size) noexcept : _data{data}, _size{size} {} + + constexpr auto data() const noexcept { return _data; } + constexpr auto size() const noexcept { return _size; } + constexpr auto count() const noexcept { return size().first * size().second; } + constexpr bool is_empty() const noexcept { return count() == 0; } + + static constexpr size_t flatten_index(size_t row, size_t column, size_type size) noexcept + { + return row * size.second + column; + } + + constexpr RowType operator[](size_t row) + { + return {this->data() + flatten_index(row, 0, this->size()), this->size().second}; + } + + template + typename OtherRowType, + typename std::enable_if, + RowType>::value, + void>::type* = nullptr> + constexpr base_2dspan(base_2dspan const& other) noexcept + : _data{other.data()}, _size{other.size()} + { + } + + protected: + T* _data = nullptr; + size_type _size{0, 0}; +}; + +/** + * @brief Alias for the 2D span for host data. + * + * Index operator returns rows as `host_span`. + */ +template +using host_2dspan = base_2dspan; + +/** + * @brief Alias for the 2D span for device data. + * + * Index operator returns rows as `device_span`. + */ +template +using device_2dspan = base_2dspan; + +} // namespace detail } // namespace cudf diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 1ee57f4a2fd..de20af1bff4 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -404,7 +404,7 @@ __global__ void __launch_bounds__(block_size) * @param[in] chunks DictionaryChunk device array [rowgroup][column] * @param[in] num_columns Number of columns * @param[in] num_rowgroups Number of row groups - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void InitDictionaryIndices(DictionaryChunk *chunks, uint32_t num_columns, @@ -425,7 +425,7 @@ void InitDictionaryIndices(DictionaryChunk *chunks, * @param[in] num_stripes Number of stripes * @param[in] num_rowgroups Number of row groups * @param[in] num_columns Number of columns - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void BuildStripeDictionaries(StripeDictionary *stripes, StripeDictionary *stripes_host, diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 811d440badd..7ad92e40cb4 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -124,19 +124,25 @@ struct RowGroup { * @brief Struct to describe an encoder data chunk */ struct EncChunk { - uint8_t *streams[CI_NUM_STREAMS]; // encoded output - int32_t strm_id[CI_NUM_STREAMS]; // stream id or -1 if not present - uint32_t strm_len[CI_NUM_STREAMS]; // in: max length, out: actual length - const uint32_t *valid_map_base; // base ptr of input valid bit map - size_type column_offset; // index of the first element relative to the base memory - const void *column_data_base; // base ptr of input column data - uint32_t start_row; // start row of this chunk - uint32_t num_rows; // number of rows in this chunk - uint32_t valid_rows; // max number of valid rows - uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind) - uint8_t type_kind; // column data type (orc::TypeKind) - uint8_t dtype_len; // data type length - uint8_t scale; // scale for decimals or timestamps + const uint32_t *valid_map_base; // base ptr of input valid bit map + size_type column_offset; // index of the first element relative to the base memory + const void *column_data_base; // base ptr of input column data + uint32_t start_row; // start row of this chunk + uint32_t num_rows; // number of rows in this chunk + uint32_t valid_rows; // max number of valid rows + uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind) + uint8_t type_kind; // column data type (orc::TypeKind) + uint8_t dtype_len; // data type length + uint8_t scale; // scale for decimals or timestamps +}; + +/** + * @brief Struct to describe the streams that correspond to a single `EncChunk`. + */ +struct encoder_chunk_streams { + uint8_t *data_ptrs[CI_NUM_STREAMS]; // encoded output + int32_t ids[CI_NUM_STREAMS]; // stream id; -1 if stream is not present + uint32_t lengths[CI_NUM_STREAMS]; // in: max length, out: actual length }; /** @@ -193,7 +199,7 @@ struct StripeDictionary { * @param[in] compression_block_size maximum size of compressed blocks (up to 16M) * @param[in] log2maxcr log2 of maximum compression ratio (used to infer max uncompressed size from *compressed size) - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void ParseCompressedStripeData(CompressedStreamInfo *strm_info, int32_t num_streams, @@ -206,7 +212,7 @@ void ParseCompressedStripeData(CompressedStreamInfo *strm_info, * * @param[in] strm_info List of compressed streams * @param[in] num_streams Number of compressed streams - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void PostDecompressionReassemble(CompressedStreamInfo *strm_info, int32_t num_streams, @@ -221,7 +227,7 @@ void PostDecompressionReassemble(CompressedStreamInfo *strm_info, * @param[in] num_columns Number of columns * @param[in] num_stripes Number of stripes * @param[in] num_rowgroups Number of row groups - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void ParseRowGroupIndex(RowGroup *row_groups, CompressedStreamInfo *strm_info, @@ -241,7 +247,7 @@ void ParseRowGroupIndex(RowGroup *row_groups, * @param[in] num_stripes Number of stripes * @param[in] max_rows Maximum number of rows to load * @param[in] first_row Crop all rows below first_row - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void DecodeNullsAndStringDictionaries(ColumnDesc *chunks, DictionaryEntry *global_dictionary, @@ -265,9 +271,9 @@ void DecodeNullsAndStringDictionaries(ColumnDesc *chunks, * @param[in] row_groups Optional row index data * @param[in] num_rowgroups Number of row groups in row index data * @param[in] rowidx_stride Row index stride - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ -void DecodeOrcColumnData(ColumnDesc *chunks, +void DecodeOrcColumnData(ColumnDesc const *chunks, DictionaryEntry *global_dictionary, uint32_t num_columns, uint32_t num_stripes, @@ -282,79 +288,72 @@ void DecodeOrcColumnData(ColumnDesc *chunks, /** * @brief Launches kernel for encoding column data * - * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[in] num_columns Number of columns - * @param[in] num_rowgroups Number of row groups - * @param[in] stream CUDA stream to use, default 0 + * @param[in] chunks encoder chunk device array [column][rowgroup] + * @param[in, out] streams chunk streams device array [column][rowgroup] + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ -void EncodeOrcColumnData(EncChunk *chunks, - uint32_t num_columns, - uint32_t num_rowgroups, +void EncodeOrcColumnData(detail::device_2dspan chunks, + detail::device_2dspan streams, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** * @brief Launches kernel for encoding column dictionaries * * @param[in] stripes Stripe dictionaries device array [stripe][string_column] - * @param[in] chunks EncChunk device array [rowgroup][column] + * @param[in] chunks encoder chunk device array [column][rowgroup] * @param[in] num_string_columns Number of string columns - * @param[in] num_columns Number of columns * @param[in] num_stripes Number of stripes - * @param[in] stream CUDA stream to use, default 0 + * @param[in,out] enc_streams chunk streams device array [column][rowgroup] + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void EncodeStripeDictionaries(StripeDictionary *stripes, - EncChunk *chunks, + detail::device_2dspan chunks, uint32_t num_string_columns, - uint32_t num_columns, uint32_t num_stripes, + detail::device_2dspan enc_streams, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** * @brief Launches kernel for compacting chunked column data prior to compression * - * @param[in] strm_desc StripeStream device array [stripe][stream] - * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[in] num_stripe_streams Total number of streams - * @param[in] num_columns Number of columns - * @param[in] stream CUDA stream to use, default 0 + * @param[in,out] strm_desc StripeStream device array [stripe][stream] + * @param[in,out] enc_streams chunk streams device array [column][rowgroup] + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ -void CompactOrcDataStreams(StripeStream *strm_desc, - EncChunk *chunks, - uint32_t num_stripe_streams, - uint32_t num_columns, +void CompactOrcDataStreams(detail::device_2dspan strm_desc, + detail::device_2dspan enc_streams, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** * @brief Launches kernel(s) for compressing data streams * * @param[in] compressed_data Output compressed blocks - * @param[in] strm_desc StripeStream device array [stripe][stream] - * @param[in] chunks EncChunk device array [rowgroup][column] + * @param[in] num_compressed_blocks Total number of compressed blocks + * @param[in] compression Type of compression + * @param[in] comp_blk_size Compression block size + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` + * @param[in,out] strm_desc StripeStream device array [stripe][stream] + * @param[in,out] enc_streams chunk streams device array [column][rowgroup] * @param[out] comp_in Per-block compression input parameters * @param[out] comp_out Per-block compression status - * @param[in] num_stripe_streams Total number of streams - * @param[in] compression Type of compression - * @param[in] num_compressed_blocks Total number of compressed blocks - * @param[in] stream CUDA stream to use, default 0 */ void CompressOrcDataStreams(uint8_t *compressed_data, - StripeStream *strm_desc, - EncChunk *chunks, - gpu_inflate_input_s *comp_in, - gpu_inflate_status_s *comp_out, - uint32_t num_stripe_streams, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, + detail::device_2dspan strm_desc, + detail::device_2dspan enc_streams, + gpu_inflate_input_s *comp_in, + gpu_inflate_status_s *comp_out, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** * @brief Launches kernel for initializing dictionary chunks * - * @param[in] chunks DictionaryChunk device array [rowgroup][column] + * @param[in,out] chunks DictionaryChunk device array [rowgroup][column] * @param[in] num_columns Number of columns * @param[in] num_rowgroups Number of row groups - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void InitDictionaryIndices(DictionaryChunk *chunks, uint32_t num_columns, @@ -370,7 +369,7 @@ void InitDictionaryIndices(DictionaryChunk *chunks, * @param[in] num_stripes Number of stripes * @param[in] num_rowgroups Number of row groups * @param[in] num_columns Number of columns - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void BuildStripeDictionaries(StripeDictionary *stripes_dev, StripeDictionary *stripes_host, @@ -388,7 +387,7 @@ void BuildStripeDictionaries(StripeDictionary *stripes_dev, * @param[in] num_columns Number of columns * @param[in] num_rowgroups Number of rowgroups * @param[in] row_index_stride Rowgroup size in rows - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void orc_init_statistics_groups(statistics_group *groups, const stats_column_desc *cols, @@ -403,7 +402,7 @@ void orc_init_statistics_groups(statistics_group *groups, * @param[in,out] groups Statistics merge groups * @param[in] chunks Statistics chunks * @param[in] statistics_count Number of statistics buffers to encode - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void orc_init_statistics_buffersize(statistics_merge_group *groups, const statistics_chunk *chunks, diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index a30c61d6ef7..15fef5c8b82 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -371,7 +371,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) * @param[in] num_columns Number of columns * @param[in] num_rowgroups Number of rowgroups * @param[in] row_index_stride Rowgroup size in rows - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void orc_init_statistics_groups(statistics_group *groups, const stats_column_desc *cols, @@ -392,7 +392,7 @@ void orc_init_statistics_groups(statistics_group *groups, * @param[in,out] groups Statistics merge groups * @param[in] chunks Statistics chunks * @param[in] statistics_count Number of statistics buffers to encode - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void orc_init_statistics_buffersize(statistics_merge_group *groups, const statistics_chunk *chunks, diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 1af5e088c22..4bca725a16b 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1374,7 +1374,7 @@ static const __device__ __constant__ uint32_t kTimestampNanoScale[8] = { // blockDim {block_size,1,1} template __global__ void __launch_bounds__(block_size) - gpuDecodeOrcColumnData(ColumnDesc *chunks, + gpuDecodeOrcColumnData(ColumnDesc const *chunks, DictionaryEntry *global_dictionary, timezone_table_view tz_table, const RowGroup *row_groups, @@ -1742,7 +1742,7 @@ __global__ void __launch_bounds__(block_size) * @param[in] num_stripes Number of stripes * @param[in] max_rows Maximum number of rows to load * @param[in] first_row Crop all rows below first_row - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void __host__ DecodeNullsAndStringDictionaries(ColumnDesc *chunks, DictionaryEntry *global_dictionary, @@ -1771,9 +1771,9 @@ void __host__ DecodeNullsAndStringDictionaries(ColumnDesc *chunks, * @param[in] row_groups Optional row index data * @param[in] num_rowgroups Number of row groups in row index data * @param[in] rowidx_stride Row index stride - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ -void __host__ DecodeOrcColumnData(ColumnDesc *chunks, +void __host__ DecodeOrcColumnData(ColumnDesc const *chunks, DictionaryEntry *global_dictionary, uint32_t num_columns, uint32_t num_stripes, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 6e2b7b2ab89..88cad005817 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -26,6 +26,9 @@ namespace cudf { namespace io { namespace orc { namespace gpu { + +using detail::device_2dspan; + constexpr int scratch_buffer_size = 512 * 4; // Apache ORC reader does not handle zero-length patch lists for RLEv2 mode2 @@ -65,6 +68,7 @@ struct orcenc_state_s { uint32_t numvals; // # of non-zero values in current batch (<=nrows) uint32_t numlengths; // # of non-zero values in DATA2 batch uint32_t nnz; // Running count of non-null values + encoder_chunk_streams stream; EncChunk chunk; uint32_t strm_pos[CI_NUM_STREAMS]; uint8_t valid_buf[512]; // valid map bits @@ -122,7 +126,7 @@ template static __device__ void StoreBytes( orcenc_state_s *s, const uint8_t *inbuf, uint32_t inpos, uint32_t count, int t) { - uint8_t *dst = s->chunk.streams[cid] + s->strm_pos[cid]; + uint8_t *dst = s->stream.data_ptrs[cid] + s->strm_pos[cid]; while (count > 0) { uint32_t n = min(count, 512); if (t < n) { dst[t] = inbuf[(inpos + t) & inmask]; } @@ -131,7 +135,7 @@ static __device__ void StoreBytes( count -= n; } __syncthreads(); - if (!t) { s->strm_pos[cid] = static_cast(dst - s->chunk.streams[cid]); } + if (!t) { s->strm_pos[cid] = static_cast(dst - s->stream.data_ptrs[cid]); } } /** @@ -153,7 +157,7 @@ template static __device__ uint32_t ByteRLE( orcenc_state_s *s, const uint8_t *inbuf, uint32_t inpos, uint32_t numvals, uint32_t flush, int t) { - uint8_t *dst = s->chunk.streams[cid] + s->strm_pos[cid]; + uint8_t *dst = s->stream.data_ptrs[cid] + s->strm_pos[cid]; uint32_t out_cnt = 0; while (numvals > 0) { @@ -252,7 +256,7 @@ static __device__ uint32_t ByteRLE( } } } - if (!t) { s->strm_pos[cid] = static_cast(dst - s->chunk.streams[cid]); } + if (!t) { s->strm_pos[cid] = static_cast(dst - s->stream.data_ptrs[cid]); } __syncthreads(); return out_cnt; } @@ -364,7 +368,7 @@ static __device__ uint32_t IntegerRLE(orcenc_state_s *s, Storage &temp_storage) { using block_reduce = cub::BlockReduce; - uint8_t *dst = s->chunk.streams[cid] + s->strm_pos[cid]; + uint8_t *dst = s->stream.data_ptrs[cid] + s->strm_pos[cid]; uint32_t out_cnt = 0; __shared__ volatile uint64_t block_vmin; @@ -563,7 +567,7 @@ static __device__ uint32_t IntegerRLE(orcenc_state_s *s, out_cnt += delta_run; } } - if (!t) { s->strm_pos[cid] = static_cast(dst - s->chunk.streams[cid]); } + if (!t) { s->strm_pos[cid] = static_cast(dst - s->stream.data_ptrs[cid]); } __syncthreads(); return out_cnt; } @@ -634,14 +638,14 @@ static const __device__ __constant__ int32_t kTimeScale[10] = { /** * @brief Encode column data * - * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[in] num_columns Number of columns - * @param[in] num_rowgroups Number of row groups + * @param[in] chunks encoder chunks device array [column][rowgroup] + * @param[in, out] chunks cunk streams device array [column][rowgroup] */ // blockDim {512,1,1} template __global__ void __launch_bounds__(block_size) - gpuEncodeOrcColumnData(EncChunk *chunks, uint32_t num_columns, uint32_t num_rowgroups) + gpuEncodeOrcColumnData(device_2dspan chunks, + device_2dspan streams) { __shared__ __align__(16) orcenc_state_s state_g; __shared__ union { @@ -654,8 +658,10 @@ __global__ void __launch_bounds__(block_size) uint32_t col_id = blockIdx.x; uint32_t group_id = blockIdx.y; int t = threadIdx.x; - - if (t == 0) s->chunk = chunks[group_id * num_columns + col_id]; + if (t == 0) { + s->chunk = chunks[col_id][group_id]; + s->stream = streams[col_id][group_id]; + } if (t < CI_NUM_STREAMS) { s->strm_pos[t] = 0; } __syncthreads(); if (!t) { @@ -667,8 +673,8 @@ __global__ void __launch_bounds__(block_size) s->nnz = 0; // Dictionary data is encoded in a separate kernel if (s->chunk.encoding_kind == DICTIONARY_V2) { - s->strm_pos[CI_DATA2] = s->chunk.strm_len[CI_DATA2]; - s->strm_pos[CI_DICTIONARY] = s->chunk.strm_len[CI_DICTIONARY]; + s->strm_pos[CI_DATA2] = s->stream.lengths[CI_DATA2]; + s->strm_pos[CI_DICTIONARY] = s->stream.lengths[CI_DICTIONARY]; } } __syncthreads(); @@ -708,7 +714,7 @@ __global__ void __launch_bounds__(block_size) s->present_out; // Should always be a multiple of 8 except at the end of the last row group if (nrows_out > ((present_rows < s->chunk.num_rows) ? 130 * 8 : 0)) { uint32_t present_out = s->present_out; - if (s->chunk.strm_id[CI_PRESENT] >= 0) { + if (s->stream.ids[CI_PRESENT] >= 0) { uint32_t flush = (present_rows < s->chunk.num_rows) ? 0 : 7; nrows_out = (nrows_out + flush) >> 3; nrows_out = @@ -722,7 +728,7 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); } // Fetch non-null values - if (!s->chunk.streams[CI_DATA]) { + if (!s->stream.data_ptrs[CI_DATA]) { // Pass-through __syncthreads(); if (!t) { @@ -806,7 +812,7 @@ __global__ void __launch_bounds__(block_size) uint32_t nz = s->buf.u32[511]; uint32_t nz_idx = (s->nnz + t) & 0x3ff; uint32_t len = (t < nz && s->u.strenc.str_data[t]) ? s->lengths.u32[nz_idx] : 0; - StoreStringData(s->chunk.streams[CI_DATA] + s->strm_pos[CI_DATA], &s->u.strenc, len, t); + StoreStringData(s->stream.data_ptrs[CI_DATA] + s->strm_pos[CI_DATA], &s->u.strenc, len, t); if (!t) { s->strm_pos[CI_DATA] += s->u.strenc.char_count; } __syncthreads(); } else if (s->chunk.type_kind == BOOLEAN) { @@ -904,11 +910,11 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); } __syncthreads(); - if (t <= CI_PRESENT && s->chunk.strm_id[t] >= 0) { + if (t <= CI_PRESENT && s->stream.ids[t] >= 0) { // Update actual compressed length - chunks[group_id * num_columns + col_id].strm_len[t] = s->strm_pos[t]; - if (!s->chunk.streams[t]) { - chunks[group_id * num_columns + col_id].streams[t] = + streams[col_id][group_id].lengths[t] = s->strm_pos[t]; + if (!s->stream.data_ptrs[t]) { + streams[col_id][group_id].data_ptrs[t] = static_cast(const_cast(s->chunk.column_data_base)) + s->chunk.start_row * s->chunk.dtype_len; } @@ -925,7 +931,9 @@ __global__ void __launch_bounds__(block_size) // blockDim {512,1,1} template __global__ void __launch_bounds__(block_size) - gpuEncodeStringDictionaries(StripeDictionary *stripes, EncChunk *chunks, uint32_t num_columns) + gpuEncodeStringDictionaries(StripeDictionary *stripes, + device_2dspan chunks, + device_2dspan streams) { __shared__ __align__(16) orcenc_state_s state_g; __shared__ typename cub::BlockReduce::TempStorage temp_storage; @@ -933,24 +941,22 @@ __global__ void __launch_bounds__(block_size) orcenc_state_s *const s = &state_g; uint32_t stripe_id = blockIdx.x; uint32_t cid = (blockIdx.y) ? CI_DICTIONARY : CI_DATA2; - uint32_t chunk_id; - int t = threadIdx.x; - const nvstrdesc_s *str_desc; - const uint32_t *dict_data; + int t = threadIdx.x; if (t == 0) s->u.dict_stripe = stripes[stripe_id]; __syncthreads(); - chunk_id = s->u.dict_stripe.start_chunk * num_columns + s->u.dict_stripe.column_id; + auto const strm_ptr = &streams[s->u.dict_stripe.column_id][s->u.dict_stripe.start_chunk]; if (t == 0) { - s->chunk = chunks[chunk_id]; + s->chunk = chunks[s->u.dict_stripe.column_id][s->u.dict_stripe.start_chunk]; + s->stream = *strm_ptr; s->strm_pos[cid] = 0; s->numlengths = 0; s->nrows = s->u.dict_stripe.num_strings; s->cur_row = 0; } - str_desc = static_cast(s->u.dict_stripe.column_data_base); - dict_data = s->u.dict_stripe.dict_data; + auto const str_desc = static_cast(s->u.dict_stripe.column_data_base); + auto const dict_data = s->u.dict_stripe.dict_data; __syncthreads(); if (s->chunk.encoding_kind != DICTIONARY_V2) { return; // This column isn't using dictionary encoding -> bail out @@ -964,7 +970,7 @@ __global__ void __launch_bounds__(block_size) const char *ptr = (t < numvals) ? str_desc[string_idx].ptr : 0; uint32_t count = (t < numvals) ? static_cast(str_desc[string_idx].count) : 0; s->u.strenc.str_data[t] = ptr; - StoreStringData(s->chunk.streams[CI_DICTIONARY] + s->strm_pos[CI_DICTIONARY], + StoreStringData(s->stream.data_ptrs[CI_DICTIONARY] + s->strm_pos[CI_DICTIONARY], &s->u.strenc, (ptr) ? count : 0, t); @@ -989,46 +995,43 @@ __global__ void __launch_bounds__(block_size) if (t == 0) { s->cur_row += numvals; } __syncthreads(); } - if (t == 0) { chunks[chunk_id].strm_len[cid] = s->strm_pos[cid]; } + if (t == 0) { strm_ptr->lengths[cid] = s->strm_pos[cid]; } } /** * @brief Merge chunked column data into a single contiguous stream * * @param[in] strm_desc StripeStream device array [stripe][stream] - * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[in] num_stripe_streams Total number of streams - * @param[in] num_columns Number of columns + * @param[in] streams TODO */ // blockDim {1024,1,1} __global__ void __launch_bounds__(1024) - gpuCompactOrcDataStreams(StripeStream *strm_desc, EncChunk *chunks, uint32_t num_columns) + gpuCompactOrcDataStreams(device_2dspan strm_desc, + device_2dspan streams) { __shared__ __align__(16) StripeStream ss; - __shared__ __align__(16) EncChunk ck0; + __shared__ __align__(16) encoder_chunk_streams strm0; __shared__ uint8_t *volatile ck_curptr_g; __shared__ uint32_t volatile ck_curlen_g; - uint32_t strm_id = blockIdx.x; - uint32_t ck0_id, cid; - uint32_t t = threadIdx.x; - uint8_t *dst_ptr; + auto const stripe_id = blockIdx.x; + auto const stream_id = blockIdx.y; + uint32_t t = threadIdx.x; if (t == 0) { - ss = strm_desc[strm_id]; - ck0 = chunks[ss.first_chunk_id]; + ss = strm_desc[stripe_id][stream_id]; + strm0 = streams[ss.column_id][ss.first_chunk_id]; } __syncthreads(); - ck0_id = ss.first_chunk_id; - cid = ss.stream_type; - dst_ptr = ck0.streams[cid] + ck0.strm_len[cid]; - for (uint32_t g = 1; g < ss.num_chunks; g++) { + auto const cid = ss.stream_type; + auto dst_ptr = strm0.data_ptrs[cid] + strm0.lengths[cid]; + for (auto group = ss.first_chunk_id + 1; group < ss.first_chunk_id + ss.num_chunks; ++group) { uint8_t *src_ptr; uint32_t len; if (t == 0) { - src_ptr = chunks[ck0_id + g * num_columns].streams[cid]; - len = chunks[ck0_id + g * num_columns].strm_len[cid]; - if (src_ptr != dst_ptr) { chunks[ck0_id + g * num_columns].streams[cid] = dst_ptr; } + src_ptr = streams[ss.column_id][group].data_ptrs[cid]; + len = streams[ss.column_id][group].lengths[cid]; + if (src_ptr != dst_ptr) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; } ck_curptr_g = src_ptr; ck_curlen_g = len; } @@ -1045,7 +1048,7 @@ __global__ void __launch_bounds__(1024) dst_ptr += len; __syncthreads(); } - if (!t) { strm_desc[strm_id].stream_size = dst_ptr - ck0.streams[cid]; } + if (!t) { strm_desc[stripe_id][stream_id].stream_size = dst_ptr - strm0.data_ptrs[cid]; } } /** @@ -1059,24 +1062,26 @@ __global__ void __launch_bounds__(1024) * @param[in] comp_blk_size Compression block size */ // blockDim {256,1,1} -__global__ void __launch_bounds__(256) gpuInitCompressionBlocks(StripeStream *strm_desc, - EncChunk *chunks, - gpu_inflate_input_s *comp_in, - gpu_inflate_status_s *comp_out, - uint8_t *compressed_bfr, - uint32_t comp_blk_size) +__global__ void __launch_bounds__(256) + gpuInitCompressionBlocks(device_2dspan strm_desc, + device_2dspan streams, // const? + gpu_inflate_input_s *comp_in, + gpu_inflate_status_s *comp_out, + uint8_t *compressed_bfr, + uint32_t comp_blk_size) { __shared__ __align__(16) StripeStream ss; __shared__ uint8_t *volatile uncomp_base_g; - uint32_t strm_id = blockIdx.x; - uint32_t t = threadIdx.x; + auto const stripe_id = blockIdx.x; + auto const stream_id = blockIdx.y; + uint32_t t = threadIdx.x; uint32_t num_blocks; uint8_t *src, *dst; if (t == 0) { - ss = strm_desc[strm_id]; - uncomp_base_g = chunks[ss.first_chunk_id].streams[ss.stream_type]; + ss = strm_desc[stripe_id][stream_id]; + uncomp_base_g = streams[ss.column_id][ss.first_chunk_id].data_ptrs[ss.stream_type]; } __syncthreads(); src = uncomp_base_g; @@ -1108,23 +1113,25 @@ __global__ void __launch_bounds__(256) gpuInitCompressionBlocks(StripeStream *st * @param[in] comp_blk_size Compression block size */ // blockDim {1024,1,1} -__global__ void __launch_bounds__(1024) gpuCompactCompressedBlocks(StripeStream *strm_desc, - gpu_inflate_input_s *comp_in, - gpu_inflate_status_s *comp_out, - uint8_t *compressed_bfr, - uint32_t comp_blk_size) +__global__ void __launch_bounds__(1024) + gpuCompactCompressedBlocks(device_2dspan strm_desc, + gpu_inflate_input_s *comp_in, + gpu_inflate_status_s *comp_out, + uint8_t *compressed_bfr, + uint32_t comp_blk_size) { __shared__ __align__(16) StripeStream ss; __shared__ const uint8_t *volatile comp_src_g; __shared__ uint32_t volatile comp_len_g; - uint32_t strm_id = blockIdx.x; - uint32_t t = threadIdx.x; + auto const stripe_id = blockIdx.x; + auto const stream_id = blockIdx.y; + uint32_t t = threadIdx.x; uint32_t num_blocks, b, blk_size; const uint8_t *src; uint8_t *dst; - if (t == 0) ss = strm_desc[strm_id]; + if (t == 0) ss = strm_desc[stripe_id][stream_id]; __syncthreads(); num_blocks = (ss.stream_size > 0) ? (ss.stream_size - 1) / comp_blk_size + 1 : 0; @@ -1171,102 +1178,56 @@ __global__ void __launch_bounds__(1024) gpuCompactCompressedBlocks(StripeStream } while (++b < num_blocks); // Update stripe stream with the compressed size if (t == 0) { - strm_desc[strm_id].stream_size = static_cast(dst - (compressed_bfr + ss.bfr_offset)); + strm_desc[stripe_id][stream_id].stream_size = + static_cast(dst - (compressed_bfr + ss.bfr_offset)); } } -/** - * @brief Launches kernel for encoding column data - * - * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[in] num_columns Number of columns - * @param[in] num_rowgroups Number of row groups - * @param[in] stream CUDA stream to use, default 0 - */ -void EncodeOrcColumnData(EncChunk *chunks, - uint32_t num_columns, - uint32_t num_rowgroups, +void EncodeOrcColumnData(device_2dspan chunks, + device_2dspan streams, rmm::cuda_stream_view stream) { dim3 dim_block(512, 1); // 512 threads per chunk - dim3 dim_grid(num_columns, num_rowgroups); - gpuEncodeOrcColumnData<512> - <<>>(chunks, num_columns, num_rowgroups); + dim3 dim_grid(chunks.size().first, chunks.size().second); + gpuEncodeOrcColumnData<512><<>>(chunks, streams); } -/** - * @brief Launches kernel for encoding column dictionaries - * - * @param[in] stripes Stripe dictionaries device array [stripe][string_column] - * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[in] num_string_columns Number of string columns - * @param[in] num_columns Number of columns - * @param[in] num_stripes Number of stripes - * @param[in] stream CUDA stream to use, default 0 - */ void EncodeStripeDictionaries(StripeDictionary *stripes, - EncChunk *chunks, + device_2dspan chunks, uint32_t num_string_columns, - uint32_t num_columns, uint32_t num_stripes, + device_2dspan enc_streams, rmm::cuda_stream_view stream) { dim3 dim_block(512, 1); // 512 threads per dictionary dim3 dim_grid(num_string_columns * num_stripes, 2); gpuEncodeStringDictionaries<512> - <<>>(stripes, chunks, num_columns); + <<>>(stripes, chunks, enc_streams); } -/** - * @brief Launches kernel for compacting chunked column data prior to compression - * - * @param[in] strm_desc StripeStream device array [stripe][stream] - * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[in] num_stripe_streams Total number of streams - * @param[in] num_columns Number of columns - * @param[in] stream CUDA stream to use, default 0 - */ -void CompactOrcDataStreams(StripeStream *strm_desc, - EncChunk *chunks, - uint32_t num_stripe_streams, - uint32_t num_columns, +void CompactOrcDataStreams(device_2dspan strm_desc, + device_2dspan enc_streams, rmm::cuda_stream_view stream) { dim3 dim_block(1024, 1); - dim3 dim_grid(num_stripe_streams, 1); - gpuCompactOrcDataStreams<<>>( - strm_desc, chunks, num_columns); + dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); + gpuCompactOrcDataStreams<<>>(strm_desc, enc_streams); } -/** - * @brief Launches kernel(s) for compressing data streams - * - * @param[in] compressed_data Output compressed blocks - * @param[in] strm_desc StripeStream device array [stripe][stream] - * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[out] comp_in Per-block compression input parameters - * @param[out] comp_out Per-block compression status - * @param[in] num_stripe_streams Total number of streams - * @param[in] num_compressed_blocks Total number of compressed blocks - * @param[in] compression Type of compression - * @param[in] comp_blk_size Compression block size - * @param[in] stream CUDA stream to use, default 0 - */ void CompressOrcDataStreams(uint8_t *compressed_data, - StripeStream *strm_desc, - EncChunk *chunks, - gpu_inflate_input_s *comp_in, - gpu_inflate_status_s *comp_out, - uint32_t num_stripe_streams, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, + detail::device_2dspan strm_desc, + detail::device_2dspan enc_streams, + gpu_inflate_input_s *comp_in, + gpu_inflate_status_s *comp_out, rmm::cuda_stream_view stream) { dim3 dim_block_init(256, 1); - dim3 dim_grid(num_stripe_streams, 1); + dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); gpuInitCompressionBlocks<<>>( - strm_desc, chunks, comp_in, comp_out, compressed_data, comp_blk_size); + strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size); if (compression == SNAPPY) { gpu_snap(comp_in, comp_out, num_compressed_blocks, stream); } dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index bd9f6694be4..61917403b41 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -485,7 +485,7 @@ void __host__ PostDecompressionReassemble(CompressedStreamInfo *strm_info, * @param[in] num_columns Number of columns * @param[in] num_stripes Number of stripes * @param[in] num_rowgroups Number of row groups - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void __host__ ParseRowGroupIndex(RowGroup *row_groups, CompressedStreamInfo *strm_info, diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 7198539a460..1e634849998 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -26,10 +26,11 @@ #include #include -#include +#include #include #include +#include #include namespace cudf { @@ -160,7 +161,7 @@ class orc_column_view { _data_count(col.size()), _null_count(col.null_count()), _data(col.head() + col.offset() * _type_width), - _nulls(col.nullable() ? col.null_mask() : nullptr), + _nulls(col.null_mask()), _column_offset(col.offset()), _clockscale(to_clockscale(col.type().id())), _type_kind(to_orc_type(col.type().id())) @@ -200,7 +201,7 @@ class orc_column_view { dict = host_dict; d_dict = dev_dict; } - auto host_dict_chunk(size_t rowgroup) + auto host_dict_chunk(size_t rowgroup) const { assert(_string_type); return &dict[rowgroup * dict_stride + _str_id]; @@ -223,6 +224,7 @@ class orc_column_view { } auto device_stripe_dict() const { return d_stripe_dict; } + auto id() const noexcept { return _id; } size_t type_width() const noexcept { return _type_width; } size_t data_count() const noexcept { return _data_count; } size_t null_count() const noexcept { return _null_count; } @@ -265,28 +267,64 @@ class orc_column_view { gpu::StripeDictionary *d_stripe_dict = nullptr; }; +std::vector writer::impl::gather_stripe_info( + host_span columns, size_t num_rowgroups) +{ + auto const is_any_column_string = + std::any_of(columns.begin(), columns.end(), [](auto const &col) { return col.is_string(); }); + // Apply rows per stripe limit to limit string dictionaries + size_t const max_stripe_rows = is_any_column_string ? 1000000 : 5000000; + + std::vector infos; + for (size_t rowgroup = 0, stripe_start = 0, stripe_size = 0; rowgroup < num_rowgroups; + ++rowgroup) { + auto const rowgroup_size = + std::accumulate(columns.begin(), columns.end(), 0ul, [&](size_t total_size, auto const &col) { + if (col.is_string()) { + const auto dt = col.host_dict_chunk(rowgroup); + return total_size + row_index_stride_ + dt->string_char_count; + } else { + return total_size + col.type_width() * row_index_stride_; + } + }); + + if ((rowgroup > stripe_start) && + (stripe_size + rowgroup_size > max_stripe_size_ || + (rowgroup + 1 - stripe_start) * row_index_stride_ > max_stripe_rows)) { + infos.emplace_back(infos.size(), stripe_start, rowgroup - stripe_start); + stripe_start = rowgroup; + stripe_size = 0; + } + stripe_size += rowgroup_size; + if (rowgroup + 1 == num_rowgroups) { + infos.emplace_back(infos.size(), stripe_start, num_rowgroups - stripe_start); + } + } + + return infos; +} + void writer::impl::init_dictionaries(orc_column_view *columns, - size_t num_rows, std::vector const &str_col_ids, uint32_t *dict_data, uint32_t *dict_index, - hostdevice_vector &dict) + hostdevice_vector *dict) { - const size_t num_rowgroups = dict.size() / str_col_ids.size(); + const size_t num_rowgroups = dict->size() / str_col_ids.size(); // Setup per-rowgroup dictionary indexes for each dictionary-aware column for (size_t i = 0; i < str_col_ids.size(); ++i) { auto &str_column = columns[str_col_ids[i]]; str_column.set_dict_stride(str_col_ids.size()); - str_column.attach_dict_chunk(dict.host_ptr(), dict.device_ptr()); + str_column.attach_dict_chunk(dict->host_ptr(), dict->device_ptr()); for (size_t g = 0; g < num_rowgroups; g++) { - auto *ck = &dict[g * str_col_ids.size() + i]; + auto *ck = &(*dict)[g * str_col_ids.size() + i]; ck->valid_map_base = str_column.nulls(); ck->column_offset = str_column.column_offset(); ck->column_data_base = str_column.data(); - ck->dict_data = dict_data + i * num_rows + g * row_index_stride_; - ck->dict_index = dict_index + i * num_rows; // Indexed by abs row + ck->dict_data = dict_data + i * str_column.data_count() + g * row_index_stride_; + ck->dict_index = dict_index + i * str_column.data_count(); // Indexed by abs row ck->start_row = g * row_index_stride_; ck->num_rows = std::min(row_index_stride_, std::max(str_column.data_count() - ck->start_row, 0)); @@ -297,51 +335,59 @@ void writer::impl::init_dictionaries(orc_column_view *columns, } } - dict.host_to_device(stream); - gpu::InitDictionaryIndices(dict.device_ptr(), str_col_ids.size(), num_rowgroups, stream); - dict.device_to_host(stream, true); + dict->host_to_device(stream); + gpu::InitDictionaryIndices(dict->device_ptr(), str_col_ids.size(), num_rowgroups, stream); + dict->device_to_host(stream, true); } void writer::impl::build_dictionaries(orc_column_view *columns, - size_t num_rows, std::vector const &str_col_ids, - std::vector const &stripe_list, + host_span stripe_bounds, hostdevice_vector const &dict, uint32_t *dict_index, hostdevice_vector &stripe_dict) { const auto num_rowgroups = dict.size() / str_col_ids.size(); - for (size_t i = 0; i < str_col_ids.size(); i++) { - size_t direct_cost = 0, dict_cost = 0; - auto &str_column = columns[str_col_ids[i]]; + for (size_t col_idx = 0; col_idx < str_col_ids.size(); ++col_idx) { + auto &str_column = columns[str_col_ids[col_idx]]; str_column.attach_stripe_dict(stripe_dict.host_ptr(), stripe_dict.device_ptr()); - for (size_t j = 0, g = 0; j < stripe_list.size(); j++) { - const auto num_chunks = stripe_list[j]; - auto *sd = &stripe_dict[j * str_col_ids.size() + i]; - sd->column_data_base = str_column.host_dict_chunk(0)->column_data_base; - sd->dict_data = str_column.host_dict_chunk(g)->dict_data; - sd->dict_index = dict_index + i * num_rows; // Indexed by abs row - sd->column_id = str_col_ids[i]; - sd->start_chunk = (uint32_t)g; - sd->num_chunks = num_chunks; - sd->num_strings = 0; - sd->dict_char_count = 0; - for (size_t k = g; k < g + num_chunks; k++) { - const auto &dt = dict[k * str_col_ids.size() + i]; - sd->num_strings += dt.num_dict_strings; - direct_cost += dt.string_char_count; - dict_cost += dt.dict_char_count + dt.num_dict_strings; - } - - g += num_chunks; + for (auto const &stripe : stripe_bounds) { + auto &sd = stripe_dict[stripe.id * str_col_ids.size() + col_idx]; + sd.column_data_base = str_column.host_dict_chunk(0)->column_data_base; + sd.dict_data = str_column.host_dict_chunk(stripe.first)->dict_data; + sd.dict_index = dict_index + col_idx * str_column.data_count(); // Indexed by abs row + sd.column_id = str_col_ids[col_idx]; + sd.start_chunk = stripe.first; + sd.num_chunks = stripe.size; + sd.dict_char_count = 0; + sd.num_strings = + std::accumulate(stripe.cbegin(), stripe.cend(), 0, [&](auto dt_str_cnt, auto rg_idx) { + const auto &dt = dict[rg_idx * str_col_ids.size() + col_idx]; + return dt_str_cnt + dt.num_dict_strings; + }); } - // Early disable of dictionary if it doesn't look good at the chunk level - if (enable_dictionary_ && dict_cost >= direct_cost) { - for (size_t j = 0; j < stripe_list.size(); j++) { - stripe_dict[j * str_col_ids.size() + i].dict_data = nullptr; + if (enable_dictionary_) { + struct string_column_cost { + size_t direct = 0; + size_t dictionary = 0; + }; + auto const col_cost = + std::accumulate(stripe_bounds.front().cbegin(), + stripe_bounds.back().cend(), + string_column_cost{}, + [&](auto cost, auto rg_idx) -> string_column_cost { + const auto &dt = dict[rg_idx * str_col_ids.size() + col_idx]; + return {cost.dictionary + dt.dict_char_count + dt.num_dict_strings, + cost.direct + dt.string_char_count}; + }); + // Disable dictionary if it does not reduce the output size + if (col_cost.dictionary >= col_cost.direct) { + for (auto const &stripe : stripe_bounds) { + stripe_dict[stripe.id * str_col_ids.size() + col_idx].dict_data = nullptr; + } } } } @@ -350,28 +396,23 @@ void writer::impl::build_dictionaries(orc_column_view *columns, gpu::BuildStripeDictionaries(stripe_dict.device_ptr(), stripe_dict.host_ptr(), dict.device_ptr(), - stripe_list.size(), + stripe_bounds.size(), num_rowgroups, str_col_ids.size(), stream); stripe_dict.device_to_host(stream, true); } -std::vector writer::impl::gather_streams(orc_column_view *columns, - size_t num_columns, - size_t num_rows, - std::vector const &stripe_list, - std::vector &strm_ids) +orc_streams writer::impl::create_streams(host_span columns, + host_span stripe_bounds) { // First n + 1 streams are row index streams, including 'column 0' - std::vector streams; - streams.resize(num_columns + 1); - streams[0].column = 0; - streams[0].kind = ROW_INDEX; - streams[0].length = 0; - - for (size_t i = 0; i < num_columns; ++i) { - TypeKind kind = columns[i].orc_kind(); + std::vector streams{{ROW_INDEX, 0, 0}}; // TODO: Separate index and data streams? + streams.resize(columns.size() + 1); + std::vector ids(columns.size() * gpu::CI_NUM_STREAMS, -1); + + for (auto &column : columns) { + TypeKind kind = column.orc_kind(); StreamKind data_kind = DATA; StreamKind data2_kind = LENGTH; ColumnEncodingKind encoding_kind = DIRECT; @@ -380,15 +421,16 @@ std::vector writer::impl::gather_streams(orc_column_view *columns, int64_t data_stream_size = 0; int64_t data2_stream_size = 0; int64_t dict_stream_size = 0; - bool is_nullable; - if (single_write_mode) { - is_nullable = (columns[i].nullable() || columns[i].data_count() < num_rows); - } else { - is_nullable = (i < user_metadata_with_nullability.column_nullable.size()) - ? user_metadata_with_nullability.column_nullable[i] - : true; - } + auto const is_nullable = [&]() { + if (single_write_mode) { + return column.nullable(); + } else { + return (column.id() < user_metadata_with_nullability.column_nullable.size()) + ? user_metadata_with_nullability.column_nullable[column.id()] + : true; + } + }(); if (is_nullable) { present_stream_size = ((row_index_stride_ + 7) >> 3); present_stream_size += (present_stream_size + 0x7f) >> 7; @@ -409,9 +451,8 @@ std::vector writer::impl::gather_streams(orc_column_view *columns, break; case TypeKind::FLOAT: // Pass through if no nulls (no RLE encoding for floating point) - data_stream_size = (columns[i].null_count() != 0) - ? div_rowgroups_by(512) * (512 * 4 + 2) - : INT64_C(-1); + data_stream_size = + (column.null_count() != 0) ? div_rowgroups_by(512) * (512 * 4 + 2) : INT64_C(-1); encoding_kind = DIRECT; break; case TypeKind::INT: @@ -421,9 +462,8 @@ std::vector writer::impl::gather_streams(orc_column_view *columns, break; case TypeKind::DOUBLE: // Pass through if no nulls (no RLE encoding for floating point) - data_stream_size = (columns[i].null_count() != 0) - ? div_rowgroups_by(512) * (512 * 8 + 2) - : INT64_C(-1); + data_stream_size = + (column.null_count() != 0) ? div_rowgroups_by(512) * (512 * 8 + 2) : INT64_C(-1); encoding_kind = DIRECT; break; case TypeKind::LONG: @@ -436,25 +476,25 @@ std::vector writer::impl::gather_streams(orc_column_view *columns, size_t dict_data_size = 0; size_t dict_strings = 0; size_t dict_lengths_div512 = 0; - for (size_t stripe = 0, g = 0; stripe < stripe_list.size(); stripe++) { - const auto sd = columns[i].host_stripe_dict(stripe); + for (auto const &stripe : stripe_bounds) { + const auto sd = column.host_stripe_dict(stripe.id); enable_dict = (enable_dict && sd->dict_data != nullptr); if (enable_dict) { dict_strings += sd->num_strings; dict_lengths_div512 += (sd->num_strings + 0x1ff) >> 9; dict_data_size += sd->dict_char_count; } - - for (uint32_t k = 0; k < stripe_list[stripe]; k++, g++) { - direct_data_size += columns[i].host_dict_chunk(g)->string_char_count; - } + direct_data_size += std::accumulate( + stripe.cbegin(), stripe.cend(), direct_data_size, [&](auto data_size, auto rg_idx) { + return data_size + column.host_dict_chunk(rg_idx)->string_char_count; + }); } if (enable_dict) { uint32_t dict_bits = 0; for (dict_bits = 1; dict_bits < 32; dict_bits <<= 1) { if (dict_strings <= (1ull << dict_bits)) break; } - const auto valid_count = columns[i].data_count() - columns[i].null_count(); + const auto valid_count = column.data_count() - column.null_count(); dict_data_size += (dict_bits * valid_count + 7) >> 3; } @@ -480,40 +520,65 @@ std::vector writer::impl::gather_streams(orc_column_view *columns, default: CUDF_FAIL("Unsupported ORC type kind"); } - // Initialize the column's metadata - columns[i].set_orc_encoding(encoding_kind); + // Initialize the column's metadata (this is the only reason columns is in/out param) + column.set_orc_encoding(encoding_kind); // Initialize the column's index stream - const auto id = static_cast(1 + i); + const auto id = static_cast(1 + column.id()); streams[id].column = id; streams[id].kind = ROW_INDEX; streams[id].length = 0; // Initialize the column's data stream(s) - const auto base = i * gpu::CI_NUM_STREAMS; + const auto base = column.id() * gpu::CI_NUM_STREAMS; if (present_stream_size != 0) { - auto len = static_cast(present_stream_size); - strm_ids[base + gpu::CI_PRESENT] = streams.size(); + auto len = static_cast(present_stream_size); + ids[base + gpu::CI_PRESENT] = streams.size(); streams.push_back(orc::Stream{PRESENT, id, len}); } if (data_stream_size != 0) { - auto len = static_cast(std::max(data_stream_size, 0)); - strm_ids[base + gpu::CI_DATA] = streams.size(); + auto len = static_cast(std::max(data_stream_size, 0)); + ids[base + gpu::CI_DATA] = streams.size(); streams.push_back(orc::Stream{data_kind, id, len}); } if (data2_stream_size != 0) { - auto len = static_cast(std::max(data2_stream_size, 0)); - strm_ids[base + gpu::CI_DATA2] = streams.size(); + auto len = static_cast(std::max(data2_stream_size, 0)); + ids[base + gpu::CI_DATA2] = streams.size(); streams.push_back(orc::Stream{data2_kind, id, len}); } if (dict_stream_size != 0) { - auto len = static_cast(dict_stream_size); - strm_ids[base + gpu::CI_DICTIONARY] = streams.size(); + auto len = static_cast(dict_stream_size); + ids[base + gpu::CI_DICTIONARY] = streams.size(); streams.push_back(orc::Stream{DICTIONARY_DATA, id, len}); } } + return {std::move(streams), std::move(ids)}; +} - return streams; +orc_streams::orc_stream_offsets orc_streams::compute_offsets( + host_span columns, size_t num_rowgroups) const +{ + std::vector strm_offsets(streams.size()); + size_t str_data_size = 0; + size_t rle_data_size = 0; + for (size_t i = 0; i < streams.size(); ++i) { + const auto &stream = streams[i]; + const auto &column = columns[stream.column - 1]; + + if (((stream.kind == DICTIONARY_DATA || stream.kind == LENGTH) && + (column.orc_encoding() == DICTIONARY_V2)) || + ((stream.kind == DATA) && + (column.orc_kind() == TypeKind::STRING && column.orc_encoding() == DIRECT_V2))) { + strm_offsets[i] = str_data_size; + str_data_size += stream.length; + } else { + strm_offsets[i] = rle_data_size; + rle_data_size += (stream.length * num_rowgroups + 7) & ~7; + } + } + str_data_size = (str_data_size + 7) & ~7; + + return {std::move(strm_offsets), str_data_size, rle_data_size}; } struct segmented_valid_cnt_input { @@ -521,229 +586,210 @@ struct segmented_valid_cnt_input { std::vector indices; }; -rmm::device_buffer writer::impl::encode_columns(orc_column_view *columns, - size_t num_columns, - size_t num_rows, - size_t num_rowgroups, - std::vector const &str_col_ids, - std::vector const &stripe_list, - std::vector const &streams, - std::vector const &strm_ids, - hostdevice_vector &chunks) +encoded_data writer::impl::encode_columns(host_span columns, + std::vector const &str_col_ids, + host_span stripe_bounds, + orc_streams const &streams) { - // Allocate combined buffer for RLE data and string data output - std::vector strm_offsets(streams.size()); - size_t str_data_size = 0; - auto output = [&]() { - size_t rle_data_size = 0; - for (size_t i = 0; i < streams.size(); ++i) { - const auto &stream = streams[i]; - const auto &column = columns[stream.column - 1]; - - if (((stream.kind == DICTIONARY_DATA || stream.kind == LENGTH) && - (column.orc_encoding() == DICTIONARY_V2)) || - ((stream.kind == DATA) && - (column.orc_kind() == TypeKind::STRING && column.orc_encoding() == DIRECT_V2))) { - strm_offsets[i] = str_data_size; - str_data_size += stream.length; - } else { - strm_offsets[i] = rle_data_size; - rle_data_size += (stream.length * num_rowgroups + 7) & ~7; - } - } - str_data_size = (str_data_size + 7) & ~7; - - return rmm::device_buffer(rle_data_size + str_data_size, stream); - }(); - auto dst_base = static_cast(output.data()); + auto const num_columns = columns.size(); + auto const num_rowgroups = stripes_size(stripe_bounds); + hostdevice_2dvector chunks(num_columns, num_rowgroups); + hostdevice_2dvector chunk_streams(num_columns, num_rowgroups); + auto const stream_offsets = streams.compute_offsets(columns, num_rowgroups); + rmm::device_uvector encoded_data(stream_offsets.data_size(), stream); // Initialize column chunks' descriptions - size_t stripe_start = 0; - size_t stripe_id = 0; std::map validity_check_inputs; - for (size_t j = 0; j < num_rowgroups; j++) { - for (size_t i = 0; i < num_columns; i++) { - auto *ck = &chunks[j * num_columns + i]; - ck->start_row = (j * row_index_stride_); - ck->num_rows = std::min(row_index_stride_, num_rows - ck->start_row); - ck->valid_rows = columns[i].data_count(); - ck->encoding_kind = columns[i].orc_encoding(); - ck->type_kind = columns[i].orc_kind(); - if (ck->type_kind == TypeKind::STRING) { - ck->valid_map_base = columns[i].nulls(); - ck->column_offset = columns[i].column_offset(); - ck->column_data_base = (ck->encoding_kind == DICTIONARY_V2) - ? columns[i].host_stripe_dict(stripe_id)->dict_index - : columns[i].data(); - ck->dtype_len = 1; - } else { - ck->valid_map_base = columns[i].nulls(); - ck->column_offset = columns[i].column_offset(); - ck->column_data_base = columns[i].data(); - ck->dtype_len = columns[i].type_width(); - } - ck->scale = columns[i].clockscale(); - - // Only need to check row groups that end within the stripe - if (ck->type_kind == TypeKind::BOOLEAN && columns[i].nullable() && - j + 1 != stripe_start + stripe_list[stripe_id]) { - auto curr_cnt_in = validity_check_inputs.find(i); - if (curr_cnt_in == validity_check_inputs.end()) { - bool unused; - // add new object - std::tie(curr_cnt_in, unused) = validity_check_inputs.insert({i, {columns[i].nulls()}}); + + for (auto const &column : columns) { + for (auto const &stripe : stripe_bounds) { + for (auto rg_idx_it = stripe.cbegin(); rg_idx_it < stripe.cend(); ++rg_idx_it) { + auto const rg_idx = *rg_idx_it; + auto &ck = chunks[column.id()][rg_idx]; + + ck.start_row = (rg_idx * row_index_stride_); + ck.num_rows = std::min(row_index_stride_, column.data_count() - ck.start_row); + ck.valid_rows = column.data_count(); + ck.encoding_kind = column.orc_encoding(); + ck.type_kind = column.orc_kind(); + if (ck.type_kind == TypeKind::STRING) { + ck.valid_map_base = column.nulls(); + ck.column_offset = column.column_offset(); + ck.column_data_base = (ck.encoding_kind == DICTIONARY_V2) + ? column.host_stripe_dict(stripe.id)->dict_index + : column.data(); + ck.dtype_len = 1; + } else { + ck.valid_map_base = column.nulls(); + ck.column_offset = column.column_offset(); + ck.column_data_base = column.data(); + ck.dtype_len = column.type_width(); } - // append row group start and end to existing object - curr_cnt_in->second.indices.push_back(ck->start_row); - curr_cnt_in->second.indices.push_back(ck->start_row + ck->num_rows); + ck.scale = column.clockscale(); + // Only need to check row groups that end within the stripe } + } + } - for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) { - const auto strm_id = strm_ids[i * gpu::CI_NUM_STREAMS + k]; - - ck->strm_id[k] = strm_id; - if (strm_id >= 0) { - if ((k == gpu::CI_DICTIONARY) || - (k == gpu::CI_DATA2 && ck->encoding_kind == DICTIONARY_V2)) { - if (j == stripe_start) { - const int32_t dict_stride = columns[i].get_dict_stride(); - const auto stripe = columns[i].host_stripe_dict(stripe_id); - ck->strm_len[k] = (k == gpu::CI_DICTIONARY) - ? stripe->dict_char_count - : (((stripe->num_strings + 0x1ff) >> 9) * (512 * 4 + 2)); - if (stripe_id == 0) { - ck->streams[k] = dst_base + strm_offsets[strm_id]; + auto validity_check_indices = [&](size_t col_idx) { + std::vector indices; + for (auto const &stripe : stripe_bounds) { + for (auto rg_idx_it = stripe.cbegin(); rg_idx_it < stripe.cend() - 1; ++rg_idx_it) { + auto const &chunk = chunks[col_idx][*rg_idx_it]; + indices.push_back(chunk.start_row); + indices.push_back(chunk.start_row + chunk.num_rows); + } + } + return indices; + }; + for (auto const &column : columns) { + if (column.orc_kind() == TypeKind::BOOLEAN && column.nullable()) { + validity_check_inputs[column.id()] = {column.nulls(), validity_check_indices(column.id())}; + } + } + for (auto &cnt_in : validity_check_inputs) { + auto const valid_counts = segmented_count_set_bits(cnt_in.second.mask, cnt_in.second.indices); + CUDF_EXPECTS(std::none_of(valid_counts.cbegin(), + valid_counts.cend(), + [](auto valid_count) { return valid_count % 8; }), + "There's currently a bug in encoding boolean columns. Suggested workaround " + "is to convert " + "to " + "int8 type. Please see https://github.com/rapidsai/cudf/issues/6763 for " + "more information."); + } + + for (size_t col_idx = 0; col_idx < num_columns; col_idx++) { + auto const &column = columns[col_idx]; + auto col_streams = chunk_streams[col_idx]; + for (auto const &stripe : stripe_bounds) { + for (auto rg_idx_it = stripe.cbegin(); rg_idx_it < stripe.cend(); ++rg_idx_it) { + auto const rg_idx = *rg_idx_it; + auto const &ck = chunks[col_idx][rg_idx]; + auto &strm = col_streams[rg_idx]; + + for (int strm_type = 0; strm_type < gpu::CI_NUM_STREAMS; ++strm_type) { + auto const strm_id = streams.id(col_idx * gpu::CI_NUM_STREAMS + strm_type); + + strm.ids[strm_type] = strm_id; + if (strm_id >= 0) { + if ((strm_type == gpu::CI_DICTIONARY) || + (strm_type == gpu::CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)) { + if (rg_idx_it == stripe.cbegin()) { + const int32_t dict_stride = column.get_dict_stride(); + const auto stripe_dict = column.host_stripe_dict(stripe.id); + strm.lengths[strm_type] = + (strm_type == gpu::CI_DICTIONARY) + ? stripe_dict->dict_char_count + : (((stripe_dict->num_strings + 0x1ff) >> 9) * (512 * 4 + 2)); + if (stripe.id == 0) { + strm.data_ptrs[strm_type] = encoded_data.data() + stream_offsets.offsets[strm_id]; + } else { + auto const &strm_up = col_streams[stripe_dict[-dict_stride].start_chunk]; + strm.data_ptrs[strm_type] = + strm_up.data_ptrs[strm_type] + strm_up.lengths[strm_type]; + } } else { - const auto *ck_up = &chunks[stripe[-dict_stride].start_chunk * num_columns + i]; - ck->streams[k] = ck_up->streams[k] + ck_up->strm_len[k]; + strm.lengths[strm_type] = 0; + strm.data_ptrs[strm_type] = col_streams[rg_idx - 1].data_ptrs[strm_type]; } + } else if (strm_type == gpu::CI_DATA && ck.type_kind == TypeKind::STRING && + ck.encoding_kind == DIRECT_V2) { + strm.lengths[strm_type] = column.host_dict_chunk(rg_idx)->string_char_count; + auto const &prev_strm = col_streams[rg_idx - 1]; + strm.data_ptrs[strm_type] = + (rg_idx == 0) ? encoded_data.data() + stream_offsets.offsets[strm_id] + : (prev_strm.data_ptrs[strm_type] + prev_strm.lengths[strm_type]); + } else if (strm_type == gpu::CI_DATA && streams[strm_id].length == 0 && + (ck.type_kind == DOUBLE || ck.type_kind == FLOAT)) { + // Pass-through + strm.lengths[strm_type] = ck.num_rows * ck.dtype_len; + strm.data_ptrs[strm_type] = nullptr; } else { - ck->strm_len[k] = 0; - ck->streams[k] = ck[-num_columns].streams[k]; + strm.lengths[strm_type] = streams[strm_id].length; + strm.data_ptrs[strm_type] = encoded_data.data() + stream_offsets.str_data_size + + stream_offsets.offsets[strm_id] + + streams[strm_id].length * rg_idx; } - } else if (k == gpu::CI_DATA && ck->type_kind == TypeKind::STRING && - ck->encoding_kind == DIRECT_V2) { - ck->strm_len[k] = columns[i].host_dict_chunk(j)->string_char_count; - ck->streams[k] = (j == 0) - ? dst_base + strm_offsets[strm_id] - : (ck[-num_columns].streams[k] + ck[-num_columns].strm_len[k]); - } else if (k == gpu::CI_DATA && streams[strm_id].length == 0 && - (ck->type_kind == DOUBLE || ck->type_kind == FLOAT)) { - // Pass-through - ck->strm_len[k] = ck->num_rows * ck->dtype_len; - ck->streams[k] = nullptr; } else { - ck->strm_len[k] = streams[strm_id].length; - ck->streams[k] = - dst_base + str_data_size + strm_offsets[strm_id] + streams[strm_id].length * j; + strm.lengths[strm_type] = 0; + strm.data_ptrs[strm_type] = nullptr; } - } else { - ck->strm_len[k] = 0; - ck->streams[k] = nullptr; } } } - - // Track the current stripe this rowgroup chunk belongs - if (j + 1 == stripe_start + stripe_list[stripe_id]) { - stripe_start = j + 1; - stripe_id++; - } - } - - for (auto &cnt_in : validity_check_inputs) { - auto const valid_counts = segmented_count_set_bits(cnt_in.second.mask, cnt_in.second.indices); - CUDF_EXPECTS( - std::none_of(valid_counts.cbegin(), - valid_counts.cend(), - [](auto valid_count) { return valid_count % 8; }), - "There's currently a bug in encoding boolean columns. Suggested workaround is to convert to " - "int8 type. Please see https://github.com/rapidsai/cudf/issues/6763 for more information."); } chunks.host_to_device(stream); + chunk_streams.host_to_device(stream); + if (!str_col_ids.empty()) { auto d_stripe_dict = columns[str_col_ids[0]].device_stripe_dict(); - gpu::EncodeStripeDictionaries(d_stripe_dict, - chunks.device_ptr(), - str_col_ids.size(), - num_columns, - stripe_list.size(), - stream); + gpu::EncodeStripeDictionaries( + d_stripe_dict, chunks, str_col_ids.size(), stripe_bounds.size(), chunk_streams, stream); } - gpu::EncodeOrcColumnData(chunks.device_ptr(), num_columns, num_rowgroups, stream); + + gpu::EncodeOrcColumnData(chunks, chunk_streams, stream); stream.synchronize(); - return output; + return {std::move(encoded_data), std::move(chunk_streams)}; } std::vector writer::impl::gather_stripes( - size_t num_columns, size_t num_rows, size_t num_index_streams, - size_t num_data_streams, - std::vector const &stripe_list, - hostdevice_vector &chunks, - hostdevice_vector &strm_desc) + host_span stripe_bounds, + hostdevice_2dvector *enc_streams, + hostdevice_2dvector *strm_desc) { - std::vector stripes(stripe_list.size()); - size_t group = 0; - size_t stripe_start = 0; - for (size_t s = 0; s < stripe_list.size(); s++) { - size_t stripe_group_end = group + stripe_list[s]; - - for (size_t i = 0; i < num_columns; i++) { - const auto *ck = &chunks[group * num_columns + i]; + std::vector stripes(stripe_bounds.size()); + for (auto const &stripe : stripe_bounds) { + for (size_t col_idx = 0; col_idx < enc_streams->size().first; col_idx++) { + const auto &strm = (*enc_streams)[col_idx][stripe.first]; // Assign stream data of column data stream(s) for (int k = 0; k < gpu::CI_INDEX; k++) { - const auto stream_id = ck->strm_id[k]; + const auto stream_id = strm.ids[k]; if (stream_id != -1) { - auto *ss = &strm_desc[s * num_data_streams + stream_id - num_index_streams]; + auto *ss = &(*strm_desc)[stripe.id][stream_id - num_index_streams]; ss->stream_size = 0; - ss->first_chunk_id = (group * num_columns + i); - ss->num_chunks = (stripe_group_end - group); - ss->column_id = i; + ss->first_chunk_id = stripe.first; + ss->num_chunks = stripe.size; + ss->column_id = col_idx; ss->stream_type = k; } } } - group = stripe_group_end; - size_t stripe_end = std::min(group * row_index_stride_, num_rows); - stripes[s].numberOfRows = stripe_end - stripe_start; - stripe_start = stripe_end; + auto const stripe_group_end = *stripe.cend(); + auto const stripe_end = std::min(stripe_group_end * row_index_stride_, num_rows); + stripes[stripe.id].numberOfRows = stripe_end - stripe.first * row_index_stride_; } - strm_desc.host_to_device(stream); - gpu::CompactOrcDataStreams( - strm_desc.device_ptr(), chunks.device_ptr(), strm_desc.size(), num_columns, stream); - strm_desc.device_to_host(stream); - chunks.device_to_host(stream, true); + strm_desc->host_to_device(stream); + gpu::CompactOrcDataStreams(*strm_desc, *enc_streams, stream); + strm_desc->device_to_host(stream); + enc_streams->device_to_host(stream, true); return stripes; } std::vector> writer::impl::gather_statistic_blobs( - orc_column_view const *columns, - size_t num_columns, - size_t num_rows, - size_t num_rowgroups, - std::vector const &stripe_list, - std::vector const &stripes, - hostdevice_vector &chunks) + host_span columns, host_span stripe_bounds) { - size_t num_stat_blobs = (1 + stripe_list.size()) * num_columns; - size_t num_chunks = chunks.size(); + auto const num_rowgroups = stripes_size(stripe_bounds); + size_t num_stat_blobs = (1 + stripe_bounds.size()) * columns.size(); + size_t num_chunks = num_rowgroups * columns.size(); + std::vector> stat_blobs(num_stat_blobs); - hostdevice_vector stat_desc(num_columns); + hostdevice_vector stat_desc(columns.size()); hostdevice_vector stat_merge(num_stat_blobs); - rmm::device_vector stat_chunks(num_chunks + num_stat_blobs); - rmm::device_vector stat_groups(num_chunks); + rmm::device_uvector stat_chunks(num_chunks + num_stat_blobs, stream); + rmm::device_uvector stat_groups(num_chunks, stream); - for (size_t i = 0; i < num_columns; i++) { - stats_column_desc *desc = &stat_desc[i]; - switch (columns[i].orc_kind()) { + for (auto const &column : columns) { + stats_column_desc *desc = &stat_desc[column.id()]; + switch (column.orc_kind()) { case TypeKind::BYTE: desc->stats_dtype = dtype_int8; break; case TypeKind::SHORT: desc->stats_dtype = dtype_int16; break; case TypeKind::INT: desc->stats_dtype = dtype_int32; break; @@ -756,14 +802,14 @@ std::vector> writer::impl::gather_statistic_blobs( case TypeKind::STRING: desc->stats_dtype = dtype_string; break; default: desc->stats_dtype = dtype_none; break; } - desc->num_rows = columns[i].data_count(); - desc->num_values = columns[i].data_count(); - desc->valid_map_base = columns[i].nulls(); - desc->column_offset = columns[i].column_offset(); - desc->column_data_base = columns[i].data(); + desc->num_rows = column.data_count(); + desc->num_values = column.data_count(); + desc->valid_map_base = column.nulls(); + desc->column_offset = column.column_offset(); + desc->column_data_base = column.data(); if (desc->stats_dtype == dtype_timestamp64) { // Timestamp statistics are in milliseconds - switch (columns[i].clockscale()) { + switch (column.clockscale()) { case 9: desc->ts_scale = 1000; break; case 6: desc->ts_scale = 0; break; case 3: desc->ts_scale = -1000; break; @@ -773,48 +819,48 @@ std::vector> writer::impl::gather_statistic_blobs( } else { desc->ts_scale = 0; } - for (size_t k = 0, c = 0; k < stripe_list.size(); k++) { - statistics_merge_group *grp = &stat_merge[i * stripe_list.size() + k]; - grp->col = stat_desc.device_ptr(i); - grp->start_chunk = static_cast(i * num_rowgroups + c); - grp->num_chunks = stripe_list[k]; - c += stripe_list[k]; + for (auto const &stripe : stripe_bounds) { + auto grp = &stat_merge[column.id() * stripe_bounds.size() + stripe.id]; + grp->col = stat_desc.device_ptr(column.id()); + grp->start_chunk = static_cast(column.id() * num_rowgroups + stripe.first); + grp->num_chunks = stripe.size; } - statistics_merge_group *col_stats = &stat_merge[stripe_list.size() * num_columns + i]; - col_stats->col = stat_desc.device_ptr(i); - col_stats->start_chunk = static_cast(i * stripe_list.size()); - col_stats->num_chunks = static_cast(stripe_list.size()); + statistics_merge_group *col_stats = + &stat_merge[stripe_bounds.size() * columns.size() + column.id()]; + col_stats->col = stat_desc.device_ptr(column.id()); + col_stats->start_chunk = static_cast(column.id() * stripe_bounds.size()); + col_stats->num_chunks = static_cast(stripe_bounds.size()); } stat_desc.host_to_device(stream); stat_merge.host_to_device(stream); - gpu::orc_init_statistics_groups(stat_groups.data().get(), + gpu::orc_init_statistics_groups(stat_groups.data(), stat_desc.device_ptr(), - num_columns, + columns.size(), num_rowgroups, row_index_stride_, stream); - GatherColumnStatistics(stat_chunks.data().get(), stat_groups.data().get(), num_chunks, stream); - MergeColumnStatistics(stat_chunks.data().get() + num_chunks, - stat_chunks.data().get(), + GatherColumnStatistics(stat_chunks.data(), stat_groups.data(), num_chunks, stream); + MergeColumnStatistics(stat_chunks.data() + num_chunks, + stat_chunks.data(), stat_merge.device_ptr(), - stripe_list.size() * num_columns, + stripe_bounds.size() * columns.size(), stream); - MergeColumnStatistics(stat_chunks.data().get() + num_chunks + stripe_list.size() * num_columns, - stat_chunks.data().get() + num_chunks, - stat_merge.device_ptr(stripe_list.size() * num_columns), - num_columns, + MergeColumnStatistics(stat_chunks.data() + num_chunks + stripe_bounds.size() * columns.size(), + stat_chunks.data() + num_chunks, + stat_merge.device_ptr(stripe_bounds.size() * columns.size()), + columns.size(), stream); gpu::orc_init_statistics_buffersize( - stat_merge.device_ptr(), stat_chunks.data().get() + num_chunks, num_stat_blobs, stream); + stat_merge.device_ptr(), stat_chunks.data() + num_chunks, num_stat_blobs, stream); stat_merge.device_to_host(stream, true); hostdevice_vector blobs(stat_merge[num_stat_blobs - 1].start_chunk + stat_merge[num_stat_blobs - 1].num_chunks); gpu::orc_encode_statistics(blobs.device_ptr(), stat_merge.device_ptr(), - stat_chunks.data().get() + num_chunks, + stat_chunks.data() + num_chunks, num_stat_blobs, stream); stat_merge.device_to_host(stream); @@ -831,42 +877,40 @@ std::vector> writer::impl::gather_statistic_blobs( void writer::impl::write_index_stream(int32_t stripe_id, int32_t stream_id, - orc_column_view *columns, - size_t num_columns, - size_t num_data_streams, - size_t group, - size_t groups_in_stripe, - hostdevice_vector const &chunks, - hostdevice_vector const &strm_desc, - hostdevice_vector const &comp_out, - StripeInformation &stripe, - std::vector &streams, + host_span columns, + stripe_rowgroups const &rowgroups_range, + host_2dspan enc_streams, + host_2dspan strm_desc, + host_span comp_out, + StripeInformation *stripe, + orc_streams *streams, ProtobufWriter *pbw) { row_group_index_info present; row_group_index_info data; row_group_index_info data2; - auto kind = TypeKind::STRUCT; + auto kind = TypeKind::STRUCT; + auto const column_id = stream_id - 1; - auto find_record = [=, &strm_desc](gpu::EncChunk const &chunk, gpu::StreamIndexType type) { + auto find_record = [=, &strm_desc](gpu::encoder_chunk_streams const &stream, + gpu::StreamIndexType type) { row_group_index_info record; - if (chunk.strm_id[type] > 0) { + if (stream.ids[type] > 0) { record.pos = 0; if (compression_kind_ != NONE) { - const auto *ss = - &strm_desc[stripe_id * num_data_streams + chunk.strm_id[type] - (num_columns + 1)]; - record.blk_pos = ss->first_block; + auto const &ss = strm_desc[stripe_id][stream.ids[type] - (columns.size() + 1)]; + record.blk_pos = ss.first_block; record.comp_pos = 0; - record.comp_size = ss->stream_size; + record.comp_size = ss.stream_size; } } return record; }; - auto scan_record = [=, &comp_out](gpu::EncChunk const &chunk, + auto scan_record = [=, &comp_out](gpu::encoder_chunk_streams const &stream, gpu::StreamIndexType type, row_group_index_info &record) { if (record.pos >= 0) { - record.pos += chunk.strm_len[type]; + record.pos += stream.lengths[type]; while ((record.pos >= 0) && (record.blk_pos >= 0) && (static_cast(record.pos) >= compression_blocksize_) && (record.comp_pos + 3 + comp_out[record.blk_pos].bytes_written < @@ -880,14 +924,14 @@ void writer::impl::write_index_stream(int32_t stripe_id, // TBD: Not sure we need an empty index stream for column 0 if (stream_id != 0) { - const auto &ck = chunks[stream_id - 1]; - present = find_record(ck, gpu::CI_PRESENT); - data = find_record(ck, gpu::CI_DATA); - data2 = find_record(ck, gpu::CI_DATA2); + const auto &strm = enc_streams[column_id][0]; + present = find_record(strm, gpu::CI_PRESENT); + data = find_record(strm, gpu::CI_DATA); + data2 = find_record(strm, gpu::CI_DATA2); // Change string dictionary to int from index point of view - kind = columns[stream_id - 1].orc_kind(); - if (kind == TypeKind::STRING && columns[stream_id - 1].orc_encoding() == DICTIONARY_V2) { + kind = columns[column_id].orc_kind(); + if (kind == TypeKind::STRING && columns[column_id].orc_encoding() == DICTIONARY_V2) { kind = TypeKind::INT; } } @@ -895,48 +939,49 @@ void writer::impl::write_index_stream(int32_t stripe_id, buffer_.resize((compression_kind_ != NONE) ? 3 : 0); // Add row index entries - for (size_t g = group; g < group + groups_in_stripe; g++) { + std::for_each(rowgroups_range.cbegin(), rowgroups_range.cend(), [&](auto rowgroup) { pbw->put_row_index_entry( present.comp_pos, present.pos, data.comp_pos, data.pos, data2.comp_pos, data2.pos, kind); if (stream_id != 0) { - const auto &ck = chunks[g * num_columns + stream_id - 1]; - scan_record(ck, gpu::CI_PRESENT, present); - scan_record(ck, gpu::CI_DATA, data); - scan_record(ck, gpu::CI_DATA2, data2); + const auto &strm = enc_streams[column_id][rowgroup]; + scan_record(strm, gpu::CI_PRESENT, present); + scan_record(strm, gpu::CI_DATA, data); + scan_record(strm, gpu::CI_DATA2, data2); } - } + }); - streams[stream_id].length = buffer_.size(); + (*streams)[stream_id].length = buffer_.size(); if (compression_kind_ != NONE) { - uint32_t uncomp_ix_len = (uint32_t)(streams[stream_id].length - 3) * 2 + 1; + uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; buffer_[0] = static_cast(uncomp_ix_len >> 0); buffer_[1] = static_cast(uncomp_ix_len >> 8); buffer_[2] = static_cast(uncomp_ix_len >> 16); } out_sink_->host_write(buffer_.data(), buffer_.size()); - stripe.indexLength += buffer_.size(); + stripe->indexLength += buffer_.size(); } void writer::impl::write_data_stream(gpu::StripeStream const &strm_desc, - gpu::EncChunk const &chunk, + gpu::encoder_chunk_streams const &enc_stream, uint8_t const *compressed_data, uint8_t *stream_out, - StripeInformation &stripe, - std::vector &streams) + StripeInformation *stripe, + orc_streams *streams) { - const auto length = strm_desc.stream_size; - streams[chunk.strm_id[strm_desc.stream_type]].length = length; + const auto length = strm_desc.stream_size; + (*streams)[enc_stream.ids[strm_desc.stream_type]].length = length; if (length != 0) { - const auto *stream_in = (compression_kind_ == NONE) ? chunk.streams[strm_desc.stream_type] - : (compressed_data + strm_desc.bfr_offset); + const auto *stream_in = (compression_kind_ == NONE) + ? enc_stream.data_ptrs[strm_desc.stream_type] + : (compressed_data + strm_desc.bfr_offset); CUDA_TRY( cudaMemcpyAsync(stream_out, stream_in, length, cudaMemcpyDeviceToHost, stream.value())); stream.synchronize(); out_sink_->host_write(stream_out, length); } - stripe.dataLength += length; + stripe->dataLength += length; } void writer::impl::add_uncompressed_block_headers(std::vector &v) @@ -1006,11 +1051,8 @@ void writer::impl::init_state() void writer::impl::write(table_view const &table) { CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); - size_type num_columns = table.num_columns(); - size_type num_rows = 0; - - // Mapping of string columns for quick look-up - std::vector str_col_ids; + auto const num_columns = table.num_columns(); + auto const num_rows = table.num_rows(); if (user_metadata_with_nullability.column_nullable.size() > 0) { CUDF_EXPECTS( @@ -1021,103 +1063,53 @@ void writer::impl::write(table_view const &table) // Wrapper around cudf columns to attach ORC-specific type info std::vector orc_columns; - orc_columns.reserve(num_columns); // Avoids unnecessary re-allocation - for (auto it = table.begin(); it < table.end(); ++it) { - const auto col = *it; - const auto current_id = orc_columns.size(); - const auto current_str_id = str_col_ids.size(); - - num_rows = std::max(num_rows, col.size()); - orc_columns.emplace_back(current_id, current_str_id, col, user_metadata, stream); + orc_columns.reserve(num_columns); + // Mapping of string columns for quick look-up + std::vector str_col_ids; + for (auto const &column : table) { + auto const current_id = orc_columns.size(); + auto const current_str_id = str_col_ids.size(); + + orc_columns.emplace_back(current_id, current_str_id, column, user_metadata, stream); if (orc_columns.back().is_string()) { str_col_ids.push_back(current_id); } } - rmm::device_vector dict_index(str_col_ids.size() * num_rows); - rmm::device_vector dict_data(str_col_ids.size() * num_rows); + rmm::device_uvector dict_index(str_col_ids.size() * num_rows, stream); + rmm::device_uvector dict_data(str_col_ids.size() * num_rows, stream); // Build per-column dictionary indices const auto num_rowgroups = div_by_rowgroups(num_rows); const auto num_dict_chunks = num_rowgroups * str_col_ids.size(); hostdevice_vector dict(num_dict_chunks); - if (str_col_ids.size() != 0) { - init_dictionaries(orc_columns.data(), - num_rows, - str_col_ids, - dict_data.data().get(), - dict_index.data().get(), - dict); + if (!str_col_ids.empty()) { + init_dictionaries(orc_columns.data(), str_col_ids, dict_data.data(), dict_index.data(), &dict); } // Decide stripe boundaries early on, based on uncompressed size - std::vector stripe_list; - for (size_t g = 0, stripe_start = 0, stripe_size = 0; g < num_rowgroups; g++) { - size_t rowgroup_size = 0; - for (int i = 0; i < num_columns; i++) { - if (orc_columns[i].is_string()) { - const auto dt = orc_columns[i].host_dict_chunk(g); - rowgroup_size += 1 * row_index_stride_; - rowgroup_size += dt->string_char_count; - } else { - rowgroup_size += orc_columns[i].type_width() * row_index_stride_; - } - } - - // Apply rows per stripe limit to limit string dictionaries - const size_t max_stripe_rows = !str_col_ids.empty() ? 1000000 : 5000000; - if ((g > stripe_start) && (stripe_size + rowgroup_size > max_stripe_size_ || - (g + 1 - stripe_start) * row_index_stride_ > max_stripe_rows)) { - stripe_list.push_back(g - stripe_start); - stripe_start = g; - stripe_size = 0; - } - stripe_size += rowgroup_size; - if (g + 1 == num_rowgroups) { stripe_list.push_back(num_rowgroups - stripe_start); } - } + auto const stripe_bounds = gather_stripe_info(orc_columns, num_rowgroups); // Build stripe-level dictionaries - const auto num_stripe_dict = stripe_list.size() * str_col_ids.size(); + const auto num_stripe_dict = stripe_bounds.size() * str_col_ids.size(); hostdevice_vector stripe_dict(num_stripe_dict); - if (str_col_ids.size() != 0) { - build_dictionaries(orc_columns.data(), - num_rows, - str_col_ids, - stripe_list, - dict, - dict_index.data().get(), - stripe_dict); + if (!str_col_ids.empty()) { + build_dictionaries( + orc_columns.data(), str_col_ids, stripe_bounds, dict, dict_index.data(), stripe_dict); } - // Initialize streams - std::vector strm_ids(num_columns * gpu::CI_NUM_STREAMS, -1); - auto streams = - gather_streams(orc_columns.data(), orc_columns.size(), num_rows, stripe_list, strm_ids); - - // Encode column data chunks - const auto num_chunks = num_rowgroups * num_columns; - hostdevice_vector chunks(num_chunks); - auto output = encode_columns(orc_columns.data(), - num_columns, - num_rows, - num_rowgroups, - str_col_ids, - stripe_list, - streams, - strm_ids, - chunks); + auto streams = create_streams(orc_columns, stripe_bounds); + auto enc_data = encode_columns(orc_columns, str_col_ids, stripe_bounds, streams); // Assemble individual disparate column chunks into contiguous data streams - const auto num_index_streams = (num_columns + 1); - const auto num_data_streams = streams.size() - num_index_streams; - const auto num_stripe_streams = stripe_list.size() * num_data_streams; - hostdevice_vector strm_desc(num_stripe_streams); - auto stripes = gather_stripes( - num_columns, num_rows, num_index_streams, num_data_streams, stripe_list, chunks, strm_desc); + const auto num_index_streams = (num_columns + 1); + const auto num_data_streams = streams.size() - num_index_streams; + hostdevice_2dvector strm_descs(stripe_bounds.size(), num_data_streams); + auto stripes = + gather_stripes(num_rows, num_index_streams, stripe_bounds, &enc_data.streams, &strm_descs); // Gather column statistics std::vector> column_stats; if (enable_statistics_ && num_columns > 0 && num_rows > 0) { - column_stats = gather_statistic_blobs( - orc_columns.data(), num_columns, num_rows, num_rowgroups, stripe_list, stripes, chunks); + column_stats = gather_statistic_blobs(orc_columns, stripe_bounds); } // Allocate intermediate output stream buffer @@ -1126,9 +1118,9 @@ void writer::impl::write(table_view const &table) auto stream_output = [&]() { size_t max_stream_size = 0; - for (size_t stripe_id = 0; stripe_id < stripe_list.size(); stripe_id++) { - for (size_t i = 0; i < num_data_streams; i++) { - gpu::StripeStream *ss = &strm_desc[stripe_id * num_data_streams + i]; + for (size_t stripe_id = 0; stripe_id < stripe_bounds.size(); stripe_id++) { + for (size_t i = 0; i < num_data_streams; i++) { // TODO range for (at least) + gpu::StripeStream *ss = &strm_descs[stripe_id][i]; size_t stream_size = ss->stream_size; if (compression_kind_ != NONE) { ss->first_block = num_compressed_blocks; @@ -1157,59 +1149,51 @@ void writer::impl::write(table_view const &table) hostdevice_vector comp_out(num_compressed_blocks); hostdevice_vector comp_in(num_compressed_blocks); if (compression_kind_ != NONE) { - strm_desc.host_to_device(stream); + strm_descs.host_to_device(stream); gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), - strm_desc.device_ptr(), - chunks.device_ptr(), - comp_in.device_ptr(), - comp_out.device_ptr(), - num_stripe_streams, num_compressed_blocks, compression_kind_, compression_blocksize_, + strm_descs, + enc_data.streams, + comp_in.device_ptr(), + comp_out.device_ptr(), stream); - strm_desc.device_to_host(stream); + strm_descs.device_to_host(stream); comp_out.device_to_host(stream, true); } ProtobufWriter pbw_(&buffer_); // Write stripes - size_t group = 0; - for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) { - auto groups_in_stripe = div_by_rowgroups(stripes[stripe_id].numberOfRows); - stripes[stripe_id].offset = out_sink_->bytes_written(); + for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { + auto const &rowgroup_range = stripe_bounds[stripe_id]; + auto &stripe = stripes[stripe_id]; + + stripe.offset = out_sink_->bytes_written(); // Column (skippable) index streams appear at the start of the stripe - stripes[stripe_id].indexLength = 0; - for (size_t col_id = 0; col_id <= (size_t)num_columns; col_id++) { + for (size_type stream_id = 0; stream_id <= num_columns; ++stream_id) { write_index_stream(stripe_id, - col_id, - orc_columns.data(), - num_columns, - num_data_streams, - group, - groups_in_stripe, - chunks, - strm_desc, + stream_id, + orc_columns, + rowgroup_range, + enc_data.streams, + strm_descs, comp_out, - stripes[stripe_id], - streams, + &stripe, + &streams, &pbw_); } // Column data consisting one or more separate streams - stripes[stripe_id].dataLength = 0; - for (size_t i = 0; i < num_data_streams; i++) { - const auto &ss = strm_desc[stripe_id * num_data_streams + i]; - const auto &ck = chunks[group * num_columns + ss.column_id]; - - write_data_stream(ss, - ck, + for (auto const &strm_desc : strm_descs[stripe_id]) { + write_data_stream(strm_desc, + enc_data.streams[strm_desc.column_id][rowgroup_range.first], static_cast(compressed_data.data()), stream_output.get(), - stripes[stripe_id], - streams); + &stripe, + &streams); } // Write stripefooter consisting of stream information @@ -1227,16 +1211,14 @@ void writer::impl::write(table_view const &table) } buffer_.resize((compression_kind_ != NONE) ? 3 : 0); pbw_.write(sf); - stripes[stripe_id].footerLength = buffer_.size(); + stripe.footerLength = buffer_.size(); if (compression_kind_ != NONE) { - uint32_t uncomp_sf_len = (stripes[stripe_id].footerLength - 3) * 2 + 1; + uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; buffer_[0] = static_cast(uncomp_sf_len >> 0); buffer_[1] = static_cast(uncomp_sf_len >> 8); buffer_[2] = static_cast(uncomp_sf_len >> 16); } out_sink_->host_write(buffer_.data(), buffer_.size()); - - group += groups_in_stripe; } if (column_stats.size() != 0) { @@ -1249,24 +1231,27 @@ void writer::impl::write(table_view const &table) pbw_.putb(1 * 8 + PB_TYPE_VARINT); pbw_.put_uint(num_rows); ff.statistics[0] = std::move(buffer_); - for (int i = 0; i < num_columns; i++) { - size_t idx = stripe_list.size() * num_columns + i; - if (idx < column_stats.size()) { ff.statistics[1 + i] = std::move(column_stats[idx]); } + for (int col_idx = 0; col_idx < num_columns; col_idx++) { + size_t idx = stripes.size() * num_columns + col_idx; + if (idx < column_stats.size()) { + ff.statistics[1 + col_idx] = std::move(column_stats[idx]); + } } } // Stripe-level statistics size_t first_stripe = md.stripeStats.size(); - md.stripeStats.resize(first_stripe + stripe_list.size()); - for (size_t stripe_id = 0; stripe_id < stripe_list.size(); stripe_id++) { + md.stripeStats.resize(first_stripe + stripes.size()); + for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) { md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + num_columns); buffer_.resize(0); pbw_.putb(1 * 8 + PB_TYPE_VARINT); pbw_.put_uint(stripes[stripe_id].numberOfRows); md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_); - for (int i = 0; i < num_columns; i++) { - size_t idx = stripe_list.size() * i + stripe_id; + for (int col_idx = 0; col_idx < num_columns; col_idx++) { + size_t idx = stripes.size() * col_idx + stripe_id; if (idx < column_stats.size()) { - md.stripeStats[first_stripe + stripe_id].colStats[1 + i] = std::move(column_stats[idx]); + md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = + std::move(column_stats[idx]); } } } @@ -1279,19 +1264,20 @@ void writer::impl::write(table_view const &table) ff.types[0].kind = STRUCT; ff.types[0].subtypes.resize(num_columns); ff.types[0].fieldNames.resize(num_columns); - for (int i = 0; i < num_columns; ++i) { - ff.types[1 + i].kind = orc_columns[i].orc_kind(); - ff.types[0].subtypes[i] = 1 + i; - ff.types[0].fieldNames[i] = orc_columns[i].orc_name(); + for (auto const &column : orc_columns) { + ff.types[1 + column.id()].kind = column.orc_kind(); + ff.types[0].subtypes[column.id()] = 1 + column.id(); + ff.types[0].fieldNames[column.id()] = column.orc_name(); } } else { // verify the user isn't passing mismatched tables CUDF_EXPECTS(ff.types.size() == 1 + orc_columns.size(), "Mismatch in table structure between multiple calls to write"); - for (auto i = 0; i < num_columns; i++) { - CUDF_EXPECTS(ff.types[1 + i].kind == orc_columns[i].orc_kind(), - "Mismatch in column types between multiple calls to write"); - } + CUDF_EXPECTS( + std::all_of(orc_columns.cbegin(), + orc_columns.cend(), + [&](auto const &col) { return ff.types[1 + col.id()].kind == col.orc_kind(); }), + "Mismatch in column types between multiple calls to write"); } ff.stripes.insert(ff.stripes.end(), std::make_move_iterator(stripes.begin()), diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index cbade92f867..6f651579042 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -28,7 +28,9 @@ #include #include +#include #include +#include #include #include @@ -43,6 +45,75 @@ class orc_column_view; using namespace cudf::io::orc; using namespace cudf::io; +using cudf::detail::device_2dspan; +using cudf::detail::host_2dspan; +using cudf::detail::hostdevice_2dvector; + +/** + * @brief Indices of rowgroups contained in a stripe. + * + * Provides a container-like interface to iterate over rowgroup indices. + */ +struct stripe_rowgroups { + uint32_t id; // stripe id + uint32_t first; // first rowgroup in the stripe + uint32_t size; // number of rowgroups in the stripe + stripe_rowgroups(uint32_t id, uint32_t first, uint32_t size) : id{id}, first{first}, size{size} {} + auto cbegin() const { return thrust::make_counting_iterator(first); } + auto cend() const { return thrust::make_counting_iterator(first + size); } +}; + +/** + * @brief Returns the total number of rowgroups in the list of contigious stripes. + */ +inline auto stripes_size(host_span stripes) +{ + return !stripes.empty() ? *stripes.back().cend() - stripes.front().first : 0; +} + +/** + * @brief List of per-column ORC streams. + * + * Provides interface to calculate their offsets. + */ +class orc_streams { + public: + orc_streams(std::vector streams, std::vector ids) + : streams{std::move(streams)}, ids{std::move(ids)} + { + } + Stream const& operator[](int idx) const { return streams[idx]; } + Stream& operator[](int idx) { return streams[idx]; } + auto id(int idx) const { return ids[idx]; } + auto& id(int idx) { return ids[idx]; } + auto size() const { return streams.size(); } + + /** + * @brief List of ORC stream offsets and their total size. + */ + struct orc_stream_offsets { + std::vector offsets; + size_t str_data_size = 0; + size_t rle_data_size = 0; + auto data_size() const { return str_data_size + rle_data_size; } + }; + orc_stream_offsets compute_offsets(host_span columns, + size_t num_rowgroups) const; + + operator std::vector const&() const { return streams; } + + private: + std::vector streams; + std::vector ids; +}; + +/** + * @brief ORC per-chunk streams of encoded data. + */ +struct encoded_data { + rmm::device_uvector data; // Owning array of the encoded data + hostdevice_2dvector streams; // streams of encoded data, per chunk +}; /** * @brief Implementation for ORC writer @@ -115,171 +186,140 @@ class writer::impl { * @brief Builds up column dictionaries indices * * @param columns List of columns - * @param num_rows Total number of rows * @param str_col_ids List of columns that are strings type * @param dict_data Dictionary data memory * @param dict_index Dictionary index memory * @param dict List of dictionary chunks */ void init_dictionaries(orc_column_view* columns, - size_t num_rows, std::vector const& str_col_ids, uint32_t* dict_data, uint32_t* dict_index, - hostdevice_vector& dict); + hostdevice_vector* dict); /** - * @brief Builds up per-stripe dictionaries for string columns + * @brief Builds up per-stripe dictionaries for string columns. * * @param columns List of columns - * @param num_rows Total number of rows * @param str_col_ids List of columns that are strings type - * @param stripe_list List of stripe boundaries + * @param stripe_bounds List of stripe boundaries * @param dict List of dictionary chunks * @param dict_index List of dictionary indices * @param stripe_dict List of stripe dictionaries */ void build_dictionaries(orc_column_view* columns, - size_t num_rows, std::vector const& str_col_ids, - std::vector const& stripe_list, + host_span stripe_bounds, hostdevice_vector const& dict, uint32_t* dict_index, hostdevice_vector& stripe_dict); /** - * @brief Returns stream information for each column + * @brief Builds up per-column streams. * - * @param columns List of columns - * @param num_columns Total number of columns - * @param num_rows Total number of rows - * @param stripe_list List of stripe boundaries - * @param strm_ids List of unique stream identifiers + * @param[in,out] columns List of columns + * @param[in] stripe_bounds List of stripe boundaries + * @return List of stream descriptors + */ + orc_streams create_streams(host_span columns, + host_span stripe_bounds); + + /** + * @brief Gathers stripe information. * - * @return The streams + * @param columns List of columns + * @param num_rowgroups Total number of rowgroups + * @return List of stripe descriptors */ - std::vector gather_streams(orc_column_view* columns, - size_t num_columns, - size_t num_rows, - std::vector const& stripe_list, - std::vector& strm_ids); + std::vector gather_stripe_info(host_span columns, + size_t num_rowgroups); /** - * @brief Encodes the streams as a series of column data chunks + * @brief Encodes the input columns into streams. * * @param columns List of columns - * @param num_columns Total number of columns - * @param num_rows Total number of rows - * @param num_rowgroups Total number of row groups * @param str_col_ids List of columns that are strings type - * @param stripe_list List of stripe boundaries - * @param streams List of columns' index and data streams - * @param strm_ids List of unique stream identifiers - * @param chunks List of column data chunks - * - * @return Device buffer containing encoded data + * @param stripe_bounds List of stripe boundaries + * @param stream CUDA stream used for device memory operations and kernel launches + * @return Encoded data and per-chunk stream descriptors */ - rmm::device_buffer encode_columns(orc_column_view* columns, - size_t num_columns, - size_t num_rows, - size_t num_rowgroups, - std::vector const& str_col_ids, - std::vector const& stripe_list, - std::vector const& streams, - std::vector const& strm_ids, - hostdevice_vector& chunks); + encoded_data encode_columns(host_span columns, + std::vector const& str_col_ids, + host_span stripe_bounds, + orc_streams const& streams); /** * @brief Returns stripe information after compacting columns' individual data - * chunks into contiguous data streams + * chunks into contiguous data streams. * - * @param num_columns Total number of columns - * @param num_rows Total number of rows - * @param num_index_streams Total number of index streams - * @param num_data_streams Total number of data streams - * @param stripe_list List of stripe boundaries - * @param chunks List of column data chunks - * @param strm_desc List of stream descriptors + * @param[in] num_rows Total number of rows + * @param[in] num_index_streams Total number of index streams + * @param[in] stripe_bounds List of stripe boundaries + * @param[in,out] enc_streams List of encoder chunk streams [column][rowgroup] + * @param[in,out] strm_desc List of stream descriptors [stripe][data_stream] * * @return The stripes' information */ - std::vector gather_stripes(size_t num_columns, - size_t num_rows, - size_t num_index_streams, - size_t num_data_streams, - std::vector const& stripe_list, - hostdevice_vector& chunks, - hostdevice_vector& strm_desc); + std::vector gather_stripes( + size_t num_rows, + size_t num_index_streams, + host_span stripe_bounds, + hostdevice_2dvector* enc_streams, + hostdevice_2dvector* strm_desc); /** * @brief Returns per-stripe and per-file column statistics encoded - * in ORC protobuf format + * in ORC protobuf format. * * @param columns List of columns - * @param num_columns Total number of columns - * @param num_rows Total number of rows - * @param num_rowgroups Total number of row groups - * @param stripe_list Number of rowgroups in each stripe - * @param stripes Stripe information - * @param chunks List of column data chunks + * @param stripe_bounds List of stripe boundaries * * @return The statistic blobs */ std::vector> gather_statistic_blobs( - orc_column_view const* columns, - size_t num_columns, - size_t num_rows, - size_t num_rowgroups, - std::vector const& stripe_list, - std::vector const& stripes, - hostdevice_vector& chunks); + host_span columns, host_span stripe_bounds); /** - * @brief Write the specified column's row index stream + * @brief Writes the specified column's row index stream. * - * @param stripe_id Stripe's identifier - * @param stream_id Stream's identifier - * @param columns List of columns - * @param num_columns Total number of columns - * @param num_data_streams Total number of data streams - * @param group Starting row group in the stripe - * @param groups_in_stripe Number of row groups in the stripe - * @param chunks List of all column chunks - * @param strm_desc List of stream descriptors - * @param comp_out Output status for compressed streams - * @param streams List of all streams - * @param pbw Protobuf writer + * @param[in] stripe_id Stripe's identifier + * @param[in] stream_id Stream identifier (column id + 1) + * @param[in] columns List of columns + * @param[in] rowgroups_range Indexes of rowgroups in the stripe + * @param[in] enc_streams List of encoder chunk streams [column][rowgroup] + * @param[in] strm_desc List of stream descriptors + * @param[in] comp_out Output status for compressed streams + * @param[in,out] stripe Stream's parent stripe + * @param[in,out] streams List of all streams + * @param[in,out] pbw Protobuf writer */ void write_index_stream(int32_t stripe_id, int32_t stream_id, - orc_column_view* columns, - size_t num_columns, - size_t num_data_streams, - size_t group, - size_t groups_in_stripe, - hostdevice_vector const& chunks, - hostdevice_vector const& strm_desc, - hostdevice_vector const& comp_out, - StripeInformation& stripe, - std::vector& streams, + host_span columns, + stripe_rowgroups const& rowgroups_range, + host_2dspan enc_streams, + host_2dspan strm_desc, + host_span comp_out, + StripeInformation* stripe, + orc_streams* streams, ProtobufWriter* pbw); /** * @brief Write the specified column's data streams * - * @param strm_desc Stream's descriptor - * @param chunk First column chunk of the stream - * @param compressed_data Compressed stream data - * @param stream_out Temporary host output buffer - * @param stripe Stream's parent stripe - * @param streams List of all streams + * @param[in] strm_desc Stream's descriptor + * @param[in] enc_stream Chunk's streams + * @param[in] compressed_data Compressed stream data + * @param[in,out] stream_out Temporary host output buffer + * @param[in,out] stripe Stream's parent stripe + * @param[in,out] streams List of all streams */ void write_data_stream(gpu::StripeStream const& strm_desc, - gpu::EncChunk const& chunk, + gpu::encoder_chunk_streams const& enc_stream, uint8_t const* compressed_data, uint8_t* stream_out, - StripeInformation& stripe, - std::vector& streams); + StripeInformation* stripe, + orc_streams* streams); /** * @brief Insert 3-byte uncompressed block headers in a byte vector diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index 148c5a1ef35..3dc399d669f 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include @@ -91,6 +92,12 @@ class hostdevice_vector { return reinterpret_cast(d_data.data()) + offset; } + operator cudf::device_span() { return {d_data.data(), max_elements}; } + operator cudf::device_span() const { return {d_data.data(), max_elements}; } + + operator cudf::host_span() { return {h_data, max_elements}; } + operator cudf::host_span() const { return {h_data, max_elements}; } + void host_to_device(rmm::cuda_stream_view stream, bool synchronize = false) { CUDA_TRY(cudaMemcpyAsync( @@ -125,3 +132,57 @@ class hostdevice_vector { T *h_data{}; rmm::device_buffer d_data{}; }; + +namespace cudf { +namespace detail { + +/** + * @brief Wrapper around hostdevice_vector to enable two-dimensional indexing. + * + * Does not incur additional allocations. + */ +template +class hostdevice_2dvector { + public: + hostdevice_2dvector(size_t rows, + size_t columns, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) + : _size{rows, columns}, _data{rows * columns, stream} + { + } + + operator device_2dspan() { return {_data.device_ptr(), _size}; } + operator device_2dspan() const { return {_data.device_ptr(), _size}; } + + operator host_2dspan() { return {_data.host_ptr(), _size}; } + operator host_2dspan() const { return {_data.host_ptr(), _size}; } + + host_span operator[](size_t row) + { + return {_data.host_ptr() + host_2dspan::flatten_index(row, 0, _size), _size.second}; + } + + host_span operator[](size_t row) const + { + return {_data.host_ptr() + host_2dspan::flatten_index(row, 0, _size), _size.second}; + } + + auto size() const noexcept { return _size; } + + void host_to_device(rmm::cuda_stream_view stream, bool synchronize = false) + { + _data.host_to_device(stream, synchronize); + } + + void device_to_host(rmm::cuda_stream_view stream, bool synchronize = false) + { + _data.device_to_host(stream, synchronize); + } + + private: + hostdevice_vector _data; + typename host_2dspan::size_type _size; +}; + +} // namespace detail +} // namespace cudf diff --git a/cpp/tests/utilities_tests/span_tests.cu b/cpp/tests/utilities_tests/span_tests.cu index 547894e9f6c..24884c15f64 100644 --- a/cpp/tests/utilities_tests/span_tests.cu +++ b/cpp/tests/utilities_tests/span_tests.cu @@ -15,6 +15,7 @@ */ #include +#include #include #include @@ -29,6 +30,9 @@ using cudf::device_span; using cudf::host_span; +using cudf::detail::device_2dspan; +using cudf::detail::host_2dspan; +using cudf::detail::hostdevice_2dvector; template void expect_equivolent(host_span a, host_span b) @@ -237,4 +241,66 @@ TEST(SpanTest, CanUseDeviceSpan) ASSERT_TRUE(h_message[0]); } +class MdSpanTest : public cudf::test::BaseFixture { +}; + +TEST(MdSpanTest, CanDetermineEmptiness) +{ + auto const vector = hostdevice_2dvector(1, 2); + auto const no_rows_vector = hostdevice_2dvector(0, 2); + auto const no_columns_vector = hostdevice_2dvector(1, 0); + + EXPECT_FALSE(host_2dspan{vector}.is_empty()); + EXPECT_FALSE(device_2dspan{vector}.is_empty()); + EXPECT_TRUE(host_2dspan{no_rows_vector}.is_empty()); + EXPECT_TRUE(device_2dspan{no_rows_vector}.is_empty()); + EXPECT_TRUE(host_2dspan{no_columns_vector}.is_empty()); + EXPECT_TRUE(device_2dspan{no_columns_vector}.is_empty()); +} + +__global__ void readwrite_kernel(device_2dspan result) +{ + if (result[5][6] == 5) { + result[5][6] *= 6; + } else { + result[5][6] = 5; + } +} + +TEST(MdSpanTest, DeviceReadWrite) +{ + auto vector = hostdevice_2dvector(11, 23); + + readwrite_kernel<<<1, 1>>>(vector); + readwrite_kernel<<<1, 1>>>(vector); + vector.device_to_host(rmm::cuda_stream_default, true); + EXPECT_EQ(vector[5][6], 30); +} + +TEST(MdSpanTest, HostReadWrite) +{ + auto vector = hostdevice_2dvector(11, 23); + auto span = host_2dspan{vector}; + span[5][6] = 5; + if (span[5][6] == 5) { span[5][6] *= 6; } + + EXPECT_EQ(vector[5][6], 30); +} + +TEST(MdSpanTest, CanGetSize) +{ + auto const vector = hostdevice_2dvector(1, 2); + + EXPECT_EQ(host_2dspan{vector}.size(), vector.size()); + EXPECT_EQ(device_2dspan{vector}.size(), vector.size()); +} + +TEST(MdSpanTest, CanGetCount) +{ + auto const vector = hostdevice_2dvector(11, 23); + + EXPECT_EQ(host_2dspan{vector}.count(), 11ul * 23); + EXPECT_EQ(device_2dspan{vector}.count(), 11ul * 23); +} + CUDF_TEST_PROGRAM_MAIN()