Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data corruption when reading ORC files with empty stripes #12160

Merged
27 changes: 11 additions & 16 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,6 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Association between each ORC column and its cudf::column
_col_meta.orc_col_map.emplace_back(_metadata.get_num_cols(), -1);
std::vector<orc_column_meta> nested_col;
bool is_data_empty = false;

// Get a list of column data types
std::vector<data_type> column_types;
Expand All @@ -991,7 +990,6 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// Map each ORC column to its column
_col_meta.orc_col_map[level][col.id] = column_types.size() - 1;
// TODO: Once MAP type is supported in cuDF, update this for MAP as well
if (col_type == type_id::LIST or col_type == type_id::STRUCT) nested_col.emplace_back(col);
}

Expand Down Expand Up @@ -1051,6 +1049,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
size_t num_rowgroups = 0;
int stripe_idx = 0;

bool is_level_data_empty = true;
std::vector<std::pair<std::future<size_t>, size_t>> read_tasks;
for (auto const& stripe_source_mapping : selected_stripes) {
// Iterate through the source files selected stripes
Expand All @@ -1070,21 +1069,16 @@ table_with_metadata reader::impl::read(size_type skip_rows,
stream_info,
level == 0);

if (total_data_size == 0) {
CUDF_EXPECTS(stripe_info->indexLength == 0, "Invalid index rowgroup stream data");
// In case ROW GROUP INDEX is not present and all columns are structs with no null
// stream, there is nothing to read at this level.
auto fn_check_dtype = [](auto dtype) { return dtype.id() == type_id::STRUCT; };
CUDF_EXPECTS(std::all_of(column_types.begin(), column_types.end(), fn_check_dtype),
"Expected streams data within stripe");
is_data_empty = true;
}
auto const is_stripe_data_empty = total_data_size == 0;
if (not is_stripe_data_empty) { is_level_data_empty = false; }
CUDF_EXPECTS(not is_stripe_data_empty or stripe_info->indexLength == 0,
"Invalid index rowgroup stream data");

stripe_data.emplace_back(total_data_size, stream);
auto dst_base = static_cast<uint8_t*>(stripe_data.back().data());

// Coalesce consecutive streams into one read
while (not is_data_empty and stream_count < stream_info.size()) {
while (not is_stripe_data_empty and stream_count < stream_info.size()) {
const auto d_dst = dst_base + stream_info[stream_count].dst_pos;
const auto offset = stream_info[stream_count].offset;
auto len = stream_info[stream_count].length;
Expand Down Expand Up @@ -1162,7 +1156,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
if (chunk.type_kind == orc::TIMESTAMP) {
chunk.timestamp_type_id = _timestamp_type.id();
}
if (not is_data_empty) {
if (not is_stripe_data_empty) {
for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) {
chunk.streams[k] = dst_base + stream_info[chunk.strm_id[k]].dst_pos;
}
Expand Down Expand Up @@ -1199,7 +1193,8 @@ table_with_metadata reader::impl::read(size_type skip_rows,
});
}
// Setup row group descriptors if using indexes
if (_metadata.per_file_metadata[0].ps.compression != orc::NONE and not is_data_empty) {
if (_metadata.per_file_metadata[0].ps.compression != orc::NONE and
not is_level_data_empty) {
auto decomp_data = decompress_stripe_data(chunks,
stripe_data,
*_metadata.per_file_metadata[0].decompressor,
Expand Down Expand Up @@ -1242,7 +1237,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
out_buffers[level].emplace_back(column_types[i], n_rows, is_nullable, stream, _mr);
}

if (not is_data_empty) {
if (not is_level_data_empty) {
decode_stream_data(chunks,
num_dict_entries,
skip_rows,
Expand All @@ -1256,7 +1251,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// Extract information to process nested child columns
if (nested_col.size()) {
if (not is_data_empty) {
if (not is_level_data_empty) {
scan_null_counts(chunks, null_count_prefix_sums[level], stream);
}
row_groups.device_to_host(stream, true);
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
19 changes: 19 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1815,3 +1815,22 @@ def test_statistics_string_sum():

file_stats, stripe_stats = cudf.io.orc.read_orc_statistics([buff])
assert_eq(file_stats[0]["str"].get("sum"), sum(len(s) for s in strings))


@pytest.mark.parametrize(
"fname",
[
"TestOrcFile.Hive.OneEmptyMap.orc",
"TestOrcFile.Hive.OneEmptyList.orc",
"TestOrcFile.Hive.OneNullStruct.orc",
"TestOrcFile.Hive.EmptyListStripe.orc",
"TestOrcFile.Hive.NullStructStripe.orc",
"TestOrcFile.Hive.AllNulls.orc",
],
)
def test_reader_empty_stripe(datadir, fname):
path = datadir / fname

expected = pd.read_orc(path)
got = cudf.read_orc(path)
assert_eq(expected, got)