Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ORC reading of files with struct columns that have null values #9005

Merged
merged 5 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct ColumnDesc {
int32_t decimal_scale; // number of fractional decimal digits for decimal type
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)
column_validity_info parent_validity_info; // consists of parent column valid_map and null count
uint32_t* parent_null_count_prefix_sums; // per-stripe prefix sums of parent column's null count
};

/**
Expand Down
69 changes: 62 additions & 7 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,49 @@ void update_null_mask(cudf::detail::hostdevice_2dvector<gpu::ColumnDesc>& chunks
}
}

/**
* @brief Compute the per-stripe prefix sum of null count, for each struct column in the current
* layer.
*/
void scan_null_counts(cudf::detail::hostdevice_2dvector<gpu::ColumnDesc> const& chunks,
cudf::host_span<rmm::device_uvector<uint32_t>> prefix_sums,
rmm::cuda_stream_view stream)
{
auto const num_stripes = chunks.size().first;
if (num_stripes == 0) return;

auto const num_columns = chunks.size().second;
std::vector<thrust::pair<size_type, cudf::device_span<uint32_t>>> prefix_sums_to_update;
for (auto col_idx = 0ul; col_idx < num_columns; ++col_idx) {
// Null counts sums are only needed for children of struct columns
if (chunks[0][col_idx].type_kind == STRUCT) {
prefix_sums_to_update.emplace_back(col_idx, prefix_sums[col_idx]);
}
}
auto const d_prefix_sums_to_update =
cudf::detail::make_device_uvector_async(prefix_sums_to_update, stream);

thrust::for_each(rmm::exec_policy(stream),
d_prefix_sums_to_update.begin(),
d_prefix_sums_to_update.end(),
[chunks = cudf::detail::device_2dspan<gpu::ColumnDesc const>{chunks}] __device__(
auto const& idx_psums) {
auto const col_idx = idx_psums.first;
auto const psums = idx_psums.second;

thrust::transform(
thrust::seq,
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(0) + psums.size(),
psums.begin(),
[&](auto stripe_idx) { return chunks[stripe_idx][col_idx].null_count; });

thrust::inclusive_scan(thrust::seq, psums.begin(), psums.end(), psums.begin());
});
// `prefix_sums_to_update` goes out of scope, copy has to be done before we return
stream.synchronize();
}

void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector<gpu::ColumnDesc>& chunks,
size_t num_dicts,
size_t skip_rows,
Expand Down Expand Up @@ -817,8 +860,6 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector<gpu::Col
[&](auto null_count, auto const stripe_idx) {
return null_count + chunks[stripe_idx][col_idx].null_count;
});
// Add parent null count in case this is a child column of a struct
out_buffers[col_idx].null_count() += chunks[0][col_idx].parent_validity_info.null_count;
});
}

