Skip to content

Commit

Permalink
apacheGH-34633: [C++][Parquet] Fix StreamReader to read decimals (apa…
Browse files Browse the repository at this point in the history
…che#34720)

### Rationale for this change

StreamReader only supports reading decimals from int32 and int64. User cannot read decimals from FIXED_LENGTH_BYTE_ARRAY or BYTE_ARRAY.

### What changes are included in this PR?

Support StreamReader to read decimals via arrow::Decimal128 type.

### Are these changes tested?

Added two test cases to read them.

### Are there any user-facing changes?

Yes, now user can directly use arrow::Decimal128 to read from parquet file.
* Closes: apache#34633

Authored-by: Gang Wu <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
wgtmac authored and ArgusLi committed May 15, 2023
1 parent c662f09 commit 73763c5
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 3 deletions.
42 changes: 41 additions & 1 deletion cpp/src/parquet/stream_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "parquet/stream_reader.h"
#include "arrow/util/decimal.h"

#include <set>
#include <utility>
Expand All @@ -35,7 +36,7 @@ constexpr int64_t StreamReader::kBatchSizeOne;
// then it will allow the Parquet file to use the converted type
// NONE.
//
static const std::set<std::pair<ConvertedType::type, ConvertedType::type> >
static const std::set<std::pair<ConvertedType::type, ConvertedType::type>>
converted_type_exceptions = {{ConvertedType::INT_32, ConvertedType::NONE},
{ConvertedType::INT_64, ConvertedType::NONE},
{ConvertedType::INT_32, ConvertedType::DECIMAL},
Expand Down Expand Up @@ -275,6 +276,45 @@ StreamReader& StreamReader::operator>>(optional<std::string>& v) {
return *this;
}

StreamReader& StreamReader::operator>>(optional<::arrow::Decimal128>& v) {
const auto& node = nodes_[column_index_];
if (node->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
const int type_length = node->type_length();
CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::DECIMAL, type_length);

FixedLenByteArray flba;
if (ReadOptional(&flba)) {
PARQUET_ASSIGN_OR_THROW(v,
::arrow::Decimal128::FromBigEndian(flba.ptr, type_length));
} else {
v.reset();
}
} else if (node->physical_type() == Type::BYTE_ARRAY) {
CheckColumn(Type::BYTE_ARRAY, ConvertedType::DECIMAL);

ByteArray ba;
if (ReadOptional(&ba)) {
PARQUET_ASSIGN_OR_THROW(v, ::arrow::Decimal128::FromBigEndian(ba.ptr, ba.len));
} else {
v.reset();
}
} else {
ParquetException::NYI("Decimal128 is not implemented for non-binary types");
}
return *this;
}

StreamReader& StreamReader::operator>>(::arrow::Decimal128& v) {
const auto& node = nodes_[column_index_];
std::optional<::arrow::Decimal128> maybe_v;
*this >> maybe_v;
if (!maybe_v.has_value()) {
ThrowReadFailedException(node);
}
v = std::move(maybe_v.value());
return *this;
}

void StreamReader::ReadFixedLength(char* ptr, int len) {
CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, len);
FixedLenByteArray flba;
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/stream_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class PARQUET_EXPORT StreamReader {

StreamReader& operator>>(std::string& v);

StreamReader& operator>>(::arrow::Decimal128& v);

// Input operators for optional fields.

StreamReader& operator>>(optional<bool>& v);
Expand Down Expand Up @@ -166,6 +168,8 @@ class PARQUET_EXPORT StreamReader {

StreamReader& operator>>(optional<std::string>& v);

StreamReader& operator>>(optional<::arrow::Decimal128>& v);

template <std::size_t N>
StreamReader& operator>>(optional<std::array<char, N>>& v) {
CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N);
Expand Down
39 changes: 37 additions & 2 deletions cpp/src/parquet/stream_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <utility>

#include "arrow/io/file.h"
#include "arrow/util/decimal.h"
#include "parquet/exception.h"
#include "parquet/test_util.h"

Expand Down Expand Up @@ -886,7 +887,7 @@ TEST_F(TestReadingDataFiles, Int32Decimal) {
auto reader = StreamReader{std::move(file_reader)};

int32_t x;
int i;
int i = 0;

for (i = 1; !reader.eof(); ++i) {
reader >> x >> EndRow;
Expand All @@ -903,7 +904,7 @@ TEST_F(TestReadingDataFiles, Int64Decimal) {
auto reader = StreamReader{std::move(file_reader)};

int64_t x;
int i;
int i = 0;

for (i = 1; !reader.eof(); ++i) {
reader >> x >> EndRow;
Expand All @@ -912,5 +913,39 @@ TEST_F(TestReadingDataFiles, Int64Decimal) {
EXPECT_EQ(i, 25);
}

TEST_F(TestReadingDataFiles, FLBADecimal) {
PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open(
GetDataFile("fixed_length_decimal.parquet")));

auto file_reader = ParquetFileReader::Open(infile);
auto reader = StreamReader{std::move(file_reader)};

::arrow::Decimal128 x;
int i = 0;

for (i = 1; !reader.eof(); ++i) {
reader >> x >> EndRow;
EXPECT_EQ(x, ::arrow::Decimal128(i * 100));
}
EXPECT_EQ(i, 25);
}

TEST_F(TestReadingDataFiles, ByteArrayDecimal) {
PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open(
GetDataFile("byte_array_decimal.parquet")));

auto file_reader = ParquetFileReader::Open(infile);
auto reader = StreamReader{std::move(file_reader)};

::arrow::Decimal128 x;
int i = 0;

for (i = 1; !reader.eof(); ++i) {
reader >> x >> EndRow;
EXPECT_EQ(x, ::arrow::Decimal128(i * 100));
}
EXPECT_EQ(i, 25);
}

} // namespace test
} // namespace parquet

0 comments on commit 73763c5

Please sign in to comment.