diff --git a/ci/release/apply_wheel_modifications.sh b/ci/release/apply_wheel_modifications.sh index 9d9758f1f15..0c55c4b9141 100755 --- a/ci/release/apply_wheel_modifications.sh +++ b/ci/release/apply_wheel_modifications.sh @@ -6,12 +6,6 @@ VERSION=${1} CUDA_SUFFIX=${2} -# __init__.py versions -sed -i "s/__version__ = .*/__version__ = \"${VERSION}\"/g" python/cudf/cudf/__init__.py -sed -i "s/__version__ = .*/__version__ = \"${VERSION}\"/g" python/dask_cudf/dask_cudf/__init__.py -sed -i "s/__version__ = .*/__version__ = \"${VERSION}\"/g" python/cudf_kafka/cudf_kafka/__init__.py -sed -i "s/__version__ = .*/__version__ = \"${VERSION}\"/g" python/custreamz/custreamz/__init__.py - # pyproject.toml versions sed -i "s/^version = .*/version = \"${VERSION}\"/g" python/cudf/pyproject.toml sed -i "s/^version = .*/version = \"${VERSION}\"/g" python/dask_cudf/pyproject.toml diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 1ee90bde1d2..d3bb0a45c12 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -55,6 +55,7 @@ #include #include #include +#include #include #include @@ -332,6 +333,136 @@ size_type orc_table_view::num_rows() const noexcept return columns.empty() ? 0 : columns.front().size(); } +orc_streams::orc_stream_offsets orc_streams::compute_offsets( + host_span columns, size_t num_rowgroups) const +{ + std::vector strm_offsets(streams.size()); + size_t non_rle_data_size = 0; + size_t rle_data_size = 0; + for (size_t i = 0; i < streams.size(); ++i) { + const auto& stream = streams[i]; + + auto const is_rle_data = [&]() { + // First stream is an index stream, don't check types, etc. + if (!stream.column_index().has_value()) return true; + + auto const& column = columns[stream.column_index().value()]; + // Dictionary encoded string column - dictionary characters or + // directly encoded string - column characters + if (column.orc_kind() == TypeKind::STRING && + ((stream.kind == DICTIONARY_DATA && column.orc_encoding() == DICTIONARY_V2) || + (stream.kind == DATA && column.orc_encoding() == DIRECT_V2))) + return false; + // Decimal data + if (column.orc_kind() == TypeKind::DECIMAL && stream.kind == DATA) return false; + + // Everything else uses RLE + return true; + }(); + // non-RLE and RLE streams are separated in the buffer that stores encoded data + // The computed offsets do not take the streams of the other type into account + if (is_rle_data) { + strm_offsets[i] = rle_data_size; + rle_data_size += (stream.length + 7) & ~7; + } else { + strm_offsets[i] = non_rle_data_size; + non_rle_data_size += stream.length; + } + } + non_rle_data_size = (non_rle_data_size + 7) & ~7; + + return {std::move(strm_offsets), non_rle_data_size, rle_data_size}; +} + +namespace { +struct string_length_functor { + __device__ inline size_type operator()(int const i) const + { + // we translate from 0 -> num_chunks * 2 because each statistic has a min and max + // string and we need to calculate lengths for both. + if (i >= num_chunks * 2) return 0; + + // min strings are even values, max strings are odd values of i + auto const should_copy_min = i % 2 == 0; + // index of the chunk + auto const idx = i / 2; + auto& str_val = should_copy_min ? stripe_stat_chunks[idx].min_value.str_val + : stripe_stat_chunks[idx].max_value.str_val; + auto const str = stripe_stat_merge[idx].stats_dtype == dtype_string; + return str ? str_val.length : 0; + } + + int const num_chunks; + statistics_chunk const* stripe_stat_chunks; + statistics_merge_group const* stripe_stat_merge; +}; + +__global__ void copy_string_data(char* string_pool, + size_type* offsets, + statistics_chunk* chunks, + statistics_merge_group const* groups) +{ + auto const idx = blockIdx.x / 2; + if (groups[idx].stats_dtype == dtype_string) { + // min strings are even values, max strings are odd values of i + auto const should_copy_min = blockIdx.x % 2 == 0; + auto& str_val = should_copy_min ? chunks[idx].min_value.str_val : chunks[idx].max_value.str_val; + auto dst = &string_pool[offsets[blockIdx.x]]; + auto src = str_val.ptr; + + for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) { + dst[i] = src[i]; + } + if (threadIdx.x == 0) { str_val.ptr = dst; } + } +} + +} // namespace + +void persisted_statistics::persist(int num_table_rows, + bool single_write_mode, + intermediate_statistics& intermediate_stats, + rmm::cuda_stream_view stream) +{ + if (not single_write_mode) { + // persist the strings in the chunks into a string pool and update pointers + auto const num_chunks = static_cast(intermediate_stats.stripe_stat_chunks.size()); + // min offset and max offset + 1 for total size + rmm::device_uvector offsets((num_chunks * 2) + 1, stream); + + auto iter = cudf::detail::make_counting_transform_iterator( + 0, + string_length_functor{num_chunks, + intermediate_stats.stripe_stat_chunks.data(), + intermediate_stats.stripe_stat_merge.device_ptr()}); + thrust::exclusive_scan(rmm::exec_policy(stream), iter, iter + offsets.size(), offsets.begin()); + + // pull size back to host + auto const total_string_pool_size = offsets.element(num_chunks * 2, stream); + if (total_string_pool_size > 0) { + rmm::device_uvector string_pool(total_string_pool_size, stream); + + // offsets describes where in the string pool each string goes. Going with the simple + // approach for now, but it is possible something fancier with breaking up each thread into + // copying x bytes instead of a single string is the better method since we are dealing in + // min/max strings they almost certainly will not be uniform length. + copy_string_data<<>>( + string_pool.data(), + offsets.data(), + intermediate_stats.stripe_stat_chunks.data(), + intermediate_stats.stripe_stat_merge.device_ptr()); + string_pools.emplace_back(std::move(string_pool)); + } + } + + stripe_stat_chunks.emplace_back(std::move(intermediate_stats.stripe_stat_chunks)); + stripe_stat_merge.emplace_back(std::move(intermediate_stats.stripe_stat_merge)); + stats_dtypes = std::move(intermediate_stats.stats_dtypes); + col_types = std::move(intermediate_stats.col_types); + num_rows = num_table_rows; +} + +namespace { /** * @brief Gathers stripe information. * @@ -442,12 +573,26 @@ void init_dictionaries(orc_table_view& orc_table, dict->device_to_host(stream, true); } -void writer::impl::build_dictionaries(orc_table_view& orc_table, - host_span stripe_bounds, - hostdevice_2dvector const& dict, - host_span> dict_index, - host_span dictionary_enabled, - hostdevice_2dvector& stripe_dict) +/** + * @brief Builds up per-stripe dictionaries for string columns. + * + * @param orc_table Non-owning view of a cuDF table w/ ORC-related info + * @param stripe_bounds List of stripe boundaries + * @param dict List of dictionary chunks [rowgroup][column] + * @param dict_index List of dictionary indices + * @param dictionary_enabled Whether dictionary encoding is enabled for a given column + * @param stripe_dict List of stripe dictionaries + * @param enable_dictionary Whether dictionary is enabled + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void build_dictionaries(orc_table_view& orc_table, + host_span stripe_bounds, + hostdevice_2dvector const& dict, + host_span> dict_index, + host_span dictionary_enabled, + hostdevice_2dvector& stripe_dict, + bool enable_dictionary, + rmm::cuda_stream_view stream) { const auto num_rowgroups = dict.size().first; @@ -471,7 +616,7 @@ void writer::impl::build_dictionaries(orc_table_view& orc_table, sd.leaf_column = dict[0][dict_idx].leaf_column; } - if (enable_dictionary_) { + if (enable_dictionary) { struct string_column_cost { size_t direct = 0; size_t dictionary = 0; @@ -555,9 +700,20 @@ auto comp_block_alignment(CompressionKind compression_kind) return 1u << nvcomp::compress_output_alignment_bits(to_nvcomp_compression_type(compression_kind)); } -orc_streams writer::impl::create_streams(host_span columns, - file_segmentation const& segmentation, - std::map const& decimal_column_sizes) +/** + * @brief Builds up per-column streams. + * + * @param[in,out] columns List of columns + * @param[in] segmentation stripe and rowgroup ranges + * @param[in] decimal_column_sizes Sizes of encoded decimal columns + * @return List of stream descriptors + */ +orc_streams create_streams(host_span columns, + file_segmentation const& segmentation, + std::map const& decimal_column_sizes, + bool enable_dictionary, + CompressionKind compression_kind, + bool single_write_mode) { // 'column 0' row index stream std::vector streams{{ROW_INDEX, 0}}; // TODO: Separate index and data streams? @@ -600,7 +756,7 @@ orc_streams writer::impl::create_streams(host_span columns, auto add_stream = [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { - auto const max_alignment_padding = uncomp_block_alignment(compression_kind_) - 1; + auto const max_alignment_padding = uncomp_block_alignment(compression_kind) - 1; const auto base = column.index() * gpu::CI_NUM_STREAMS; ids[base + index_type] = streams.size(); streams.push_back(orc::Stream{ @@ -637,7 +793,7 @@ orc_streams writer::impl::create_streams(host_span columns, column.set_orc_encoding(DIRECT); break; case TypeKind::STRING: { - bool enable_dict = enable_dictionary_; + bool enable_dict = enable_dictionary; size_t dict_data_size = 0; size_t dict_strings = 0; size_t dict_lengths_div512 = 0; @@ -712,47 +868,6 @@ orc_streams writer::impl::create_streams(host_span columns, return {std::move(streams), std::move(ids), std::move(types)}; } -orc_streams::orc_stream_offsets orc_streams::compute_offsets( - host_span columns, size_t num_rowgroups) const -{ - std::vector strm_offsets(streams.size()); - size_t non_rle_data_size = 0; - size_t rle_data_size = 0; - for (size_t i = 0; i < streams.size(); ++i) { - const auto& stream = streams[i]; - - auto const is_rle_data = [&]() { - // First stream is an index stream, don't check types, etc. - if (!stream.column_index().has_value()) return true; - - auto const& column = columns[stream.column_index().value()]; - // Dictionary encoded string column - dictionary characters or - // directly encoded string - column characters - if (column.orc_kind() == TypeKind::STRING && - ((stream.kind == DICTIONARY_DATA && column.orc_encoding() == DICTIONARY_V2) || - (stream.kind == DATA && column.orc_encoding() == DIRECT_V2))) - return false; - // Decimal data - if (column.orc_kind() == TypeKind::DECIMAL && stream.kind == DATA) return false; - - // Everything else uses RLE - return true; - }(); - // non-RLE and RLE streams are separated in the buffer that stores encoded data - // The computed offsets do not take the streams of the other type into account - if (is_rle_data) { - strm_offsets[i] = rle_data_size; - rle_data_size += (stream.length + 7) & ~7; - } else { - strm_offsets[i] = non_rle_data_size; - non_rle_data_size += stream.length; - } - } - non_rle_data_size = (non_rle_data_size + 7) & ~7; - - return {std::move(strm_offsets), non_rle_data_size, rle_data_size}; -} - std::vector> calculate_aligned_rowgroup_bounds( orc_table_view const& orc_table, file_segmentation const& segmentation, @@ -1093,11 +1208,23 @@ encoded_data encode_columns(orc_table_view const& orc_table, return {std::move(encoded_data), std::move(chunk_streams)}; } -std::vector writer::impl::gather_stripes( +/** + * @brief Returns stripe information after compacting columns' individual data + * chunks into contiguous data streams. + * + * @param[in] num_index_streams Total number of index streams + * @param[in] segmentation stripe and rowgroup ranges + * @param[in,out] enc_streams List of encoder chunk streams [column][rowgroup] + * @param[in,out] strm_desc List of stream descriptors [stripe][data_stream] + * @param[in] stream CUDA stream used for device memory operations and kernel launches + * @return The stripes' information + */ +std::vector gather_stripes( size_t num_index_streams, file_segmentation const& segmentation, hostdevice_2dvector* enc_streams, - hostdevice_2dvector* strm_desc) + hostdevice_2dvector* strm_desc, + rmm::cuda_stream_view stream) { if (segmentation.num_stripes() == 0) { return {}; } std::vector stripes(segmentation.num_stripes()); @@ -1165,16 +1292,25 @@ hostdevice_vector allocate_and_encode_blobs( return blobs; } -writer::impl::intermediate_statistics writer::impl::gather_statistic_blobs( - statistics_freq const stats_freq, - orc_table_view const& orc_table, - file_segmentation const& segmentation) +/** + * @brief Returns column statistics in an intermediate format. + * + * @param statistics_freq Frequency of statistics to be included in the output file + * @param orc_table Table information to be written + * @param segmentation stripe and rowgroup ranges + * @param stream CUDA stream used for device memory operations and kernel launches + * @return The statistic information + */ +intermediate_statistics gather_statistic_blobs(statistics_freq const stats_freq, + orc_table_view const& orc_table, + file_segmentation const& segmentation, + rmm::cuda_stream_view stream) { auto const num_rowgroup_blobs = segmentation.rowgroups.count(); auto const num_stripe_blobs = segmentation.num_stripes() * orc_table.num_columns(); auto const are_statistics_enabled = stats_freq != statistics_freq::STATISTICS_NONE; if (not are_statistics_enabled or num_rowgroup_blobs + num_stripe_blobs == 0) { - return writer::impl::intermediate_statistics{stream}; + return intermediate_statistics{stream}; } hostdevice_vector stat_desc(orc_table.num_columns(), stream); @@ -1292,8 +1428,17 @@ writer::impl::intermediate_statistics writer::impl::gather_statistic_blobs( std::move(col_types)}; } -writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( - int num_stripes, writer::impl::persisted_statistics& per_chunk_stats) +/** + * @brief Returns column statistics encoded in ORC protobuf format stored in the footer. + * + * @param num_stripes number of stripes in the data + * @param incoming_stats intermediate statistics returned from `gather_statistic_blobs` + * @param stream CUDA stream used for device memory operations and kernel launches + * @return The encoded statistic blobs + */ +encoded_footer_statistics finish_statistic_blobs(int num_stripes, + persisted_statistics& per_chunk_stats, + rmm::cuda_stream_view stream) { auto stripe_size_iter = thrust::make_transform_iterator(per_chunk_stats.stripe_stat_merge.begin(), [](auto const& i) { return i.size(); }); @@ -1383,16 +1528,36 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( return {std::move(stripe_blobs), std::move(file_blobs)}; } -void writer::impl::write_index_stream(int32_t stripe_id, - int32_t stream_id, - host_span columns, - file_segmentation const& segmentation, - host_2dspan enc_streams, - host_2dspan strm_desc, - host_span comp_res, - std::vector const& rg_stats, - StripeInformation* stripe, - orc_streams* streams) +/** + * @brief Writes the specified column's row index stream. + * + * @param[in] stripe_id Stripe's identifier + * @param[in] stream_id Stream identifier (column id + 1) + * @param[in] columns List of columns + * @param[in] segmentation stripe and rowgroup ranges + * @param[in] enc_streams List of encoder chunk streams [column][rowgroup] + * @param[in] strm_desc List of stream descriptors + * @param[in] comp_res Output status for compressed streams + * @param[in] rg_stats row group level statistics + * @param[in,out] stripe Stream's parent stripe + * @param[in,out] streams List of all streams + * @param[in] compression_kind The compression kind + * @param[in] compression_blocksize The block size used for compression + * @param[in] out_sink Sink for writing data + */ +void write_index_stream(int32_t stripe_id, + int32_t stream_id, + host_span columns, + file_segmentation const& segmentation, + host_2dspan enc_streams, + host_2dspan strm_desc, + host_span comp_res, + std::vector const& rg_stats, + StripeInformation* stripe, + orc_streams* streams, + CompressionKind compression_kind, + size_t compression_blocksize, + std::unique_ptr const& out_sink) { row_group_index_info present; row_group_index_info data; @@ -1404,7 +1569,7 @@ void writer::impl::write_index_stream(int32_t stripe_id, row_group_index_info record; if (stream.ids[type] > 0) { record.pos = 0; - if (compression_kind_ != NONE) { + if (compression_kind != NONE) { auto const& ss = strm_desc[stripe_id][stream.ids[type] - (columns.size() + 1)]; record.blk_pos = ss.first_block; record.comp_pos = 0; @@ -1419,10 +1584,10 @@ void writer::impl::write_index_stream(int32_t stripe_id, if (record.pos >= 0) { record.pos += stream.lengths[type]; while ((record.pos >= 0) && (record.blk_pos >= 0) && - (static_cast(record.pos) >= compression_blocksize_) && + (static_cast(record.pos) >= compression_blocksize) && (record.comp_pos + block_header_size + comp_res[record.blk_pos].bytes_written < static_cast(record.comp_size))) { - record.pos -= compression_blocksize_; + record.pos -= compression_blocksize; record.comp_pos += block_header_size + comp_res[record.blk_pos].bytes_written; record.blk_pos += 1; } @@ -1444,7 +1609,7 @@ void writer::impl::write_index_stream(int32_t stripe_id, } } - ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw((compression_kind != NONE) ? 3 : 0); // Add row index entries auto const& rowgroups_range = segmentation.stripes[stripe_id]; @@ -1469,22 +1634,39 @@ void writer::impl::write_index_stream(int32_t stripe_id, }); (*streams)[stream_id].length = pbw.size(); - if (compression_kind_ != NONE) { + if (compression_kind != NONE) { uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_ix_len >> 0); pbw.buffer()[1] = static_cast(uncomp_ix_len >> 8); pbw.buffer()[2] = static_cast(uncomp_ix_len >> 16); } - out_sink_->host_write(pbw.data(), pbw.size()); + out_sink->host_write(pbw.data(), pbw.size()); stripe->indexLength += pbw.size(); } -std::future writer::impl::write_data_stream(gpu::StripeStream const& strm_desc, - gpu::encoder_chunk_streams const& enc_stream, - uint8_t const* compressed_data, - uint8_t* stream_out, - StripeInformation* stripe, - orc_streams* streams) +/** + * @brief Write the specified column's data streams + * + * @param[in] strm_desc Stream's descriptor + * @param[in] enc_stream Chunk's streams + * @param[in] compressed_data Compressed stream data + * @param[in,out] stream_out Temporary host output buffer + * @param[in,out] stripe Stream's parent stripe + * @param[in,out] streams List of all streams + * @param[in] compression_kind The compression kind + * @param[in] out_sink Sink for writing data + * @param[in] stream CUDA stream used for device memory operations and kernel launches + * @return An std::future that should be synchronized to ensure the writing is complete + */ +std::future write_data_stream(gpu::StripeStream const& strm_desc, + gpu::encoder_chunk_streams const& enc_stream, + uint8_t const* compressed_data, + uint8_t* stream_out, + StripeInformation* stripe, + orc_streams* streams, + CompressionKind compression_kind, + std::unique_ptr const& out_sink, + rmm::cuda_stream_view stream) { const auto length = strm_desc.stream_size; (*streams)[enc_stream.ids[strm_desc.stream_type]].length = length; @@ -1492,18 +1674,18 @@ std::future writer::impl::write_data_stream(gpu::StripeStream const& strm_ return std::async(std::launch::deferred, [] {}); } - const auto* stream_in = (compression_kind_ == NONE) ? enc_stream.data_ptrs[strm_desc.stream_type] - : (compressed_data + strm_desc.bfr_offset); + const auto* stream_in = (compression_kind == NONE) ? enc_stream.data_ptrs[strm_desc.stream_type] + : (compressed_data + strm_desc.bfr_offset); auto write_task = [&]() { - if (out_sink_->is_device_write_preferred(length)) { - return out_sink_->device_write_async(stream_in, length, stream); + if (out_sink->is_device_write_preferred(length)) { + return out_sink->device_write_async(stream_in, length, stream); } else { CUDF_CUDA_TRY( cudaMemcpyAsync(stream_out, stream_in, length, cudaMemcpyDefault, stream.value())); stream.synchronize(); - out_sink_->host_write(stream_out, length); + out_sink->host_write(stream_out, length); return std::async(std::launch::deferred, [] {}); } }(); @@ -1511,18 +1693,27 @@ std::future writer::impl::write_data_stream(gpu::StripeStream const& strm_ return write_task; } -void writer::impl::add_uncompressed_block_headers(std::vector& v) +/** + * @brief Insert 3-byte uncompressed block headers in a byte vector + * + * @param compression_kind The compression kind + * @param compression_blocksize The block size used for compression + * @param v The destitation byte vector to write, which must include initial 3-byte header + */ +void add_uncompressed_block_headers(CompressionKind compression_kind, + size_t compression_blocksize, + std::vector& v) { - if (compression_kind_ != NONE) { + if (compression_kind != NONE) { size_t uncomp_len = v.size() - 3, pos = 0, block_len; - while (uncomp_len > compression_blocksize_) { - block_len = compression_blocksize_ * 2 + 1; + while (uncomp_len > compression_blocksize) { + block_len = compression_blocksize * 2 + 1; v[pos + 0] = static_cast(block_len >> 0); v[pos + 1] = static_cast(block_len >> 8); v[pos + 2] = static_cast(block_len >> 16); - pos += 3 + compression_blocksize_; + pos += 3 + compression_blocksize; v.insert(v.begin() + pos, 3, 0); - uncomp_len -= compression_blocksize_; + uncomp_len -= compression_blocksize; } block_len = uncomp_len * 2 + 1; v[pos + 0] = static_cast(block_len >> 0); @@ -1531,58 +1722,6 @@ void writer::impl::add_uncompressed_block_headers(std::vector& v) } } -writer::impl::impl(std::unique_ptr sink, - orc_writer_options const& options, - SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _mr(mr), - stream(stream), - max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, - row_index_stride{options.get_row_index_stride()}, - compression_kind_(to_orc_compression(options.get_compression())), - compression_blocksize_(compression_block_size(compression_kind_)), - stats_freq_(options.get_statistics_freq()), - single_write_mode(mode == SingleWriteMode::YES), - kv_meta(options.get_key_value_metadata()), - out_sink_(std::move(sink)) -{ - if (options.get_metadata()) { - table_meta = std::make_unique(*options.get_metadata()); - } - init_state(); -} - -writer::impl::impl(std::unique_ptr sink, - chunked_orc_writer_options const& options, - SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _mr(mr), - stream(stream), - max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, - row_index_stride{options.get_row_index_stride()}, - compression_kind_(to_orc_compression(options.get_compression())), - compression_blocksize_(compression_block_size(compression_kind_)), - stats_freq_(options.get_statistics_freq()), - single_write_mode(mode == SingleWriteMode::YES), - kv_meta(options.get_key_value_metadata()), - out_sink_(std::move(sink)) -{ - if (options.get_metadata()) { - table_meta = std::make_unique(*options.get_metadata()); - } - init_state(); -} - -writer::impl::~impl() { close(); } - -void writer::impl::init_state() -{ - // Write file header - out_sink_->host_write(MAGIC, std::strlen(MAGIC)); -} - void pushdown_lists_null_mask(orc_column_view const& col, device_span d_columns, bitmask_type const* parent_pd_mask, @@ -2018,48 +2157,6 @@ string_dictionaries allocate_dictionaries(orc_table_view const& orc_table, std::move(is_dict_enabled)}; } -struct string_length_functor { - __device__ inline size_type operator()(int const i) const - { - // we translate from 0 -> num_chunks * 2 because each statistic has a min and max - // string and we need to calculate lengths for both. - if (i >= num_chunks * 2) return 0; - - // min strings are even values, max strings are odd values of i - auto const should_copy_min = i % 2 == 0; - // index of the chunk - auto const idx = i / 2; - auto& str_val = should_copy_min ? stripe_stat_chunks[idx].min_value.str_val - : stripe_stat_chunks[idx].max_value.str_val; - auto const str = stripe_stat_merge[idx].stats_dtype == dtype_string; - return str ? str_val.length : 0; - } - - int const num_chunks; - statistics_chunk const* stripe_stat_chunks; - statistics_merge_group const* stripe_stat_merge; -}; - -__global__ void copy_string_data(char* string_pool, - size_type* offsets, - statistics_chunk* chunks, - statistics_merge_group const* groups) -{ - auto const idx = blockIdx.x / 2; - if (groups[idx].stats_dtype == dtype_string) { - // min strings are even values, max strings are odd values of i - auto const should_copy_min = blockIdx.x % 2 == 0; - auto& str_val = should_copy_min ? chunks[idx].min_value.str_val : chunks[idx].max_value.str_val; - auto dst = &string_pool[offsets[blockIdx.x]]; - auto src = str_val.ptr; - - for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) { - dst[i] = src[i]; - } - if (threadIdx.x == 0) { str_val.ptr = dst; } - } -} - size_t max_compression_output_size(CompressionKind compression_kind, uint32_t compression_blocksize) { if (compression_kind == NONE) return 0; @@ -2068,60 +2165,14 @@ size_t max_compression_output_size(CompressionKind compression_kind, uint32_t co compression_blocksize); } -void writer::impl::persisted_statistics::persist(int num_table_rows, - bool single_write_mode, - intermediate_statistics& intermediate_stats, - rmm::cuda_stream_view stream) -{ - if (not single_write_mode) { - // persist the strings in the chunks into a string pool and update pointers - auto const num_chunks = static_cast(intermediate_stats.stripe_stat_chunks.size()); - // min offset and max offset + 1 for total size - rmm::device_uvector offsets((num_chunks * 2) + 1, stream); - - auto iter = cudf::detail::make_counting_transform_iterator( - 0, - string_length_functor{num_chunks, - intermediate_stats.stripe_stat_chunks.data(), - intermediate_stats.stripe_stat_merge.device_ptr()}); - thrust::exclusive_scan(rmm::exec_policy(stream), iter, iter + offsets.size(), offsets.begin()); - - // pull size back to host - auto const total_string_pool_size = offsets.element(num_chunks * 2, stream); - if (total_string_pool_size > 0) { - rmm::device_uvector string_pool(total_string_pool_size, stream); - - // offsets describes where in the string pool each string goes. Going with the simple - // approach for now, but it is possible something fancier with breaking up each thread into - // copying x bytes instead of a single string is the better method since we are dealing in - // min/max strings they almost certainly will not be uniform length. - copy_string_data<<>>( - string_pool.data(), - offsets.data(), - intermediate_stats.stripe_stat_chunks.data(), - intermediate_stats.stripe_stat_merge.device_ptr()); - string_pools.emplace_back(std::move(string_pool)); - } - } - - stripe_stat_chunks.emplace_back(std::move(intermediate_stats.stripe_stat_chunks)); - stripe_stat_merge.emplace_back(std::move(intermediate_stats.stripe_stat_merge)); - stats_dtypes = std::move(intermediate_stats.stats_dtypes); - col_types = std::move(intermediate_stats.col_types); - num_rows = num_table_rows; -} - -void writer::impl::write(table_view const& table) +std::unique_ptr make_table_meta(table_view const& input) { - CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); - auto const num_rows = table.num_rows(); - - if (not table_meta) { table_meta = std::make_unique(table); } + auto table_meta = std::make_unique(input); // Fill unnamed columns' names in table_meta std::function add_default_name = [&](column_in_metadata& col_meta, std::string default_name) { - if (col_meta.get_name().empty()) col_meta.set_name(default_name); + if (col_meta.get_name().empty()) { col_meta.set_name(default_name); } for (size_type i = 0; i < col_meta.num_children(); ++i) { add_default_name(col_meta.child(i), std::to_string(i)); } @@ -2130,9 +2181,51 @@ void writer::impl::write(table_view const& table) add_default_name(table_meta->column_metadata[i], "_col" + std::to_string(i)); } - auto const d_table = table_device_view::create(table, stream); + return table_meta; +} + +/** + * @brief Perform the processing steps needed to convert the input table into the output ORC data + * for writing, such as compression and ORC encoding. + * + * @param input The input table + * @param table_meta The table metadata + * @param max_stripe_size Maximum size of stripes in the output file + * @param row_index_stride The row index stride + * @param enable_dictionary Whether dictionary is enabled + * @param compression_kind The compression kind + * @param compression_blocksize The block size used for compression + * @param stats_freq Column statistics granularity type for parquet/orc writers + * @param single_write_mode Flag to indicate if there is only a single table write + * @param out_sink Sink for writing data + * @param stream CUDA stream used for device memory operations and kernel launches + * @return A tuple of the intermediate results containing the processed data + */ +std::tuple, + hostdevice_2dvector, + encoded_data, + file_segmentation, + std::vector, + orc_table_view, + rmm::device_buffer, + intermediate_statistics, + pinned_buffer> +convert_table_to_orc_data(table_view const& input, + table_input_metadata const& table_meta, + stripe_size_limits max_stripe_size, + size_type row_index_stride, + bool enable_dictionary, + CompressionKind compression_kind, + size_t compression_blocksize, + statistics_freq stats_freq, + bool single_write_mode, + data_sink const& out_sink, + rmm::cuda_stream_view stream) +{ + auto const input_tview = table_device_view::create(input, stream); - auto orc_table = make_orc_table_view(table, *d_table, *table_meta, stream); + auto orc_table = make_orc_table_view(input, *input_tview, table_meta, stream); auto const pd_masks = init_pushdown_null_masks(orc_table, stream); @@ -2152,7 +2245,7 @@ void writer::impl::write(table_view const& table) } // Decide stripe boundaries based on rowgroups and dict chunks - auto const segmentation = + auto segmentation = calculate_segmentation(orc_table.columns, std::move(rowgroup_bounds), max_stripe_size); // Build stripe-level dictionaries @@ -2164,15 +2257,22 @@ void writer::impl::write(table_view const& table) dict, dictionaries.index, dictionaries.dictionary_enabled, - stripe_dict); + stripe_dict, + enable_dictionary, + stream); } auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); - auto const uncompressed_block_align = uncomp_block_alignment(compression_kind_); - auto const compressed_block_align = comp_block_alignment(compression_kind_); - auto streams = - create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes)); + auto const uncompressed_block_align = uncomp_block_alignment(compression_kind); + auto const compressed_block_align = comp_block_alignment(compression_kind); + + auto streams = create_streams(orc_table.columns, + segmentation, + decimal_column_sizes(dec_chunk_sizes.rg_sizes), + enable_dictionary, + compression_kind, + single_write_mode); auto enc_data = encode_columns(orc_table, std::move(dictionaries), std::move(dec_chunk_sizes), @@ -2181,152 +2281,314 @@ void writer::impl::write(table_view const& table) uncompressed_block_align, stream); + auto const num_rows = input.num_rows(); + // Assemble individual disparate column chunks into contiguous data streams size_type const num_index_streams = (orc_table.num_columns() + 1); const auto num_data_streams = streams.size() - num_index_streams; hostdevice_2dvector strm_descs( segmentation.num_stripes(), num_data_streams, stream); - auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs); - - if (num_rows > 0) { - // Allocate intermediate output stream buffer - size_t compressed_bfr_size = 0; - size_t num_compressed_blocks = 0; - - auto const max_compressed_block_size = - max_compression_output_size(compression_kind_, compression_blocksize_); - auto const padded_max_compressed_block_size = - util::round_up_unsafe(max_compressed_block_size, compressed_block_align); - auto const padded_block_header_size = - util::round_up_unsafe(block_header_size, compressed_block_align); - - auto stream_output = [&]() { - size_t max_stream_size = 0; - bool all_device_write = true; - - for (auto& ss : strm_descs.host_view().flat_view()) { - if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; } - size_t stream_size = ss.stream_size; - if (compression_kind_ != NONE) { - ss.first_block = num_compressed_blocks; - ss.bfr_offset = compressed_bfr_size; - - auto num_blocks = std::max( - (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); - stream_size += num_blocks * block_header_size; - num_compressed_blocks += num_blocks; - compressed_bfr_size += - (padded_block_header_size + padded_max_compressed_block_size) * num_blocks; - } - max_stream_size = std::max(max_stream_size, stream_size); - } + auto stripes = + gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs, stream); + + if (num_rows == 0) { + return {std::move(streams), + hostdevice_vector{}, // comp_results + std::move(strm_descs), + std::move(enc_data), + std::move(segmentation), + std::move(stripes), + std::move(orc_table), + rmm::device_buffer{}, // compressed_data + intermediate_statistics{stream}, + pinned_buffer{nullptr, cudaFreeHost}}; + } - if (all_device_write) { - return pinned_buffer{nullptr, cudaFreeHost}; - } else { - return pinned_buffer{[](size_t size) { - uint8_t* ptr = nullptr; - CUDF_CUDA_TRY(cudaMallocHost(&ptr, size)); - return ptr; - }(max_stream_size), - cudaFreeHost}; + // Allocate intermediate output stream buffer + size_t compressed_bfr_size = 0; + size_t num_compressed_blocks = 0; + + auto const max_compressed_block_size = + max_compression_output_size(compression_kind, compression_blocksize); + auto const padded_max_compressed_block_size = + util::round_up_unsafe(max_compressed_block_size, compressed_block_align); + auto const padded_block_header_size = + util::round_up_unsafe(block_header_size, compressed_block_align); + + auto stream_output = [&]() { + size_t max_stream_size = 0; + bool all_device_write = true; + + for (auto& ss : strm_descs.host_view().flat_view()) { + if (!out_sink.is_device_write_preferred(ss.stream_size)) { all_device_write = false; } + size_t stream_size = ss.stream_size; + if (compression_kind != NONE) { + ss.first_block = num_compressed_blocks; + ss.bfr_offset = compressed_bfr_size; + + auto num_blocks = + std::max((stream_size + compression_blocksize - 1) / compression_blocksize, 1); + stream_size += num_blocks * block_header_size; + num_compressed_blocks += num_blocks; + compressed_bfr_size += + (padded_block_header_size + padded_max_compressed_block_size) * num_blocks; } - }(); + max_stream_size = std::max(max_stream_size, stream_size); + } - // Compress the data streams - rmm::device_buffer compressed_data(compressed_bfr_size, stream); - hostdevice_vector comp_results(num_compressed_blocks, stream); - thrust::fill(rmm::exec_policy(stream), - comp_results.d_begin(), - comp_results.d_end(), - compression_result{0, compression_status::FAILURE}); - if (compression_kind_ != NONE) { - strm_descs.host_to_device(stream); - gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), - num_compressed_blocks, - compression_kind_, - compression_blocksize_, - max_compressed_block_size, - compressed_block_align, - strm_descs, - enc_data.streams, - comp_results, - stream); - - // deallocate encoded data as it is not needed anymore - enc_data.data = rmm::device_uvector{0, stream}; - - strm_descs.device_to_host(stream); - comp_results.device_to_host(stream, true); + if (all_device_write) { + return pinned_buffer{nullptr, cudaFreeHost}; + } else { + return pinned_buffer{[](size_t size) { + uint8_t* ptr = nullptr; + CUDF_CUDA_TRY(cudaMallocHost(&ptr, size)); + return ptr; + }(max_stream_size), + cudaFreeHost}; } + }(); + + // Compress the data streams + rmm::device_buffer compressed_data(compressed_bfr_size, stream); + hostdevice_vector comp_results(num_compressed_blocks, stream); + thrust::fill(rmm::exec_policy(stream), + comp_results.d_begin(), + comp_results.d_end(), + compression_result{0, compression_status::FAILURE}); + if (compression_kind != NONE) { + strm_descs.host_to_device(stream); + gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), + num_compressed_blocks, + compression_kind, + compression_blocksize, + max_compressed_block_size, + compressed_block_align, + strm_descs, + enc_data.streams, + comp_results, + stream); + + // deallocate encoded data as it is not needed anymore + enc_data.data = rmm::device_uvector{0, stream}; + + strm_descs.device_to_host(stream); + comp_results.device_to_host(stream, true); + } - auto intermediate_stats = gather_statistic_blobs(stats_freq_, orc_table, segmentation); + auto intermediate_stats = gather_statistic_blobs(stats_freq, orc_table, segmentation, stream); + + return {std::move(streams), + std::move(comp_results), + std::move(strm_descs), + std::move(enc_data), + std::move(segmentation), + std::move(stripes), + std::move(orc_table), + std::move(compressed_data), + std::move(intermediate_stats), + std::move(stream_output)}; +} - if (intermediate_stats.stripe_stat_chunks.size() > 0) { - persisted_stripe_statistics.persist( - orc_table.num_rows(), single_write_mode, intermediate_stats, stream); +} // namespace + +writer::impl::impl(std::unique_ptr sink, + orc_writer_options const& options, + SingleWriteMode mode, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + : _mr(mr), + stream(stream), + max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, + row_index_stride{options.get_row_index_stride()}, + compression_kind_(to_orc_compression(options.get_compression())), + compression_blocksize_(compression_block_size(compression_kind_)), + stats_freq_(options.get_statistics_freq()), + single_write_mode(mode == SingleWriteMode::YES), + kv_meta(options.get_key_value_metadata()), + out_sink_(std::move(sink)) +{ + if (options.get_metadata()) { + table_meta = std::make_unique(*options.get_metadata()); + } + init_state(); +} + +writer::impl::impl(std::unique_ptr sink, + chunked_orc_writer_options const& options, + SingleWriteMode mode, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + : _mr(mr), + stream(stream), + max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, + row_index_stride{options.get_row_index_stride()}, + compression_kind_(to_orc_compression(options.get_compression())), + compression_blocksize_(compression_block_size(compression_kind_)), + stats_freq_(options.get_statistics_freq()), + single_write_mode(mode == SingleWriteMode::YES), + kv_meta(options.get_key_value_metadata()), + out_sink_(std::move(sink)) +{ + if (options.get_metadata()) { + table_meta = std::make_unique(*options.get_metadata()); + } + init_state(); +} + +writer::impl::~impl() { close(); } + +void writer::impl::init_state() +{ + // Write file header + out_sink_->host_write(MAGIC, std::strlen(MAGIC)); +} + +void writer::impl::write(table_view const& input) +{ + CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); + + if (not table_meta) { table_meta = make_table_meta(input); } + + // All kinds of memory allocation and data compressions/encoding are performed here. + // If any error occurs, such as out-of-memory exception, the internal state of the current writer + // is still intact. + // Note that `out_sink_` is intentionally passed by const reference to prevent accidentally + // writing anything to it. + auto [streams, + comp_results, + strm_descs, + enc_data, + segmentation, + stripes, + orc_table, + compressed_data, + intermediate_stats, + stream_output] = [&] { + try { + return convert_table_to_orc_data(input, + *table_meta, + max_stripe_size, + row_index_stride, + enable_dictionary_, + compression_kind_, + compression_blocksize_, + stats_freq_, + single_write_mode, + *out_sink_, + stream); + } catch (...) { // catch any exception type + CUDF_LOG_ERROR( + "ORC writer encountered exception during processing. " + "No data has been written to the sink."); + throw; // this throws the same exception } + }(); - // Write stripes - std::vector> write_tasks; - for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { - auto& stripe = stripes[stripe_id]; - - stripe.offset = out_sink_->bytes_written(); - - // Column (skippable) index streams appear at the start of the stripe - for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) { - write_index_stream(stripe_id, - stream_id, - orc_table.columns, - segmentation, - enc_data.streams, - strm_descs, - comp_results, - intermediate_stats.rowgroup_blobs, - &stripe, - &streams); - } + // Compression/encoding were all successful. Now write the intermediate results. + write_orc_data_to_sink(streams, + comp_results, + strm_descs, + enc_data, + segmentation, + stripes, + orc_table, + compressed_data, + intermediate_stats, + stream_output.get()); + + // Update data into the footer. This needs to be called even when num_rows==0. + add_table_to_footer_data(orc_table, stripes); +} - // Column data consisting one or more separate streams - for (auto const& strm_desc : strm_descs[stripe_id]) { - write_tasks.push_back(write_data_stream( - strm_desc, - enc_data.streams[strm_desc.column_id][segmentation.stripes[stripe_id].first], - static_cast(compressed_data.data()), - stream_output.get(), - &stripe, - &streams)); - } +void writer::impl::write_orc_data_to_sink(orc_streams& streams, + hostdevice_vector const& comp_results, + hostdevice_2dvector const& strm_descs, + encoded_data const& enc_data, + file_segmentation const& segmentation, + std::vector& stripes, + orc_table_view const& orc_table, + rmm::device_buffer const& compressed_data, + intermediate_statistics& intermediate_stats, + uint8_t* stream_output) +{ + if (orc_table.num_rows() == 0) { return; } - // Write stripefooter consisting of stream information - StripeFooter sf; - sf.streams = streams; - sf.columns.resize(orc_table.num_columns() + 1); - sf.columns[0].kind = DIRECT; - for (size_t i = 1; i < sf.columns.size(); ++i) { - sf.columns[i].kind = orc_table.column(i - 1).orc_encoding(); - sf.columns[i].dictionarySize = - (sf.columns[i].kind == DICTIONARY_V2) - ? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings - : 0; - if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } - } - ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); - pbw.write(sf); - stripe.footerLength = pbw.size(); - if (compression_kind_ != NONE) { - uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; - pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); - pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); - pbw.buffer()[2] = static_cast(uncomp_sf_len >> 16); - } - out_sink_->host_write(pbw.data(), pbw.size()); + if (intermediate_stats.stripe_stat_chunks.size() > 0) { + persisted_stripe_statistics.persist( + orc_table.num_rows(), single_write_mode, intermediate_stats, stream); + } + + // Write stripes + std::vector> write_tasks; + for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { + auto& stripe = stripes[stripe_id]; + + stripe.offset = out_sink_->bytes_written(); + + // Column (skippable) index streams appear at the start of the stripe + size_type const num_index_streams = (orc_table.num_columns() + 1); + for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) { + write_index_stream(stripe_id, + stream_id, + orc_table.columns, + segmentation, + enc_data.streams, + strm_descs, + comp_results, + intermediate_stats.rowgroup_blobs, + &stripe, + &streams, + compression_kind_, + compression_blocksize_, + out_sink_); } - for (auto const& task : write_tasks) { - task.wait(); + + // Column data consisting one or more separate streams + for (auto const& strm_desc : strm_descs[stripe_id]) { + write_tasks.push_back(write_data_stream( + strm_desc, + enc_data.streams[strm_desc.column_id][segmentation.stripes[stripe_id].first], + static_cast(compressed_data.data()), + stream_output, + &stripe, + &streams, + compression_kind_, + out_sink_, + stream)); } + + // Write stripefooter consisting of stream information + StripeFooter sf; + sf.streams = streams; + sf.columns.resize(orc_table.num_columns() + 1); + sf.columns[0].kind = DIRECT; + for (size_t i = 1; i < sf.columns.size(); ++i) { + sf.columns[i].kind = orc_table.column(i - 1).orc_encoding(); + sf.columns[i].dictionarySize = + (sf.columns[i].kind == DICTIONARY_V2) + ? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings + : 0; + if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } + } + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); + pbw.write(sf); + stripe.footerLength = pbw.size(); + if (compression_kind_ != NONE) { + uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; + pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); + pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); + pbw.buffer()[2] = static_cast(uncomp_sf_len >> 16); + } + out_sink_->host_write(pbw.data(), pbw.size()); + } + for (auto const& task : write_tasks) { + task.wait(); } +} + +void writer::impl::add_table_to_footer_data(orc_table_view const& orc_table, + std::vector& stripes) +{ if (ff.headerLength == 0) { // First call ff.headerLength = std::strlen(MAGIC); @@ -2372,7 +2634,7 @@ void writer::impl::write(table_view const& table) ff.stripes.insert(ff.stripes.end(), std::make_move_iterator(stripes.begin()), std::make_move_iterator(stripes.end())); - ff.numberOfRows += num_rows; + ff.numberOfRows += orc_table.num_rows(); } void writer::impl::close() @@ -2381,7 +2643,8 @@ void writer::impl::close() closed = true; PostScript ps; - auto const statistics = finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics); + auto const statistics = + finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics, stream); // File-level statistics if (not statistics.file_level.empty()) { @@ -2425,7 +2688,7 @@ void writer::impl::close() if (md.stripeStats.size() != 0) { ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); pbw.write(md); - add_uncompressed_block_headers(pbw.buffer()); + add_uncompressed_block_headers(compression_kind_, compression_blocksize_, pbw.buffer()); ps.metadataLength = pbw.size(); out_sink_->host_write(pbw.data(), pbw.size()); } else { @@ -2433,7 +2696,7 @@ void writer::impl::close() } ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); pbw.write(ff); - add_uncompressed_block_headers(pbw.buffer()); + add_uncompressed_block_headers(compression_kind_, compression_blocksize_, pbw.buffer()); // Write postscript metadata ps.footerLength = pbw.size(); diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 691fba6bac2..27d74e45b46 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -176,6 +176,72 @@ struct stripe_size_limits { size_type rows; }; +/** + * @brief Statistics data stored between calls to write for chunked writes + * + */ +struct intermediate_statistics { + explicit intermediate_statistics(rmm::cuda_stream_view stream) : stripe_stat_chunks(0, stream) {} + + intermediate_statistics(std::vector rb, + rmm::device_uvector sc, + hostdevice_vector smg, + std::vector sdt, + std::vector sct) + : rowgroup_blobs(std::move(rb)), + stripe_stat_chunks(std::move(sc)), + stripe_stat_merge(std::move(smg)), + stats_dtypes(std::move(sdt)), + col_types(std::move(sct)) + { + } + + // blobs for the rowgroups. Not persisted + std::vector rowgroup_blobs; + + rmm::device_uvector stripe_stat_chunks; + hostdevice_vector stripe_stat_merge; + std::vector stats_dtypes; + std::vector col_types; +}; + +/** + * @brief used for chunked writes to persist data between calls to write. + * + */ +struct persisted_statistics { + void clear() + { + stripe_stat_chunks.clear(); + stripe_stat_merge.clear(); + string_pools.clear(); + stats_dtypes.clear(); + col_types.clear(); + num_rows = 0; + } + + void persist(int num_table_rows, + bool single_write_mode, + intermediate_statistics& intermediate_stats, + rmm::cuda_stream_view stream); + + std::vector> stripe_stat_chunks; + std::vector> stripe_stat_merge; + std::vector> string_pools; + std::vector stats_dtypes; + std::vector col_types; + int num_rows = 0; +}; + +/** + * @brief Protobuf encoded statistics created at file close + * + */ +struct encoded_footer_statistics { + std::vector stripe_level; + std::vector file_level; +}; + /** * @brief Implementation for ORC writer */ @@ -227,7 +293,7 @@ class writer::impl { /** * @brief Writes a single subtable as part of a larger ORC file/table write. * - * @param[in] table The table information to be written + * @param table The table information to be written */ void write(table_view const& table); @@ -238,186 +304,41 @@ class writer::impl { private: /** - * @brief Builds up per-stripe dictionaries for string columns. - * - * @param orc_table Non-owning view of a cuDF table w/ ORC-related info - * @param stripe_bounds List of stripe boundaries - * @param dict List of dictionary chunks [rowgroup][column] - * @param dict_index List of dictionary indices - * @param dictionary_enabled Whether dictionary encoding is enabled for a given column - * @param stripe_dict List of stripe dictionaries - */ - void build_dictionaries(orc_table_view& orc_table, - host_span stripe_bounds, - hostdevice_2dvector const& dict, - host_span> dict_index, - host_span dictionary_enabled, - hostdevice_2dvector& stripe_dict); - - /** - * @brief Builds up per-column streams. - * - * @param[in,out] columns List of columns - * @param[in] segmentation stripe and rowgroup ranges - * @param[in] decimal_column_sizes Sizes of encoded decimal columns - * @return List of stream descriptors - */ - orc_streams create_streams(host_span columns, - file_segmentation const& segmentation, - std::map const& decimal_column_sizes); - - /** - * @brief Returns stripe information after compacting columns' individual data - * chunks into contiguous data streams. - * - * @param[in] num_index_streams Total number of index streams - * @param[in] segmentation stripe and rowgroup ranges - * @param[in,out] enc_streams List of encoder chunk streams [column][rowgroup] - * @param[in,out] strm_desc List of stream descriptors [stripe][data_stream] + * @brief Write the intermediate ORC data into the data sink. * - * @return The stripes' information - */ - std::vector gather_stripes( - size_t num_index_streams, - file_segmentation const& segmentation, - hostdevice_2dvector* enc_streams, - hostdevice_2dvector* strm_desc); - - /** - * @brief Statistics data stored between calls to write for chunked writes - * - */ - struct intermediate_statistics { - explicit intermediate_statistics(rmm::cuda_stream_view stream) - : stripe_stat_chunks(0, stream){}; - intermediate_statistics(std::vector rb, - rmm::device_uvector sc, - hostdevice_vector smg, - std::vector sdt, - std::vector sct) - : rowgroup_blobs(std::move(rb)), - stripe_stat_chunks(std::move(sc)), - stripe_stat_merge(std::move(smg)), - stats_dtypes(std::move(sdt)), - col_types(std::move(sct)){}; - - // blobs for the rowgroups. Not persisted - std::vector rowgroup_blobs; - - rmm::device_uvector stripe_stat_chunks; - hostdevice_vector stripe_stat_merge; - std::vector stats_dtypes; - std::vector col_types; - }; - - /** - * @brief used for chunked writes to persist data between calls to write. - * - */ - struct persisted_statistics { - void clear() - { - stripe_stat_chunks.clear(); - stripe_stat_merge.clear(); - string_pools.clear(); - stats_dtypes.clear(); - col_types.clear(); - num_rows = 0; - } - - void persist(int num_table_rows, - bool single_write_mode, - intermediate_statistics& intermediate_stats, - rmm::cuda_stream_view stream); - - std::vector> stripe_stat_chunks; - std::vector> stripe_stat_merge; - std::vector> string_pools; - std::vector stats_dtypes; - std::vector col_types; - int num_rows = 0; - }; - - /** - * @brief Protobuf encoded statistics created at file close - * - */ - struct encoded_footer_statistics { - std::vector stripe_level; - std::vector file_level; - }; - - /** - * @brief Returns column statistics in an intermediate format. - * - * @param statistics_freq Frequency of statistics to be included in the output file - * @param orc_table Table information to be written - * @param segmentation stripe and rowgroup ranges - * @return The statistic information - */ - intermediate_statistics gather_statistic_blobs(statistics_freq const statistics_freq, - orc_table_view const& orc_table, - file_segmentation const& segmentation); - - /** - * @brief Returns column statistics encoded in ORC protobuf format stored in the footer. - * - * @param num_stripes number of stripes in the data - * @param incoming_stats intermediate statistics returned from `gather_statistic_blobs` - * @return The encoded statistic blobs - */ - encoded_footer_statistics finish_statistic_blobs( - int num_stripes, writer::impl::persisted_statistics& incoming_stats); - - /** - * @brief Writes the specified column's row index stream. - * - * @param[in] stripe_id Stripe's identifier - * @param[in] stream_id Stream identifier (column id + 1) - * @param[in] columns List of columns - * @param[in] segmentation stripe and rowgroup ranges - * @param[in] enc_streams List of encoder chunk streams [column][rowgroup] - * @param[in] strm_desc List of stream descriptors - * @param[in] comp_out Output status for compressed streams - * @param[in] rg_stats row group level statistics - * @param[in,out] stripe Stream's parent stripe - * @param[in,out] streams List of all streams - */ - void write_index_stream(int32_t stripe_id, - int32_t stream_id, - host_span columns, - file_segmentation const& segmentation, - host_2dspan enc_streams, - host_2dspan strm_desc, - host_span comp_out, - std::vector const& rg_stats, - StripeInformation* stripe, - orc_streams* streams); - - /** - * @brief Write the specified column's data streams + * The intermediate data is generated from processing (compressing/encoding) an cuDF input table + * by `process_for_write` called in the `write()` function. * - * @param[in] strm_desc Stream's descriptor - * @param[in] enc_stream Chunk's streams - * @param[in] compressed_data Compressed stream data - * @param[in,out] stream_out Temporary host output buffer - * @param[in,out] stripe Stream's parent stripe - * @param[in,out] streams List of all streams - * @return An std::future that should be synchronized to ensure the writing is complete + * @param streams List of stream descriptors + * @param comp_results Status of data compression + * @param strm_descs List of stream descriptors + * @param enc_data ORC per-chunk streams of encoded data + * @param segmentation Description of how the ORC file is segmented into stripes and rowgroups + * @param stripes List of stripe description + * @param orc_table Non-owning view of a cuDF table that includes ORC-related information + * @param compressed_data Compressed stream data + * @param intermediate_stats Statistics data stored between calls to write + * @param stream_output Temporary host output buffer */ - std::future write_data_stream(gpu::StripeStream const& strm_desc, - gpu::encoder_chunk_streams const& enc_stream, - uint8_t const* compressed_data, - uint8_t* stream_out, - StripeInformation* stripe, - orc_streams* streams); + void write_orc_data_to_sink(orc_streams& streams, + hostdevice_vector const& comp_results, + hostdevice_2dvector const& strm_descs, + encoded_data const& enc_data, + file_segmentation const& segmentation, + std::vector& stripes, + orc_table_view const& orc_table, + rmm::device_buffer const& compressed_data, + intermediate_statistics& intermediate_stats, + uint8_t* stream_output); /** - * @brief Insert 3-byte uncompressed block headers in a byte vector + * @brief Add the processed table data into the internal file footer. * - * @param byte_vector Raw data (must include initial 3-byte header) + * @param orc_table Non-owning view of a cuDF table that includes ORC-related information + * @param stripes List of stripe description */ - void add_uncompressed_block_headers(std::vector& byte_vector); + void add_table_to_footer_data(orc_table_view const& orc_table, + std::vector& stripes); private: rmm::mr::device_memory_resource* _mr = nullptr; diff --git a/python/cudf/cudf/core/cut.py b/python/cudf/cudf/core/cut.py index 6590cf2940d..ccf730c91fb 100644 --- a/python/cudf/cudf/core/cut.py +++ b/python/cudf/cudf/core/cut.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2022, NVIDIA CORPORATION. +# Copyright (c) 2021-2023, NVIDIA CORPORATION. from collections import abc @@ -279,12 +279,8 @@ def cut( if labels is not None: if labels is not ordered and len(set(labels)) != len(labels): # when we have duplicate labels and ordered is False, we - # should allow duplicate categories. The categories are - # returned in order - new_data = [interval_labels[i][0] for i in index_labels.values] - return cudf.CategoricalIndex( - new_data, categories=sorted(set(labels)), ordered=False - ) + # should allow duplicate categories. + return interval_labels[index_labels] col = build_categorical_column( categories=interval_labels, diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 413e005b798..d1408fec160 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -1403,7 +1403,7 @@ def __repr__(self): @_cudf_nvtx_annotate def __getitem__(self, index): res = self._get_elements_from_column(index) - if not isinstance(index, int): + if isinstance(res, ColumnBase): res = as_index(res) res.name = self.name return res diff --git a/python/cudf/cudf/tests/test_index.py b/python/cudf/cudf/tests/test_index.py index d043b917251..0b0c5fba7fa 100644 --- a/python/cudf/cudf/tests/test_index.py +++ b/python/cudf/cudf/tests/test_index.py @@ -2886,3 +2886,22 @@ def test_index_to_pandas_nullable(data, expected_dtype): expected = pd.Index(data, dtype=expected_dtype) assert_eq(pi, expected) + + +class TestIndexScalarGetItem: + @pytest.fixture( + params=[range(1, 10, 2), [1, 2, 3], ["a", "b", "c"], [1.5, 2.5, 3.5]] + ) + def index_values(self, request): + return request.param + + @pytest.fixture(params=[int, np.int8, np.int32, np.int64]) + def i(self, request): + return request.param(1) + + def test_scalar_getitem(self, index_values, i): + index = cudf.Index(index_values) + + assert not isinstance(index[i], cudf.Index) + assert index[i] == index_values[i] + assert_eq(index, index.to_pandas())