diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index fbe44eff5ad..1561737da48 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -102,27 +102,18 @@ constexpr type_id to_type_id(const orc::SchemaType& schema, return type_id::EMPTY; } -constexpr std::pair get_index_type_and_pos( - const orc::StreamKind kind, uint32_t skip_count, bool non_child) +gpu::StreamIndexType get_stream_index_type(orc::StreamKind kind) { switch (kind) { - case orc::DATA: - skip_count += 1; - skip_count |= (skip_count & 0xff) << 8; - return std::pair(gpu::CI_DATA, skip_count); + case orc::DATA: return gpu::CI_DATA; case orc::LENGTH: - case orc::SECONDARY: - skip_count += 1; - skip_count |= (skip_count & 0xff) << 16; - return std::pair(gpu::CI_DATA2, skip_count); - case orc::DICTIONARY_DATA: return std::pair(gpu::CI_DICTIONARY, skip_count); - case orc::PRESENT: - skip_count += (non_child ? 1 : 0); - return std::pair(gpu::CI_PRESENT, skip_count); - case orc::ROW_INDEX: return std::pair(gpu::CI_INDEX, skip_count); + case orc::SECONDARY: return gpu::CI_DATA2; + case orc::DICTIONARY_DATA: return gpu::CI_DICTIONARY; + case orc::PRESENT: return gpu::CI_PRESENT; + case orc::ROW_INDEX: return gpu::CI_INDEX; default: // Skip this stream as it's not strictly required - return std::pair(gpu::CI_NUM_STREAMS, 0); + return gpu::CI_NUM_STREAMS; } } @@ -213,16 +204,15 @@ size_t gather_stream_info(const size_t stripe_index, } if (col != -1) { if (src_offset >= stripeinfo->indexLength || use_index) { - // NOTE: skip_count field is temporarily used to track index ordering - auto& chunk = chunks[stripe_index][col]; - const auto idx = - get_index_type_and_pos(stream.kind, chunk.skip_count, col == orc2gdf[column_id]); - if (idx.first < gpu::CI_NUM_STREAMS) { - chunk.strm_id[idx.first] = stream_info.size(); - chunk.strm_len[idx.first] = stream.length; - chunk.skip_count = idx.second; - - if (idx.first == gpu::CI_DICTIONARY) { + auto& chunk = chunks[stripe_index][col]; + auto const index_type = get_stream_index_type(stream.kind); + if (index_type < gpu::CI_NUM_STREAMS) { + chunk.strm_id[index_type] = stream_info.size(); + chunk.strm_len[index_type] = stream.length; + // NOTE: skip_count field is temporarily used to track the presence of index streams + chunk.skip_count |= 1 << index_type; + + if (index_type == gpu::CI_DICTIONARY) { chunk.dictionary_start = *num_dictionary_entries; chunk.dict_len = stripefooter->columns[column_id].dictionarySize; *num_dictionary_entries += stripefooter->columns[column_id].dictionarySize; diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 11813677b95..6c0f0767b73 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -21,6 +21,8 @@ #include #include +#include +#include namespace cudf { namespace io { @@ -226,6 +228,30 @@ enum row_entry_state_e { STORE_INDEX2, }; +/** + * @brief Calculates the order of index streams based on the index types present in the column. + * + * @param index_types_bitmap The bitmap of index types showing which index streams are present + * + * @return The order of index streams + */ +static auto __device__ index_order_from_index_types(uint32_t index_types_bitmap) +{ + constexpr std::array full_order = {CI_PRESENT, CI_DATA, CI_DATA2}; + + std::array partial_order; + thrust::copy_if(thrust::seq, + full_order.cbegin(), + full_order.cend(), + partial_order.begin(), + [index_types_bitmap] __device__(auto index_type) { + // Check if the index type is present + return index_types_bitmap & (1 << index_type); + }); + + return partial_order; +} + /** * @brief Decode a single row group index entry * @@ -239,11 +265,14 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s, uint8_t const* const end) { constexpr uint32_t pb_rowindexentry_id = ProtofType::FIXEDLEN + 8; + auto const stream_order = index_order_from_index_types(s->chunk.skip_count); const uint8_t* cur = start; row_entry_state_e state = NOT_FOUND; - uint32_t length = 0, strm_idx_id = s->chunk.skip_count >> 8, idx_id = 1, ci_id = CI_PRESENT, - pos_end = 0; + uint32_t length = 0; + uint32_t idx_id = 0; + uint32_t pos_end = 0; + uint32_t ci_id = CI_NUM_STREAMS; while (cur < end) { uint32_t v = 0; for (uint32_t l = 0; l <= 28; l += 7) { @@ -283,10 +312,8 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s, } break; case STORE_INDEX0: - ci_id = (idx_id == (strm_idx_id & 0xff)) ? CI_DATA - : (idx_id == ((strm_idx_id >> 8) & 0xff)) ? CI_DATA2 - : CI_PRESENT; - idx_id++; + // Start of a new entry; determine the stream index types + ci_id = stream_order[idx_id++]; if (s->is_compressed) { if (ci_id < CI_PRESENT) s->row_index_entry[0][ci_id] = v; if (cur >= start + pos_end) return length; diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 33095761fde..7fcad5df9f1 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2022, NVIDIA CORPORATION. +# Copyright (c) 2019-2023, NVIDIA CORPORATION. import datetime import decimal @@ -1896,12 +1896,10 @@ def test_reader_empty_stripe(datadir, fname): assert_eq(expected, got) -@pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/11890", raises=RuntimeError -) -def test_reader_unsupported_offsets(): - # needs enough data for more than one row group - expected = cudf.DataFrame({"str": ["*"] * 10001}, dtype="string") +# needs enough data for multiple row groups +@pytest.mark.parametrize("data", [["*"] * 10001, ["**", None] * 5001]) +def test_reader_row_index_order(data): + expected = cudf.DataFrame({"str": data}, dtype="string") buffer = BytesIO() expected.to_pandas().to_orc(buffer)