From db237419d96de5e92faafcc91cd54571efe6f755 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 7 May 2021 17:26:42 +0530 Subject: [PATCH 01/24] Initial changes to get nvcomp integrated Cmake changes (excluding changes needed in nvcomp's cmake) Replace cuIO's snappy compressor with nvcomp --- cpp/CMakeLists.txt | 21 ++++ cpp/cmake/cudf-build-config.cmake.in | 7 ++ cpp/cmake/cudf-config.cmake.in | 3 + cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake | 41 +++++++ cpp/src/io/comp/snap.cu | 2 +- cpp/src/io/parquet/writer_impl.cu | 135 +++++++++++++++++++++- 6 files changed, 207 insertions(+), 2 deletions(-) create mode 100644 cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 198690e37ff..499ae8b5797 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -125,6 +125,8 @@ include(cmake/thirdparty/CUDF_GetCPM.cmake) include(cmake/thirdparty/CUDF_FindBoost.cmake) # find jitify include(cmake/thirdparty/CUDF_GetJitify.cmake) +# find nvCOMP +include(cmake/thirdparty/CUDF_GetnvCOMP.cmake) # find thrust/cub include(cmake/thirdparty/CUDF_GetThrust.cmake) # find rmm @@ -475,6 +477,7 @@ target_link_libraries(cudf Boost::filesystem ${ARROW_LIBRARIES} cudf::Thrust + nvCOMP::nvcomp rmm::rmm) if(CUDA_STATIC_RUNTIME) @@ -580,6 +583,10 @@ install(TARGETS cudf DESTINATION lib EXPORT cudf-targets) +install(TARGETS nvcomp + DESTINATION lib + EXPORT cudf-nvcomp-target) + install(DIRECTORY ${CUDF_SOURCE_DIR}/include/cudf ${CUDF_SOURCE_DIR}/include/cudf_test @@ -623,6 +630,11 @@ install(EXPORT cudf-testing-targets NAMESPACE cudf:: DESTINATION "${INSTALL_CONFIGDIR}") +install(EXPORT cudf-nvcomp-target + FILE cudf-nvcomp-target.cmake + NAMESPACE nvCOMP:: + DESTINATION "${INSTALL_CONFIGDIR}") + ################################################################################################ # - build export ------------------------------------------------------------------------------- configure_package_config_file(cmake/cudf-build-config.cmake.in ${CUDF_BINARY_DIR}/cudf-config.cmake @@ -656,6 +668,15 @@ if(TARGET gtest) endif() endif() +if(TARGET nvcomp) + get_target_property(nvcomp_is_imported nvcomp IMPORTED) + if(NOT nvcomp_is_imported) + export(TARGETS nvcomp + FILE ${CUDF_BINARY_DIR}/cudf-nvcomp-target.cmake + NAMESPACE nvCOMP::) + endif() +endif() + export(EXPORT cudf-targets FILE ${CUDF_BINARY_DIR}/cudf-targets.cmake NAMESPACE cudf::) diff --git a/cpp/cmake/cudf-build-config.cmake.in b/cpp/cmake/cudf-build-config.cmake.in index ed1926f20f0..84cba8e8680 100644 --- a/cpp/cmake/cudf-build-config.cmake.in +++ b/cpp/cmake/cudf-build-config.cmake.in @@ -71,6 +71,13 @@ else() include(@CUDF_SOURCE_DIR@/cmake/thirdparty/CUDF_GetGTest.cmake) endif() +# find nvCOMP +if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") + include("${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") +else() + include(@CUDF_SOURCE_DIR@/cmake/thirdparty/CUDF_GetnvCOMP.cmake) +endif() + list(POP_FRONT CMAKE_MODULE_PATH) diff --git a/cpp/cmake/cudf-config.cmake.in b/cpp/cmake/cudf-config.cmake.in index 66c669851fa..950c08767f6 100644 --- a/cpp/cmake/cudf-config.cmake.in +++ b/cpp/cmake/cudf-config.cmake.in @@ -83,6 +83,9 @@ find_dependency(ArrowCUDA @CUDF_VERSION_Arrow@) find_dependency(rmm @CUDF_MIN_VERSION_rmm@) +find_dependency(nvCOMP @CUDF_MIN_VERSION_nvCOMP@) +include("${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") + set(Thrust_ROOT "${CMAKE_CURRENT_LIST_DIR}/../../../include/libcudf/Thrust") find_dependency(Thrust @CUDF_MIN_VERSION_Thrust@) thrust_create_target(cudf::Thrust FROM_OPTIONS) diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake new file mode 100644 index 00000000000..4fec448e9a9 --- /dev/null +++ b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake @@ -0,0 +1,41 @@ +#============================================================================= +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#============================================================================= + +function(find_and_configure_nvcomp VERSION) + + if(TARGET nvCOMP::nvcomp) + return() + endif() + + # Find or install nvCOMP + CPMFindPackage(NAME nvCOMP + VERSION ${VERSION} + GIT_REPOSITORY https://github.com/NVIDIA/nvcomp.git + GIT_TAG v${VERSION} + GIT_SHALLOW TRUE + OPTIONS ) + + if(NOT TARGET nvCOMP::nvcomp) + add_library(nvCOMP::nvcomp ALIAS nvcomp) + endif() + + # Make sure consumers of cudf can also see nvCOMP::nvcomp target + fix_cmake_global_defaults(nvCOMP::nvcomp) +endfunction() + +set(CUDF_MIN_VERSION_nvCOMP 2.0.0) + +find_and_configure_nvcomp(${CUDF_MIN_VERSION_nvCOMP}) diff --git a/cpp/src/io/comp/snap.cu b/cpp/src/io/comp/snap.cu index 999d02e3a50..526ff95d3d4 100644 --- a/cpp/src/io/comp/snap.cu +++ b/cpp/src/io/comp/snap.cu @@ -257,7 +257,7 @@ static __device__ uint32_t Match60(const uint8_t *src1, * @param[out] outputs Compression status per block * @param[in] count Number of blocks to compress */ -extern "C" __global__ void __launch_bounds__(128) +__global__ void __launch_bounds__(128) snap_kernel(gpu_inflate_input_s *inputs, gpu_inflate_status_s *outputs, int count) { __shared__ __align__(16) snap_state_s state_g; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 0b5a8a0d501..0a4a5d96a4e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -36,6 +36,8 @@ #include #include +#include + #include #include #include @@ -792,6 +794,136 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & stream.synchronize(); } +template +void print(rmm::device_uvector const &d_vec, std::string label = "") +{ + std::vector h_vec(d_vec.size()); + cudaMemcpy(h_vec.data(), d_vec.data(), d_vec.size() * sizeof(T), cudaMemcpyDeviceToHost); + printf("%s (%lu)\t", label.c_str(), h_vec.size()); + for (auto &&i : h_vec) std::cout << (int)i << " "; + printf("\n"); +} + +template +void print(rmm::device_vector const &d_vec, std::string label = "") +{ + thrust::host_vector h_vec = d_vec; + printf("%s \t", label.c_str()); + for (auto &&i : h_vec) std::cout << i << " "; + printf("\n"); +} + +struct printer { + template + std::enable_if_t(), void> operator()(column_view const &col, + std::string label = "") + { + auto d_vec = rmm::device_vector(col.begin(), col.end()); + print(d_vec, label); + } + template + std::enable_if_t(), void> operator()(column_view const &col, + std::string label = "") + { + CUDF_FAIL("no strings"); + } +}; +void print(column_view const &col, std::string label = "") +{ + cudf::type_dispatcher(col.type(), printer{}, col, label); +} + +void snappy_compress(device_span comp_in, + device_span comp_stat, + rmm::cuda_stream_view stream) +{ + size_t num_comp_pages = comp_in.size(); + size_t temp_size; + nvcompError_t nvcomp_error = + nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, 1 << 20, &temp_size); + // TODO: Continue with uncompressed if nvcomp fails at any step + CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, "Unable to get temporary size"); + + // Not needed now but nvcomp API makes no promises about future + rmm::device_buffer scratch(temp_size, stream); + // Analogous to comp_in.srcDevice + rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); + // Analogous to comp_in.srcSize + rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); + // Analogous to comp_in.dstDevice + rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); + // Analogous to comp_stat.bytes_written + rmm::device_vector compressed_bytes_written(num_comp_pages); + + rmm::device_uvector dstSizes(num_comp_pages, stream); + // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in + // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() + // TODO: Replace our allocation size computation with value obtained from aforementioned API + + // Prepare the vectors + auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.begin(), + compressed_data_ptrs.begin(), + dstSizes.begin()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); + }); + nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + num_comp_pages, + scratch.data(), // Not needed rn but future + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data().get(), + stream.value()); + + // rmm::device_uvector new_comp_in(num_comp_pages, stream); + // thrust::transform(rmm::exec_policy(), + // comp_it, + // comp_it + num_comp_pages, + // new_comp_in.begin(), + // [] __device__(auto zip) { + // gpu_inflate_input_s status{}; + // status.srcDevice = thrust::get<0>(zip); + // status.srcSize = thrust::get<1>(zip); + // status.dstDevice = thrust::get<2>(zip); + // status.dstSize = thrust::get<3>(zip); + // return status; + // }); + // gpu_snap(comp_in.data(), comp_stat.data(), num_comp_pages, stream); + + // thrust::transform(rmm::exec_policy(stream), + // comp_stat.begin(), + // comp_stat.end(), + // compressed_bytes_written.begin(), + // [] __device__(gpu_inflate_status_s status) { return status.bytes_written; }); + + print(uncompressed_data_sizes, "uncomp size"); + print(dstSizes, "dstSizes from cuIO"); + print(compressed_bytes_written, "written by nvcomp"); + CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, + "Unable to perform snappy compression"); + + // nvcomp also doesn't use comp_out.status . Even though it has it in the kernel, it doesn't + // expose it from the API and ignores it internally. Need to rely on nvcompError and the + // compression will be all or nothing rather than per page. + // TODO: Get to it or file a request with the nvcomp team to expose per page status output + // The other comp_out field is reserved which is for internal cuIO debugging and can be 0. + thrust::transform(rmm::exec_policy(stream), + compressed_bytes_written.begin(), + compressed_bytes_written.end(), + comp_stat.begin(), + [] __device__(size_t size) { + gpu_inflate_status_s status{}; + status.bytes_written = size; + return status; + }); +} + void writer::impl::encode_pages(hostdevice_2dvector &chunks, device_span pages, uint32_t pages_in_batch, @@ -820,7 +952,8 @@ void writer::impl::encode_pages(hostdevice_2dvector &chunks gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); switch (compression_) { case parquet::Compression::SNAPPY: - CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); + // CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); + snappy_compress(comp_in, comp_stat, stream); break; default: break; } From a5f336347dbdef253160e336fe33a13fc45f79f1 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 13 May 2021 01:57:39 +0530 Subject: [PATCH 02/24] Using nvcomp provided max compressed buffer size --- cpp/src/io/parquet/page_enc.cu | 17 +++++- cpp/src/io/parquet/parquet_gpu.hpp | 8 ++- cpp/src/io/parquet/writer_impl.cu | 91 ++++++++++++++---------------- cpp/src/io/parquet/writer_impl.hpp | 4 ++ 4 files changed, 66 insertions(+), 54 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index bf9114949aa..aee80b1ae7b 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -410,6 +410,7 @@ __global__ void __launch_bounds__(128) device_span col_desc, statistics_merge_group *page_grstats, statistics_merge_group *chunk_grstats, + size_t max_page_comp_data_size, int32_t num_columns) { // TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach @@ -439,6 +440,8 @@ __global__ void __launch_bounds__(128) uint32_t page_offset = ck_g.ck_stat_size; uint32_t num_dict_entries = 0; uint32_t comp_page_offset = ck_g.ck_stat_size; + uint32_t page_headers_size = 0; + uint32_t max_page_data_size = 0; uint32_t cur_row = ck_g.start_row; uint32_t ck_max_stats_len = 0; uint32_t max_stats_len = 0; @@ -465,7 +468,9 @@ __global__ void __launch_bounds__(128) page_g.num_leaf_values = ck_g.total_dict_entries; page_g.num_values = ck_g.total_dict_entries; page_offset += page_g.max_hdr_size + page_g.max_data_size; - comp_page_offset += page_g.max_hdr_size + GetMaxCompressedBfrSize(page_g.max_data_size); + comp_page_offset += page_g.max_hdr_size + max_page_comp_data_size; + page_headers_size += page_g.max_hdr_size; + max_page_data_size = max(max_page_data_size, page_g.max_data_size); } __syncwarp(); if (t == 0) { @@ -571,7 +576,9 @@ __global__ void __launch_bounds__(128) pagestats_g.start_chunk = ck_g.first_fragment + page_start; pagestats_g.num_chunks = page_g.num_fragments; page_offset += page_g.max_hdr_size + page_g.max_data_size; - comp_page_offset += page_g.max_hdr_size + GetMaxCompressedBfrSize(page_g.max_data_size); + comp_page_offset += page_g.max_hdr_size + max_page_comp_data_size; + page_headers_size += page_g.max_hdr_size; + max_page_data_size = max(max_page_data_size, page_g.max_data_size); cur_row += rows_in_page; ck_max_stats_len = max(ck_max_stats_len, max_stats_len); } @@ -610,6 +617,8 @@ __global__ void __launch_bounds__(128) ck_g.num_pages = num_pages; ck_g.bfr_size = page_offset; ck_g.compressed_size = comp_page_offset; + ck_g.page_headers_size = page_headers_size; + ck_g.max_page_data_size = max_page_data_size; pagestats_g.start_chunk = ck_g.first_page + ck_g.has_dictionary; // Exclude dictionary pagestats_g.num_chunks = num_pages - ck_g.has_dictionary; } @@ -2141,6 +2150,7 @@ void InitFragmentStatistics(device_2dspan groups, * @param[in] num_columns Number of columns * @param[out] page_grstats Setup for page-level stats * @param[out] chunk_grstats Setup for chunk-level stats + * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages * @param[in] stream CUDA stream to use, default 0 */ void InitEncoderPages(device_2dspan chunks, @@ -2149,12 +2159,13 @@ void InitEncoderPages(device_2dspan chunks, int32_t num_columns, statistics_merge_group *page_grstats, statistics_merge_group *chunk_grstats, + size_t max_page_comp_data_size, rmm::cuda_stream_view stream) { auto num_rowgroups = chunks.size().first; dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup gpuInitPages<<>>( - chunks, pages, col_desc, page_grstats, chunk_grstats, num_columns); + chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns); } /** diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 564226c7ff3..335407af19d 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -298,8 +298,10 @@ struct EncColumnChunk { statistics_chunk const *stats; //!< Fragment statistics uint32_t bfr_size; //!< Uncompressed buffer size uint32_t compressed_size; //!< Compressed buffer size - uint32_t start_row; //!< First row of chunk - uint32_t num_rows; //!< Number of rows in chunk + uint32_t max_page_data_size; //!< Max data size (excuding header) of any page in this chunk + uint32_t page_headers_size; //!< Sum of size of all page headers + uint32_t start_row; //!< First row of chunk + uint32_t num_rows; //!< Number of rows in chunk uint32_t num_values; //!< Number of values in chunk. Different from num_rows for nested types uint32_t first_fragment; //!< First fragment of chunk EncPage *pages; //!< Ptr to pages that belong to this chunk @@ -480,6 +482,7 @@ void InitFragmentStatistics(cudf::detail::device_2dspan groups * @param[in] num_columns Number of columns * @param[in] page_grstats Setup for page-level stats * @param[in] chunk_grstats Setup for chunk-level stats + * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages * @param[in] stream CUDA stream to use, default 0 */ void InitEncoderPages(cudf::detail::device_2dspan chunks, @@ -488,6 +491,7 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, int32_t num_columns, statistics_merge_group *page_grstats = nullptr, statistics_merge_group *chunk_grstats = nullptr, + size_t max_page_comp_data_size = 0, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 0a4a5d96a4e..0b07ac01487 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -759,7 +759,7 @@ void writer::impl::build_chunk_dictionaries( gpu::BuildChunkDictionaries(chunks.device_view().flat_view(), dict_scratch.data(), stream); } - gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, stream); + gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, 0, stream); chunks.device_to_host(stream, true); } @@ -768,6 +768,7 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & device_span pages, statistics_chunk *page_stats, statistics_chunk *frag_stats, + size_t max_page_comp_data_size, uint32_t num_columns, uint32_t num_pages, uint32_t num_stats_bfr) @@ -780,6 +781,7 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & num_columns, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, + max_page_comp_data_size, stream); if (num_stats_bfr > 0) { MergeColumnStatistics(page_stats, frag_stats, page_stats_mrg.data(), num_pages, stream); @@ -854,23 +856,18 @@ void snappy_compress(device_span comp_in, rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); // Analogous to comp_stat.bytes_written rmm::device_vector compressed_bytes_written(num_comp_pages); - - rmm::device_uvector dstSizes(num_comp_pages, stream); // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() - // TODO: Replace our allocation size computation with value obtained from aforementioned API // Prepare the vectors - auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), - uncompressed_data_sizes.begin(), - compressed_data_ptrs.begin(), - dstSizes.begin()); + auto comp_it = thrust::make_zip_iterator( + uncompressed_data_ptrs.begin(), uncompressed_data_sizes.begin(), compressed_data_ptrs.begin()); thrust::transform(rmm::exec_policy(stream), comp_in.begin(), comp_in.end(), comp_it, [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); }); nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), uncompressed_data_sizes.data(), @@ -881,38 +878,12 @@ void snappy_compress(device_span comp_in, compressed_bytes_written.data().get(), stream.value()); - // rmm::device_uvector new_comp_in(num_comp_pages, stream); - // thrust::transform(rmm::exec_policy(), - // comp_it, - // comp_it + num_comp_pages, - // new_comp_in.begin(), - // [] __device__(auto zip) { - // gpu_inflate_input_s status{}; - // status.srcDevice = thrust::get<0>(zip); - // status.srcSize = thrust::get<1>(zip); - // status.dstDevice = thrust::get<2>(zip); - // status.dstSize = thrust::get<3>(zip); - // return status; - // }); - // gpu_snap(comp_in.data(), comp_stat.data(), num_comp_pages, stream); - - // thrust::transform(rmm::exec_policy(stream), - // comp_stat.begin(), - // comp_stat.end(), - // compressed_bytes_written.begin(), - // [] __device__(gpu_inflate_status_s status) { return status.bytes_written; }); - - print(uncompressed_data_sizes, "uncomp size"); - print(dstSizes, "dstSizes from cuIO"); - print(compressed_bytes_written, "written by nvcomp"); CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, "Unable to perform snappy compression"); - // nvcomp also doesn't use comp_out.status . Even though it has it in the kernel, it doesn't - // expose it from the API and ignores it internally. Need to rely on nvcompError and the - // compression will be all or nothing rather than per page. - // TODO: Get to it or file a request with the nvcomp team to expose per page status output - // The other comp_out field is reserved which is for internal cuIO debugging and can be 0. + // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, + // compression will succeed. + // The other `comp_out` field is `reserved` which is for internal cuIO debugging and can be 0. thrust::transform(rmm::exec_policy(stream), compressed_bytes_written.begin(), compressed_bytes_written.end(), @@ -951,10 +922,7 @@ void writer::impl::encode_pages(hostdevice_2dvector &chunks gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); switch (compression_) { - case parquet::Compression::SNAPPY: - // CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); - snappy_compress(comp_in, comp_stat, stream); - break; + case parquet::Compression::SNAPPY: snappy_compress(comp_in, comp_stat, stream); break; default: break; } // TBD: Not clear if the official spec actually allows dynamically turning off compression at the @@ -1225,16 +1193,36 @@ void writer::impl::write(table_view const &table) build_chunk_dictionaries(chunks, col_desc, num_columns, num_dictionaries); } + // Get the maximum page size across all chunks + size_type max_page_uncomp_data_size = + std::accumulate(chunks.host_view().flat_view().begin(), + chunks.host_view().flat_view().end(), + 0, + [](uint32_t max_page_size, gpu::EncColumnChunk const &chunk) { + return std::max(max_page_size, chunk.max_page_data_size); + }); + + size_t max_page_comp_data_size = 0; + if (compression_ != parquet::Compression::UNCOMPRESSED) { + CUDF_EXPECTS( + nvcompError_t::nvcompSuccess == nvcompBatchedSnappyCompressGetOutputSize( + max_page_uncomp_data_size, &max_page_comp_data_size), + "Error in getting compressed size from nvcomp"); + } + // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) std::vector batch_list; uint32_t num_pages = 0; size_t max_bytes_in_batch = 1024 * 1024 * 1024; // 1GB - TBD: Tune this size_t max_uncomp_bfr_size = 0; + size_t max_comp_bfr_size = 0; size_t max_chunk_bfr_size = 0; uint32_t max_pages_in_batch = 0; size_t bytes_in_batch = 0; + size_t comp_bytes_in_batch = 0; for (uint32_t r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { - size_t rowgroup_size = 0; + size_t rowgroup_size = 0; + size_t comp_rowgroup_size = 0; if (r < num_rowgroups) { for (int i = 0; i < num_columns; i++) { gpu::EncColumnChunk *ck = &chunks[r][i]; @@ -1242,6 +1230,8 @@ void writer::impl::write(table_view const &table) num_pages += ck->num_pages; pages_in_batch += ck->num_pages; rowgroup_size += ck->bfr_size; + ck->compressed_size = ck->page_headers_size + max_page_comp_data_size * ck->num_pages; + comp_rowgroup_size += ck->compressed_size; max_chunk_bfr_size = std::max(max_chunk_bfr_size, (size_t)std::max(ck->bfr_size, ck->compressed_size)); } @@ -1250,23 +1240,25 @@ void writer::impl::write(table_view const &table) if ((r == num_rowgroups) || (groups_in_batch != 0 && bytes_in_batch + rowgroup_size > max_bytes_in_batch)) { max_uncomp_bfr_size = std::max(max_uncomp_bfr_size, bytes_in_batch); + max_comp_bfr_size = std::max(max_comp_bfr_size, comp_bytes_in_batch); max_pages_in_batch = std::max(max_pages_in_batch, pages_in_batch); if (groups_in_batch != 0) { batch_list.push_back(groups_in_batch); groups_in_batch = 0; } - bytes_in_batch = 0; - pages_in_batch = 0; + bytes_in_batch = 0; + comp_bytes_in_batch = 0; + pages_in_batch = 0; } bytes_in_batch += rowgroup_size; + comp_bytes_in_batch += comp_rowgroup_size; groups_in_batch++; } + // Clear compressed buffer size if compression has been turned off + if (compression_ == parquet::Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } + // Initialize data pointers in batch - size_t max_comp_bfr_size = - (compression_ != parquet::Compression::UNCOMPRESSED) - ? gpu::GetMaxCompressedBfrSize(max_uncomp_bfr_size, max_pages_in_batch) - : 0; uint32_t num_stats_bfr = (stats_granularity_ != statistics_freq::STATISTICS_NONE) ? num_pages + num_chunks : 0; rmm::device_buffer uncomp_bfr(max_uncomp_bfr_size, stream); @@ -1295,6 +1287,7 @@ void writer::impl::write(table_view const &table) {pages.data(), pages.size()}, (num_stats_bfr) ? page_stats.data() : nullptr, (num_stats_bfr) ? frag_stats.data() : nullptr, + max_page_comp_data_size, num_columns, num_pages, num_stats_bfr); diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 34c8347e79e..c2f0509a08b 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -165,6 +165,9 @@ class writer::impl { * @param chunks column chunk array * @param col_desc column description array * @param pages encoder pages array + * @param page_stats page statistics array + * @param frag_stats fragment statistics array + * @param max_page_comp_data_size max compressed * @param num_columns Total number of columns * @param num_pages Total number of pages * @param num_stats_bfr Number of statistics buffers @@ -174,6 +177,7 @@ class writer::impl { device_span pages, statistics_chunk* page_stats, statistics_chunk* frag_stats, + size_t max_page_comp_data_size, uint32_t num_columns, uint32_t num_pages, uint32_t num_stats_bfr); From 61018aa14aa5eef4f76a75f5fcaafd7039c7a20e Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 13 May 2021 02:28:30 +0530 Subject: [PATCH 03/24] Recover from error in nvcomp compressing and encode uncompressed. --- cpp/src/io/parquet/writer_impl.cu | 114 ++++++++++++++++-------------- 1 file changed, 61 insertions(+), 53 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 0b07ac01487..3b8d945fdff 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -840,59 +840,67 @@ void snappy_compress(device_span comp_in, rmm::cuda_stream_view stream) { size_t num_comp_pages = comp_in.size(); - size_t temp_size; - nvcompError_t nvcomp_error = - nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, 1 << 20, &temp_size); - // TODO: Continue with uncompressed if nvcomp fails at any step - CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, "Unable to get temporary size"); - - // Not needed now but nvcomp API makes no promises about future - rmm::device_buffer scratch(temp_size, stream); - // Analogous to comp_in.srcDevice - rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_in.srcSize - rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); - // Analogous to comp_in.dstDevice - rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_stat.bytes_written - rmm::device_vector compressed_bytes_written(num_comp_pages); - // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in - // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() - - // Prepare the vectors - auto comp_it = thrust::make_zip_iterator( - uncompressed_data_ptrs.begin(), uncompressed_data_sizes.begin(), compressed_data_ptrs.begin()); - thrust::transform(rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); - }); - nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), - uncompressed_data_sizes.data(), - num_comp_pages, - scratch.data(), // Not needed rn but future - scratch.size(), - compressed_data_ptrs.data(), - compressed_bytes_written.data().get(), - stream.value()); - - CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, - "Unable to perform snappy compression"); - - // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, - // compression will succeed. - // The other `comp_out` field is `reserved` which is for internal cuIO debugging and can be 0. - thrust::transform(rmm::exec_policy(stream), - compressed_bytes_written.begin(), - compressed_bytes_written.end(), - comp_stat.begin(), - [] __device__(size_t size) { - gpu_inflate_status_s status{}; - status.bytes_written = size; - return status; - }); + do { + size_t temp_size; + nvcompError_t nvcomp_error = + nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, 1 << 20, &temp_size); + if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } + + // Not needed now but nvcomp API makes no promises about future + rmm::device_buffer scratch(temp_size, stream); + // Analogous to comp_in.srcDevice + rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); + // Analogous to comp_in.srcSize + rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); + // Analogous to comp_in.dstDevice + rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); + // Analogous to comp_stat.bytes_written + rmm::device_vector compressed_bytes_written(num_comp_pages); + // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in + // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() + + // Prepare the vectors + auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.begin(), + compressed_data_ptrs.begin()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); + }); + nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + num_comp_pages, + scratch.data(), // Not needed rn but future + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data().get(), + stream.value()); + + if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } + + // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, + // compression will succeed. + // The other `comp_out` field is `reserved` which is for internal cuIO debugging and can be 0. + thrust::transform(rmm::exec_policy(stream), + compressed_bytes_written.begin(), + compressed_bytes_written.end(), + comp_stat.begin(), + [] __device__(size_t size) { + gpu_inflate_status_s status{}; + status.bytes_written = size; + return status; + }); + return; + } while (0); + + // If we reach this then there was an error in compressing so set an error status for each page + thrust::for_each(rmm::exec_policy(stream), + comp_stat.begin(), + comp_stat.end(), + [] __device__(gpu_inflate_status_s stat) { stat.status = 1; }); } void writer::impl::encode_pages(hostdevice_2dvector &chunks, From 64d7d1c8161738314ac05b2b50dce1d2f72a78b0 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 13 May 2021 13:29:32 +0530 Subject: [PATCH 04/24] review changes --- cpp/src/io/parquet/parquet_gpu.hpp | 25 +++++++++--------- cpp/src/io/parquet/writer_impl.cu | 41 +----------------------------- 2 files changed, 13 insertions(+), 53 deletions(-) diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 335407af19d..824aa38b0f1 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -489,10 +489,10 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, device_span pages, device_span col_desc, int32_t num_columns, - statistics_merge_group *page_grstats = nullptr, - statistics_merge_group *chunk_grstats = nullptr, - size_t max_page_comp_data_size = 0, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + statistics_merge_group *page_grstats, + statistics_merge_group *chunk_grstats, + size_t max_page_comp_data_size, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for packing column data into parquet pages @@ -503,9 +503,9 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, * @param[in] stream CUDA stream to use, default 0 */ void EncodePages(device_span pages, - device_span comp_in = {}, - device_span comp_out = {}, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + device_span comp_in, + device_span comp_out, + rmm::cuda_stream_view stream); /** * @brief Launches kernel to make the compressed vs uncompressed chunk-level decision @@ -513,8 +513,7 @@ void EncodePages(device_span pages, * @param[in,out] chunks Column chunks (updated with actual compressed/uncompressed sizes) * @param[in] stream CUDA stream to use, default 0 */ -void DecideCompression(device_span chunks, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); +void DecideCompression(device_span chunks, rmm::cuda_stream_view stream); /** * @brief Launches kernel to encode page headers @@ -526,10 +525,10 @@ void DecideCompression(device_span chunks, * @param[in] stream CUDA stream to use, default 0 */ void EncodePageHeaders(device_span pages, - device_span comp_out = {}, - device_span page_stats = {}, - const statistics_chunk *chunk_stats = nullptr, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + device_span comp_out, + device_span page_stats, + const statistics_chunk *chunk_stats, + rmm::cuda_stream_view stream); /** * @brief Launches kernel to gather pages to a single contiguous block per chunk diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 3b8d945fdff..141b74f2a60 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -796,45 +796,6 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & stream.synchronize(); } -template -void print(rmm::device_uvector const &d_vec, std::string label = "") -{ - std::vector h_vec(d_vec.size()); - cudaMemcpy(h_vec.data(), d_vec.data(), d_vec.size() * sizeof(T), cudaMemcpyDeviceToHost); - printf("%s (%lu)\t", label.c_str(), h_vec.size()); - for (auto &&i : h_vec) std::cout << (int)i << " "; - printf("\n"); -} - -template -void print(rmm::device_vector const &d_vec, std::string label = "") -{ - thrust::host_vector h_vec = d_vec; - printf("%s \t", label.c_str()); - for (auto &&i : h_vec) std::cout << i << " "; - printf("\n"); -} - -struct printer { - template - std::enable_if_t(), void> operator()(column_view const &col, - std::string label = "") - { - auto d_vec = rmm::device_vector(col.begin(), col.end()); - print(d_vec, label); - } - template - std::enable_if_t(), void> operator()(column_view const &col, - std::string label = "") - { - CUDF_FAIL("no strings"); - } -}; -void print(column_view const &col, std::string label = "") -{ - cudf::type_dispatcher(col.type(), printer{}, col, label); -} - void snappy_compress(device_span comp_in, device_span comp_stat, rmm::cuda_stream_view stream) @@ -900,7 +861,7 @@ void snappy_compress(device_span comp_in, thrust::for_each(rmm::exec_policy(stream), comp_stat.begin(), comp_stat.end(), - [] __device__(gpu_inflate_status_s stat) { stat.status = 1; }); + [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); } void writer::impl::encode_pages(hostdevice_2dvector &chunks, From 27764e76f9152fac9072d199759222ca74d6b46c Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 15 May 2021 00:14:30 +0530 Subject: [PATCH 05/24] Replace accidental vector with uvector. --- cpp/src/io/parquet/writer_impl.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 141b74f2a60..79d049ac69a 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -816,7 +816,7 @@ void snappy_compress(device_span comp_in, // Analogous to comp_in.dstDevice rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); // Analogous to comp_stat.bytes_written - rmm::device_vector compressed_bytes_written(num_comp_pages); + rmm::device_uvector compressed_bytes_written(num_comp_pages, stream); // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() @@ -837,7 +837,7 @@ void snappy_compress(device_span comp_in, scratch.data(), // Not needed rn but future scratch.size(), compressed_data_ptrs.data(), - compressed_bytes_written.data().get(), + compressed_bytes_written.data(), stream.value()); if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } From 95a57ec6d1eaa3c71ecd6fcb30fc400d3cabede2 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 15 May 2021 00:16:18 +0530 Subject: [PATCH 06/24] Provide the actual max uncomp page size to nvcomp's temp size estimator rather than a hardcoded value --- cpp/src/io/parquet/writer_impl.cu | 9 +++++++-- cpp/src/io/parquet/writer_impl.hpp | 2 ++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 79d049ac69a..6ad73d6f6f6 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -798,13 +798,14 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & void snappy_compress(device_span comp_in, device_span comp_stat, + size_t max_page_uncomp_data_size, rmm::cuda_stream_view stream) { size_t num_comp_pages = comp_in.size(); do { size_t temp_size; nvcompError_t nvcomp_error = - nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, 1 << 20, &temp_size); + nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, max_page_uncomp_data_size, &temp_size); if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } // Not needed now but nvcomp API makes no promises about future @@ -866,6 +867,7 @@ void snappy_compress(device_span comp_in, void writer::impl::encode_pages(hostdevice_2dvector &chunks, device_span pages, + size_t max_page_uncomp_data_size, uint32_t pages_in_batch, uint32_t first_page_in_batch, uint32_t rowgroups_in_batch, @@ -891,7 +893,9 @@ void writer::impl::encode_pages(hostdevice_2dvector &chunks gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); switch (compression_) { - case parquet::Compression::SNAPPY: snappy_compress(comp_in, comp_stat, stream); break; + case parquet::Compression::SNAPPY: + snappy_compress(comp_in, comp_stat, max_page_uncomp_data_size, stream); + break; default: break; } // TBD: Not clear if the official spec actually allows dynamically turning off compression at the @@ -1277,6 +1281,7 @@ void writer::impl::write(table_view const &table) encode_pages( chunks, {pages.data(), pages.size()}, + max_page_uncomp_data_size, pages_in_batch, first_page_in_batch, batch_list[b], diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index c2f0509a08b..8848d4ffdc0 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -186,6 +186,7 @@ class writer::impl { * * @param chunks column chunk array * @param pages encoder pages array + * @param max_page_uncomp_data_size maximum uncompressed size of any page's data * @param pages_in_batch number of pages in this batch * @param first_page_in_batch first page in batch * @param rowgroups_in_batch number of rowgroups in this batch @@ -195,6 +196,7 @@ class writer::impl { */ void encode_pages(hostdevice_2dvector& chunks, device_span pages, + size_t max_page_uncomp_data_size, uint32_t pages_in_batch, uint32_t first_page_in_batch, uint32_t rowgroups_in_batch, From cc9500abba3c1b3b411caa04d2c126af7997508a Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 15 May 2021 00:45:46 +0530 Subject: [PATCH 07/24] cmake changes requested in review --- cpp/cmake/cudf-config.cmake.in | 1 - cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/cmake/cudf-config.cmake.in b/cpp/cmake/cudf-config.cmake.in index 950c08767f6..1de968f6a95 100644 --- a/cpp/cmake/cudf-config.cmake.in +++ b/cpp/cmake/cudf-config.cmake.in @@ -83,7 +83,6 @@ find_dependency(ArrowCUDA @CUDF_VERSION_Arrow@) find_dependency(rmm @CUDF_MIN_VERSION_rmm@) -find_dependency(nvCOMP @CUDF_MIN_VERSION_nvCOMP@) include("${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") set(Thrust_ROOT "${CMAKE_CURRENT_LIST_DIR}/../../../include/libcudf/Thrust") diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake index 4fec448e9a9..1272728770c 100644 --- a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake +++ b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake @@ -25,8 +25,7 @@ function(find_and_configure_nvcomp VERSION) VERSION ${VERSION} GIT_REPOSITORY https://github.com/NVIDIA/nvcomp.git GIT_TAG v${VERSION} - GIT_SHALLOW TRUE - OPTIONS ) + GIT_SHALLOW TRUE) if(NOT TARGET nvCOMP::nvcomp) add_library(nvCOMP::nvcomp ALIAS nvcomp) From 40ebd1e04f538bae118f2b99b2379be065331b63 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 25 Aug 2021 01:24:09 +0530 Subject: [PATCH 08/24] Update parquet writer to use nvcomp 2.1 --- cpp/CMakeLists.txt | 20 +------------- cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake | 28 ++++++++++--------- cpp/src/io/parquet/writer_impl.cu | 33 ++++++++++++----------- 3 files changed, 35 insertions(+), 46 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 1bc4e65338c..60f9cd510cd 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -532,7 +532,7 @@ target_link_libraries(cudf PUBLIC ZLIB::ZLIB ${ARROW_LIBRARIES} cudf::Thrust - nvCOMP::nvcomp + nvcomp::nvcomp rmm::rmm PRIVATE cuco::cuco) @@ -651,10 +651,6 @@ install(TARGETS cudf DESTINATION lib EXPORT cudf-targets) -install(TARGETS nvcomp - DESTINATION lib - EXPORT cudf-nvcomp-target) - install(DIRECTORY ${CUDF_SOURCE_DIR}/include/cudf ${CUDF_SOURCE_DIR}/include/cudf_test @@ -688,11 +684,6 @@ install(EXPORT cudf-testing-targets NAMESPACE cudf:: DESTINATION "${INSTALL_CONFIGDIR}") -install(EXPORT cudf-nvcomp-target - FILE cudf-nvcomp-target.cmake - NAMESPACE nvCOMP:: - DESTINATION "${INSTALL_CONFIGDIR}") - ################################################################################################ # - build export ------------------------------------------------------------------------------- configure_package_config_file(cmake/cudf-build-config.cmake.in ${CUDF_BINARY_DIR}/cudf-config.cmake @@ -710,15 +701,6 @@ if(TARGET gtest) endif() endif() -if(TARGET nvcomp) - get_target_property(nvcomp_is_imported nvcomp IMPORTED) - if(NOT nvcomp_is_imported) - export(TARGETS nvcomp - FILE ${CUDF_BINARY_DIR}/cudf-nvcomp-target.cmake - NAMESPACE nvCOMP::) - endif() -endif() - export(EXPORT cudf-targets FILE ${CUDF_BINARY_DIR}/cudf-targets.cmake NAMESPACE cudf::) diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake index 1272728770c..c88d462e959 100644 --- a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake +++ b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake @@ -16,25 +16,29 @@ function(find_and_configure_nvcomp VERSION) - if(TARGET nvCOMP::nvcomp) + if(TARGET nvcomp::nvcomp) return() endif() - # Find or install nvCOMP - CPMFindPackage(NAME nvCOMP - VERSION ${VERSION} - GIT_REPOSITORY https://github.com/NVIDIA/nvcomp.git - GIT_TAG v${VERSION} - GIT_SHALLOW TRUE) + # Find or install nvcomp + CPMFindPackage(NAME nvcomp + VERSION ${VERSION} + GITHUB_REPOSITORY NVIDIA/nvcomp + GIT_TAG f5b8dee714bd2970d8230efa95f337c91f080257 + GIT_SHALLOW TRUE + OPTIONS "BUILD_TESTS OFF" + "BUILD_BENCHMARKS OFF" + "BUILD_EXAMPLES OFF" + ) - if(NOT TARGET nvCOMP::nvcomp) - add_library(nvCOMP::nvcomp ALIAS nvcomp) + if(NOT TARGET nvcomp::nvcomp) + add_library(nvcomp::nvcomp ALIAS nvcomp) endif() - # Make sure consumers of cudf can also see nvCOMP::nvcomp target - fix_cmake_global_defaults(nvCOMP::nvcomp) + # Make sure consumers of cudf can also see nvcomp::nvcomp target + fix_cmake_global_defaults(nvcomp::nvcomp) endfunction() -set(CUDF_MIN_VERSION_nvCOMP 2.0.0) +set(CUDF_MIN_VERSION_nvCOMP 2.1.0) find_and_configure_nvcomp(${CUDF_MIN_VERSION_nvCOMP}) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 5530769a819..06d75d8b575 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -898,9 +898,9 @@ void snappy_compress(device_span comp_in, size_t num_comp_pages = comp_in.size(); do { size_t temp_size; - nvcompError_t nvcomp_error = - nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, max_page_uncomp_data_size, &temp_size); - if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } + nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( + num_comp_pages, max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &temp_size); + if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } // Not needed now but nvcomp API makes no promises about future rmm::device_buffer scratch(temp_size, stream); @@ -926,16 +926,18 @@ void snappy_compress(device_span comp_in, [] __device__(gpu_inflate_input_s in) { return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); }); - nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), - uncompressed_data_sizes.data(), - num_comp_pages, - scratch.data(), // Not needed rn but future - scratch.size(), - compressed_data_ptrs.data(), - compressed_bytes_written.data(), - stream.value()); - - if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } + nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + max_page_uncomp_data_size, + num_comp_pages, + scratch.data(), // Not needed rn but future + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data(), + nvcompBatchedSnappyDefaultOpts, + stream.value()); + + if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, // compression will succeed. @@ -1257,8 +1259,9 @@ void writer::impl::write(table_view const& table) size_t max_page_comp_data_size = 0; if (compression_ != parquet::Compression::UNCOMPRESSED) { CUDF_EXPECTS( - nvcompError_t::nvcompSuccess == nvcompBatchedSnappyCompressGetOutputSize( - max_page_uncomp_data_size, &max_page_comp_data_size), + nvcompStatus_t::nvcompSuccess == + nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size), "Error in getting compressed size from nvcomp"); } From 4a2cb245161059bfcc47e52730743d0c6804443f Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 25 Aug 2021 01:32:14 +0530 Subject: [PATCH 09/24] One more cmake change related to updating nvcomp --- cpp/cmake/cudf-build-config.cmake.in | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cpp/cmake/cudf-build-config.cmake.in b/cpp/cmake/cudf-build-config.cmake.in index a52881459f5..4b5ad8ebb8d 100644 --- a/cpp/cmake/cudf-build-config.cmake.in +++ b/cpp/cmake/cudf-build-config.cmake.in @@ -81,13 +81,6 @@ else() include(@CUDF_SOURCE_DIR@/cmake/thirdparty/CUDF_GetGTest.cmake) endif() -# find nvCOMP -if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") - include("${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") -else() - include(@CUDF_SOURCE_DIR@/cmake/thirdparty/CUDF_GetnvCOMP.cmake) -endif() - list(POP_FRONT CMAKE_MODULE_PATH) From 6019b0f22b1220a304c3e45fb0fc09bc6faffddb Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Tue, 31 Aug 2021 23:00:45 +0530 Subject: [PATCH 10/24] Update nvcomp to version with fix for snappy decompressor --- cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake index c88d462e959..fc1a5e583a9 100644 --- a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake +++ b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake @@ -24,7 +24,7 @@ function(find_and_configure_nvcomp VERSION) CPMFindPackage(NAME nvcomp VERSION ${VERSION} GITHUB_REPOSITORY NVIDIA/nvcomp - GIT_TAG f5b8dee714bd2970d8230efa95f337c91f080257 + GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 GIT_SHALLOW TRUE OPTIONS "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" From 140d3d0fa0b029f1f9cfd4842f0dd267a0d58a0c Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 3 Sep 2021 02:01:47 +0530 Subject: [PATCH 11/24] Fix allocation size bug When writing statistics, there's not enough space allocated in chunk's compressed buffer. This results in the compressed buffer being written into another chunk's memory. --- cpp/src/io/parquet/page_enc.cu | 1 - cpp/src/io/parquet/writer_impl.cu | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index d88ca7a3b2a..5b1d8c846bf 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -423,7 +423,6 @@ __global__ void __launch_bounds__(128) } ck_g.num_pages = num_pages; ck_g.bfr_size = page_offset; - ck_g.compressed_size = comp_page_offset; ck_g.page_headers_size = page_headers_size; ck_g.max_page_data_size = max_page_data_size; pagestats_g.start_chunk = ck_g.first_page + ck_g.use_dictionary; // Exclude dictionary diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 06d75d8b575..fff247fd47f 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1285,7 +1285,8 @@ void writer::impl::write(table_view const& table) num_pages += ck->num_pages; pages_in_batch += ck->num_pages; rowgroup_size += ck->bfr_size; - ck->compressed_size = ck->page_headers_size + max_page_comp_data_size * ck->num_pages; + ck->compressed_size = + ck->ck_stat_size + ck->page_headers_size + max_page_comp_data_size * ck->num_pages; comp_rowgroup_size += ck->compressed_size; max_chunk_bfr_size = std::max(max_chunk_bfr_size, (size_t)std::max(ck->bfr_size, ck->compressed_size)); From 62d92b4bb9f019dcc9079b52e8b8c9d6e4139446 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 4 Sep 2021 00:42:20 +0530 Subject: [PATCH 12/24] Update cmake to find nvcomp in new manner --- cpp/CMakeLists.txt | 2 +- .../{CUDF_GetnvCOMP.cmake => get_nvcomp.cmake} | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) rename cpp/cmake/thirdparty/{CUDF_GetnvCOMP.cmake => get_nvcomp.cmake} (74%) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 4c4fd39e612..259cc884ee1 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -115,7 +115,7 @@ rapids_cpm_init() # find jitify include(cmake/thirdparty/get_jitify.cmake) # find nvCOMP -include(cmake/thirdparty/CUDF_GetnvCOMP.cmake) +include(cmake/thirdparty/get_nvcomp.cmake) # find thrust/cub include(cmake/thirdparty/get_thrust.cmake) # find rmm diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/get_nvcomp.cmake similarity index 74% rename from cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake rename to cpp/cmake/thirdparty/get_nvcomp.cmake index fc1a5e583a9..ae83a78c3fb 100644 --- a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake +++ b/cpp/cmake/thirdparty/get_nvcomp.cmake @@ -22,21 +22,21 @@ function(find_and_configure_nvcomp VERSION) # Find or install nvcomp CPMFindPackage(NAME nvcomp + GLOBAL_TARGETS nvcomp::nvcomp VERSION ${VERSION} - GITHUB_REPOSITORY NVIDIA/nvcomp - GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 - GIT_SHALLOW TRUE - OPTIONS "BUILD_TESTS OFF" - "BUILD_BENCHMARKS OFF" - "BUILD_EXAMPLES OFF" + CPM_ARGS + GITHUB_REPOSITORY NVIDIA/nvcomp + GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 + # GIT_SHALLOW TRUE + OPTIONS "BUILD_TESTS OFF" + "BUILD_BENCHMARKS OFF" + "BUILD_EXAMPLES OFF" ) if(NOT TARGET nvcomp::nvcomp) add_library(nvcomp::nvcomp ALIAS nvcomp) endif() - # Make sure consumers of cudf can also see nvcomp::nvcomp target - fix_cmake_global_defaults(nvcomp::nvcomp) endfunction() set(CUDF_MIN_VERSION_nvCOMP 2.1.0) From 3c73be31cac6617604cf2d652900a402fc01395b Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 8 Sep 2021 03:14:25 +0530 Subject: [PATCH 13/24] Make nvcomp private in cmake and update get_nvcomp --- cpp/CMakeLists.txt | 6 +++--- cpp/cmake/thirdparty/get_nvcomp.cmake | 7 +------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 259cc884ee1..1ec166653f0 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -510,9 +510,9 @@ target_link_libraries(cudf PUBLIC ZLIB::ZLIB ${ARROW_LIBRARIES} cudf::Thrust - nvcomp::nvcomp rmm::rmm - PRIVATE cuco::cuco) + PRIVATE cuco::cuco + nvcomp::nvcomp) # Add Conda library, and include paths if specified if(TARGET conda_env) @@ -751,7 +751,7 @@ rapids_export(BUILD cudf EXPORT_SET cudf-exports GLOBAL_TARGETS cudf NAMESPACE cudf:: - FINAL_CODE_BLOCK code_string) + FINAL_CODE_BLOCK build_code_string) export(EXPORT cudf-testing-exports FILE ${CUDF_BINARY_DIR}/cudf-testing.cmake diff --git a/cpp/cmake/thirdparty/get_nvcomp.cmake b/cpp/cmake/thirdparty/get_nvcomp.cmake index ae83a78c3fb..d9dd032875d 100644 --- a/cpp/cmake/thirdparty/get_nvcomp.cmake +++ b/cpp/cmake/thirdparty/get_nvcomp.cmake @@ -16,14 +16,9 @@ function(find_and_configure_nvcomp VERSION) - if(TARGET nvcomp::nvcomp) - return() - endif() - # Find or install nvcomp - CPMFindPackage(NAME nvcomp + rapids_cpm_find(nvcomp ${VERSION} GLOBAL_TARGETS nvcomp::nvcomp - VERSION ${VERSION} CPM_ARGS GITHUB_REPOSITORY NVIDIA/nvcomp GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 From e0a013d6bc14bb397dc82d0410f273260d4ff3d3 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 8 Sep 2021 17:43:31 +0530 Subject: [PATCH 14/24] Add an env var flip switch to choose b/w nvcomp and inbuilt compressor --- cpp/src/io/parquet/writer_impl.cu | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index fff247fd47f..831e082a8bf 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -988,9 +988,15 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks device_span comp_stat{compression_status.data(), compression_status.size()}; gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); + auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP"); + bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0; switch (compression_) { case parquet::Compression::SNAPPY: - snappy_compress(comp_in, comp_stat, max_page_uncomp_data_size, stream); + if (use_nvcomp) { + snappy_compress(comp_in, comp_stat, max_page_uncomp_data_size, stream); + } else { + CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); + } break; default: break; } From bfa13661dc7ebb43972b773d0aa992f7478afb32 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 8 Sep 2021 22:20:29 +0530 Subject: [PATCH 15/24] Static linking nvcomp into libcudf --- cpp/CMakeLists.txt | 2 ++ cpp/cmake/thirdparty/get_nvcomp.cmake | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 18e8e14efb1..ad93a24a5d8 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -505,6 +505,8 @@ target_compile_definitions(cudf PUBLIC "SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_${RMM_L # Compile stringified JIT sources first add_dependencies(cudf jitify_preprocess_run) +set_target_properties(nvcomp PROPERTIES POSITION_INDEPENDENT_CODE ON) + # Specify the target module library dependencies target_link_libraries(cudf PUBLIC ZLIB::ZLIB diff --git a/cpp/cmake/thirdparty/get_nvcomp.cmake b/cpp/cmake/thirdparty/get_nvcomp.cmake index d9dd032875d..49a1bac7ee4 100644 --- a/cpp/cmake/thirdparty/get_nvcomp.cmake +++ b/cpp/cmake/thirdparty/get_nvcomp.cmake @@ -23,7 +23,8 @@ function(find_and_configure_nvcomp VERSION) GITHUB_REPOSITORY NVIDIA/nvcomp GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 # GIT_SHALLOW TRUE - OPTIONS "BUILD_TESTS OFF" + OPTIONS "BUILD_STATIC ON" + "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" "BUILD_EXAMPLES OFF" ) From 203cf151240c8fcc3bbe7cc424cd7c255f314a71 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 10 Sep 2021 02:19:41 +0530 Subject: [PATCH 16/24] Review changes --- cpp/src/io/parquet/writer_impl.cu | 33 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 831e082a8bf..0382a7bb7ba 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -890,17 +890,19 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& stream.synchronize(); } -void snappy_compress(device_span comp_in, +void snappy_compress(device_span comp_in, device_span comp_stat, size_t max_page_uncomp_data_size, rmm::cuda_stream_view stream) { size_t num_comp_pages = comp_in.size(); - do { + try { size_t temp_size; nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( num_comp_pages, max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &temp_size); - if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, + "Error in getting snappy compression scratch size"); // Not needed now but nvcomp API makes no promises about future rmm::device_buffer scratch(temp_size, stream); @@ -937,7 +939,7 @@ void snappy_compress(device_span comp_in, nvcompBatchedSnappyDefaultOpts, stream.value()); - if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, // compression will succeed. @@ -952,13 +954,13 @@ void snappy_compress(device_span comp_in, return status; }); return; - } while (0); - - // If we reach this then there was an error in compressing so set an error status for each page - thrust::for_each(rmm::exec_policy(stream), - comp_stat.begin(), - comp_stat.end(), - [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + } catch (...) { + // If we reach this then there was an error in compressing so set an error status for each page + thrust::for_each(rmm::exec_policy(stream), + comp_stat.begin(), + comp_stat.end(), + [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + }; } void writer::impl::encode_pages(hostdevice_2dvector& chunks, @@ -1264,11 +1266,10 @@ void writer::impl::write(table_view const& table) size_t max_page_comp_data_size = 0; if (compression_ != parquet::Compression::UNCOMPRESSED) { - CUDF_EXPECTS( - nvcompStatus_t::nvcompSuccess == - nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size), - "Error in getting compressed size from nvcomp"); + auto status = nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size); + CUDF_EXPECTS(status == nvcompStatus_t::nvcompSuccess, + "Error in getting compressed size from nvcomp"); } // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) From 99e4f80b79f3d8118996b5ce91431731712b5ec7 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 11 Sep 2021 01:44:03 +0530 Subject: [PATCH 17/24] Working orc reader with nvcomp --- cpp/src/io/orc/orc_gpu.h | 10 ++-- cpp/src/io/orc/reader_impl.cu | 86 +++++++++++++++++++++++++++++++++-- cpp/src/io/orc/stripe_init.cu | 26 ++++++----- 3 files changed, 103 insertions(+), 19 deletions(-) diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 004812615eb..30687331c15 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -48,7 +48,8 @@ struct CompressedStreamInfo { copyctl(nullptr), num_compressed_blocks(0), num_uncompressed_blocks(0), - max_uncompressed_size(0) + max_uncompressed_size(0), + max_uncompressed_block_size(0) { } const uint8_t* compressed_data; // [in] base ptr to compressed stream data @@ -60,9 +61,10 @@ struct CompressedStreamInfo { copyctl; // [in] base ptr to copy structure to be filled for uncompressed blocks uint32_t num_compressed_blocks; // [in,out] number of entries in decctl(in), number of compressed // blocks(out) - uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of - // uncompressed blocks(out) - uint64_t max_uncompressed_size; // [out] maximum uncompressed data size + uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of + // uncompressed blocks(out) + uint64_t max_uncompressed_size; // [out] maximum uncompressed data size of stream + uint32_t max_uncompressed_block_size; // [out] maximum uncompressed size of any block in stream }; enum StreamIndexType { diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 83be58f5e56..e6d747be606 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -38,6 +38,8 @@ #include #include +#include + #include #include @@ -552,6 +554,68 @@ class aggregate_orc_metadata { } }; +void snappy_decompress(device_span comp_in, + device_span comp_stat, + size_t max_uncomp_page_size, + rmm::cuda_stream_view stream) +{ + size_t num_blocks = comp_in.size(); + size_t temp_size; + + auto status = + nvcompBatchedSnappyDecompressGetTempSize(num_blocks, max_uncomp_page_size, &temp_size); + CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, + "Unable to get scratch size for snappy decompression"); + + rmm::device_buffer scratch(temp_size, stream); + rmm::device_uvector compressed_data_ptrs(num_blocks, stream); + rmm::device_uvector compressed_data_sizes(num_blocks, stream); + rmm::device_uvector uncompressed_data_ptrs(num_blocks, stream); + rmm::device_uvector uncompressed_data_sizes(num_blocks, stream); + + rmm::device_uvector actual_uncompressed_data_sizes(num_blocks, stream); + rmm::device_uvector statuses(num_blocks, stream); + + // Prepare the vectors + auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(), + compressed_data_sizes.begin(), + uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.data()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); + }); + + status = nvcompBatchedSnappyDecompressAsync(compressed_data_ptrs.data(), + compressed_data_sizes.data(), + uncompressed_data_sizes.data(), + actual_uncompressed_data_sizes.data(), + num_blocks, + scratch.data(), + scratch.size(), + uncompressed_data_ptrs.data(), + statuses.data(), + stream.value()); + CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, "unable to perform snappy decompression"); + + CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream), + statuses.begin(), + statuses.end(), + thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)), + "Error during snappy decompression"); + thrust::for_each_n( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + num_blocks, + [=, actual_uncomp_sizes = actual_uncompressed_data_sizes.data()] __device__(auto i) { + comp_stat[i].bytes_written = actual_uncomp_sizes[i]; + comp_stat[i].status = 0; + }); +} + rmm::device_buffer reader::impl::decompress_stripe_data( cudf::detail::hostdevice_2dvector& chunks, const std::vector& stripe_data, @@ -595,9 +659,10 @@ rmm::device_buffer reader::impl::decompress_stripe_data( rmm::device_uvector inflate_out(num_compressed_blocks, stream); // Parse again to populate the decompression input/output buffers - size_t decomp_offset = 0; - uint32_t start_pos = 0; - uint32_t start_pos_uncomp = (uint32_t)num_compressed_blocks; + size_t decomp_offset = 0; + uint32_t max_uncomp_block_size = 0; + uint32_t start_pos = 0; + uint32_t start_pos_uncomp = (uint32_t)num_compressed_blocks; for (size_t i = 0; i < compinfo.size(); ++i) { auto dst_base = static_cast(decomp_data.data()); compinfo[i].uncompressed_data = dst_base + decomp_offset; @@ -609,6 +674,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( decomp_offset += compinfo[i].max_uncompressed_size; start_pos += compinfo[i].num_compressed_blocks; start_pos_uncomp += compinfo[i].num_uncompressed_blocks; + max_uncomp_block_size = + std::max(max_uncomp_block_size, compinfo[i].max_uncompressed_block_size); } compinfo.host_to_device(stream); gpu::ParseCompressedStripeData(compinfo.device_ptr(), @@ -619,13 +686,24 @@ rmm::device_buffer reader::impl::decompress_stripe_data( // Dispatch batches of blocks to decompress if (num_compressed_blocks > 0) { + auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP"); + bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0; switch (decompressor->GetKind()) { case orc::ZLIB: CUDA_TRY( gpuinflate(inflate_in.data(), inflate_out.data(), num_compressed_blocks, 0, stream)); break; case orc::SNAPPY: - CUDA_TRY(gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream)); + if (use_nvcomp) { + device_span inflate_in_view{inflate_in.data(), + num_compressed_blocks}; + device_span inflate_out_view{inflate_out.data(), + num_compressed_blocks}; + snappy_decompress(inflate_in_view, inflate_out_view, max_uncomp_block_size, stream); + } else { + CUDA_TRY( + gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream)); + } break; default: CUDF_EXPECTS(false, "Unexpected decompression dispatch"); break; } diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index abef4f57894..94d8de6561b 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -45,12 +45,13 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat __syncthreads(); if (strm_id < num_streams) { // Walk through the compressed blocks - const uint8_t* cur = s->info.compressed_data; - const uint8_t* end = cur + s->info.compressed_data_size; - uint8_t* uncompressed = s->info.uncompressed_data; - size_t max_uncompressed_size = 0; - uint32_t num_compressed_blocks = 0; - uint32_t num_uncompressed_blocks = 0; + const uint8_t* cur = s->info.compressed_data; + const uint8_t* end = cur + s->info.compressed_data_size; + uint8_t* uncompressed = s->info.uncompressed_data; + size_t max_uncompressed_size = 0; + uint32_t max_uncompressed_block_size = 0; + uint32_t num_compressed_blocks = 0; + uint32_t num_uncompressed_blocks = 0; while (cur + 3 < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); uint32_t is_uncompressed = block_len & 1; @@ -60,8 +61,9 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat cur += 3; if (block_len > block_size || cur + block_len > end) { // Fatal - num_compressed_blocks = 0; - max_uncompressed_size = 0; + num_compressed_blocks = 0; + max_uncompressed_size = 0; + max_uncompressed_block_size = 0; break; } // TBD: For some codecs like snappy, it wouldn't be too difficult to get the actual @@ -102,12 +104,14 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat if (init_ctl && lane_id == 0) *init_ctl = s->ctl; cur += block_len; max_uncompressed_size += uncompressed_size; + max_uncompressed_block_size = max(max_uncompressed_block_size, uncompressed_size); } __syncwarp(); if (!lane_id) { - s->info.num_compressed_blocks = num_compressed_blocks; - s->info.num_uncompressed_blocks = num_uncompressed_blocks; - s->info.max_uncompressed_size = max_uncompressed_size; + s->info.num_compressed_blocks = num_compressed_blocks; + s->info.num_uncompressed_blocks = num_uncompressed_blocks; + s->info.max_uncompressed_size = max_uncompressed_size; + s->info.max_uncompressed_block_size = max_uncompressed_block_size; } } From 6721fb8fb1e44d2de6e29fce838e6c88e373932a Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 13 Sep 2021 22:10:41 +0530 Subject: [PATCH 18/24] Merge changes from nvcomp -fPIC --- cpp/CMakeLists.txt | 2 -- cpp/cmake/thirdparty/get_nvcomp.cmake | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ad93a24a5d8..18e8e14efb1 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -505,8 +505,6 @@ target_compile_definitions(cudf PUBLIC "SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_${RMM_L # Compile stringified JIT sources first add_dependencies(cudf jitify_preprocess_run) -set_target_properties(nvcomp PROPERTIES POSITION_INDEPENDENT_CODE ON) - # Specify the target module library dependencies target_link_libraries(cudf PUBLIC ZLIB::ZLIB diff --git a/cpp/cmake/thirdparty/get_nvcomp.cmake b/cpp/cmake/thirdparty/get_nvcomp.cmake index 49a1bac7ee4..cade101cbfd 100644 --- a/cpp/cmake/thirdparty/get_nvcomp.cmake +++ b/cpp/cmake/thirdparty/get_nvcomp.cmake @@ -21,8 +21,7 @@ function(find_and_configure_nvcomp VERSION) GLOBAL_TARGETS nvcomp::nvcomp CPM_ARGS GITHUB_REPOSITORY NVIDIA/nvcomp - GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 - # GIT_SHALLOW TRUE + GIT_TAG 4f4e5713e69473be6e0c8ae483a932f666ae3c2f OPTIONS "BUILD_STATIC ON" "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" From 66d49e8cab46f190f27c788b5d01ad6742b9a5e5 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 17 Sep 2021 01:11:41 +0530 Subject: [PATCH 19/24] Working ORC writer with nvcomp --- cpp/src/io/orc/orc_gpu.h | 1 + cpp/src/io/orc/stripe_enc.cu | 77 +++++++++++++++++++++++++++++++---- cpp/src/io/orc/writer_impl.cu | 17 ++++++-- 3 files changed, 84 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 30687331c15..ee31b0f4b30 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -365,6 +365,7 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, + uint32_t max_comp_blk_size, device_2dspan strm_desc, device_2dspan enc_streams, gpu_inflate_input_s* comp_in, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index db871784665..b2d0be214ee 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -24,6 +24,9 @@ #include #include +#include + +#include namespace cudf { namespace io { @@ -1110,7 +1113,8 @@ __global__ void __launch_bounds__(256) gpu_inflate_input_s* comp_in, gpu_inflate_status_s* comp_out, uint8_t* compressed_bfr, - uint32_t comp_blk_size) + uint32_t comp_blk_size, + uint32_t max_comp_blk_size) { __shared__ __align__(16) StripeStream ss; __shared__ uint8_t* volatile uncomp_base_g; @@ -1135,8 +1139,8 @@ __global__ void __launch_bounds__(256) uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); blk_in->srcDevice = src + b * comp_blk_size; blk_in->srcSize = blk_size; - blk_in->dstDevice = dst + b * (3 + comp_blk_size) + 3; // reserve 3 bytes for block header - blk_in->dstSize = blk_size; + blk_in->dstDevice = dst + b * (3 + max_comp_blk_size) + 3; // reserve 3 bytes for block header + blk_in->dstSize = max_comp_blk_size; blk_out->bytes_written = blk_size; blk_out->status = 1; blk_out->reserved = 0; @@ -1160,7 +1164,8 @@ __global__ void __launch_bounds__(1024) gpu_inflate_input_s* comp_in, gpu_inflate_status_s* comp_out, uint8_t* compressed_bfr, - uint32_t comp_blk_size) + uint32_t comp_blk_size, + uint32_t max_comp_blk_size) { __shared__ __align__(16) StripeStream ss; __shared__ const uint8_t* volatile comp_src_g; @@ -1271,6 +1276,7 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, + uint32_t max_comp_blk_size, device_2dspan strm_desc, device_2dspan enc_streams, gpu_inflate_input_s* comp_in, @@ -1280,11 +1286,68 @@ void CompressOrcDataStreams(uint8_t* compressed_data, dim3 dim_block_init(256, 1); dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); gpuInitCompressionBlocks<<>>( - strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size); - if (compression == SNAPPY) { gpu_snap(comp_in, comp_out, num_compressed_blocks, stream); } + strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); + if (compression == SNAPPY) { + try { + size_t temp_size; + nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( + num_compressed_blocks, comp_blk_size, nvcompBatchedSnappyDefaultOpts, &temp_size); + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, + "Error in getting snappy compression scratch size"); + + rmm::device_buffer scratch(temp_size, stream); + rmm::device_uvector uncompressed_data_ptrs(num_compressed_blocks, stream); + rmm::device_uvector uncompressed_data_sizes(num_compressed_blocks, stream); + rmm::device_uvector compressed_data_ptrs(num_compressed_blocks, stream); + rmm::device_uvector compressed_bytes_written(num_compressed_blocks, stream); + + auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.begin(), + compressed_data_ptrs.begin()); + thrust::transform(rmm::exec_policy(stream), + comp_in, + comp_in + num_compressed_blocks, + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); + }); + nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + max_comp_blk_size, + num_compressed_blocks, + scratch.data(), + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data(), + nvcompBatchedSnappyDefaultOpts, + stream.value()); + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); + + thrust::transform(rmm::exec_policy(stream), + compressed_bytes_written.begin(), + compressed_bytes_written.end(), + comp_out, + [] __device__(size_t size) { + gpu_inflate_status_s status{}; + status.bytes_written = size; + return status; + }); + } catch (...) { + // If we reach this then there was an error in compressing so set an error status for each + // block + thrust::for_each(rmm::exec_policy(stream), + comp_out, + comp_out + num_compressed_blocks, + [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + }; + + // gpu_snap(comp_in, comp_out, num_compressed_blocks, stream); + } dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( - strm_desc, comp_in, comp_out, compressed_data, comp_blk_size); + strm_desc, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); } } // namespace gpu diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index e0018ed7166..174932d0430 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -36,6 +36,8 @@ #include #include +#include + #include #include #include @@ -1472,9 +1474,14 @@ void writer::impl::write(table_view const& table) } // Allocate intermediate output stream buffer - size_t compressed_bfr_size = 0; - size_t num_compressed_blocks = 0; - auto stream_output = [&]() { + size_t compressed_bfr_size = 0; + size_t num_compressed_blocks = 0; + size_t max_compressed_block_size = 0; + if (compression_kind_ != NONE) { + nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size); + } + auto stream_output = [&]() { size_t max_stream_size = 0; bool all_device_write = true; @@ -1491,7 +1498,8 @@ void writer::impl::write(table_view const& table) (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); stream_size += num_blocks * 3; num_compressed_blocks += num_blocks; - compressed_bfr_size += stream_size; + compressed_bfr_size += (max_compressed_block_size + 3) * num_blocks; + // compressed_bfr_size += stream_size; } max_stream_size = std::max(max_stream_size, stream_size); } @@ -1519,6 +1527,7 @@ void writer::impl::write(table_view const& table) num_compressed_blocks, compression_kind_, compression_blocksize_, + max_compressed_block_size, strm_descs, enc_data.streams, comp_in.device_ptr(), From 4e78529a43ead1f692edc2ff14f8dd12f228d2ed Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 17 Sep 2021 01:25:21 +0530 Subject: [PATCH 20/24] Small cleanups. Device span instead of pointers --- cpp/src/io/orc/orc_gpu.h | 5 +++-- cpp/src/io/orc/stripe_enc.cu | 24 +++++++++++++----------- cpp/src/io/orc/writer_impl.cu | 5 ++--- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index ee31b0f4b30..88d7e26b3b6 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -355,6 +355,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, * @param[in] num_compressed_blocks Total number of compressed blocks * @param[in] compression Type of compression * @param[in] comp_blk_size Compression block size + * @param[in] max_comp_blk_size Max size of any block after compression * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in,out] enc_streams chunk streams device array [column][rowgroup] * @param[out] comp_in Per-block compression input parameters @@ -368,8 +369,8 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uint32_t max_comp_blk_size, device_2dspan strm_desc, device_2dspan enc_streams, - gpu_inflate_input_s* comp_in, - gpu_inflate_status_s* comp_out, + device_span comp_in, + device_span comp_out, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index b2d0be214ee..3018fe4fec7 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1105,13 +1105,14 @@ __global__ void __launch_bounds__(1024) * @param[out] comp_out Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size + * @param[in] max_comp_blk_size Max size of any block after compression */ // blockDim {256,1,1} __global__ void __launch_bounds__(256) gpuInitCompressionBlocks(device_2dspan strm_desc, device_2dspan streams, // const? - gpu_inflate_input_s* comp_in, - gpu_inflate_status_s* comp_out, + device_span comp_in, + device_span comp_out, uint8_t* compressed_bfr, uint32_t comp_blk_size, uint32_t max_comp_blk_size) @@ -1157,12 +1158,13 @@ __global__ void __launch_bounds__(256) * @param[in] comp_out Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size + * @param[in] max_comp_blk_size Max size of any block after compression */ // blockDim {1024,1,1} __global__ void __launch_bounds__(1024) gpuCompactCompressedBlocks(device_2dspan strm_desc, - gpu_inflate_input_s* comp_in, - gpu_inflate_status_s* comp_out, + device_span comp_in, + device_span comp_out, uint8_t* compressed_bfr, uint32_t comp_blk_size, uint32_t max_comp_blk_size) @@ -1279,8 +1281,8 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uint32_t max_comp_blk_size, device_2dspan strm_desc, device_2dspan enc_streams, - gpu_inflate_input_s* comp_in, - gpu_inflate_status_s* comp_out, + device_span comp_in, + device_span comp_out, rmm::cuda_stream_view stream) { dim3 dim_block_init(256, 1); @@ -1306,8 +1308,8 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uncompressed_data_sizes.begin(), compressed_data_ptrs.begin()); thrust::transform(rmm::exec_policy(stream), - comp_in, - comp_in + num_compressed_blocks, + comp_in.begin(), + comp_in.end(), comp_it, [] __device__(gpu_inflate_input_s in) { return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); @@ -1328,7 +1330,7 @@ void CompressOrcDataStreams(uint8_t* compressed_data, thrust::transform(rmm::exec_policy(stream), compressed_bytes_written.begin(), compressed_bytes_written.end(), - comp_out, + comp_out.begin(), [] __device__(size_t size) { gpu_inflate_status_s status{}; status.bytes_written = size; @@ -1338,8 +1340,8 @@ void CompressOrcDataStreams(uint8_t* compressed_data, // If we reach this then there was an error in compressing so set an error status for each // block thrust::for_each(rmm::exec_policy(stream), - comp_out, - comp_out + num_compressed_blocks, + comp_out.begin(), + comp_out.end(), [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); }; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 174932d0430..9660d2b78c5 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1499,7 +1499,6 @@ void writer::impl::write(table_view const& table) stream_size += num_blocks * 3; num_compressed_blocks += num_blocks; compressed_bfr_size += (max_compressed_block_size + 3) * num_blocks; - // compressed_bfr_size += stream_size; } max_stream_size = std::max(max_stream_size, stream_size); } @@ -1530,8 +1529,8 @@ void writer::impl::write(table_view const& table) max_compressed_block_size, strm_descs, enc_data.streams, - comp_in.device_ptr(), - comp_out.device_ptr(), + comp_in, + comp_out, stream); strm_descs.device_to_host(stream); comp_out.device_to_host(stream, true); From 8ed68ef29a6b29f79c3bd40a074cd1c117a493ac Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 17 Sep 2021 01:54:30 +0530 Subject: [PATCH 21/24] Here you go: range for loop --- cpp/src/io/orc/writer_impl.cu | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 9660d2b78c5..313700ba296 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1485,23 +1485,20 @@ void writer::impl::write(table_view const& table) size_t max_stream_size = 0; bool all_device_write = true; - for (size_t stripe_id = 0; stripe_id < segmentation.num_stripes(); stripe_id++) { - for (size_t i = 0; i < num_data_streams; i++) { // TODO range for (at least) - gpu::StripeStream* ss = &strm_descs[stripe_id][i]; - if (!out_sink_->is_device_write_preferred(ss->stream_size)) { all_device_write = false; } - size_t stream_size = ss->stream_size; - if (compression_kind_ != NONE) { - ss->first_block = num_compressed_blocks; - ss->bfr_offset = compressed_bfr_size; - - auto num_blocks = std::max( - (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); - stream_size += num_blocks * 3; - num_compressed_blocks += num_blocks; - compressed_bfr_size += (max_compressed_block_size + 3) * num_blocks; - } - max_stream_size = std::max(max_stream_size, stream_size); + for (auto& ss : strm_descs.host_view().flat_view()) { + if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; } + size_t stream_size = ss.stream_size; + if (compression_kind_ != NONE) { + ss.first_block = num_compressed_blocks; + ss.bfr_offset = compressed_bfr_size; + + auto num_blocks = std::max( + (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); + stream_size += num_blocks * 3; + num_compressed_blocks += num_blocks; + compressed_bfr_size += (max_compressed_block_size + 3) * num_blocks; } + max_stream_size = std::max(max_stream_size, stream_size); } if (all_device_write) { From 8b471decc38414642c5baa3a82bd0377cd515c1c Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 17 Sep 2021 02:06:29 +0530 Subject: [PATCH 22/24] Add switch to control usage of nvcomp --- cpp/src/io/orc/stripe_enc.cu | 117 ++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 56 deletions(-) diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 3018fe4fec7..37b998f54c6 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1290,62 +1290,67 @@ void CompressOrcDataStreams(uint8_t* compressed_data, gpuInitCompressionBlocks<<>>( strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); if (compression == SNAPPY) { - try { - size_t temp_size; - nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( - num_compressed_blocks, comp_blk_size, nvcompBatchedSnappyDefaultOpts, &temp_size); - - CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, - "Error in getting snappy compression scratch size"); - - rmm::device_buffer scratch(temp_size, stream); - rmm::device_uvector uncompressed_data_ptrs(num_compressed_blocks, stream); - rmm::device_uvector uncompressed_data_sizes(num_compressed_blocks, stream); - rmm::device_uvector compressed_data_ptrs(num_compressed_blocks, stream); - rmm::device_uvector compressed_bytes_written(num_compressed_blocks, stream); - - auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), - uncompressed_data_sizes.begin(), - compressed_data_ptrs.begin()); - thrust::transform(rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); - }); - nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), - uncompressed_data_sizes.data(), - max_comp_blk_size, - num_compressed_blocks, - scratch.data(), - scratch.size(), - compressed_data_ptrs.data(), - compressed_bytes_written.data(), - nvcompBatchedSnappyDefaultOpts, - stream.value()); - - CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); - - thrust::transform(rmm::exec_policy(stream), - compressed_bytes_written.begin(), - compressed_bytes_written.end(), - comp_out.begin(), - [] __device__(size_t size) { - gpu_inflate_status_s status{}; - status.bytes_written = size; - return status; - }); - } catch (...) { - // If we reach this then there was an error in compressing so set an error status for each - // block - thrust::for_each(rmm::exec_policy(stream), - comp_out.begin(), - comp_out.end(), - [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); - }; - - // gpu_snap(comp_in, comp_out, num_compressed_blocks, stream); + auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP"); + bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0; + if (use_nvcomp) { + try { + size_t temp_size; + nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( + num_compressed_blocks, comp_blk_size, nvcompBatchedSnappyDefaultOpts, &temp_size); + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, + "Error in getting snappy compression scratch size"); + + rmm::device_buffer scratch(temp_size, stream); + rmm::device_uvector uncompressed_data_ptrs(num_compressed_blocks, stream); + rmm::device_uvector uncompressed_data_sizes(num_compressed_blocks, stream); + rmm::device_uvector compressed_data_ptrs(num_compressed_blocks, stream); + rmm::device_uvector compressed_bytes_written(num_compressed_blocks, stream); + + auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.begin(), + compressed_data_ptrs.begin()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); + }); + nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + max_comp_blk_size, + num_compressed_blocks, + scratch.data(), + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data(), + nvcompBatchedSnappyDefaultOpts, + stream.value()); + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); + + thrust::transform(rmm::exec_policy(stream), + compressed_bytes_written.begin(), + compressed_bytes_written.end(), + comp_out.begin(), + [] __device__(size_t size) { + gpu_inflate_status_s status{}; + status.bytes_written = size; + return status; + }); + } catch (...) { + // If we reach this then there was an error in compressing so set an error status for each + // block + thrust::for_each(rmm::exec_policy(stream), + comp_out.begin(), + comp_out.end(), + [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + }; + + } else { + gpu_snap(comp_in.data(), comp_out.data(), num_compressed_blocks, stream); + } } dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( From 0569281a2488e874ddd29a1ca917b334076bcd9a Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 22 Sep 2021 05:23:53 +0530 Subject: [PATCH 23/24] Replace magic number 3 with BLOCK_HEADER_SIZE --- cpp/src/io/orc/orc_common.h | 1 + cpp/src/io/orc/stripe_enc.cu | 2 +- cpp/src/io/orc/stripe_init.cu | 14 ++++++++------ cpp/src/io/orc/writer_impl.cu | 8 ++++---- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/orc/orc_common.h b/cpp/src/io/orc/orc_common.h index ab6788d01f1..de739e8ae19 100644 --- a/cpp/src/io/orc/orc_common.h +++ b/cpp/src/io/orc/orc_common.h @@ -24,6 +24,7 @@ namespace orc { // ORC rows are divided into groups and assigned indexes for faster seeking static constexpr uint32_t default_row_index_stride = 10000; +static constexpr uint32_t BLOCK_HEADER_SIZE = 3; enum CompressionKind : uint8_t { NONE = 0, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 0d41a61d04d..9348d817dad 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1140,7 +1140,7 @@ __global__ void __launch_bounds__(256) uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); blk_in->srcDevice = src + b * comp_blk_size; blk_in->srcSize = blk_size; - blk_in->dstDevice = dst + b * (3 + max_comp_blk_size) + 3; // reserve 3 bytes for block header + blk_in->dstDevice = dst + b * (BLOCK_HEADER_SIZE + max_comp_blk_size) + BLOCK_HEADER_SIZE; blk_in->dstSize = max_comp_blk_size; blk_out->bytes_written = blk_size; blk_out->status = 1; diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 94d8de6561b..a234e81d488 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -52,13 +52,13 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat uint32_t max_uncompressed_block_size = 0; uint32_t num_compressed_blocks = 0; uint32_t num_uncompressed_blocks = 0; - while (cur + 3 < end) { + while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); uint32_t is_uncompressed = block_len & 1; uint32_t uncompressed_size; gpu_inflate_input_s* init_ctl = nullptr; block_len >>= 1; - cur += 3; + cur += BLOCK_HEADER_SIZE; if (block_len > block_size || cur + block_len > end) { // Fatal num_compressed_blocks = 0; @@ -145,12 +145,12 @@ extern "C" __global__ void __launch_bounds__(128, 8) uint32_t num_compressed_blocks = 0; uint32_t max_compressed_blocks = s->info.num_compressed_blocks; - while (cur + 3 < end) { + while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); uint32_t is_uncompressed = block_len & 1; uint32_t uncompressed_size_est, uncompressed_size_actual; block_len >>= 1; - cur += 3; + cur += BLOCK_HEADER_SIZE; if (cur + block_len > end) { break; } if (is_uncompressed) { uncompressed_size_est = block_len; @@ -367,9 +367,11 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, for (;;) { uint32_t block_len, is_uncompressed; - if (cur + 3 > end || cur + 3 >= start + compressed_offset) { break; } + if (cur + BLOCK_HEADER_SIZE > end || cur + BLOCK_HEADER_SIZE >= start + compressed_offset) { + break; + } block_len = cur[0] | (cur[1] << 8) | (cur[2] << 16); - cur += 3; + cur += BLOCK_HEADER_SIZE; is_uncompressed = block_len & 1; block_len >>= 1; cur += block_len; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 313700ba296..8a0112deb76 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1001,10 +1001,10 @@ void writer::impl::write_index_stream(int32_t stripe_id, record.pos += stream.lengths[type]; while ((record.pos >= 0) && (record.blk_pos >= 0) && (static_cast(record.pos) >= compression_blocksize_) && - (record.comp_pos + 3 + comp_out[record.blk_pos].bytes_written < + (record.comp_pos + BLOCK_HEADER_SIZE + comp_out[record.blk_pos].bytes_written < static_cast(record.comp_size))) { record.pos -= compression_blocksize_; - record.comp_pos += 3 + comp_out[record.blk_pos].bytes_written; + record.comp_pos += BLOCK_HEADER_SIZE + comp_out[record.blk_pos].bytes_written; record.blk_pos += 1; } } @@ -1494,9 +1494,9 @@ void writer::impl::write(table_view const& table) auto num_blocks = std::max( (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); - stream_size += num_blocks * 3; + stream_size += num_blocks * BLOCK_HEADER_SIZE; num_compressed_blocks += num_blocks; - compressed_bfr_size += (max_compressed_block_size + 3) * num_blocks; + compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks; } max_stream_size = std::max(max_stream_size, stream_size); } From 11e20e713d9affab99153fe73cedb90f52cc4177 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 22 Sep 2021 20:12:07 +0530 Subject: [PATCH 24/24] Copyright updates --- cpp/src/io/orc/orc_common.h | 2 +- cpp/src/io/orc/stripe_init.cu | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/orc/orc_common.h b/cpp/src/io/orc/orc_common.h index de739e8ae19..eedaa9d4fc2 100644 --- a/cpp/src/io/orc/orc_common.h +++ b/cpp/src/io/orc/orc_common.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index a234e81d488..d6dbdbe6403 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.