Skip to content

Commit

Permalink
Fix null_count of columns returned by chunked_parquet_reader (#13111
Browse files Browse the repository at this point in the history
)

Chunked Parquet reader returns columns with incorrect null counts - the counts are cumulative sums that include all previous chunks.
Root cause is that `nesting_decode_cache` is not copied back to `nesting_decode` when `gpuDecodePageData` returns early, so previously computed null counts are only reset in the cache.
With this PR, we use RAII to make sure cached decode info is always copied back in `gpuDecodePageData`.
Also fixed `column_buffer::empty_like` to return zero null count and empty null mask.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - https://github.com/nvdbaranec
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #13111
  • Loading branch information
vuule authored Apr 13, 2023
1 parent 4b34831 commit 5764ba5
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 27 deletions.
39 changes: 25 additions & 14 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,10 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
int chunk_idx;

// Fetch page info
if (!t) s->page = *p;
if (!t) {
s->page = *p;
s->nesting_info = nullptr;
}
__syncthreads();

if (s->page.flags & PAGEINFO_FLAGS_DICTIONARY) { return false; }
Expand Down Expand Up @@ -1889,6 +1892,26 @@ __global__ void __launch_bounds__(block_size)
}
}

// Copies null counts back to `nesting_decode` at the end of scope
struct null_count_back_copier {
page_state_s* s;
int t;
__device__ ~null_count_back_copier()
{
if (s->nesting_info != nullptr and s->nesting_info == s->nesting_decode_cache) {
int depth = 0;
while (depth < s->page.num_output_nesting_levels) {
int const thread_depth = depth + t;
if (thread_depth < s->page.num_output_nesting_levels) {
s->page.nesting_decode[thread_depth].null_count =
s->nesting_decode_cache[thread_depth].null_count;
}
depth += blockDim.x;
}
}
}
};

/**
* @brief Kernel for co the column data stored in the pages
*
Expand All @@ -1913,6 +1936,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData(
int page_idx = blockIdx.x;
int t = threadIdx.x;
int out_thread0;
[[maybe_unused]] null_count_back_copier _{s, t};

if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows, true)) { return; }

Expand Down Expand Up @@ -2065,19 +2089,6 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData(
}
__syncthreads();
}

// if we are using the nesting decode cache, copy null count back
if (s->nesting_info == s->nesting_decode_cache) {
int depth = 0;
while (depth < s->page.num_output_nesting_levels) {
int const thread_depth = depth + t;
if (thread_depth < s->page.num_output_nesting_levels) {
s->page.nesting_decode[thread_depth].null_count =
s->nesting_decode_cache[thread_depth].null_count;
}
depth += blockDim.x;
}
}
}

} // anonymous namespace
Expand Down
17 changes: 4 additions & 13 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,8 @@ std::unique_ptr<column> empty_like(column_buffer& buffer,
auto child = empty_like(buffer.children[0], child_info, stream, mr);

// make the final list column
return make_lists_column(0,
std::move(offsets),
std::move(child),
buffer._null_count,
std::move(buffer._null_mask),
stream,
mr);
return make_lists_column(
0, std::move(offsets), std::move(child), 0, rmm::device_buffer{0, stream, mr}, stream, mr);
} break;

case type_id::STRUCT: {
Expand All @@ -265,12 +260,8 @@ std::unique_ptr<column> empty_like(column_buffer& buffer,
return empty_like(col, child_info, stream, mr);
});

return make_structs_column(0,
std::move(output_children),
buffer._null_count,
std::move(buffer._null_mask),
stream,
mr);
return make_structs_column(
0, std::move(output_children), 0, rmm::device_buffer{0, stream, mr}, stream, mr);
} break;

default: return cudf::make_empty_column(buffer.type);
Expand Down
31 changes: 31 additions & 0 deletions cpp/tests/io/parquet_chunked_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,3 +920,34 @@ TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithListsOfStructs)
CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result);
}
}

TEST_F(ParquetChunkedReaderTest, TestChunkedReadNullCount)
{
auto constexpr num_rows = 100'000;

auto const sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return 1; });
auto const validity =
cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 4 != 3; });
cudf::test::fixed_width_column_wrapper<int32_t> col{sequence, sequence + num_rows, validity};
std::vector<std::unique_ptr<cudf::column>> cols;
cols.push_back(col.release());
auto const expected = std::make_unique<cudf::table>(std::move(cols));

auto const filepath = temp_env->get_temp_filepath("chunked_reader_null_count.parquet");
auto const page_limit_rows = num_rows / 5;
auto const write_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *expected)
.max_page_size_rows(page_limit_rows) // 20k rows per page
.build();
cudf::io::write_parquet(write_opts);

auto const byte_limit = page_limit_rows * sizeof(int);
auto const read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build();
auto reader = cudf::io::chunked_parquet_reader(byte_limit, read_opts);

do {
// Every fourth row is null
EXPECT_EQ(reader.read_chunk().tbl->get_column(0).null_count(), page_limit_rows / 4);
} while (reader.has_next());
}

0 comments on commit 5764ba5

Please sign in to comment.