From db237419d96de5e92faafcc91cd54571efe6f755 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 7 May 2021 17:26:42 +0530 Subject: [PATCH 01/18] Initial changes to get nvcomp integrated Cmake changes (excluding changes needed in nvcomp's cmake) Replace cuIO's snappy compressor with nvcomp --- cpp/CMakeLists.txt | 21 ++++ cpp/cmake/cudf-build-config.cmake.in | 7 ++ cpp/cmake/cudf-config.cmake.in | 3 + cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake | 41 +++++++ cpp/src/io/comp/snap.cu | 2 +- cpp/src/io/parquet/writer_impl.cu | 135 +++++++++++++++++++++- 6 files changed, 207 insertions(+), 2 deletions(-) create mode 100644 cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 198690e37ff..499ae8b5797 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -125,6 +125,8 @@ include(cmake/thirdparty/CUDF_GetCPM.cmake) include(cmake/thirdparty/CUDF_FindBoost.cmake) # find jitify include(cmake/thirdparty/CUDF_GetJitify.cmake) +# find nvCOMP +include(cmake/thirdparty/CUDF_GetnvCOMP.cmake) # find thrust/cub include(cmake/thirdparty/CUDF_GetThrust.cmake) # find rmm @@ -475,6 +477,7 @@ target_link_libraries(cudf Boost::filesystem ${ARROW_LIBRARIES} cudf::Thrust + nvCOMP::nvcomp rmm::rmm) if(CUDA_STATIC_RUNTIME) @@ -580,6 +583,10 @@ install(TARGETS cudf DESTINATION lib EXPORT cudf-targets) +install(TARGETS nvcomp + DESTINATION lib + EXPORT cudf-nvcomp-target) + install(DIRECTORY ${CUDF_SOURCE_DIR}/include/cudf ${CUDF_SOURCE_DIR}/include/cudf_test @@ -623,6 +630,11 @@ install(EXPORT cudf-testing-targets NAMESPACE cudf:: DESTINATION "${INSTALL_CONFIGDIR}") +install(EXPORT cudf-nvcomp-target + FILE cudf-nvcomp-target.cmake + NAMESPACE nvCOMP:: + DESTINATION "${INSTALL_CONFIGDIR}") + ################################################################################################ # - build export ------------------------------------------------------------------------------- configure_package_config_file(cmake/cudf-build-config.cmake.in ${CUDF_BINARY_DIR}/cudf-config.cmake @@ -656,6 +668,15 @@ if(TARGET gtest) endif() endif() +if(TARGET nvcomp) + get_target_property(nvcomp_is_imported nvcomp IMPORTED) + if(NOT nvcomp_is_imported) + export(TARGETS nvcomp + FILE ${CUDF_BINARY_DIR}/cudf-nvcomp-target.cmake + NAMESPACE nvCOMP::) + endif() +endif() + export(EXPORT cudf-targets FILE ${CUDF_BINARY_DIR}/cudf-targets.cmake NAMESPACE cudf::) diff --git a/cpp/cmake/cudf-build-config.cmake.in b/cpp/cmake/cudf-build-config.cmake.in index ed1926f20f0..84cba8e8680 100644 --- a/cpp/cmake/cudf-build-config.cmake.in +++ b/cpp/cmake/cudf-build-config.cmake.in @@ -71,6 +71,13 @@ else() include(@CUDF_SOURCE_DIR@/cmake/thirdparty/CUDF_GetGTest.cmake) endif() +# find nvCOMP +if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") + include("${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") +else() + include(@CUDF_SOURCE_DIR@/cmake/thirdparty/CUDF_GetnvCOMP.cmake) +endif() + list(POP_FRONT CMAKE_MODULE_PATH) diff --git a/cpp/cmake/cudf-config.cmake.in b/cpp/cmake/cudf-config.cmake.in index 66c669851fa..950c08767f6 100644 --- a/cpp/cmake/cudf-config.cmake.in +++ b/cpp/cmake/cudf-config.cmake.in @@ -83,6 +83,9 @@ find_dependency(ArrowCUDA @CUDF_VERSION_Arrow@) find_dependency(rmm @CUDF_MIN_VERSION_rmm@) +find_dependency(nvCOMP @CUDF_MIN_VERSION_nvCOMP@) +include("${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") + set(Thrust_ROOT "${CMAKE_CURRENT_LIST_DIR}/../../../include/libcudf/Thrust") find_dependency(Thrust @CUDF_MIN_VERSION_Thrust@) thrust_create_target(cudf::Thrust FROM_OPTIONS) diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake new file mode 100644 index 00000000000..4fec448e9a9 --- /dev/null +++ b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake @@ -0,0 +1,41 @@ +#============================================================================= +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#============================================================================= + +function(find_and_configure_nvcomp VERSION) + + if(TARGET nvCOMP::nvcomp) + return() + endif() + + # Find or install nvCOMP + CPMFindPackage(NAME nvCOMP + VERSION ${VERSION} + GIT_REPOSITORY https://github.com/NVIDIA/nvcomp.git + GIT_TAG v${VERSION} + GIT_SHALLOW TRUE + OPTIONS ) + + if(NOT TARGET nvCOMP::nvcomp) + add_library(nvCOMP::nvcomp ALIAS nvcomp) + endif() + + # Make sure consumers of cudf can also see nvCOMP::nvcomp target + fix_cmake_global_defaults(nvCOMP::nvcomp) +endfunction() + +set(CUDF_MIN_VERSION_nvCOMP 2.0.0) + +find_and_configure_nvcomp(${CUDF_MIN_VERSION_nvCOMP}) diff --git a/cpp/src/io/comp/snap.cu b/cpp/src/io/comp/snap.cu index 999d02e3a50..526ff95d3d4 100644 --- a/cpp/src/io/comp/snap.cu +++ b/cpp/src/io/comp/snap.cu @@ -257,7 +257,7 @@ static __device__ uint32_t Match60(const uint8_t *src1, * @param[out] outputs Compression status per block * @param[in] count Number of blocks to compress */ -extern "C" __global__ void __launch_bounds__(128) +__global__ void __launch_bounds__(128) snap_kernel(gpu_inflate_input_s *inputs, gpu_inflate_status_s *outputs, int count) { __shared__ __align__(16) snap_state_s state_g; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 0b5a8a0d501..0a4a5d96a4e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -36,6 +36,8 @@ #include #include +#include + #include #include #include @@ -792,6 +794,136 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & stream.synchronize(); } +template +void print(rmm::device_uvector const &d_vec, std::string label = "") +{ + std::vector h_vec(d_vec.size()); + cudaMemcpy(h_vec.data(), d_vec.data(), d_vec.size() * sizeof(T), cudaMemcpyDeviceToHost); + printf("%s (%lu)\t", label.c_str(), h_vec.size()); + for (auto &&i : h_vec) std::cout << (int)i << " "; + printf("\n"); +} + +template +void print(rmm::device_vector const &d_vec, std::string label = "") +{ + thrust::host_vector h_vec = d_vec; + printf("%s \t", label.c_str()); + for (auto &&i : h_vec) std::cout << i << " "; + printf("\n"); +} + +struct printer { + template + std::enable_if_t(), void> operator()(column_view const &col, + std::string label = "") + { + auto d_vec = rmm::device_vector(col.begin(), col.end()); + print(d_vec, label); + } + template + std::enable_if_t(), void> operator()(column_view const &col, + std::string label = "") + { + CUDF_FAIL("no strings"); + } +}; +void print(column_view const &col, std::string label = "") +{ + cudf::type_dispatcher(col.type(), printer{}, col, label); +} + +void snappy_compress(device_span comp_in, + device_span comp_stat, + rmm::cuda_stream_view stream) +{ + size_t num_comp_pages = comp_in.size(); + size_t temp_size; + nvcompError_t nvcomp_error = + nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, 1 << 20, &temp_size); + // TODO: Continue with uncompressed if nvcomp fails at any step + CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, "Unable to get temporary size"); + + // Not needed now but nvcomp API makes no promises about future + rmm::device_buffer scratch(temp_size, stream); + // Analogous to comp_in.srcDevice + rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); + // Analogous to comp_in.srcSize + rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); + // Analogous to comp_in.dstDevice + rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); + // Analogous to comp_stat.bytes_written + rmm::device_vector compressed_bytes_written(num_comp_pages); + + rmm::device_uvector dstSizes(num_comp_pages, stream); + // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in + // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() + // TODO: Replace our allocation size computation with value obtained from aforementioned API + + // Prepare the vectors + auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.begin(), + compressed_data_ptrs.begin(), + dstSizes.begin()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); + }); + nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + num_comp_pages, + scratch.data(), // Not needed rn but future + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data().get(), + stream.value()); + + // rmm::device_uvector new_comp_in(num_comp_pages, stream); + // thrust::transform(rmm::exec_policy(), + // comp_it, + // comp_it + num_comp_pages, + // new_comp_in.begin(), + // [] __device__(auto zip) { + // gpu_inflate_input_s status{}; + // status.srcDevice = thrust::get<0>(zip); + // status.srcSize = thrust::get<1>(zip); + // status.dstDevice = thrust::get<2>(zip); + // status.dstSize = thrust::get<3>(zip); + // return status; + // }); + // gpu_snap(comp_in.data(), comp_stat.data(), num_comp_pages, stream); + + // thrust::transform(rmm::exec_policy(stream), + // comp_stat.begin(), + // comp_stat.end(), + // compressed_bytes_written.begin(), + // [] __device__(gpu_inflate_status_s status) { return status.bytes_written; }); + + print(uncompressed_data_sizes, "uncomp size"); + print(dstSizes, "dstSizes from cuIO"); + print(compressed_bytes_written, "written by nvcomp"); + CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, + "Unable to perform snappy compression"); + + // nvcomp also doesn't use comp_out.status . Even though it has it in the kernel, it doesn't + // expose it from the API and ignores it internally. Need to rely on nvcompError and the + // compression will be all or nothing rather than per page. + // TODO: Get to it or file a request with the nvcomp team to expose per page status output + // The other comp_out field is reserved which is for internal cuIO debugging and can be 0. + thrust::transform(rmm::exec_policy(stream), + compressed_bytes_written.begin(), + compressed_bytes_written.end(), + comp_stat.begin(), + [] __device__(size_t size) { + gpu_inflate_status_s status{}; + status.bytes_written = size; + return status; + }); +} + void writer::impl::encode_pages(hostdevice_2dvector &chunks, device_span pages, uint32_t pages_in_batch, @@ -820,7 +952,8 @@ void writer::impl::encode_pages(hostdevice_2dvector &chunks gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); switch (compression_) { case parquet::Compression::SNAPPY: - CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); + // CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); + snappy_compress(comp_in, comp_stat, stream); break; default: break; } From a5f336347dbdef253160e336fe33a13fc45f79f1 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 13 May 2021 01:57:39 +0530 Subject: [PATCH 02/18] Using nvcomp provided max compressed buffer size --- cpp/src/io/parquet/page_enc.cu | 17 +++++- cpp/src/io/parquet/parquet_gpu.hpp | 8 ++- cpp/src/io/parquet/writer_impl.cu | 91 ++++++++++++++---------------- cpp/src/io/parquet/writer_impl.hpp | 4 ++ 4 files changed, 66 insertions(+), 54 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index bf9114949aa..aee80b1ae7b 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -410,6 +410,7 @@ __global__ void __launch_bounds__(128) device_span col_desc, statistics_merge_group *page_grstats, statistics_merge_group *chunk_grstats, + size_t max_page_comp_data_size, int32_t num_columns) { // TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach @@ -439,6 +440,8 @@ __global__ void __launch_bounds__(128) uint32_t page_offset = ck_g.ck_stat_size; uint32_t num_dict_entries = 0; uint32_t comp_page_offset = ck_g.ck_stat_size; + uint32_t page_headers_size = 0; + uint32_t max_page_data_size = 0; uint32_t cur_row = ck_g.start_row; uint32_t ck_max_stats_len = 0; uint32_t max_stats_len = 0; @@ -465,7 +468,9 @@ __global__ void __launch_bounds__(128) page_g.num_leaf_values = ck_g.total_dict_entries; page_g.num_values = ck_g.total_dict_entries; page_offset += page_g.max_hdr_size + page_g.max_data_size; - comp_page_offset += page_g.max_hdr_size + GetMaxCompressedBfrSize(page_g.max_data_size); + comp_page_offset += page_g.max_hdr_size + max_page_comp_data_size; + page_headers_size += page_g.max_hdr_size; + max_page_data_size = max(max_page_data_size, page_g.max_data_size); } __syncwarp(); if (t == 0) { @@ -571,7 +576,9 @@ __global__ void __launch_bounds__(128) pagestats_g.start_chunk = ck_g.first_fragment + page_start; pagestats_g.num_chunks = page_g.num_fragments; page_offset += page_g.max_hdr_size + page_g.max_data_size; - comp_page_offset += page_g.max_hdr_size + GetMaxCompressedBfrSize(page_g.max_data_size); + comp_page_offset += page_g.max_hdr_size + max_page_comp_data_size; + page_headers_size += page_g.max_hdr_size; + max_page_data_size = max(max_page_data_size, page_g.max_data_size); cur_row += rows_in_page; ck_max_stats_len = max(ck_max_stats_len, max_stats_len); } @@ -610,6 +617,8 @@ __global__ void __launch_bounds__(128) ck_g.num_pages = num_pages; ck_g.bfr_size = page_offset; ck_g.compressed_size = comp_page_offset; + ck_g.page_headers_size = page_headers_size; + ck_g.max_page_data_size = max_page_data_size; pagestats_g.start_chunk = ck_g.first_page + ck_g.has_dictionary; // Exclude dictionary pagestats_g.num_chunks = num_pages - ck_g.has_dictionary; } @@ -2141,6 +2150,7 @@ void InitFragmentStatistics(device_2dspan groups, * @param[in] num_columns Number of columns * @param[out] page_grstats Setup for page-level stats * @param[out] chunk_grstats Setup for chunk-level stats + * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages * @param[in] stream CUDA stream to use, default 0 */ void InitEncoderPages(device_2dspan chunks, @@ -2149,12 +2159,13 @@ void InitEncoderPages(device_2dspan chunks, int32_t num_columns, statistics_merge_group *page_grstats, statistics_merge_group *chunk_grstats, + size_t max_page_comp_data_size, rmm::cuda_stream_view stream) { auto num_rowgroups = chunks.size().first; dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup gpuInitPages<<>>( - chunks, pages, col_desc, page_grstats, chunk_grstats, num_columns); + chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns); } /** diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 564226c7ff3..335407af19d 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -298,8 +298,10 @@ struct EncColumnChunk { statistics_chunk const *stats; //!< Fragment statistics uint32_t bfr_size; //!< Uncompressed buffer size uint32_t compressed_size; //!< Compressed buffer size - uint32_t start_row; //!< First row of chunk - uint32_t num_rows; //!< Number of rows in chunk + uint32_t max_page_data_size; //!< Max data size (excuding header) of any page in this chunk + uint32_t page_headers_size; //!< Sum of size of all page headers + uint32_t start_row; //!< First row of chunk + uint32_t num_rows; //!< Number of rows in chunk uint32_t num_values; //!< Number of values in chunk. Different from num_rows for nested types uint32_t first_fragment; //!< First fragment of chunk EncPage *pages; //!< Ptr to pages that belong to this chunk @@ -480,6 +482,7 @@ void InitFragmentStatistics(cudf::detail::device_2dspan groups * @param[in] num_columns Number of columns * @param[in] page_grstats Setup for page-level stats * @param[in] chunk_grstats Setup for chunk-level stats + * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages * @param[in] stream CUDA stream to use, default 0 */ void InitEncoderPages(cudf::detail::device_2dspan chunks, @@ -488,6 +491,7 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, int32_t num_columns, statistics_merge_group *page_grstats = nullptr, statistics_merge_group *chunk_grstats = nullptr, + size_t max_page_comp_data_size = 0, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 0a4a5d96a4e..0b07ac01487 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -759,7 +759,7 @@ void writer::impl::build_chunk_dictionaries( gpu::BuildChunkDictionaries(chunks.device_view().flat_view(), dict_scratch.data(), stream); } - gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, stream); + gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, 0, stream); chunks.device_to_host(stream, true); } @@ -768,6 +768,7 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & device_span pages, statistics_chunk *page_stats, statistics_chunk *frag_stats, + size_t max_page_comp_data_size, uint32_t num_columns, uint32_t num_pages, uint32_t num_stats_bfr) @@ -780,6 +781,7 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & num_columns, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, + max_page_comp_data_size, stream); if (num_stats_bfr > 0) { MergeColumnStatistics(page_stats, frag_stats, page_stats_mrg.data(), num_pages, stream); @@ -854,23 +856,18 @@ void snappy_compress(device_span comp_in, rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); // Analogous to comp_stat.bytes_written rmm::device_vector compressed_bytes_written(num_comp_pages); - - rmm::device_uvector dstSizes(num_comp_pages, stream); // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() - // TODO: Replace our allocation size computation with value obtained from aforementioned API // Prepare the vectors - auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), - uncompressed_data_sizes.begin(), - compressed_data_ptrs.begin(), - dstSizes.begin()); + auto comp_it = thrust::make_zip_iterator( + uncompressed_data_ptrs.begin(), uncompressed_data_sizes.begin(), compressed_data_ptrs.begin()); thrust::transform(rmm::exec_policy(stream), comp_in.begin(), comp_in.end(), comp_it, [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); }); nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), uncompressed_data_sizes.data(), @@ -881,38 +878,12 @@ void snappy_compress(device_span comp_in, compressed_bytes_written.data().get(), stream.value()); - // rmm::device_uvector new_comp_in(num_comp_pages, stream); - // thrust::transform(rmm::exec_policy(), - // comp_it, - // comp_it + num_comp_pages, - // new_comp_in.begin(), - // [] __device__(auto zip) { - // gpu_inflate_input_s status{}; - // status.srcDevice = thrust::get<0>(zip); - // status.srcSize = thrust::get<1>(zip); - // status.dstDevice = thrust::get<2>(zip); - // status.dstSize = thrust::get<3>(zip); - // return status; - // }); - // gpu_snap(comp_in.data(), comp_stat.data(), num_comp_pages, stream); - - // thrust::transform(rmm::exec_policy(stream), - // comp_stat.begin(), - // comp_stat.end(), - // compressed_bytes_written.begin(), - // [] __device__(gpu_inflate_status_s status) { return status.bytes_written; }); - - print(uncompressed_data_sizes, "uncomp size"); - print(dstSizes, "dstSizes from cuIO"); - print(compressed_bytes_written, "written by nvcomp"); CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, "Unable to perform snappy compression"); - // nvcomp also doesn't use comp_out.status . Even though it has it in the kernel, it doesn't - // expose it from the API and ignores it internally. Need to rely on nvcompError and the - // compression will be all or nothing rather than per page. - // TODO: Get to it or file a request with the nvcomp team to expose per page status output - // The other comp_out field is reserved which is for internal cuIO debugging and can be 0. + // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, + // compression will succeed. + // The other `comp_out` field is `reserved` which is for internal cuIO debugging and can be 0. thrust::transform(rmm::exec_policy(stream), compressed_bytes_written.begin(), compressed_bytes_written.end(), @@ -951,10 +922,7 @@ void writer::impl::encode_pages(hostdevice_2dvector &chunks gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); switch (compression_) { - case parquet::Compression::SNAPPY: - // CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); - snappy_compress(comp_in, comp_stat, stream); - break; + case parquet::Compression::SNAPPY: snappy_compress(comp_in, comp_stat, stream); break; default: break; } // TBD: Not clear if the official spec actually allows dynamically turning off compression at the @@ -1225,16 +1193,36 @@ void writer::impl::write(table_view const &table) build_chunk_dictionaries(chunks, col_desc, num_columns, num_dictionaries); } + // Get the maximum page size across all chunks + size_type max_page_uncomp_data_size = + std::accumulate(chunks.host_view().flat_view().begin(), + chunks.host_view().flat_view().end(), + 0, + [](uint32_t max_page_size, gpu::EncColumnChunk const &chunk) { + return std::max(max_page_size, chunk.max_page_data_size); + }); + + size_t max_page_comp_data_size = 0; + if (compression_ != parquet::Compression::UNCOMPRESSED) { + CUDF_EXPECTS( + nvcompError_t::nvcompSuccess == nvcompBatchedSnappyCompressGetOutputSize( + max_page_uncomp_data_size, &max_page_comp_data_size), + "Error in getting compressed size from nvcomp"); + } + // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) std::vector batch_list; uint32_t num_pages = 0; size_t max_bytes_in_batch = 1024 * 1024 * 1024; // 1GB - TBD: Tune this size_t max_uncomp_bfr_size = 0; + size_t max_comp_bfr_size = 0; size_t max_chunk_bfr_size = 0; uint32_t max_pages_in_batch = 0; size_t bytes_in_batch = 0; + size_t comp_bytes_in_batch = 0; for (uint32_t r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { - size_t rowgroup_size = 0; + size_t rowgroup_size = 0; + size_t comp_rowgroup_size = 0; if (r < num_rowgroups) { for (int i = 0; i < num_columns; i++) { gpu::EncColumnChunk *ck = &chunks[r][i]; @@ -1242,6 +1230,8 @@ void writer::impl::write(table_view const &table) num_pages += ck->num_pages; pages_in_batch += ck->num_pages; rowgroup_size += ck->bfr_size; + ck->compressed_size = ck->page_headers_size + max_page_comp_data_size * ck->num_pages; + comp_rowgroup_size += ck->compressed_size; max_chunk_bfr_size = std::max(max_chunk_bfr_size, (size_t)std::max(ck->bfr_size, ck->compressed_size)); } @@ -1250,23 +1240,25 @@ void writer::impl::write(table_view const &table) if ((r == num_rowgroups) || (groups_in_batch != 0 && bytes_in_batch + rowgroup_size > max_bytes_in_batch)) { max_uncomp_bfr_size = std::max(max_uncomp_bfr_size, bytes_in_batch); + max_comp_bfr_size = std::max(max_comp_bfr_size, comp_bytes_in_batch); max_pages_in_batch = std::max(max_pages_in_batch, pages_in_batch); if (groups_in_batch != 0) { batch_list.push_back(groups_in_batch); groups_in_batch = 0; } - bytes_in_batch = 0; - pages_in_batch = 0; + bytes_in_batch = 0; + comp_bytes_in_batch = 0; + pages_in_batch = 0; } bytes_in_batch += rowgroup_size; + comp_bytes_in_batch += comp_rowgroup_size; groups_in_batch++; } + // Clear compressed buffer size if compression has been turned off + if (compression_ == parquet::Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } + // Initialize data pointers in batch - size_t max_comp_bfr_size = - (compression_ != parquet::Compression::UNCOMPRESSED) - ? gpu::GetMaxCompressedBfrSize(max_uncomp_bfr_size, max_pages_in_batch) - : 0; uint32_t num_stats_bfr = (stats_granularity_ != statistics_freq::STATISTICS_NONE) ? num_pages + num_chunks : 0; rmm::device_buffer uncomp_bfr(max_uncomp_bfr_size, stream); @@ -1295,6 +1287,7 @@ void writer::impl::write(table_view const &table) {pages.data(), pages.size()}, (num_stats_bfr) ? page_stats.data() : nullptr, (num_stats_bfr) ? frag_stats.data() : nullptr, + max_page_comp_data_size, num_columns, num_pages, num_stats_bfr); diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 34c8347e79e..c2f0509a08b 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -165,6 +165,9 @@ class writer::impl { * @param chunks column chunk array * @param col_desc column description array * @param pages encoder pages array + * @param page_stats page statistics array + * @param frag_stats fragment statistics array + * @param max_page_comp_data_size max compressed * @param num_columns Total number of columns * @param num_pages Total number of pages * @param num_stats_bfr Number of statistics buffers @@ -174,6 +177,7 @@ class writer::impl { device_span pages, statistics_chunk* page_stats, statistics_chunk* frag_stats, + size_t max_page_comp_data_size, uint32_t num_columns, uint32_t num_pages, uint32_t num_stats_bfr); From 61018aa14aa5eef4f76a75f5fcaafd7039c7a20e Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 13 May 2021 02:28:30 +0530 Subject: [PATCH 03/18] Recover from error in nvcomp compressing and encode uncompressed. --- cpp/src/io/parquet/writer_impl.cu | 114 ++++++++++++++++-------------- 1 file changed, 61 insertions(+), 53 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 0b07ac01487..3b8d945fdff 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -840,59 +840,67 @@ void snappy_compress(device_span comp_in, rmm::cuda_stream_view stream) { size_t num_comp_pages = comp_in.size(); - size_t temp_size; - nvcompError_t nvcomp_error = - nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, 1 << 20, &temp_size); - // TODO: Continue with uncompressed if nvcomp fails at any step - CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, "Unable to get temporary size"); - - // Not needed now but nvcomp API makes no promises about future - rmm::device_buffer scratch(temp_size, stream); - // Analogous to comp_in.srcDevice - rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_in.srcSize - rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); - // Analogous to comp_in.dstDevice - rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_stat.bytes_written - rmm::device_vector compressed_bytes_written(num_comp_pages); - // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in - // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() - - // Prepare the vectors - auto comp_it = thrust::make_zip_iterator( - uncompressed_data_ptrs.begin(), uncompressed_data_sizes.begin(), compressed_data_ptrs.begin()); - thrust::transform(rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); - }); - nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), - uncompressed_data_sizes.data(), - num_comp_pages, - scratch.data(), // Not needed rn but future - scratch.size(), - compressed_data_ptrs.data(), - compressed_bytes_written.data().get(), - stream.value()); - - CUDF_EXPECTS(nvcomp_error == nvcompError_t::nvcompSuccess, - "Unable to perform snappy compression"); - - // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, - // compression will succeed. - // The other `comp_out` field is `reserved` which is for internal cuIO debugging and can be 0. - thrust::transform(rmm::exec_policy(stream), - compressed_bytes_written.begin(), - compressed_bytes_written.end(), - comp_stat.begin(), - [] __device__(size_t size) { - gpu_inflate_status_s status{}; - status.bytes_written = size; - return status; - }); + do { + size_t temp_size; + nvcompError_t nvcomp_error = + nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, 1 << 20, &temp_size); + if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } + + // Not needed now but nvcomp API makes no promises about future + rmm::device_buffer scratch(temp_size, stream); + // Analogous to comp_in.srcDevice + rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); + // Analogous to comp_in.srcSize + rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); + // Analogous to comp_in.dstDevice + rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); + // Analogous to comp_stat.bytes_written + rmm::device_vector compressed_bytes_written(num_comp_pages); + // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in + // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() + + // Prepare the vectors + auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.begin(), + compressed_data_ptrs.begin()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); + }); + nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + num_comp_pages, + scratch.data(), // Not needed rn but future + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data().get(), + stream.value()); + + if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } + + // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, + // compression will succeed. + // The other `comp_out` field is `reserved` which is for internal cuIO debugging and can be 0. + thrust::transform(rmm::exec_policy(stream), + compressed_bytes_written.begin(), + compressed_bytes_written.end(), + comp_stat.begin(), + [] __device__(size_t size) { + gpu_inflate_status_s status{}; + status.bytes_written = size; + return status; + }); + return; + } while (0); + + // If we reach this then there was an error in compressing so set an error status for each page + thrust::for_each(rmm::exec_policy(stream), + comp_stat.begin(), + comp_stat.end(), + [] __device__(gpu_inflate_status_s stat) { stat.status = 1; }); } void writer::impl::encode_pages(hostdevice_2dvector &chunks, From 64d7d1c8161738314ac05b2b50dce1d2f72a78b0 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 13 May 2021 13:29:32 +0530 Subject: [PATCH 04/18] review changes --- cpp/src/io/parquet/parquet_gpu.hpp | 25 +++++++++--------- cpp/src/io/parquet/writer_impl.cu | 41 +----------------------------- 2 files changed, 13 insertions(+), 53 deletions(-) diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 335407af19d..824aa38b0f1 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -489,10 +489,10 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, device_span pages, device_span col_desc, int32_t num_columns, - statistics_merge_group *page_grstats = nullptr, - statistics_merge_group *chunk_grstats = nullptr, - size_t max_page_comp_data_size = 0, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + statistics_merge_group *page_grstats, + statistics_merge_group *chunk_grstats, + size_t max_page_comp_data_size, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for packing column data into parquet pages @@ -503,9 +503,9 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, * @param[in] stream CUDA stream to use, default 0 */ void EncodePages(device_span pages, - device_span comp_in = {}, - device_span comp_out = {}, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + device_span comp_in, + device_span comp_out, + rmm::cuda_stream_view stream); /** * @brief Launches kernel to make the compressed vs uncompressed chunk-level decision @@ -513,8 +513,7 @@ void EncodePages(device_span pages, * @param[in,out] chunks Column chunks (updated with actual compressed/uncompressed sizes) * @param[in] stream CUDA stream to use, default 0 */ -void DecideCompression(device_span chunks, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); +void DecideCompression(device_span chunks, rmm::cuda_stream_view stream); /** * @brief Launches kernel to encode page headers @@ -526,10 +525,10 @@ void DecideCompression(device_span chunks, * @param[in] stream CUDA stream to use, default 0 */ void EncodePageHeaders(device_span pages, - device_span comp_out = {}, - device_span page_stats = {}, - const statistics_chunk *chunk_stats = nullptr, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + device_span comp_out, + device_span page_stats, + const statistics_chunk *chunk_stats, + rmm::cuda_stream_view stream); /** * @brief Launches kernel to gather pages to a single contiguous block per chunk diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 3b8d945fdff..141b74f2a60 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -796,45 +796,6 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & stream.synchronize(); } -template -void print(rmm::device_uvector const &d_vec, std::string label = "") -{ - std::vector h_vec(d_vec.size()); - cudaMemcpy(h_vec.data(), d_vec.data(), d_vec.size() * sizeof(T), cudaMemcpyDeviceToHost); - printf("%s (%lu)\t", label.c_str(), h_vec.size()); - for (auto &&i : h_vec) std::cout << (int)i << " "; - printf("\n"); -} - -template -void print(rmm::device_vector const &d_vec, std::string label = "") -{ - thrust::host_vector h_vec = d_vec; - printf("%s \t", label.c_str()); - for (auto &&i : h_vec) std::cout << i << " "; - printf("\n"); -} - -struct printer { - template - std::enable_if_t(), void> operator()(column_view const &col, - std::string label = "") - { - auto d_vec = rmm::device_vector(col.begin(), col.end()); - print(d_vec, label); - } - template - std::enable_if_t(), void> operator()(column_view const &col, - std::string label = "") - { - CUDF_FAIL("no strings"); - } -}; -void print(column_view const &col, std::string label = "") -{ - cudf::type_dispatcher(col.type(), printer{}, col, label); -} - void snappy_compress(device_span comp_in, device_span comp_stat, rmm::cuda_stream_view stream) @@ -900,7 +861,7 @@ void snappy_compress(device_span comp_in, thrust::for_each(rmm::exec_policy(stream), comp_stat.begin(), comp_stat.end(), - [] __device__(gpu_inflate_status_s stat) { stat.status = 1; }); + [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); } void writer::impl::encode_pages(hostdevice_2dvector &chunks, From 27764e76f9152fac9072d199759222ca74d6b46c Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 15 May 2021 00:14:30 +0530 Subject: [PATCH 05/18] Replace accidental vector with uvector. --- cpp/src/io/parquet/writer_impl.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 141b74f2a60..79d049ac69a 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -816,7 +816,7 @@ void snappy_compress(device_span comp_in, // Analogous to comp_in.dstDevice rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); // Analogous to comp_stat.bytes_written - rmm::device_vector compressed_bytes_written(num_comp_pages); + rmm::device_uvector compressed_bytes_written(num_comp_pages, stream); // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() @@ -837,7 +837,7 @@ void snappy_compress(device_span comp_in, scratch.data(), // Not needed rn but future scratch.size(), compressed_data_ptrs.data(), - compressed_bytes_written.data().get(), + compressed_bytes_written.data(), stream.value()); if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } From 95a57ec6d1eaa3c71ecd6fcb30fc400d3cabede2 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 15 May 2021 00:16:18 +0530 Subject: [PATCH 06/18] Provide the actual max uncomp page size to nvcomp's temp size estimator rather than a hardcoded value --- cpp/src/io/parquet/writer_impl.cu | 9 +++++++-- cpp/src/io/parquet/writer_impl.hpp | 2 ++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 79d049ac69a..6ad73d6f6f6 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -798,13 +798,14 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector & void snappy_compress(device_span comp_in, device_span comp_stat, + size_t max_page_uncomp_data_size, rmm::cuda_stream_view stream) { size_t num_comp_pages = comp_in.size(); do { size_t temp_size; nvcompError_t nvcomp_error = - nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, 1 << 20, &temp_size); + nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, max_page_uncomp_data_size, &temp_size); if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } // Not needed now but nvcomp API makes no promises about future @@ -866,6 +867,7 @@ void snappy_compress(device_span comp_in, void writer::impl::encode_pages(hostdevice_2dvector &chunks, device_span pages, + size_t max_page_uncomp_data_size, uint32_t pages_in_batch, uint32_t first_page_in_batch, uint32_t rowgroups_in_batch, @@ -891,7 +893,9 @@ void writer::impl::encode_pages(hostdevice_2dvector &chunks gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); switch (compression_) { - case parquet::Compression::SNAPPY: snappy_compress(comp_in, comp_stat, stream); break; + case parquet::Compression::SNAPPY: + snappy_compress(comp_in, comp_stat, max_page_uncomp_data_size, stream); + break; default: break; } // TBD: Not clear if the official spec actually allows dynamically turning off compression at the @@ -1277,6 +1281,7 @@ void writer::impl::write(table_view const &table) encode_pages( chunks, {pages.data(), pages.size()}, + max_page_uncomp_data_size, pages_in_batch, first_page_in_batch, batch_list[b], diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index c2f0509a08b..8848d4ffdc0 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -186,6 +186,7 @@ class writer::impl { * * @param chunks column chunk array * @param pages encoder pages array + * @param max_page_uncomp_data_size maximum uncompressed size of any page's data * @param pages_in_batch number of pages in this batch * @param first_page_in_batch first page in batch * @param rowgroups_in_batch number of rowgroups in this batch @@ -195,6 +196,7 @@ class writer::impl { */ void encode_pages(hostdevice_2dvector& chunks, device_span pages, + size_t max_page_uncomp_data_size, uint32_t pages_in_batch, uint32_t first_page_in_batch, uint32_t rowgroups_in_batch, From cc9500abba3c1b3b411caa04d2c126af7997508a Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 15 May 2021 00:45:46 +0530 Subject: [PATCH 07/18] cmake changes requested in review --- cpp/cmake/cudf-config.cmake.in | 1 - cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/cmake/cudf-config.cmake.in b/cpp/cmake/cudf-config.cmake.in index 950c08767f6..1de968f6a95 100644 --- a/cpp/cmake/cudf-config.cmake.in +++ b/cpp/cmake/cudf-config.cmake.in @@ -83,7 +83,6 @@ find_dependency(ArrowCUDA @CUDF_VERSION_Arrow@) find_dependency(rmm @CUDF_MIN_VERSION_rmm@) -find_dependency(nvCOMP @CUDF_MIN_VERSION_nvCOMP@) include("${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") set(Thrust_ROOT "${CMAKE_CURRENT_LIST_DIR}/../../../include/libcudf/Thrust") diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake index 4fec448e9a9..1272728770c 100644 --- a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake +++ b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake @@ -25,8 +25,7 @@ function(find_and_configure_nvcomp VERSION) VERSION ${VERSION} GIT_REPOSITORY https://github.com/NVIDIA/nvcomp.git GIT_TAG v${VERSION} - GIT_SHALLOW TRUE - OPTIONS ) + GIT_SHALLOW TRUE) if(NOT TARGET nvCOMP::nvcomp) add_library(nvCOMP::nvcomp ALIAS nvcomp) From 40ebd1e04f538bae118f2b99b2379be065331b63 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 25 Aug 2021 01:24:09 +0530 Subject: [PATCH 08/18] Update parquet writer to use nvcomp 2.1 --- cpp/CMakeLists.txt | 20 +------------- cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake | 28 ++++++++++--------- cpp/src/io/parquet/writer_impl.cu | 33 ++++++++++++----------- 3 files changed, 35 insertions(+), 46 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 1bc4e65338c..60f9cd510cd 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -532,7 +532,7 @@ target_link_libraries(cudf PUBLIC ZLIB::ZLIB ${ARROW_LIBRARIES} cudf::Thrust - nvCOMP::nvcomp + nvcomp::nvcomp rmm::rmm PRIVATE cuco::cuco) @@ -651,10 +651,6 @@ install(TARGETS cudf DESTINATION lib EXPORT cudf-targets) -install(TARGETS nvcomp - DESTINATION lib - EXPORT cudf-nvcomp-target) - install(DIRECTORY ${CUDF_SOURCE_DIR}/include/cudf ${CUDF_SOURCE_DIR}/include/cudf_test @@ -688,11 +684,6 @@ install(EXPORT cudf-testing-targets NAMESPACE cudf:: DESTINATION "${INSTALL_CONFIGDIR}") -install(EXPORT cudf-nvcomp-target - FILE cudf-nvcomp-target.cmake - NAMESPACE nvCOMP:: - DESTINATION "${INSTALL_CONFIGDIR}") - ################################################################################################ # - build export ------------------------------------------------------------------------------- configure_package_config_file(cmake/cudf-build-config.cmake.in ${CUDF_BINARY_DIR}/cudf-config.cmake @@ -710,15 +701,6 @@ if(TARGET gtest) endif() endif() -if(TARGET nvcomp) - get_target_property(nvcomp_is_imported nvcomp IMPORTED) - if(NOT nvcomp_is_imported) - export(TARGETS nvcomp - FILE ${CUDF_BINARY_DIR}/cudf-nvcomp-target.cmake - NAMESPACE nvCOMP::) - endif() -endif() - export(EXPORT cudf-targets FILE ${CUDF_BINARY_DIR}/cudf-targets.cmake NAMESPACE cudf::) diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake index 1272728770c..c88d462e959 100644 --- a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake +++ b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake @@ -16,25 +16,29 @@ function(find_and_configure_nvcomp VERSION) - if(TARGET nvCOMP::nvcomp) + if(TARGET nvcomp::nvcomp) return() endif() - # Find or install nvCOMP - CPMFindPackage(NAME nvCOMP - VERSION ${VERSION} - GIT_REPOSITORY https://github.com/NVIDIA/nvcomp.git - GIT_TAG v${VERSION} - GIT_SHALLOW TRUE) + # Find or install nvcomp + CPMFindPackage(NAME nvcomp + VERSION ${VERSION} + GITHUB_REPOSITORY NVIDIA/nvcomp + GIT_TAG f5b8dee714bd2970d8230efa95f337c91f080257 + GIT_SHALLOW TRUE + OPTIONS "BUILD_TESTS OFF" + "BUILD_BENCHMARKS OFF" + "BUILD_EXAMPLES OFF" + ) - if(NOT TARGET nvCOMP::nvcomp) - add_library(nvCOMP::nvcomp ALIAS nvcomp) + if(NOT TARGET nvcomp::nvcomp) + add_library(nvcomp::nvcomp ALIAS nvcomp) endif() - # Make sure consumers of cudf can also see nvCOMP::nvcomp target - fix_cmake_global_defaults(nvCOMP::nvcomp) + # Make sure consumers of cudf can also see nvcomp::nvcomp target + fix_cmake_global_defaults(nvcomp::nvcomp) endfunction() -set(CUDF_MIN_VERSION_nvCOMP 2.0.0) +set(CUDF_MIN_VERSION_nvCOMP 2.1.0) find_and_configure_nvcomp(${CUDF_MIN_VERSION_nvCOMP}) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 5530769a819..06d75d8b575 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -898,9 +898,9 @@ void snappy_compress(device_span comp_in, size_t num_comp_pages = comp_in.size(); do { size_t temp_size; - nvcompError_t nvcomp_error = - nvcompBatchedSnappyCompressGetTempSize(num_comp_pages, max_page_uncomp_data_size, &temp_size); - if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } + nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( + num_comp_pages, max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &temp_size); + if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } // Not needed now but nvcomp API makes no promises about future rmm::device_buffer scratch(temp_size, stream); @@ -926,16 +926,18 @@ void snappy_compress(device_span comp_in, [] __device__(gpu_inflate_input_s in) { return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); }); - nvcomp_error = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), - uncompressed_data_sizes.data(), - num_comp_pages, - scratch.data(), // Not needed rn but future - scratch.size(), - compressed_data_ptrs.data(), - compressed_bytes_written.data(), - stream.value()); - - if (nvcomp_error != nvcompError_t::nvcompSuccess) { break; } + nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + max_page_uncomp_data_size, + num_comp_pages, + scratch.data(), // Not needed rn but future + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data(), + nvcompBatchedSnappyDefaultOpts, + stream.value()); + + if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, // compression will succeed. @@ -1257,8 +1259,9 @@ void writer::impl::write(table_view const& table) size_t max_page_comp_data_size = 0; if (compression_ != parquet::Compression::UNCOMPRESSED) { CUDF_EXPECTS( - nvcompError_t::nvcompSuccess == nvcompBatchedSnappyCompressGetOutputSize( - max_page_uncomp_data_size, &max_page_comp_data_size), + nvcompStatus_t::nvcompSuccess == + nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size), "Error in getting compressed size from nvcomp"); } From 4a2cb245161059bfcc47e52730743d0c6804443f Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 25 Aug 2021 01:32:14 +0530 Subject: [PATCH 09/18] One more cmake change related to updating nvcomp --- cpp/cmake/cudf-build-config.cmake.in | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cpp/cmake/cudf-build-config.cmake.in b/cpp/cmake/cudf-build-config.cmake.in index a52881459f5..4b5ad8ebb8d 100644 --- a/cpp/cmake/cudf-build-config.cmake.in +++ b/cpp/cmake/cudf-build-config.cmake.in @@ -81,13 +81,6 @@ else() include(@CUDF_SOURCE_DIR@/cmake/thirdparty/CUDF_GetGTest.cmake) endif() -# find nvCOMP -if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") - include("${CMAKE_CURRENT_LIST_DIR}/cudf-nvcomp-target.cmake") -else() - include(@CUDF_SOURCE_DIR@/cmake/thirdparty/CUDF_GetnvCOMP.cmake) -endif() - list(POP_FRONT CMAKE_MODULE_PATH) From 6019b0f22b1220a304c3e45fb0fc09bc6faffddb Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Tue, 31 Aug 2021 23:00:45 +0530 Subject: [PATCH 10/18] Update nvcomp to version with fix for snappy decompressor --- cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake index c88d462e959..fc1a5e583a9 100644 --- a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake +++ b/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake @@ -24,7 +24,7 @@ function(find_and_configure_nvcomp VERSION) CPMFindPackage(NAME nvcomp VERSION ${VERSION} GITHUB_REPOSITORY NVIDIA/nvcomp - GIT_TAG f5b8dee714bd2970d8230efa95f337c91f080257 + GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 GIT_SHALLOW TRUE OPTIONS "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" From 140d3d0fa0b029f1f9cfd4842f0dd267a0d58a0c Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 3 Sep 2021 02:01:47 +0530 Subject: [PATCH 11/18] Fix allocation size bug When writing statistics, there's not enough space allocated in chunk's compressed buffer. This results in the compressed buffer being written into another chunk's memory. --- cpp/src/io/parquet/page_enc.cu | 1 - cpp/src/io/parquet/writer_impl.cu | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index d88ca7a3b2a..5b1d8c846bf 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -423,7 +423,6 @@ __global__ void __launch_bounds__(128) } ck_g.num_pages = num_pages; ck_g.bfr_size = page_offset; - ck_g.compressed_size = comp_page_offset; ck_g.page_headers_size = page_headers_size; ck_g.max_page_data_size = max_page_data_size; pagestats_g.start_chunk = ck_g.first_page + ck_g.use_dictionary; // Exclude dictionary diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 06d75d8b575..fff247fd47f 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1285,7 +1285,8 @@ void writer::impl::write(table_view const& table) num_pages += ck->num_pages; pages_in_batch += ck->num_pages; rowgroup_size += ck->bfr_size; - ck->compressed_size = ck->page_headers_size + max_page_comp_data_size * ck->num_pages; + ck->compressed_size = + ck->ck_stat_size + ck->page_headers_size + max_page_comp_data_size * ck->num_pages; comp_rowgroup_size += ck->compressed_size; max_chunk_bfr_size = std::max(max_chunk_bfr_size, (size_t)std::max(ck->bfr_size, ck->compressed_size)); From 62d92b4bb9f019dcc9079b52e8b8c9d6e4139446 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 4 Sep 2021 00:42:20 +0530 Subject: [PATCH 12/18] Update cmake to find nvcomp in new manner --- cpp/CMakeLists.txt | 2 +- .../{CUDF_GetnvCOMP.cmake => get_nvcomp.cmake} | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) rename cpp/cmake/thirdparty/{CUDF_GetnvCOMP.cmake => get_nvcomp.cmake} (74%) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 4c4fd39e612..259cc884ee1 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -115,7 +115,7 @@ rapids_cpm_init() # find jitify include(cmake/thirdparty/get_jitify.cmake) # find nvCOMP -include(cmake/thirdparty/CUDF_GetnvCOMP.cmake) +include(cmake/thirdparty/get_nvcomp.cmake) # find thrust/cub include(cmake/thirdparty/get_thrust.cmake) # find rmm diff --git a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake b/cpp/cmake/thirdparty/get_nvcomp.cmake similarity index 74% rename from cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake rename to cpp/cmake/thirdparty/get_nvcomp.cmake index fc1a5e583a9..ae83a78c3fb 100644 --- a/cpp/cmake/thirdparty/CUDF_GetnvCOMP.cmake +++ b/cpp/cmake/thirdparty/get_nvcomp.cmake @@ -22,21 +22,21 @@ function(find_and_configure_nvcomp VERSION) # Find or install nvcomp CPMFindPackage(NAME nvcomp + GLOBAL_TARGETS nvcomp::nvcomp VERSION ${VERSION} - GITHUB_REPOSITORY NVIDIA/nvcomp - GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 - GIT_SHALLOW TRUE - OPTIONS "BUILD_TESTS OFF" - "BUILD_BENCHMARKS OFF" - "BUILD_EXAMPLES OFF" + CPM_ARGS + GITHUB_REPOSITORY NVIDIA/nvcomp + GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 + # GIT_SHALLOW TRUE + OPTIONS "BUILD_TESTS OFF" + "BUILD_BENCHMARKS OFF" + "BUILD_EXAMPLES OFF" ) if(NOT TARGET nvcomp::nvcomp) add_library(nvcomp::nvcomp ALIAS nvcomp) endif() - # Make sure consumers of cudf can also see nvcomp::nvcomp target - fix_cmake_global_defaults(nvcomp::nvcomp) endfunction() set(CUDF_MIN_VERSION_nvCOMP 2.1.0) From 3c73be31cac6617604cf2d652900a402fc01395b Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 8 Sep 2021 03:14:25 +0530 Subject: [PATCH 13/18] Make nvcomp private in cmake and update get_nvcomp --- cpp/CMakeLists.txt | 6 +++--- cpp/cmake/thirdparty/get_nvcomp.cmake | 7 +------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 259cc884ee1..1ec166653f0 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -510,9 +510,9 @@ target_link_libraries(cudf PUBLIC ZLIB::ZLIB ${ARROW_LIBRARIES} cudf::Thrust - nvcomp::nvcomp rmm::rmm - PRIVATE cuco::cuco) + PRIVATE cuco::cuco + nvcomp::nvcomp) # Add Conda library, and include paths if specified if(TARGET conda_env) @@ -751,7 +751,7 @@ rapids_export(BUILD cudf EXPORT_SET cudf-exports GLOBAL_TARGETS cudf NAMESPACE cudf:: - FINAL_CODE_BLOCK code_string) + FINAL_CODE_BLOCK build_code_string) export(EXPORT cudf-testing-exports FILE ${CUDF_BINARY_DIR}/cudf-testing.cmake diff --git a/cpp/cmake/thirdparty/get_nvcomp.cmake b/cpp/cmake/thirdparty/get_nvcomp.cmake index ae83a78c3fb..d9dd032875d 100644 --- a/cpp/cmake/thirdparty/get_nvcomp.cmake +++ b/cpp/cmake/thirdparty/get_nvcomp.cmake @@ -16,14 +16,9 @@ function(find_and_configure_nvcomp VERSION) - if(TARGET nvcomp::nvcomp) - return() - endif() - # Find or install nvcomp - CPMFindPackage(NAME nvcomp + rapids_cpm_find(nvcomp ${VERSION} GLOBAL_TARGETS nvcomp::nvcomp - VERSION ${VERSION} CPM_ARGS GITHUB_REPOSITORY NVIDIA/nvcomp GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 From e0a013d6bc14bb397dc82d0410f273260d4ff3d3 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 8 Sep 2021 17:43:31 +0530 Subject: [PATCH 14/18] Add an env var flip switch to choose b/w nvcomp and inbuilt compressor --- cpp/src/io/parquet/writer_impl.cu | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index fff247fd47f..831e082a8bf 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -988,9 +988,15 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks device_span comp_stat{compression_status.data(), compression_status.size()}; gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); + auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP"); + bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0; switch (compression_) { case parquet::Compression::SNAPPY: - snappy_compress(comp_in, comp_stat, max_page_uncomp_data_size, stream); + if (use_nvcomp) { + snappy_compress(comp_in, comp_stat, max_page_uncomp_data_size, stream); + } else { + CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); + } break; default: break; } From bfa13661dc7ebb43972b773d0aa992f7478afb32 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 8 Sep 2021 22:20:29 +0530 Subject: [PATCH 15/18] Static linking nvcomp into libcudf --- cpp/CMakeLists.txt | 2 ++ cpp/cmake/thirdparty/get_nvcomp.cmake | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 18e8e14efb1..ad93a24a5d8 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -505,6 +505,8 @@ target_compile_definitions(cudf PUBLIC "SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_${RMM_L # Compile stringified JIT sources first add_dependencies(cudf jitify_preprocess_run) +set_target_properties(nvcomp PROPERTIES POSITION_INDEPENDENT_CODE ON) + # Specify the target module library dependencies target_link_libraries(cudf PUBLIC ZLIB::ZLIB diff --git a/cpp/cmake/thirdparty/get_nvcomp.cmake b/cpp/cmake/thirdparty/get_nvcomp.cmake index d9dd032875d..49a1bac7ee4 100644 --- a/cpp/cmake/thirdparty/get_nvcomp.cmake +++ b/cpp/cmake/thirdparty/get_nvcomp.cmake @@ -23,7 +23,8 @@ function(find_and_configure_nvcomp VERSION) GITHUB_REPOSITORY NVIDIA/nvcomp GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 # GIT_SHALLOW TRUE - OPTIONS "BUILD_TESTS OFF" + OPTIONS "BUILD_STATIC ON" + "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" "BUILD_EXAMPLES OFF" ) From 203cf151240c8fcc3bbe7cc424cd7c255f314a71 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 10 Sep 2021 02:19:41 +0530 Subject: [PATCH 16/18] Review changes --- cpp/src/io/parquet/writer_impl.cu | 33 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 831e082a8bf..0382a7bb7ba 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -890,17 +890,19 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& stream.synchronize(); } -void snappy_compress(device_span comp_in, +void snappy_compress(device_span comp_in, device_span comp_stat, size_t max_page_uncomp_data_size, rmm::cuda_stream_view stream) { size_t num_comp_pages = comp_in.size(); - do { + try { size_t temp_size; nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( num_comp_pages, max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &temp_size); - if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, + "Error in getting snappy compression scratch size"); // Not needed now but nvcomp API makes no promises about future rmm::device_buffer scratch(temp_size, stream); @@ -937,7 +939,7 @@ void snappy_compress(device_span comp_in, nvcompBatchedSnappyDefaultOpts, stream.value()); - if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, // compression will succeed. @@ -952,13 +954,13 @@ void snappy_compress(device_span comp_in, return status; }); return; - } while (0); - - // If we reach this then there was an error in compressing so set an error status for each page - thrust::for_each(rmm::exec_policy(stream), - comp_stat.begin(), - comp_stat.end(), - [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + } catch (...) { + // If we reach this then there was an error in compressing so set an error status for each page + thrust::for_each(rmm::exec_policy(stream), + comp_stat.begin(), + comp_stat.end(), + [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + }; } void writer::impl::encode_pages(hostdevice_2dvector& chunks, @@ -1264,11 +1266,10 @@ void writer::impl::write(table_view const& table) size_t max_page_comp_data_size = 0; if (compression_ != parquet::Compression::UNCOMPRESSED) { - CUDF_EXPECTS( - nvcompStatus_t::nvcompSuccess == - nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size), - "Error in getting compressed size from nvcomp"); + auto status = nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size); + CUDF_EXPECTS(status == nvcompStatus_t::nvcompSuccess, + "Error in getting compressed size from nvcomp"); } // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) From 99e4f80b79f3d8118996b5ce91431731712b5ec7 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 11 Sep 2021 01:44:03 +0530 Subject: [PATCH 17/18] Working orc reader with nvcomp --- cpp/src/io/orc/orc_gpu.h | 10 ++-- cpp/src/io/orc/reader_impl.cu | 86 +++++++++++++++++++++++++++++++++-- cpp/src/io/orc/stripe_init.cu | 26 ++++++----- 3 files changed, 103 insertions(+), 19 deletions(-) diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 004812615eb..30687331c15 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -48,7 +48,8 @@ struct CompressedStreamInfo { copyctl(nullptr), num_compressed_blocks(0), num_uncompressed_blocks(0), - max_uncompressed_size(0) + max_uncompressed_size(0), + max_uncompressed_block_size(0) { } const uint8_t* compressed_data; // [in] base ptr to compressed stream data @@ -60,9 +61,10 @@ struct CompressedStreamInfo { copyctl; // [in] base ptr to copy structure to be filled for uncompressed blocks uint32_t num_compressed_blocks; // [in,out] number of entries in decctl(in), number of compressed // blocks(out) - uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of - // uncompressed blocks(out) - uint64_t max_uncompressed_size; // [out] maximum uncompressed data size + uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of + // uncompressed blocks(out) + uint64_t max_uncompressed_size; // [out] maximum uncompressed data size of stream + uint32_t max_uncompressed_block_size; // [out] maximum uncompressed size of any block in stream }; enum StreamIndexType { diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 83be58f5e56..e6d747be606 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -38,6 +38,8 @@ #include #include +#include + #include #include @@ -552,6 +554,68 @@ class aggregate_orc_metadata { } }; +void snappy_decompress(device_span comp_in, + device_span comp_stat, + size_t max_uncomp_page_size, + rmm::cuda_stream_view stream) +{ + size_t num_blocks = comp_in.size(); + size_t temp_size; + + auto status = + nvcompBatchedSnappyDecompressGetTempSize(num_blocks, max_uncomp_page_size, &temp_size); + CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, + "Unable to get scratch size for snappy decompression"); + + rmm::device_buffer scratch(temp_size, stream); + rmm::device_uvector compressed_data_ptrs(num_blocks, stream); + rmm::device_uvector compressed_data_sizes(num_blocks, stream); + rmm::device_uvector uncompressed_data_ptrs(num_blocks, stream); + rmm::device_uvector uncompressed_data_sizes(num_blocks, stream); + + rmm::device_uvector actual_uncompressed_data_sizes(num_blocks, stream); + rmm::device_uvector statuses(num_blocks, stream); + + // Prepare the vectors + auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(), + compressed_data_sizes.begin(), + uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.data()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); + }); + + status = nvcompBatchedSnappyDecompressAsync(compressed_data_ptrs.data(), + compressed_data_sizes.data(), + uncompressed_data_sizes.data(), + actual_uncompressed_data_sizes.data(), + num_blocks, + scratch.data(), + scratch.size(), + uncompressed_data_ptrs.data(), + statuses.data(), + stream.value()); + CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, "unable to perform snappy decompression"); + + CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream), + statuses.begin(), + statuses.end(), + thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)), + "Error during snappy decompression"); + thrust::for_each_n( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + num_blocks, + [=, actual_uncomp_sizes = actual_uncompressed_data_sizes.data()] __device__(auto i) { + comp_stat[i].bytes_written = actual_uncomp_sizes[i]; + comp_stat[i].status = 0; + }); +} + rmm::device_buffer reader::impl::decompress_stripe_data( cudf::detail::hostdevice_2dvector& chunks, const std::vector& stripe_data, @@ -595,9 +659,10 @@ rmm::device_buffer reader::impl::decompress_stripe_data( rmm::device_uvector inflate_out(num_compressed_blocks, stream); // Parse again to populate the decompression input/output buffers - size_t decomp_offset = 0; - uint32_t start_pos = 0; - uint32_t start_pos_uncomp = (uint32_t)num_compressed_blocks; + size_t decomp_offset = 0; + uint32_t max_uncomp_block_size = 0; + uint32_t start_pos = 0; + uint32_t start_pos_uncomp = (uint32_t)num_compressed_blocks; for (size_t i = 0; i < compinfo.size(); ++i) { auto dst_base = static_cast(decomp_data.data()); compinfo[i].uncompressed_data = dst_base + decomp_offset; @@ -609,6 +674,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( decomp_offset += compinfo[i].max_uncompressed_size; start_pos += compinfo[i].num_compressed_blocks; start_pos_uncomp += compinfo[i].num_uncompressed_blocks; + max_uncomp_block_size = + std::max(max_uncomp_block_size, compinfo[i].max_uncompressed_block_size); } compinfo.host_to_device(stream); gpu::ParseCompressedStripeData(compinfo.device_ptr(), @@ -619,13 +686,24 @@ rmm::device_buffer reader::impl::decompress_stripe_data( // Dispatch batches of blocks to decompress if (num_compressed_blocks > 0) { + auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP"); + bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0; switch (decompressor->GetKind()) { case orc::ZLIB: CUDA_TRY( gpuinflate(inflate_in.data(), inflate_out.data(), num_compressed_blocks, 0, stream)); break; case orc::SNAPPY: - CUDA_TRY(gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream)); + if (use_nvcomp) { + device_span inflate_in_view{inflate_in.data(), + num_compressed_blocks}; + device_span inflate_out_view{inflate_out.data(), + num_compressed_blocks}; + snappy_decompress(inflate_in_view, inflate_out_view, max_uncomp_block_size, stream); + } else { + CUDA_TRY( + gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream)); + } break; default: CUDF_EXPECTS(false, "Unexpected decompression dispatch"); break; } diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index abef4f57894..94d8de6561b 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -45,12 +45,13 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat __syncthreads(); if (strm_id < num_streams) { // Walk through the compressed blocks - const uint8_t* cur = s->info.compressed_data; - const uint8_t* end = cur + s->info.compressed_data_size; - uint8_t* uncompressed = s->info.uncompressed_data; - size_t max_uncompressed_size = 0; - uint32_t num_compressed_blocks = 0; - uint32_t num_uncompressed_blocks = 0; + const uint8_t* cur = s->info.compressed_data; + const uint8_t* end = cur + s->info.compressed_data_size; + uint8_t* uncompressed = s->info.uncompressed_data; + size_t max_uncompressed_size = 0; + uint32_t max_uncompressed_block_size = 0; + uint32_t num_compressed_blocks = 0; + uint32_t num_uncompressed_blocks = 0; while (cur + 3 < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); uint32_t is_uncompressed = block_len & 1; @@ -60,8 +61,9 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat cur += 3; if (block_len > block_size || cur + block_len > end) { // Fatal - num_compressed_blocks = 0; - max_uncompressed_size = 0; + num_compressed_blocks = 0; + max_uncompressed_size = 0; + max_uncompressed_block_size = 0; break; } // TBD: For some codecs like snappy, it wouldn't be too difficult to get the actual @@ -102,12 +104,14 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat if (init_ctl && lane_id == 0) *init_ctl = s->ctl; cur += block_len; max_uncompressed_size += uncompressed_size; + max_uncompressed_block_size = max(max_uncompressed_block_size, uncompressed_size); } __syncwarp(); if (!lane_id) { - s->info.num_compressed_blocks = num_compressed_blocks; - s->info.num_uncompressed_blocks = num_uncompressed_blocks; - s->info.max_uncompressed_size = max_uncompressed_size; + s->info.num_compressed_blocks = num_compressed_blocks; + s->info.num_uncompressed_blocks = num_uncompressed_blocks; + s->info.max_uncompressed_size = max_uncompressed_size; + s->info.max_uncompressed_block_size = max_uncompressed_block_size; } } From 6721fb8fb1e44d2de6e29fce838e6c88e373932a Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 13 Sep 2021 22:10:41 +0530 Subject: [PATCH 18/18] Merge changes from nvcomp -fPIC --- cpp/CMakeLists.txt | 2 -- cpp/cmake/thirdparty/get_nvcomp.cmake | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ad93a24a5d8..18e8e14efb1 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -505,8 +505,6 @@ target_compile_definitions(cudf PUBLIC "SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_${RMM_L # Compile stringified JIT sources first add_dependencies(cudf jitify_preprocess_run) -set_target_properties(nvcomp PROPERTIES POSITION_INDEPENDENT_CODE ON) - # Specify the target module library dependencies target_link_libraries(cudf PUBLIC ZLIB::ZLIB diff --git a/cpp/cmake/thirdparty/get_nvcomp.cmake b/cpp/cmake/thirdparty/get_nvcomp.cmake index 49a1bac7ee4..cade101cbfd 100644 --- a/cpp/cmake/thirdparty/get_nvcomp.cmake +++ b/cpp/cmake/thirdparty/get_nvcomp.cmake @@ -21,8 +21,7 @@ function(find_and_configure_nvcomp VERSION) GLOBAL_TARGETS nvcomp::nvcomp CPM_ARGS GITHUB_REPOSITORY NVIDIA/nvcomp - GIT_TAG 3a12516afdeab4ace01298031757f84b8dda81b7 - # GIT_SHALLOW TRUE + GIT_TAG 4f4e5713e69473be6e0c8ae483a932f666ae3c2f OPTIONS "BUILD_STATIC ON" "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF"