Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch memcpy the last offsets for output buffers of str and list cols in PQ reader #16905

Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
74ee6ae
Add capability to batch memcpy the last offsets to str and list out_bufs
mhaseeb123 Sep 25, 2024
cab885d
Move `WriteFinalOffsetsBatched` out of the for loop
mhaseeb123 Sep 25, 2024
b15e3d3
Generalize the API and ORC changes by @vuule
mhaseeb123 Sep 25, 2024
50dcd71
Use make_zeroed_device_uvector_async instead
mhaseeb123 Sep 25, 2024
bd44ca0
Merge branch 'branch-24.12' into fea-batch-memcpy-list-str-output-buf…
mhaseeb123 Sep 26, 2024
800b271
Add gtest for batched_memcpy
mhaseeb123 Sep 26, 2024
31a755b
Update cpp/include/cudf/io/detail/batched_memcpy.hpp
mhaseeb123 Sep 26, 2024
b29329b
Update cpp/include/cudf/io/detail/batched_memcpy.hpp
mhaseeb123 Sep 26, 2024
4efb989
Comments update
mhaseeb123 Sep 26, 2024
cc2829f
Address reviewer comments
mhaseeb123 Sep 27, 2024
78d68a8
Style fix
mhaseeb123 Sep 27, 2024
d42da45
Remove the unnecessary iterator
mhaseeb123 Sep 27, 2024
8d5640d
Move batched_memxxx to include/detail/utilities
mhaseeb123 Sep 27, 2024
9e063af
Minor changes from reviews
mhaseeb123 Sep 30, 2024
cf98118
Merge branch 'branch-24.12' into fea-batch-memcpy-list-str-output-buf…
mhaseeb123 Sep 30, 2024
2372fbb
Minor updates
mhaseeb123 Sep 30, 2024
6100c94
Merge branch 'fea-batch-memcpy-list-str-output-buff-offsets' of https…
mhaseeb123 Sep 30, 2024
4ea0930
Minor comment update
mhaseeb123 Oct 1, 2024
3eea6e2
Minor comment update
mhaseeb123 Oct 1, 2024
6d078c2
Style fix and add to CI.
mhaseeb123 Oct 1, 2024
1cc4e1f
Revert erroneous commit
mhaseeb123 Oct 1, 2024
042cfc0
Update cpp/include/cudf/detail/utilities/batched_memcpy.hpp
mhaseeb123 Oct 2, 2024
eee6f6d
Apply suggestions from review
mhaseeb123 Oct 2, 2024
828e0ac
Minor updates from review
mhaseeb123 Oct 2, 2024
ecc4252
Minor
mhaseeb123 Oct 2, 2024
4bd83db
Merge branch 'branch-24.12' into fea-batch-memcpy-list-str-output-buf…
mhaseeb123 Oct 2, 2024
871854b
Update cpp/src/io/parquet/page_data.cu
mhaseeb123 Oct 3, 2024
3e30777
Comments update.
mhaseeb123 Oct 3, 2024
16540a1
Merge branch 'branch-24.12' into fea-batch-memcpy-list-str-output-buf…
mhaseeb123 Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,6 @@ ConfigureNVBench(JSON_READER_NVBENCH io/json/nested_json.cpp io/json/json_reader
ConfigureNVBench(JSON_READER_OPTION_NVBENCH io/json/json_reader_option.cpp)
ConfigureNVBench(JSON_WRITER_NVBENCH io/json/json_writer.cpp)

# ##################################################################################################
# * multi buffer memset benchmark
# ----------------------------------------------------------------------
ConfigureNVBench(BATCHED_MEMSET_BENCH io/utilities/batched_memset_bench.cpp)
Copy link
Member Author

@mhaseeb123 mhaseeb123 Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simply reimpl of one of the benchmarks in parquet_reader_input.cpp as is so removing altogether.


# ##################################################################################################
# * io benchmark ---------------------------------------------------------------------
ConfigureNVBench(MULTIBYTE_SPLIT_NVBENCH io/text/multibyte_split.cpp)
Expand Down
101 changes: 0 additions & 101 deletions cpp/benchmarks/io/utilities/batched_memset_bench.cpp

This file was deleted.

70 changes: 70 additions & 0 deletions cpp/include/cudf/detail/utilities/batched_memcpy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/detail/iterator.cuh>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>

#include <cub/device/device_memcpy.cuh>
#include <cuda/functional>
#include <thrust/iterator/constant_iterator.h>

namespace CUDF_EXPORT cudf {
namespace detail {

/**
* @brief A helper function that copies a vector of vectors from source to destination addresses in
* a batched manner.
*
* @tparam SrcIterator The type of the source address iterator
* @tparam DstIterator The type of the destination address iterator
* @tparam SizeIterator The type of the buffer size iterator
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param[in] src_iter Iterator to source addresses
* @param[in] dst_iter Iterator to destination addresses
* @param[in] size_iter Iterator to the vector sizes (in bytes)
* @param[in] num_buffs Number of buffers to be copied
* @param[in] stream CUDA stream to use
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
*/
template <typename SrcIterator, typename DstIterator, typename SizeIterator>
void batched_memcpy_async(SrcIterator src_iter,
DstIterator dst_iter,
SizeIterator size_iter,
size_t num_buffs,
rmm::cuda_stream_view stream)
{
// Get temp storage needed for cub::DeviceMemcpy::Batched
size_t temp_storage_bytes = 0;
cub::DeviceMemcpy::Batched(
nullptr, temp_storage_bytes, src_iter, dst_iter, size_iter, num_buffs, stream.value());

// Allocate temporary storage
rmm::device_buffer d_temp_storage{temp_storage_bytes, stream.value()};

// Perform copies
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
cub::DeviceMemcpy::Batched(d_temp_storage.data(),
temp_storage_bytes,
src_iter,
dst_iter,
size_iter,
num_buffs,
stream.value());
}

} // namespace detail
} // namespace CUDF_EXPORT cudf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <thrust/transform.h>

namespace CUDF_EXPORT cudf {
namespace io::detail {
namespace detail {

/**
* @brief A helper function that takes in a vector of device spans and memsets them to the
Expand Down Expand Up @@ -78,5 +78,5 @@ void batched_memset(std::vector<cudf::device_span<T>> const& bufs,
d_temp_storage.data(), temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream);
}

} // namespace io::detail
} // namespace detail
} // namespace CUDF_EXPORT cudf
63 changes: 43 additions & 20 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "orc_gpu.hpp"

#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/utilities/batched_memcpy.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/logger.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
Expand Down Expand Up @@ -1087,37 +1088,42 @@ CUDF_KERNEL void __launch_bounds__(block_size)
/**
* @brief Merge chunked column data into a single contiguous stream
*
* @param[in,out] strm_desc StripeStream device array [stripe][stream]
* @param[in,out] streams List of encoder chunk streams [column][rowgroup]
* @param[in] strm_desc StripeStream device array [stripe][stream]
* @param[in] streams List of encoder chunk streams [column][rowgroup]
* @param[out] srcs List of source encoder chunk stream data addresses
* @param[out] dsts List of destination StripeStream data addresses
* @param[out] sizes List of stream sizes in bytes
*/
// blockDim {compact_streams_block_size,1,1}
CUDF_KERNEL void __launch_bounds__(compact_streams_block_size)
gpuCompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> streams)
gpuInitBatchedMemcpy(device_2dspan<StripeStream const> strm_desc,
device_2dspan<encoder_chunk_streams> streams,
device_span<uint8_t*> srcs,
device_span<uint8_t*> dsts,
device_span<size_t> sizes)
{
__shared__ __align__(16) StripeStream ss;

auto const stripe_id = blockIdx.x;
auto const stripe_id = blockIdx.x * compact_streams_block_size + threadIdx.x;
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto const stream_id = blockIdx.y;
auto const t = threadIdx.x;
if (stripe_id >= strm_desc.size().first) { return; }

if (t == 0) { ss = strm_desc[stripe_id][stream_id]; }
__syncthreads();
auto const out_id = stream_id * strm_desc.size().first + stripe_id;
StripeStream ss = strm_desc[stripe_id][stream_id];

if (ss.data_ptr == nullptr) { return; }

auto const cid = ss.stream_type;
auto dst_ptr = ss.data_ptr;
for (auto group = ss.first_chunk_id; group < ss.first_chunk_id + ss.num_chunks; ++group) {
auto const out_id = stream_id * streams.size().second + group;
srcs[out_id] = streams[ss.column_id][group].data_ptrs[cid];
dsts[out_id] = dst_ptr;

// Also update the stream here, data will be copied in a separate kernel
streams[ss.column_id][group].data_ptrs[cid] = dst_ptr;

auto const len = streams[ss.column_id][group].lengths[cid];
if (len > 0) {
auto const src_ptr = streams[ss.column_id][group].data_ptrs[cid];
for (uint32_t i = t; i < len; i += blockDim.x) {
dst_ptr[i] = src_ptr[i];
}
__syncthreads();
}
if (t == 0) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; }
// len is the size (in bytes) of the current stream.
sizes[out_id] = len;
dst_ptr += len;
}
}
Expand Down Expand Up @@ -1325,9 +1331,26 @@ void CompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> enc_streams,
rmm::cuda_stream_view stream)
{
auto const num_rowgroups = enc_streams.size().second;
auto const num_streams = strm_desc.size().second;
auto const num_stripes = strm_desc.size().first;
auto const num_chunks = num_rowgroups * num_streams;
auto srcs = cudf::detail::make_zeroed_device_uvector_async<uint8_t*>(
num_chunks, stream, rmm::mr::get_current_device_resource());
auto dsts = cudf::detail::make_zeroed_device_uvector_async<uint8_t*>(
num_chunks, stream, rmm::mr::get_current_device_resource());
auto lengths = cudf::detail::make_zeroed_device_uvector_async<size_t>(
num_chunks, stream, rmm::mr::get_current_device_resource());

dim3 dim_block(compact_streams_block_size, 1);
dim3 dim_grid(strm_desc.size().first, strm_desc.size().second);
gpuCompactOrcDataStreams<<<dim_grid, dim_block, 0, stream.value()>>>(strm_desc, enc_streams);
dim3 dim_grid(cudf::util::div_rounding_up_unsafe(num_stripes, compact_streams_block_size),
strm_desc.size().second);
gpuInitBatchedMemcpy<<<dim_grid, dim_block, 0, stream.value()>>>(
strm_desc, enc_streams, srcs, dsts, lengths);

// Copy streams in a batched manner.
cudf::detail::batched_memcpy_async(
srcs.begin(), dsts.begin(), lengths.begin(), lengths.size(), stream);
}

std::optional<writer_compression_statistics> CompressOrcDataStreams(
Expand Down
29 changes: 29 additions & 0 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "page_data.cuh"
#include "page_decode.cuh"

#include <cudf/detail/utilities/batched_memcpy.hpp>

#include <rmm/exec_policy.hpp>

#include <thrust/reduce.h>
Expand Down Expand Up @@ -466,4 +468,31 @@ void __host__ DecodeSplitPageData(cudf::detail::hostdevice_span<PageInfo> pages,
}
}

/**
* @copydoc cudf::io::parquet::detail::WriteFinalOffsets
*/
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
void WriteFinalOffsets(host_span<size_type const> offsets,
host_span<size_type* const> buff_addrs,
rmm::cuda_stream_view stream)
{
// Copy offsets to device and create an iterator
auto d_src_data = cudf::detail::make_device_uvector_async(
offsets, stream, cudf::get_current_device_resource_ref());
// Iterator for the source (scalar) data
auto src_iter = cudf::detail::make_counting_transform_iterator(
static_cast<std::size_t>(0),
cuda::proclaim_return_type<size_type*>(
[src = d_src_data.begin()] __device__(std::size_t i) { return src + i; }));

// Copy buffer addresses to device and create an iterator
auto d_dst_addrs = cudf::detail::make_device_uvector_async(
buff_addrs, stream, cudf::get_current_device_resource_ref());
// size_iter is simply a constant iterator of sizeof(size_type) bytes.
auto size_iter = thrust::make_constant_iterator(sizeof(size_type));

// Copy offsets to buffers in batched manner.
cudf::detail::batched_memcpy_async(
src_iter, d_dst_addrs.begin(), size_iter, offsets.size(), stream);
}

} // namespace cudf::io::parquet::detail
12 changes: 12 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,18 @@ void DecodeSplitPageData(cudf::detail::hostdevice_span<PageInfo> pages,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

/**
* @brief Writes the final offsets to the corresponding list and string buffer end addresses in a
* batched manner.
*
* @param offsets Host span of final offsets
* @param buff_addrs Host span of corresponding output col buffer end addresses
* @param stream CUDA stream to use
*/
void WriteFinalOffsets(host_span<size_type const> offsets,
host_span<size_type* const> buff_addrs,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel for reading the string column data stored in the pages
*
Expand Down
Loading
Loading