Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet reader optimization to address V100 regression. #12577

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 94 additions & 50 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ constexpr int non_zero_buffer_size = block_size * 2;

constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); }

// Use up to 512 bytes of shared memory as a cache for nesting information.
// As of 1/20/23, this gives us a max nesting depth of 10 (after which it falls back to
// global memory). This handles all but the most extreme cases.
constexpr int max_cacheable_nesting_decode_info = (512) / sizeof(PageNestingDecodeInfo);
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved

struct page_state_s {
const uint8_t* data_start;
const uint8_t* data_end;
Expand Down Expand Up @@ -90,6 +95,13 @@ struct page_state_s {
const uint8_t* lvl_start[NUM_LEVEL_TYPES]; // [def,rep]
int32_t lvl_count[NUM_LEVEL_TYPES]; // how many of each of the streams we've decoded
int32_t row_index_lower_bound; // lower bound of row indices we should process

// a shared-memory cache of frequently used data when decoding. The source of this data is
// normally stored in global memory which can yield poor performance. So, when possible
// we copy that info here prior to decoding
PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info];
// points to either nesting_decode above when possible, or to the global source otherwise
PageNestingDecodeInfo* pndi;
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down Expand Up @@ -927,21 +939,40 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
int chunk_idx;

// Fetch page info
if (t == 0) s->page = *p;
if (!t) s->page = *p;
vuule marked this conversation as resolved.
Show resolved Hide resolved
__syncthreads();

if (s->page.flags & PAGEINFO_FLAGS_DICTIONARY) { return false; }
// Fetch column chunk info
chunk_idx = s->page.chunk_idx;
if (t == 0) { s->col = chunks[chunk_idx]; }
if (!t) { s->col = chunks[chunk_idx]; }

// zero nested value and valid counts
// if we can use the decode cache, set it up now
auto const can_use_decode_cache = s->page.nesting_info_size <= max_cacheable_nesting_decode_info;
if (can_use_decode_cache) {
int d = 0;
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
while (d < s->page.nesting_info_size) {
if (d + t < s->page.nesting_info_size) {
// these values need to be copied over from global
s->nesting_decode_cache[d + t].max_def_level = s->page.nesting_decode[d + t].max_def_level;
s->nesting_decode_cache[d + t].page_start_value =
s->page.nesting_decode[d + t].page_start_value;
s->nesting_decode_cache[d + t].start_depth = s->page.nesting_decode[d + t].start_depth;
s->nesting_decode_cache[d + t].end_depth = s->page.nesting_decode[d + t].end_depth;
}
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
d += blockDim.x;
}
}
if (!t) { s->pndi = can_use_decode_cache ? s->nesting_decode_cache : s->page.nesting_decode; }
__syncthreads();

// zero counts
int d = 0;
while (d < s->page.num_output_nesting_levels) {
if (d + t < s->page.num_output_nesting_levels) {
s->page.nesting[d + t].valid_count = 0;
s->page.nesting[d + t].value_count = 0;
s->page.nesting[d + t].null_count = 0;
s->pndi[d + t].valid_count = 0;
s->pndi[d + t].value_count = 0;
s->pndi[d + t].null_count = 0;
}
d += blockDim.x;
}
Expand Down Expand Up @@ -1076,7 +1107,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
if (is_decode_step) {
int max_depth = s->col.max_nesting_depth;
for (int idx = 0; idx < max_depth; idx++) {
PageNestingInfo* pni = &s->page.nesting[idx];
PageNestingDecodeInfo* pndi = &s->pndi[idx];

size_t output_offset;
// schemas without lists
Expand All @@ -1085,21 +1116,21 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
}
// for schemas with lists, we've already got the exact value precomputed
else {
output_offset = pni->page_start_value;
output_offset = pndi->page_start_value;
}

pni->data_out = static_cast<uint8_t*>(s->col.column_data_base[idx]);
pndi->data_out = static_cast<uint8_t*>(s->col.column_data_base[idx]);

if (pni->data_out != nullptr) {
if (pndi->data_out != nullptr) {
// anything below max depth with a valid data pointer must be a list, so the
// element size is the size of the offset type.
uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len;
pni->data_out += (output_offset * len);
pndi->data_out += (output_offset * len);
}
pni->valid_map = s->col.valid_map_base[idx];
if (pni->valid_map != nullptr) {
pni->valid_map += output_offset >> 5;
pni->valid_map_offset = (int32_t)(output_offset & 0x1f);
pndi->valid_map = s->col.valid_map_base[idx];
if (pndi->valid_map != nullptr) {
pndi->valid_map += output_offset >> 5;
pndi->valid_map_offset = (int32_t)(output_offset & 0x1f);
}
}
}
Expand Down Expand Up @@ -1217,26 +1248,26 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
* @brief Store a validity mask containing value_count bits into the output validity buffer of the
* page.
*
* @param[in,out] pni The page/nesting information to store the mask in. The validity map offset is
* @param[in,out] pndi The page/nesting information to store the mask in. The validity map offset is
* also updated
* @param[in] valid_mask The validity mask to be stored
* @param[in] value_count # of bits in the validity mask
*/
static __device__ void store_validity(PageNestingInfo* pni,
static __device__ void store_validity(PageNestingDecodeInfo* pndi,
uint32_t valid_mask,
int32_t value_count)
{
int word_offset = pni->valid_map_offset / 32;
int bit_offset = pni->valid_map_offset % 32;
int word_offset = pndi->valid_map_offset / 32;
int bit_offset = pndi->valid_map_offset % 32;
// if we fit entirely in the output word
if (bit_offset + value_count <= 32) {
auto relevant_mask = static_cast<uint32_t>((static_cast<uint64_t>(1) << value_count) - 1);

if (relevant_mask == ~0) {
pni->valid_map[word_offset] = valid_mask;
pndi->valid_map[word_offset] = valid_mask;
} else {
atomicAnd(pni->valid_map + word_offset, ~(relevant_mask << bit_offset));
atomicOr(pni->valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset);
atomicAnd(pndi->valid_map + word_offset, ~(relevant_mask << bit_offset));
atomicOr(pndi->valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset);
}
}
// we're going to spill over into the next word.
Expand All @@ -1250,17 +1281,17 @@ static __device__ void store_validity(PageNestingInfo* pni,
// first word. strip bits_left bits off the beginning and store that
uint32_t relevant_mask = ((1 << bits_left) - 1);
uint32_t mask_word0 = valid_mask & relevant_mask;
atomicAnd(pni->valid_map + word_offset, ~(relevant_mask << bit_offset));
atomicOr(pni->valid_map + word_offset, mask_word0 << bit_offset);
atomicAnd(pndi->valid_map + word_offset, ~(relevant_mask << bit_offset));
atomicOr(pndi->valid_map + word_offset, mask_word0 << bit_offset);

// second word. strip the remainder of the bits off the end and store that
relevant_mask = ((1 << (value_count - bits_left)) - 1);
uint32_t mask_word1 = valid_mask & (relevant_mask << bits_left);
atomicAnd(pni->valid_map + word_offset + 1, ~(relevant_mask));
atomicOr(pni->valid_map + word_offset + 1, mask_word1 >> bits_left);
atomicAnd(pndi->valid_map + word_offset + 1, ~(relevant_mask));
atomicOr(pndi->valid_map + word_offset + 1, mask_word1 >> bits_left);
}

pni->valid_map_offset += value_count;
pndi->valid_map_offset += value_count;
}

/**
Expand Down Expand Up @@ -1294,8 +1325,8 @@ inline __device__ void get_nesting_bounds(int& start_depth,
// bound what nesting levels we apply values to
if (s->col.max_level[level_type::REPETITION] > 0) {
int r = s->rep[index];
start_depth = s->page.nesting[r].start_depth;
end_depth = s->page.nesting[d].end_depth;
start_depth = s->pndi[r].start_depth;
end_depth = s->pndi[d].end_depth;
}
// for columns without repetition (even ones involving structs) we always
// traverse the entire hierarchy.
Expand Down Expand Up @@ -1326,6 +1357,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
// how many rows we've processed in the page so far
int input_row_count = s->input_row_count;

PageNestingDecodeInfo* pndi_base = s->pndi;

// process until we've reached the target
while (input_value_count < target_input_value_count) {
// determine the nesting bounds for this thread (the range of nesting depths we
Expand Down Expand Up @@ -1367,14 +1400,14 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
// walk from 0 to max_depth
uint32_t next_thread_value_count, next_warp_value_count;
for (int s_idx = 0; s_idx < max_depth; s_idx++) {
PageNestingInfo* pni = &s->page.nesting[s_idx];
PageNestingDecodeInfo* pndi = &pndi_base[s_idx];

// if we are within the range of nesting levels we should be adding value indices for
int const in_nesting_bounds =
((s_idx >= start_depth && s_idx <= end_depth) && in_row_bounds) ? 1 : 0;

// everything up to the max_def_level is a non-null value
uint32_t const is_valid = d >= pni->max_def_level && in_nesting_bounds ? 1 : 0;
uint32_t const is_valid = d >= pndi->max_def_level && in_nesting_bounds ? 1 : 0;

// compute warp and thread valid counts
uint32_t const warp_valid_mask =
Expand All @@ -1395,8 +1428,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu

// if this is the value column emit an index for value decoding
if (is_valid && s_idx == max_depth - 1) {
int const src_pos = pni->valid_count + thread_valid_count;
int const dst_pos = pni->value_count + thread_value_count;
int const src_pos = pndi->valid_count + thread_valid_count;
int const dst_pos = pndi->value_count + thread_value_count;
// nz_idx is a mapping of src buffer indices to destination buffer indices
s->nz_idx[rolling_index(src_pos)] = dst_pos;
}
Expand All @@ -1414,12 +1447,11 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
// if we're -not- at a leaf column and we're within nesting/row bounds
// and we have a valid data_out pointer, it implies this is a list column, so
// emit an offset.
if (in_nesting_bounds && pni->data_out != nullptr) {
int const idx = pni->value_count + thread_value_count;
cudf::size_type const ofs = s->page.nesting[s_idx + 1].value_count +
next_thread_value_count +
s->page.nesting[s_idx + 1].page_start_value;
(reinterpret_cast<cudf::size_type*>(pni->data_out))[idx] = ofs;
if (in_nesting_bounds && pndi->data_out != nullptr) {
int const idx = pndi->value_count + thread_value_count;
cudf::size_type const ofs = pndi_base[s_idx + 1].value_count + next_thread_value_count +
pndi_base[s_idx + 1].page_start_value;
(reinterpret_cast<cudf::size_type*>(pndi->data_out))[idx] = ofs;
}
}

Expand All @@ -1441,14 +1473,14 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu

// increment count of valid values, count of total values, and update validity mask
if (!t) {
if (pni->valid_map != nullptr && warp_valid_mask_bit_count > 0) {
if (pndi->valid_map != nullptr && warp_valid_mask_bit_count > 0) {
uint32_t const warp_output_valid_mask = warp_valid_mask >> first_thread_in_write_range;
store_validity(pni, warp_output_valid_mask, warp_valid_mask_bit_count);
store_validity(pndi, warp_output_valid_mask, warp_valid_mask_bit_count);

pni->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask);
pndi->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask);
}
pni->valid_count += warp_valid_count;
pni->value_count += warp_value_count;
pndi->valid_count += warp_valid_count;
pndi->value_count += warp_value_count;
}

// propagate value counts for the next level
Expand All @@ -1463,7 +1495,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
// update
if (!t) {
// update valid value count for decoding and total # of values we've processed
s->nz_count = s->page.nesting[max_depth - 1].valid_count;
s->nz_count = pndi_base[max_depth - 1].valid_count;
s->input_value_count = input_value_count;
s->input_row_count = input_row_count;
}
Expand Down Expand Up @@ -1543,9 +1575,9 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s,
start_depth, end_depth, d, s, input_value_count, target_input_value_count, t);

// count rows and leaf values
int const is_new_row = start_depth == 0 ? 1 : 0;
uint32_t const warp_row_count_mask = ballot(is_new_row);
int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0;
int const is_new_row = start_depth == 0 ? 1 : 0;
uint32_t const warp_row_count_mask = ballot(is_new_row);
int const is_new_leaf = (d >= s->pndi[max_depth - 1].max_def_level) ? 1 : 0;
uint32_t const warp_leaf_count_mask = ballot(is_new_leaf);
// is this thread within row bounds? on the first pass we don't know the bounds, so we will be
// computing the full size of the column. on the second pass, we will know our actual row
Expand Down Expand Up @@ -1808,6 +1840,8 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData(
((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32;
}

PageNestingDecodeInfo* pni_base = s->pndi;

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
Expand Down Expand Up @@ -1875,8 +1909,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData(
int leaf_level_index = s->col.max_nesting_depth - 1;

uint32_t dtype_len = s->dtype_len;
void* dst =
s->page.nesting[leaf_level_index].data_out + static_cast<size_t>(dst_pos) * dtype_len;
void* dst = pni_base[leaf_level_index].data_out + static_cast<size_t>(dst_pos) * dtype_len;
if (dtype == BYTE_ARRAY) {
if (s->col.converted_type == DECIMAL) {
auto const [ptr, len] = gpuGetStringData(s, val_src_pos);
Expand Down Expand Up @@ -1931,6 +1964,17 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData(
}
__syncthreads();
}

// if we are using the nesting decode cache, copy null count back
if (s->pndi == s->nesting_decode_cache) {
int d = 0;
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
while (d < s->page.num_output_nesting_levels) {
if (d + t < s->page.num_output_nesting_levels) {
s->page.nesting_decode[d + t].null_count = s->nesting_decode_cache[d + t].null_count;
}
d += blockDim.x;
}
}
}

} // anonymous namespace
Expand Down
Loading