Skip to content

Commit

Permalink
Fix the row index stream order in ORC reader (#13242)
Browse files Browse the repository at this point in the history
Fixes #11890 

Use fixed order when reading row index streams.
The order is `PRESENT`, `DATA`, `SECONDARY`/`LENGTH` (maps to `DATA2`). 
Is any of these is absent in the column, relative order is maintained. Thus, the order is a sub-array of the one above.

Also simplified some logic related to stream order, as we do not need to pass it from the host. Instead, we only pass a bitmap to denote which streams are present.
Updated the xfail test, as it now passes :)

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Keith Kraus (https://github.com/kkraus14)
  - Yunsong Wang (https://github.com/PointKernel)
  - Bradley Dice (https://github.com/bdice)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #13242
  • Loading branch information
vuule authored May 10, 2023
1 parent a1c2eec commit 3d814bd
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 39 deletions.
42 changes: 16 additions & 26 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,18 @@ constexpr type_id to_type_id(const orc::SchemaType& schema,
return type_id::EMPTY;
}

constexpr std::pair<gpu::StreamIndexType, uint32_t> get_index_type_and_pos(
const orc::StreamKind kind, uint32_t skip_count, bool non_child)
gpu::StreamIndexType get_stream_index_type(orc::StreamKind kind)
{
switch (kind) {
case orc::DATA:
skip_count += 1;
skip_count |= (skip_count & 0xff) << 8;
return std::pair(gpu::CI_DATA, skip_count);
case orc::DATA: return gpu::CI_DATA;
case orc::LENGTH:
case orc::SECONDARY:
skip_count += 1;
skip_count |= (skip_count & 0xff) << 16;
return std::pair(gpu::CI_DATA2, skip_count);
case orc::DICTIONARY_DATA: return std::pair(gpu::CI_DICTIONARY, skip_count);
case orc::PRESENT:
skip_count += (non_child ? 1 : 0);
return std::pair(gpu::CI_PRESENT, skip_count);
case orc::ROW_INDEX: return std::pair(gpu::CI_INDEX, skip_count);
case orc::SECONDARY: return gpu::CI_DATA2;
case orc::DICTIONARY_DATA: return gpu::CI_DICTIONARY;
case orc::PRESENT: return gpu::CI_PRESENT;
case orc::ROW_INDEX: return gpu::CI_INDEX;
default:
// Skip this stream as it's not strictly required
return std::pair(gpu::CI_NUM_STREAMS, 0);
return gpu::CI_NUM_STREAMS;
}
}

Expand Down Expand Up @@ -213,16 +204,15 @@ size_t gather_stream_info(const size_t stripe_index,
}
if (col != -1) {
if (src_offset >= stripeinfo->indexLength || use_index) {
// NOTE: skip_count field is temporarily used to track index ordering
auto& chunk = chunks[stripe_index][col];
const auto idx =
get_index_type_and_pos(stream.kind, chunk.skip_count, col == orc2gdf[column_id]);
if (idx.first < gpu::CI_NUM_STREAMS) {
chunk.strm_id[idx.first] = stream_info.size();
chunk.strm_len[idx.first] = stream.length;
chunk.skip_count = idx.second;

if (idx.first == gpu::CI_DICTIONARY) {
auto& chunk = chunks[stripe_index][col];
auto const index_type = get_stream_index_type(stream.kind);
if (index_type < gpu::CI_NUM_STREAMS) {
chunk.strm_id[index_type] = stream_info.size();
chunk.strm_len[index_type] = stream.length;
// NOTE: skip_count field is temporarily used to track the presence of index streams
chunk.skip_count |= 1 << index_type;

if (index_type == gpu::CI_DICTIONARY) {
chunk.dictionary_start = *num_dictionary_entries;
chunk.dict_len = stripefooter->columns[column_id].dictionarySize;
*num_dictionary_entries += stripefooter->columns[column_id].dictionarySize;
Expand Down
39 changes: 33 additions & 6 deletions cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <cub/cub.cuh>
#include <rmm/cuda_stream_view.hpp>
#include <thrust/copy.h>
#include <thrust/execution_policy.h>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -226,6 +228,30 @@ enum row_entry_state_e {
STORE_INDEX2,
};

/**
* @brief Calculates the order of index streams based on the index types present in the column.
*
* @param index_types_bitmap The bitmap of index types showing which index streams are present
*
* @return The order of index streams
*/
static auto __device__ index_order_from_index_types(uint32_t index_types_bitmap)
{
constexpr std::array full_order = {CI_PRESENT, CI_DATA, CI_DATA2};

std::array<uint32_t, full_order.size()> partial_order;
thrust::copy_if(thrust::seq,
full_order.cbegin(),
full_order.cend(),
partial_order.begin(),
[index_types_bitmap] __device__(auto index_type) {
// Check if the index type is present
return index_types_bitmap & (1 << index_type);
});

return partial_order;
}

/**
* @brief Decode a single row group index entry
*
Expand All @@ -239,11 +265,14 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s,
uint8_t const* const end)
{
constexpr uint32_t pb_rowindexentry_id = ProtofType::FIXEDLEN + 8;
auto const stream_order = index_order_from_index_types(s->chunk.skip_count);

const uint8_t* cur = start;
row_entry_state_e state = NOT_FOUND;
uint32_t length = 0, strm_idx_id = s->chunk.skip_count >> 8, idx_id = 1, ci_id = CI_PRESENT,
pos_end = 0;
uint32_t length = 0;
uint32_t idx_id = 0;
uint32_t pos_end = 0;
uint32_t ci_id = CI_NUM_STREAMS;
while (cur < end) {
uint32_t v = 0;
for (uint32_t l = 0; l <= 28; l += 7) {
Expand Down Expand Up @@ -283,10 +312,8 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s,
}
break;
case STORE_INDEX0:
ci_id = (idx_id == (strm_idx_id & 0xff)) ? CI_DATA
: (idx_id == ((strm_idx_id >> 8) & 0xff)) ? CI_DATA2
: CI_PRESENT;
idx_id++;
// Start of a new entry; determine the stream index types
ci_id = stream_order[idx_id++];
if (s->is_compressed) {
if (ci_id < CI_PRESENT) s->row_index_entry[0][ci_id] = v;
if (cur >= start + pos_end) return length;
Expand Down
12 changes: 5 additions & 7 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2019-2023, NVIDIA CORPORATION.

import datetime
import decimal
Expand Down Expand Up @@ -1896,12 +1896,10 @@ def test_reader_empty_stripe(datadir, fname):
assert_eq(expected, got)


@pytest.mark.xfail(
reason="https://github.com/rapidsai/cudf/issues/11890", raises=RuntimeError
)
def test_reader_unsupported_offsets():
# needs enough data for more than one row group
expected = cudf.DataFrame({"str": ["*"] * 10001}, dtype="string")
# needs enough data for multiple row groups
@pytest.mark.parametrize("data", [["*"] * 10001, ["**", None] * 5001])
def test_reader_row_index_order(data):
expected = cudf.DataFrame({"str": data}, dtype="string")

buffer = BytesIO()
expected.to_pandas().to_orc(buffer)
Expand Down

0 comments on commit 3d814bd

Please sign in to comment.