Skip to content

Commit

Permalink
Fix a row index entry error in ORC writer issue (#10989)
Browse files Browse the repository at this point in the history
Issue #10755

Fixes an issue in protobuf writer where the length on the row index entry was being written into a single byte. This would cause errors when the size is larger than 127.
The issue was uncovered when row group statistics were added. String statistics contain copies to min/max strings, so the size is unbounded.
This PR changes the protobuf writer to write the entry size as a generic uint, allowing larger entries.
Also fixed `start_row` in row group info array in the reader (unrelated).

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu)
  - David Wendt (https://github.com/davidwendt)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #10989
  • Loading branch information
vuule committed May 31, 2022
1 parent d0b4e30 commit 69fc6aa
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 27 deletions.
56 changes: 32 additions & 24 deletions cpp/src/io/orc/orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,51 +212,59 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk,
TypeKind kind,
ColStatsBlob const* stats)
{
size_t sz = 0, lpos;
put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:RowIndex.entry
lpos = m_buf->size();
put_byte(0xcd); // sz+2
put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true]
put_byte(0xcd); // sz
if (present_blk >= 0) sz += put_uint(present_blk);
std::vector<uint8_t> positions_data;
ProtobufWriter position_writer(&positions_data);
auto const positions_size_offset = position_writer.put_uint(
encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true]
position_writer.put_byte(0xcd); // positions size placeholder
uint32_t positions_size = 0;
if (present_blk >= 0) positions_size += position_writer.put_uint(present_blk);
if (present_ofs >= 0) {
sz += put_uint(present_ofs);
sz += put_byte(0); // run pos = 0
sz += put_byte(0); // bit pos = 0
positions_size += position_writer.put_uint(present_ofs);
positions_size += position_writer.put_byte(0); // run pos = 0
positions_size += position_writer.put_byte(0); // bit pos = 0
}
if (data_blk >= 0) { sz += put_uint(data_blk); }
if (data_blk >= 0) { positions_size += position_writer.put_uint(data_blk); }
if (data_ofs >= 0) {
sz += put_uint(data_ofs);
positions_size += position_writer.put_uint(data_ofs);
if (kind != STRING && kind != FLOAT && kind != DOUBLE && kind != DECIMAL) {
// RLE run pos always zero (assumes RLE aligned with row index boundaries)
sz += put_byte(0);
positions_size += position_writer.put_byte(0);
if (kind == BOOLEAN) {
// bit position in byte, always zero
sz += put_byte(0);
positions_size += position_writer.put_byte(0);
}
}
}
// INT kind can be passed in to bypass 2nd stream index (dictionary length streams)
if (kind != INT) {
if (data2_blk >= 0) { sz += put_uint(data2_blk); }
if (data2_blk >= 0) { positions_size += position_writer.put_uint(data2_blk); }
if (data2_ofs >= 0) {
sz += put_uint(data2_ofs);
positions_size += position_writer.put_uint(data2_ofs);
// RLE run pos always zero (assumes RLE aligned with row index boundaries)
sz += put_byte(0);
positions_size += position_writer.put_byte(0);
}
}
// size of the field 1
m_buf->data()[lpos + 2] = (uint8_t)(sz);
positions_data[positions_size_offset] = static_cast<uint8_t>(positions_size);

auto const stats_size = (stats == nullptr)
? 0
: varint_size(encode_field_number<decltype(*stats)>(2)) +
varint_size(stats->size()) + stats->size();
auto const entry_size = positions_data.size() + stats_size;

// 1:RowIndex.entry
put_uint(encode_field_number(1, ProtofType::FIXEDLEN));
put_uint(entry_size);
put_bytes<uint8_t>(positions_data);

if (stats != nullptr) {
sz += put_uint(encode_field_number<decltype(*stats)>(2)); // 2: statistics
put_uint(encode_field_number<decltype(*stats)>(2)); // 2: statistics
// Statistics field contains its length as varint and dtype specific data (encoded on the GPU)
sz += put_uint(stats->size());
sz += put_bytes<typename ColStatsBlob::value_type>(*stats);
put_uint(stats->size());
put_bytes<typename ColStatsBlob::value_type>(*stats);
}

// size of the whole row index entry
m_buf->data()[lpos] = (uint8_t)(sz + 2);
}

size_t ProtobufWriter::write(const PostScript& s)
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/io/orc/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,17 @@ class ProtobufWriter {
put_byte(static_cast<uint8_t>(v));
return l;
}

uint32_t varint_size(uint64_t val)
{
auto len = 1u;
while (val > 0x7f) {
val >>= 7;
++len;
}
return len;
}

uint32_t put_int(int64_t v)
{
int64_t s = (v < 0);
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ enum row_entry_state_e {
* @return bytes consumed
*/
static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s,
const uint8_t* start,
const uint8_t* end)
uint8_t const* const start,
uint8_t const* const end)
{
constexpr uint32_t pb_rowindexentry_id = ProtofType::FIXEDLEN + 8;

Expand Down Expand Up @@ -471,7 +471,7 @@ __global__ void __launch_bounds__(128, 8) gpuParseRowGroupIndex(RowGroup* row_gr
: row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows;
auto const start_row =
(use_base_stride)
? rowidx_stride
? i * rowidx_stride
: row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row;
for (int j = t4; j < rowgroup_size4; j += 4) {
((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] =
Expand Down
14 changes: 14 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1729,3 +1729,17 @@ def test_orc_reader_zstd_compression(list_struct_buff):
assert_eq(expected, got)
except RuntimeError:
pytest.mark.xfail(reason="zstd support is not enabled")


def test_writer_protobuf_large_rowindexentry():
s = [
"Length of the two strings needs to add up to at least ~120",
"So that the encoded statistics are larger than 128 bytes",
] * 5001 # generate more than 10K rows to have two row groups
df = cudf.DataFrame({"s1": s})

buff = BytesIO()
df.to_orc(buff)

got = cudf.read_orc(buff)
assert_frame_equal(df, got)

0 comments on commit 69fc6aa

Please sign in to comment.