Expand All @@ -841,6 +882,7 @@ void reader::impl::aggregate_child_meta(cudf::detail::host_2dspan<gpu::ColumnDes
num_child_rows.resize(_selected_columns[level + 1].size());
std::fill(num_child_rows.begin(), num_child_rows.end(), 0);
parent_column_data.resize(number_of_child_chunks);
_col_meta.parent_column_index.resize(number_of_child_chunks);
_col_meta.child_start_row.resize(number_of_child_chunks);
_col_meta.num_child_rows_per_stripe.resize(number_of_child_chunks);
_col_meta.rwgrp_meta.resize(num_of_rowgroups * num_child_cols);
Expand Down Expand Up @@ -899,7 +941,8 @@ void reader::impl::aggregate_child_meta(cudf::detail::host_2dspan<gpu::ColumnDes
auto num_rows = out_buffers[parent_col_idx].size;

for (uint32_t id = 0; id < p_col.num_children; id++) {
const auto child_col_idx = index + id;
const auto child_col_idx = index + id;
_col_meta.parent_column_index[child_col_idx] = parent_col_idx;
if (type == type_id::STRUCT) {
parent_column_data[child_col_idx] = {parent_valid_map, parent_null_count};
// Number of rows in child will remain same as parent in case of struct column
Expand Down Expand Up @@ -1042,6 +1085,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
std::vector<std::vector<column_buffer>> out_buffers(_selected_columns.size());
std::vector<column_name_info> schema_info;
std::vector<std::vector<rmm::device_buffer>> lvl_stripe_data(_selected_columns.size());
std::vector<std::vector<rmm::device_uvector<uint32_t>>> null_count_prefix_sums;
table_metadata out_metadata;

// There are no columns in the table
Expand Down Expand Up @@ -1124,6 +1168,14 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Logically view streams as columns
std::vector<orc_stream_info> stream_info;

null_count_prefix_sums.emplace_back();
null_count_prefix_sums.back().reserve(_selected_columns[level].size());
std::generate_n(
std::back_inserter(null_count_prefix_sums.back()), _selected_columns[level].size(), [&]() {
return cudf::detail::make_zeroed_device_uvector_async<uint32_t>(total_num_stripes,
stream);
});

// Tracker for eventually deallocating compressed and uncompressed data
auto& stripe_data = lvl_stripe_data[level];

Expand Down Expand Up @@ -1207,10 +1259,12 @@ table_with_metadata reader::impl::read(size_type skip_rows,
? stripe_info->numberOfRows
: _col_meta.num_child_rows_per_stripe[stripe_idx * num_columns + col_idx];
chunk.column_num_rows = (level == 0) ? num_rows : _col_meta.num_child_rows[col_idx];
chunk.parent_validity_info.valid_map_base =
(level == 0) ? nullptr : _col_meta.parent_column_data[col_idx].valid_map_base;
chunk.parent_validity_info.null_count =
(level == 0) ? 0 : _col_meta.parent_column_data[col_idx].null_count;
chunk.parent_validity_info =
(level == 0) ? column_validity_info{} : _col_meta.parent_column_data[col_idx];
chunk.parent_null_count_prefix_sums =
(level == 0)
? nullptr
: null_count_prefix_sums[level - 1][_col_meta.parent_column_index[col_idx]].data();
chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind;
chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx]
.ff.types[selected_columns[col_idx].id]
Expand Down Expand Up @@ -1336,6 +1390,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// Extract information to process nested child columns
if (nested_col.size()) {
scan_null_counts(chunks, null_count_prefix_sums[level], stream);
row_groups.device_to_host(stream, true);
aggregate_child_meta(chunks, row_groups, out_buffers[level], nested_col, level);
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/orc/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct reader_column_meta {

std::vector<column_validity_info>
parent_column_data; // consists of parent column valid_map and null count
std::vector<size_type> parent_column_index;

std::vector<uint32_t> child_start_row; // start row of child columns [stripe][column]
std::vector<uint32_t>
Expand Down
17 changes: 13 additions & 4 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1167,8 +1167,17 @@ __global__ void __launch_bounds__(block_size)
// No present stream: all rows are valid
s->vals.u32[t] = ~0;
}
while (s->top.nulls_desc_row < s->chunk.num_rows) {
uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls_desc_row, blockDim.x * 32);
auto const prev_parent_null_count =
(s->chunk.parent_null_count_prefix_sums != nullptr && stripe > 0)
? s->chunk.parent_null_count_prefix_sums[stripe - 1]
: 0;
auto const parent_null_count =
(s->chunk.parent_null_count_prefix_sums != nullptr)
? s->chunk.parent_null_count_prefix_sums[stripe] - prev_parent_null_count
: 0;
auto const num_elems = s->chunk.num_rows - parent_null_count;
while (s->top.nulls_desc_row < num_elems) {
uint32_t nrows_max = min(num_elems - s->top.nulls_desc_row, blockDim.x * 32);
uint32_t nrows;
size_t row_in;

Expand All @@ -1187,7 +1196,7 @@ __global__ void __launch_bounds__(block_size)
}
__syncthreads();

row_in = s->chunk.start_row + s->top.nulls_desc_row;
row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_null_count;
if (row_in + nrows > first_row && row_in < first_row + max_num_rows &&
s->chunk.valid_map_base != NULL) {
int64_t dst_row = row_in - first_row;
Expand Down Expand Up @@ -1251,7 +1260,7 @@ __global__ void __launch_bounds__(block_size)
// Sum up the valid counts and infer null_count
null_count = block_reduce(temp_storage.bk_storage).Sum(null_count);
if (t == 0) {
chunks[chunk_id].null_count = null_count;
chunks[chunk_id].null_count = parent_null_count + null_count;
chunks[chunk_id].skip_count = s->chunk.skip_count;
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ def test_orc_string_stream_offset_issue():


# Data is generated using pyorc module
def generate_list_struct_buff(size=28000):
def generate_list_struct_buff(size=100_000):
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
rd = random.Random(1)
np.random.seed(seed=1)

Expand Down Expand Up @@ -963,7 +963,7 @@ def generate_list_struct_buff(size=28000):
["lvl2_struct", "lvl1_struct"],
],
)
@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000])
@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000, 100_000])
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize("use_index", [True, False])
def test_lists_struct_nests(
columns, num_rows, use_index,
Expand Down