Skip to content

Commit

Permalink
ARROW-13388 [C++][Parquet] Enable DELTA_LENGTH_BYTE_ARRAY decoder (#1…
Browse files Browse the repository at this point in the history
…3386)

Looks like we have DeltaLengthByteArrayDecoder implemented. Enabling it in this commit to support DELTA_LENGTH_BYTE_ARRAY decoding

Authored-by: Muthunagappan Muthuraman <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
sfc-gh-mmuthuraman authored Jun 28, 2022
1 parent ce3ccdd commit a376968
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 3 deletions.
9 changes: 7 additions & 2 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,13 @@ class ColumnReaderImplBase {
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");
case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
auto decoder =
MakeTypedDecoder<DType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}

default:
throw ParquetException("Unknown encoding type.");
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2757,6 +2757,11 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
return std::unique_ptr<Decoder>(new DeltaByteArrayDecoder(descr));
}
throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY");
} else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
if (type_num == Type::BYTE_ARRAY) {
return std::unique_ptr<Decoder>(new DeltaLengthByteArrayDecoder(descr));
}
throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY");
} else {
ParquetException::NYI("Selected encoding is not supported");
}
Expand Down
84 changes: 84 additions & 0 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,90 @@ void CheckRowGroupMetadata(const RowGroupMetaData* rg_metadata,
}
}

class TestTextDeltaLengthByteArray : public ::testing::Test {
public:
void SetUp() {
reader_ = ParquetFileReader::OpenFile(data_file("delta_length_byte_array.parquet"));
}

void TearDown() {}

protected:
std::unique_ptr<ParquetFileReader> reader_;
};

TEST_F(TestTextDeltaLengthByteArray, TestTextScanner) {
auto group = reader_->RowGroup(0);

// column 0, id
auto scanner = std::make_shared<ByteArrayScanner>(group->Column(0));
ByteArray val;
bool is_null;
std::string expected_prefix("apple_banana_mango");
for (int i = 0; i < 1000; ++i) {
ASSERT_TRUE(scanner->HasNext());
ASSERT_TRUE(scanner->NextValue(&val, &is_null));
ASSERT_FALSE(is_null);
std::string expected = expected_prefix + std::to_string(i * i);
ASSERT_TRUE(val.len == expected.length());
ASSERT_EQ(::arrow::util::string_view(reinterpret_cast<const char*>(val.ptr), val.len),
expected);
}
ASSERT_FALSE(scanner->HasNext());
ASSERT_FALSE(scanner->NextValue(&val, &is_null));
}

TEST_F(TestTextDeltaLengthByteArray, TestBatchRead) {
auto group = reader_->RowGroup(0);

// column 0, id
auto col = std::dynamic_pointer_cast<ByteArrayReader>(group->Column(0));

// This file only has 1000 rows
ASSERT_EQ(1000, reader_->metadata()->num_rows());
// This file only has 1 row group
ASSERT_EQ(1, reader_->metadata()->num_row_groups());
// Size of the metadata is 105 bytes
ASSERT_EQ(105, reader_->metadata()->size());
// This row group must have 1000 rows
ASSERT_EQ(1000, group->metadata()->num_rows());

// Check if the column is encoded with DELTA_LENGTH_BYTE_ARRAY
auto col_chunk = group->metadata()->ColumnChunk(0);

ASSERT_TRUE(std::find(col_chunk->encodings().begin(), col_chunk->encodings().end(),
Encoding::DELTA_LENGTH_BYTE_ARRAY) !=
col_chunk->encodings().end());

ASSERT_TRUE(col->HasNext());
int64_t values_read = 0;
int64_t curr_batch_read;
std::string expected_prefix("apple_banana_mango");
while (values_read < 1000) {
const int16_t batch_size = 25;
int16_t def_levels[batch_size];
int16_t rep_levels[batch_size];
ByteArray values[batch_size];

auto levels_read =
col->ReadBatch(batch_size, def_levels, rep_levels, values, &curr_batch_read);
ASSERT_EQ(batch_size, levels_read);
ASSERT_EQ(batch_size, curr_batch_read);
for (int16_t i = 0; i < batch_size; i++) {
auto expected =
expected_prefix + std::to_string((i + values_read) * (i + values_read));
ASSERT_TRUE(values[i].len == expected.length());
ASSERT_EQ(::arrow::util::string_view(reinterpret_cast<const char*>(values[i].ptr),
values[i].len),
expected);
}
values_read += curr_batch_read;
}

// Now read past the end of the file
ASSERT_FALSE(col->HasNext());
}

class TestAllTypesPlain : public ::testing::Test {
public:
void SetUp() { reader_ = ParquetFileReader::OpenFile(alltypes_plain()); }
Expand Down

0 comments on commit a376968

Please sign in to comment.