Skip to content

Commit

Permalink
Merge pull request #12160 from vuule/bug-read_orc-empty-map-column
Browse files Browse the repository at this point in the history
Fix data corruption when reading ORC files with empty stripes
  • Loading branch information
jolorunyomi authored Nov 22, 2022
2 parents 49f983d + f15080f commit ed35f67
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 16 deletions.
27 changes: 11 additions & 16 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,6 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Association between each ORC column and its cudf::column
_col_meta.orc_col_map.emplace_back(_metadata.get_num_cols(), -1);
std::vector<orc_column_meta> nested_col;
bool is_data_empty = false;

// Get a list of column data types
std::vector<data_type> column_types;
Expand All @@ -991,7 +990,6 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// Map each ORC column to its column
_col_meta.orc_col_map[level][col.id] = column_types.size() - 1;
// TODO: Once MAP type is supported in cuDF, update this for MAP as well
if (col_type == type_id::LIST or col_type == type_id::STRUCT) nested_col.emplace_back(col);
}

Expand Down Expand Up @@ -1051,6 +1049,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
size_t num_rowgroups = 0;
int stripe_idx = 0;

bool is_level_data_empty = true;
std::vector<std::pair<std::future<size_t>, size_t>> read_tasks;
for (auto const& stripe_source_mapping : selected_stripes) {
// Iterate through the source files selected stripes
Expand All @@ -1070,21 +1069,16 @@ table_with_metadata reader::impl::read(size_type skip_rows,
stream_info,
level == 0);

if (total_data_size == 0) {
CUDF_EXPECTS(stripe_info->indexLength == 0, "Invalid index rowgroup stream data");
// In case ROW GROUP INDEX is not present and all columns are structs with no null
// stream, there is nothing to read at this level.
auto fn_check_dtype = [](auto dtype) { return dtype.id() == type_id::STRUCT; };
CUDF_EXPECTS(std::all_of(column_types.begin(), column_types.end(), fn_check_dtype),
"Expected streams data within stripe");
is_data_empty = true;
}
auto const is_stripe_data_empty = total_data_size == 0;
if (not is_stripe_data_empty) { is_level_data_empty = false; }
CUDF_EXPECTS(not is_stripe_data_empty or stripe_info->indexLength == 0,
"Invalid index rowgroup stream data");

stripe_data.emplace_back(total_data_size, stream);
auto dst_base = static_cast<uint8_t*>(stripe_data.back().data());

// Coalesce consecutive streams into one read
while (not is_data_empty and stream_count < stream_info.size()) {
while (not is_stripe_data_empty and stream_count < stream_info.size()) {
const auto d_dst = dst_base + stream_info[stream_count].dst_pos;
const auto offset = stream_info[stream_count].offset;
auto len = stream_info[stream_count].length;
Expand Down Expand Up @@ -1162,7 +1156,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
if (chunk.type_kind == orc::TIMESTAMP) {
chunk.timestamp_type_id = _timestamp_type.id();
}
if (not is_data_empty) {
if (not is_stripe_data_empty) {
for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) {
chunk.streams[k] = dst_base + stream_info[chunk.strm_id[k]].dst_pos;
}
Expand Down Expand Up @@ -1199,7 +1193,8 @@ table_with_metadata reader::impl::read(size_type skip_rows,
});
}
// Setup row group descriptors if using indexes
if (_metadata.per_file_metadata[0].ps.compression != orc::NONE and not is_data_empty) {
if (_metadata.per_file_metadata[0].ps.compression != orc::NONE and
not is_level_data_empty) {
auto decomp_data = decompress_stripe_data(chunks,
stripe_data,
*_metadata.per_file_metadata[0].decompressor,
Expand Down Expand Up @@ -1242,7 +1237,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
out_buffers[level].emplace_back(column_types[i], n_rows, is_nullable, stream, _mr);
}

if (not is_data_empty) {
if (not is_level_data_empty) {
decode_stream_data(chunks,
num_dict_entries,
skip_rows,
Expand All @@ -1256,7 +1251,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// Extract information to process nested child columns
if (nested_col.size()) {
if (not is_data_empty) {
if (not is_level_data_empty) {
scan_null_counts(chunks, null_count_prefix_sums[level], stream);
}
row_groups.device_to_host(stream, true);
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
19 changes: 19 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1815,3 +1815,22 @@ def test_statistics_string_sum():

file_stats, stripe_stats = cudf.io.orc.read_orc_statistics([buff])
assert_eq(file_stats[0]["str"].get("sum"), sum(len(s) for s in strings))


@pytest.mark.parametrize(
"fname",
[
"TestOrcFile.Hive.OneEmptyMap.orc",
"TestOrcFile.Hive.OneEmptyList.orc",
"TestOrcFile.Hive.OneNullStruct.orc",
"TestOrcFile.Hive.EmptyListStripe.orc",
"TestOrcFile.Hive.NullStructStripe.orc",
"TestOrcFile.Hive.AllNulls.orc",
],
)
def test_reader_empty_stripe(datadir, fname):
path = datadir / fname

expected = pd.read_orc(path)
got = cudf.read_orc(path)
assert_eq(expected, got)

0 comments on commit ed35f67

Please sign in to comment.