diff --git a/cpp/src/parquet/stream_reader.cc b/cpp/src/parquet/stream_reader.cc index 9a7cc8cdf8617..0fecb1bf24615 100644 --- a/cpp/src/parquet/stream_reader.cc +++ b/cpp/src/parquet/stream_reader.cc @@ -16,6 +16,7 @@ // under the License. #include "parquet/stream_reader.h" +#include "arrow/util/decimal.h" #include #include @@ -35,7 +36,7 @@ constexpr int64_t StreamReader::kBatchSizeOne; // then it will allow the Parquet file to use the converted type // NONE. // -static const std::set > +static const std::set> converted_type_exceptions = {{ConvertedType::INT_32, ConvertedType::NONE}, {ConvertedType::INT_64, ConvertedType::NONE}, {ConvertedType::INT_32, ConvertedType::DECIMAL}, @@ -275,6 +276,45 @@ StreamReader& StreamReader::operator>>(optional& v) { return *this; } +StreamReader& StreamReader::operator>>(optional<::arrow::Decimal128>& v) { + const auto& node = nodes_[column_index_]; + if (node->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { + const int type_length = node->type_length(); + CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::DECIMAL, type_length); + + FixedLenByteArray flba; + if (ReadOptional(&flba)) { + PARQUET_ASSIGN_OR_THROW(v, + ::arrow::Decimal128::FromBigEndian(flba.ptr, type_length)); + } else { + v.reset(); + } + } else if (node->physical_type() == Type::BYTE_ARRAY) { + CheckColumn(Type::BYTE_ARRAY, ConvertedType::DECIMAL); + + ByteArray ba; + if (ReadOptional(&ba)) { + PARQUET_ASSIGN_OR_THROW(v, ::arrow::Decimal128::FromBigEndian(ba.ptr, ba.len)); + } else { + v.reset(); + } + } else { + ParquetException::NYI("Decimal128 is not implemented for non-binary types"); + } + return *this; +} + +StreamReader& StreamReader::operator>>(::arrow::Decimal128& v) { + const auto& node = nodes_[column_index_]; + std::optional<::arrow::Decimal128> maybe_v; + *this >> maybe_v; + if (!maybe_v.has_value()) { + ThrowReadFailedException(node); + } + v = std::move(maybe_v.value()); + return *this; +} + void StreamReader::ReadFixedLength(char* ptr, int len) { CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, len); FixedLenByteArray flba; diff --git a/cpp/src/parquet/stream_reader.h b/cpp/src/parquet/stream_reader.h index e16f8ee694c2c..a7dadac92c892 100644 --- a/cpp/src/parquet/stream_reader.h +++ b/cpp/src/parquet/stream_reader.h @@ -134,6 +134,8 @@ class PARQUET_EXPORT StreamReader { StreamReader& operator>>(std::string& v); + StreamReader& operator>>(::arrow::Decimal128& v); + // Input operators for optional fields. StreamReader& operator>>(optional& v); @@ -166,6 +168,8 @@ class PARQUET_EXPORT StreamReader { StreamReader& operator>>(optional& v); + StreamReader& operator>>(optional<::arrow::Decimal128>& v); + template StreamReader& operator>>(optional>& v) { CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N); diff --git a/cpp/src/parquet/stream_reader_test.cc b/cpp/src/parquet/stream_reader_test.cc index aa0ff25b10dbe..fed036bca546a 100644 --- a/cpp/src/parquet/stream_reader_test.cc +++ b/cpp/src/parquet/stream_reader_test.cc @@ -26,6 +26,7 @@ #include #include "arrow/io/file.h" +#include "arrow/util/decimal.h" #include "parquet/exception.h" #include "parquet/test_util.h" @@ -886,7 +887,7 @@ TEST_F(TestReadingDataFiles, Int32Decimal) { auto reader = StreamReader{std::move(file_reader)}; int32_t x; - int i; + int i = 0; for (i = 1; !reader.eof(); ++i) { reader >> x >> EndRow; @@ -903,7 +904,7 @@ TEST_F(TestReadingDataFiles, Int64Decimal) { auto reader = StreamReader{std::move(file_reader)}; int64_t x; - int i; + int i = 0; for (i = 1; !reader.eof(); ++i) { reader >> x >> EndRow; @@ -912,5 +913,39 @@ TEST_F(TestReadingDataFiles, Int64Decimal) { EXPECT_EQ(i, 25); } +TEST_F(TestReadingDataFiles, FLBADecimal) { + PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open( + GetDataFile("fixed_length_decimal.parquet"))); + + auto file_reader = ParquetFileReader::Open(infile); + auto reader = StreamReader{std::move(file_reader)}; + + ::arrow::Decimal128 x; + int i = 0; + + for (i = 1; !reader.eof(); ++i) { + reader >> x >> EndRow; + EXPECT_EQ(x, ::arrow::Decimal128(i * 100)); + } + EXPECT_EQ(i, 25); +} + +TEST_F(TestReadingDataFiles, ByteArrayDecimal) { + PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open( + GetDataFile("byte_array_decimal.parquet"))); + + auto file_reader = ParquetFileReader::Open(infile); + auto reader = StreamReader{std::move(file_reader)}; + + ::arrow::Decimal128 x; + int i = 0; + + for (i = 1; !reader.eof(); ++i) { + reader >> x >> EndRow; + EXPECT_EQ(x, ::arrow::Decimal128(i * 100)); + } + EXPECT_EQ(i, 25); +} + } // namespace test } // namespace parquet