Skip to content

Commit

Permalink
Added Null Memsest
Browse files Browse the repository at this point in the history
  • Loading branch information
sdrp713 committed Jun 27, 2024
1 parent d263d14 commit ddb59cb
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 21 deletions.
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/detail/utilities.hpp>

#include <io/utilities/multibuffer_memset.hpp>

#include <rmm/resource_ref.hpp>

#include <thrust/iterator/counting_iterator.h>
Expand Down
21 changes: 18 additions & 3 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>

#include <io/utilities/multibuffer_memset.hpp>


#include <rmm/exec_policy.hpp>

#include <cuda/functional>
Expand Down Expand Up @@ -1482,6 +1485,9 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
// buffers if they are not part of a list hierarchy. mark down
// if we have any list columns that need further processing.
bool has_lists = false;
std::vector<cudf::device_span<uint8_t>> memset_bufs;
std::vector<cudf::device_span<uint8_t>> nullmask_bufs;

for (size_t idx = 0; idx < _input_columns.size(); idx++) {
auto const& input_col = _input_columns[idx];
size_t const max_depth = input_col.nesting_depth();
Expand All @@ -1502,13 +1508,14 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(
out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows,
cudf::mask_state::ALL_VALID,
cudf::mask_state::UNINITIALIZED,
_stream,
_mr);
memset_bufs.push_back(cudf::device_span<uint8_t>((uint8_t *)(out_buf.data()), out_buf.data_size()));
nullmask_bufs.push_back(cudf::device_span<uint8_t>((uint8_t *)(out_buf.null_mask()), out_buf.null_mask_size()));
}
}
}

// compute output column sizes by examining the pages of the -input- columns
if (has_lists) {
std::vector<input_col_info> h_cols_info;
Expand Down Expand Up @@ -1581,11 +1588,19 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num

// allocate
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(size, cudf::mask_state::ALL_VALID, _stream, _mr);
out_buf.create_with_mask(size, cudf::mask_state::UNINITIALIZED, _stream, _mr);
memset_bufs.push_back(cudf::device_span<uint8_t>(static_cast<uint8_t *>(out_buf.data()), out_buf.data_size()));
nullmask_bufs.push_back(cudf::device_span<uint8_t>((uint8_t *)(out_buf.null_mask()), out_buf.null_mask_size()));

}
}
}
}
// for (size_t i = 0; i < memset_bufs.size(); i++) {
// fprintf(stderr, "Buf %lu size: %lu \n", i, memset_bufs[i].size());
// }
multibuffer_memset(memset_bufs, 0, _stream, _mr);
multibuffer_memset(nullmask_bufs, 0xFF, _stream, _mr);
}

std::vector<size_t> reader::impl::calculate_page_string_offsets()
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void cudf::io::detail::inline_column_buffer::allocate_strings_data(rmm::cuda_str
{
CUDF_EXPECTS(type.id() == type_id::STRING, "allocate_strings_data called for non-string column");
// size + 1 for final offset. _string_data will be initialized later.
_data = create_data(data_type{type_id::INT32}, size + 1, stream, _mr, true);
_data = create_data(data_type{type_id::INT32}, size + 1, stream, _mr);
}

void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes,
Expand Down Expand Up @@ -103,12 +103,12 @@ void column_buffer_base<string_policy>::create_with_mask(size_type _size,

// list columns store a buffer of int32's as offsets to represent
// their individual rows
case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, _mr, false); break;
case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, _mr); break;

// struct columns store no data themselves. just validity and children.
case type_id::STRUCT: break;

default: _data = create_data(type, size, stream, _mr, false); break;
default: _data = create_data(type, size, stream, _mr); break;
}
if (is_nullable) {
_null_mask =
Expand Down
7 changes: 2 additions & 5 deletions cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,12 @@ namespace detail {
inline rmm::device_buffer create_data(data_type type,
size_type size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr,
bool is_string)
rmm::device_async_resource_ref mr)
{
std::size_t data_size = size_of(type) * size;

rmm::device_buffer data(data_size, stream, mr);
if (is_string) {
CUDF_CUDA_TRY(cudaMemsetAsync(data.data(), 0, data_size, stream.value()));
}
// CUDF_CUDA_TRY(cudaMemsetAsync(data.data(), 0, data_size, stream.value()));

return data;
}
Expand Down
19 changes: 10 additions & 9 deletions cpp/src/io/utilities/multibuffer_memset.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

struct memset_task {
uint64_t size;
uint8_t * data;
uint64_t * data;
} typedef memset_task;

// 1 task == 1 block
Expand All @@ -47,15 +47,14 @@ __global__ void memset_kernel(memset_task * tasks, int8_t value)
// block stride over task.begin, task.end
auto buf = task.data;
uint64_t end = task.size;
uint64_t bytes_left = (task.size + blockDim.x - 1) / blockDim.x;
bytes_left *= 8;
uint64_t memsets_left = (task.size + blockDim.x - 1) / blockDim.x;
uint64_t t = threadIdx.x;
while(bytes_left > 0){
while(memsets_left > 0){
if (t < end) {
buf[t] = value;
}
t += blockDim.x;
bytes_left -= 1;
memsets_left -= 1;
}
}

Expand Down Expand Up @@ -104,19 +103,21 @@ void multibuffer_memset(std::vector<cudf::device_span<uint8_t>> & bufs,
tasks.begin(),
cuda::proclaim_return_type<memset_task>(
[offsets = offsets.data(), offset_size = num_bufs + 1, gpu_bufs = gpu_bufs.data(), bytes_per_task] __device__(cudf::size_type task) {
auto buf_idx = thrust::lower_bound(thrust::seq, offsets, offsets + offset_size, task);
auto buf_idx = thrust::upper_bound(thrust::seq, offsets, offsets + offset_size, task);
if (*buf_idx > task) {buf_idx -= 1;}
size_t start = (task - *buf_idx) * bytes_per_task;
size_t end = (start + bytes_per_task) <= gpu_bufs[buf_idx - offsets].size() ? start + bytes_per_task : gpu_bufs[buf_idx - offsets].size();
memset_task ret;
ret.size = end - start;
ret.data = gpu_bufs[buf_idx - offsets].data() + start;
ret.size = (end - start) / 8;
ret.data = (uint64_t *)(gpu_bufs[buf_idx - offsets].data() + start);
return ret;
}
)
);

// launch cuda kernel
memset_kernel<<<total_tasks, threads_per_block>>>(tasks.data(), value);
if (total_tasks != 0) {
memset_kernel<<<total_tasks, threads_per_block>>>(tasks.data(), value);
}

}
2 changes: 1 addition & 1 deletion cpp/tests/utilities_tests/multibuffer_memset_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ TEST(MultiBufferTestIntegral, BasicTest2)
TEST(MultiBufferTestIntegral, BasicTest3)
{
long NUM_BUFS = 8;
std::vector<long> BUF_SIZES{100, 200, 300, 400, 500, 600, 700, 800};
std::vector<long> BUF_SIZES{100, 0, 300, 400, 500, 600, 700, 800};

// Device init
std::vector<cudf::device_span<uint8_t>> bufs;
Expand Down

0 comments on commit ddb59cb

Please sign in to comment.