Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail loudly to avoid data corruption with unsupported input in read_orc #12325

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ void DecodeNullsAndStringDictionaries(ColumnDesc* chunks,
* @param[in] num_rowgroups Number of row groups in row index data
* @param[in] rowidx_stride Row index stride
* @param[in] level Current nesting level being processed
* @param[out] error_count Number of errors during decode
Copy link
Contributor

@ttnghia ttnghia Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just return this number, instead of using the void return type and modifying this parameter? I understand that this may be a pointer to device memory but we will read it to host anyway, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DecodeOrcColumnData is asynchronous. The fact that we copy chunks to host immediately after calling DecodeOrcColumnData should not impact how its implemented. If we return the error code we are enforcing this synchronization even though it might not be required otherwise.

* @param[in] stream CUDA stream used for device memory operations and kernel launches
*/
void DecodeOrcColumnData(ColumnDesc* chunks,
Expand All @@ -299,6 +300,7 @@ void DecodeOrcColumnData(ColumnDesc* chunks,
uint32_t num_rowgroups,
uint32_t rowidx_stride,
size_t level,
size_type* error_count,
rmm::cuda_stream_view stream);

/**
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/traits.hpp>
#include <rmm/device_scalar.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
Expand Down Expand Up @@ -641,6 +642,7 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector<gpu::Col
update_null_mask(chunks, out_buffers, stream, _mr);
}

rmm::device_scalar<size_type> error_count(0, stream);
// Update the null map for child columns
gpu::DecodeOrcColumnData(chunks.base_device_ptr(),
global_dict.data(),
Expand All @@ -652,8 +654,12 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector<gpu::Col
row_groups.size().first,
row_index_stride,
level,
error_count.data(),
stream);
chunks.device_to_host(stream, true);
chunks.device_to_host(stream);
// `value` synchronizes
auto const num_errors = error_count.value(stream);
CUDF_EXPECTS(num_errors == 0, "ORC data decode failed");

std::for_each(col_idx_it + 0, col_idx_it + num_columns, [&](auto col_idx) {
out_buffers[col_idx].null_count() =
Expand Down
12 changes: 10 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,8 @@ __global__ void __launch_bounds__(block_size)
device_2dspan<RowGroup> row_groups,
size_t first_row,
uint32_t rowidx_stride,
size_t level)
size_t level,
size_type* error_count)
{
__shared__ __align__(16) orcdec_state_s state_g;
using block_reduce = cub::BlockReduce<uint64_t, block_size>;
Expand Down Expand Up @@ -1410,6 +1411,12 @@ __global__ void __launch_bounds__(block_size)
if (t == 0 and is_valid) {
// If we have an index, seek to the initial run and update row positions
if (num_rowgroups > 0) {
if (s->top.data.index.strm_offset[0] > s->chunk.strm_len[CI_DATA]) {
atomicAdd(error_count, 1);
}
if (s->top.data.index.strm_offset[1] > s->chunk.strm_len[CI_DATA2]) {
atomicAdd(error_count, 1);
}
uint32_t ofs0 = min(s->top.data.index.strm_offset[0], s->chunk.strm_len[CI_DATA]);
uint32_t ofs1 = min(s->top.data.index.strm_offset[1], s->chunk.strm_len[CI_DATA2]);
uint32_t rowgroup_rowofs =
Expand Down Expand Up @@ -1884,14 +1891,15 @@ void __host__ DecodeOrcColumnData(ColumnDesc* chunks,
uint32_t num_rowgroups,
uint32_t rowidx_stride,
size_t level,
size_type* error_count,
rmm::cuda_stream_view stream)
{
uint32_t num_chunks = num_columns * num_stripes;
dim3 dim_block(block_size, 1); // 1024 threads per chunk
dim3 dim_grid((num_rowgroups > 0) ? num_columns : num_chunks,
(num_rowgroups > 0) ? num_rowgroups : 1);
gpuDecodeOrcColumnData<block_size><<<dim_grid, dim_block, 0, stream.value()>>>(
chunks, global_dictionary, tz_table, row_groups, first_row, rowidx_stride, level);
chunks, global_dictionary, tz_table, row_groups, first_row, rowidx_stride, level, error_count);
}

} // namespace gpu
Expand Down
13 changes: 13 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1834,3 +1834,16 @@ def test_reader_empty_stripe(datadir, fname):
expected = pd.read_orc(path)
got = cudf.read_orc(path)
assert_eq(expected, got)


@pytest.mark.xfail(
reason="https://github.com/rapidsai/cudf/issues/11890", raises=RuntimeError
)
def test_reader_unsupported_offsets():
# needs enough data for more than one row group
expected = cudf.DataFrame({"str": ["*"] * 10001}, dtype="string")

buffer = BytesIO()
expected.to_pandas().to_orc(buffer)
got = cudf.read_orc(buffer)
assert_eq(expected, got)