From ddb59cb2c03a17e54d657daca7bae843312e6bbf Mon Sep 17 00:00:00 2001 From: Rahul Prabhu Date: Thu, 27 Jun 2024 11:42:04 -0700 Subject: [PATCH] Added Null Memsest --- cpp/src/io/parquet/reader_impl.cpp | 2 ++ cpp/src/io/parquet/reader_impl_preprocess.cu | 21 ++++++++++++++++--- cpp/src/io/utilities/column_buffer.cpp | 6 +++--- cpp/src/io/utilities/column_buffer.hpp | 7 ++----- cpp/src/io/utilities/multibuffer_memset.cu | 19 +++++++++-------- .../multibuffer_memset_tests.cpp | 2 +- 6 files changed, 36 insertions(+), 21 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 1bd2fae281c..c117152e225 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -24,6 +24,8 @@ #include #include +#include + #include #include diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 9df5c362cdd..ca30a77e920 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -22,6 +22,9 @@ #include #include +#include + + #include #include @@ -1482,6 +1485,9 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num // buffers if they are not part of a list hierarchy. mark down // if we have any list columns that need further processing. bool has_lists = false; + std::vector> memset_bufs; + std::vector> nullmask_bufs; + for (size_t idx = 0; idx < _input_columns.size(); idx++) { auto const& input_col = _input_columns[idx]; size_t const max_depth = input_col.nesting_depth(); @@ -1502,13 +1508,14 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num // we're going to start null mask as all valid and then turn bits off if necessary out_buf.create_with_mask( out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows, - cudf::mask_state::ALL_VALID, + cudf::mask_state::UNINITIALIZED, _stream, _mr); + memset_bufs.push_back(cudf::device_span((uint8_t *)(out_buf.data()), out_buf.data_size())); + nullmask_bufs.push_back(cudf::device_span((uint8_t *)(out_buf.null_mask()), out_buf.null_mask_size())); } } } - // compute output column sizes by examining the pages of the -input- columns if (has_lists) { std::vector h_cols_info; @@ -1581,11 +1588,19 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num // allocate // we're going to start null mask as all valid and then turn bits off if necessary - out_buf.create_with_mask(size, cudf::mask_state::ALL_VALID, _stream, _mr); + out_buf.create_with_mask(size, cudf::mask_state::UNINITIALIZED, _stream, _mr); + memset_bufs.push_back(cudf::device_span(static_cast(out_buf.data()), out_buf.data_size())); + nullmask_bufs.push_back(cudf::device_span((uint8_t *)(out_buf.null_mask()), out_buf.null_mask_size())); + } } } } + // for (size_t i = 0; i < memset_bufs.size(); i++) { + // fprintf(stderr, "Buf %lu size: %lu \n", i, memset_bufs[i].size()); + // } + multibuffer_memset(memset_bufs, 0, _stream, _mr); + multibuffer_memset(nullmask_bufs, 0xFF, _stream, _mr); } std::vector reader::impl::calculate_page_string_offsets() diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index f76ad188fd1..35908335357 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -60,7 +60,7 @@ void cudf::io::detail::inline_column_buffer::allocate_strings_data(rmm::cuda_str { CUDF_EXPECTS(type.id() == type_id::STRING, "allocate_strings_data called for non-string column"); // size + 1 for final offset. _string_data will be initialized later. - _data = create_data(data_type{type_id::INT32}, size + 1, stream, _mr, true); + _data = create_data(data_type{type_id::INT32}, size + 1, stream, _mr); } void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes, @@ -103,12 +103,12 @@ void column_buffer_base::create_with_mask(size_type _size, // list columns store a buffer of int32's as offsets to represent // their individual rows - case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, _mr, false); break; + case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, _mr); break; // struct columns store no data themselves. just validity and children. case type_id::STRUCT: break; - default: _data = create_data(type, size, stream, _mr, false); break; + default: _data = create_data(type, size, stream, _mr); break; } if (is_nullable) { _null_mask = diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index b97c7fddee9..c69ea0439ba 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -52,15 +52,12 @@ namespace detail { inline rmm::device_buffer create_data(data_type type, size_type size, rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr, - bool is_string) + rmm::device_async_resource_ref mr) { std::size_t data_size = size_of(type) * size; rmm::device_buffer data(data_size, stream, mr); - if (is_string) { - CUDF_CUDA_TRY(cudaMemsetAsync(data.data(), 0, data_size, stream.value())); - } + // CUDF_CUDA_TRY(cudaMemsetAsync(data.data(), 0, data_size, stream.value())); return data; } diff --git a/cpp/src/io/utilities/multibuffer_memset.cu b/cpp/src/io/utilities/multibuffer_memset.cu index d71f4a639a1..395538a27a1 100644 --- a/cpp/src/io/utilities/multibuffer_memset.cu +++ b/cpp/src/io/utilities/multibuffer_memset.cu @@ -36,7 +36,7 @@ struct memset_task { uint64_t size; - uint8_t * data; + uint64_t * data; } typedef memset_task; // 1 task == 1 block @@ -47,15 +47,14 @@ __global__ void memset_kernel(memset_task * tasks, int8_t value) // block stride over task.begin, task.end auto buf = task.data; uint64_t end = task.size; - uint64_t bytes_left = (task.size + blockDim.x - 1) / blockDim.x; - bytes_left *= 8; + uint64_t memsets_left = (task.size + blockDim.x - 1) / blockDim.x; uint64_t t = threadIdx.x; - while(bytes_left > 0){ + while(memsets_left > 0){ if (t < end) { buf[t] = value; } t += blockDim.x; - bytes_left -= 1; + memsets_left -= 1; } } @@ -104,19 +103,21 @@ void multibuffer_memset(std::vector> & bufs, tasks.begin(), cuda::proclaim_return_type( [offsets = offsets.data(), offset_size = num_bufs + 1, gpu_bufs = gpu_bufs.data(), bytes_per_task] __device__(cudf::size_type task) { - auto buf_idx = thrust::lower_bound(thrust::seq, offsets, offsets + offset_size, task); + auto buf_idx = thrust::upper_bound(thrust::seq, offsets, offsets + offset_size, task); if (*buf_idx > task) {buf_idx -= 1;} size_t start = (task - *buf_idx) * bytes_per_task; size_t end = (start + bytes_per_task) <= gpu_bufs[buf_idx - offsets].size() ? start + bytes_per_task : gpu_bufs[buf_idx - offsets].size(); memset_task ret; - ret.size = end - start; - ret.data = gpu_bufs[buf_idx - offsets].data() + start; + ret.size = (end - start) / 8; + ret.data = (uint64_t *)(gpu_bufs[buf_idx - offsets].data() + start); return ret; } ) ); // launch cuda kernel - memset_kernel<<>>(tasks.data(), value); + if (total_tasks != 0) { + memset_kernel<<>>(tasks.data(), value); + } } diff --git a/cpp/tests/utilities_tests/multibuffer_memset_tests.cpp b/cpp/tests/utilities_tests/multibuffer_memset_tests.cpp index ed10ad3fcf4..4ab0daf33fb 100644 --- a/cpp/tests/utilities_tests/multibuffer_memset_tests.cpp +++ b/cpp/tests/utilities_tests/multibuffer_memset_tests.cpp @@ -81,7 +81,7 @@ TEST(MultiBufferTestIntegral, BasicTest2) TEST(MultiBufferTestIntegral, BasicTest3) { long NUM_BUFS = 8; - std::vector BUF_SIZES{100, 200, 300, 400, 500, 600, 700, 800}; + std::vector BUF_SIZES{100, 0, 300, 400, 500, 600, 700, 800}; // Device init std::vector> bufs;