Skip to content

Commit

Permalink
GH-44784: [C++][Parquet] Add arrow::Result version of `parquet::arr…
Browse files Browse the repository at this point in the history
…ow::OpenFile()` (#44785)

### Rationale for this change

We're migrating `arrow::Status` + output variable API to `arrow::Result` API.

### What changes are included in this PR?

* Add `arrow::Result<std::unique_ptr<FileReader>> parquet::arrow::OpenFile()`
* Deprecate `arrow::Status parquet::arrow::OpenFile()`
* Use the added `arrow::Result` version in our code base

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* GitHub Issue: #44784

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored Nov 20, 2024
1 parent 1302889 commit 33e8cbb
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 52 deletions.
26 changes: 14 additions & 12 deletions c_glib/parquet-glib/arrow-file-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,13 @@ gparquet_arrow_file_reader_new_arrow(GArrowSeekableInputStream *source, GError *
{
auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(source);
auto arrow_memory_pool = arrow::default_memory_pool();
std::unique_ptr<parquet::arrow::FileReader> parquet_arrow_file_reader;
auto status = parquet::arrow::OpenFile(arrow_random_access_file,
arrow_memory_pool,
&parquet_arrow_file_reader);
if (garrow_error_check(error, status, "[parquet][arrow][file-reader][new-arrow]")) {
return gparquet_arrow_file_reader_new_raw(parquet_arrow_file_reader.release());
auto parquet_arrow_file_reader_result =
parquet::arrow::OpenFile(arrow_random_access_file, arrow_memory_pool);
if (garrow::check(error,
parquet_arrow_file_reader_result,
"[parquet][arrow][file-reader][new-arrow]")) {
return gparquet_arrow_file_reader_new_raw(
parquet_arrow_file_reader_result->release());
} else {
return NULL;
}
Expand Down Expand Up @@ -168,12 +169,13 @@ gparquet_arrow_file_reader_new_path(const gchar *path, GError **error)
std::shared_ptr<arrow::io::RandomAccessFile> arrow_random_access_file =
arrow_memory_mapped_file.ValueOrDie();
auto arrow_memory_pool = arrow::default_memory_pool();
std::unique_ptr<parquet::arrow::FileReader> parquet_arrow_file_reader;
auto status = parquet::arrow::OpenFile(arrow_random_access_file,
arrow_memory_pool,
&parquet_arrow_file_reader);
if (garrow::check(error, status, "[parquet][arrow][file-reader][new-path]")) {
return gparquet_arrow_file_reader_new_raw(parquet_arrow_file_reader.release());
auto parquet_arrow_file_reader_result =
parquet::arrow::OpenFile(arrow_random_access_file, arrow_memory_pool);
if (garrow::check(error,
parquet_arrow_file_reader_result,
"[parquet][arrow][file-reader][new-path]")) {
return gparquet_arrow_file_reader_new_raw(
parquet_arrow_file_reader_result->release());
} else {
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/arrow/parquet_read_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ arrow::Status ReadFullFile(std::string path_to_file) {

// Open Parquet file reader
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, pool, &arrow_reader));
ARROW_ASSIGN_OR_RAISE(arrow_reader, parquet::arrow::OpenFile(input, pool));

// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> table;
Expand Down
16 changes: 8 additions & 8 deletions cpp/examples/parquet/parquet_arrow/reader_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ void read_whole_file() {
arrow::default_memory_pool()));

std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns()
Expand All @@ -85,8 +85,8 @@ void read_single_rowgroup() {
arrow::default_memory_pool()));

std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->RowGroup(0)->ReadTable(&table));
std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns()
Expand All @@ -102,8 +102,8 @@ void read_single_column() {
arrow::default_memory_pool()));

std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::ChunkedArray> array;
PARQUET_THROW_NOT_OK(reader->ReadColumn(0, &array));
PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout));
Expand All @@ -122,8 +122,8 @@ void read_single_column_chunk() {
arrow::default_memory_pool()));

std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::ChunkedArray> array;
PARQUET_THROW_NOT_OK(reader->RowGroup(0)->Column(0)->Read(&array));
PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout));
Expand Down
50 changes: 21 additions & 29 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,8 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
ASSERT_NO_FATAL_FAILURE(
WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

reader->set_use_threads(use_threads);
if (column_subset.size() > 0) {
Expand Down Expand Up @@ -1095,8 +1094,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {

auto source = std::make_shared<BufferReader>(pbuffer);
std::shared_ptr<::arrow::Table> out;
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(source, ::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(source, ::arrow::default_memory_pool()));
ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out));
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(values->length(), out->num_rows());
Expand Down Expand Up @@ -2295,9 +2293,8 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

ASSERT_EQ(2, reader->num_row_groups());

Expand Down Expand Up @@ -2357,9 +2354,8 @@ TEST(TestArrowReadWrite, ReadTableManually) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(expected, num_rows / 2,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

ASSERT_EQ(2, reader->num_row_groups());

Expand Down Expand Up @@ -2476,9 +2472,8 @@ TEST(TestArrowReadWrite, CoalescedReadsAndNonCoalescedReads) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(expected, num_rows / 2,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

ASSERT_EQ(2, reader->num_row_groups());

Expand Down Expand Up @@ -2594,9 +2589,8 @@ TEST(TestArrowReadWrite, ScanContents) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

int64_t num_rows_returned = 0;
ASSERT_OK_NO_THROW(reader->ScanContents({}, 256, &num_rows_returned));
Expand Down Expand Up @@ -2689,18 +2683,17 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

// Read everything
std::shared_ptr<Table> result;
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));

// Read 1 record at a time
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

std::unique_ptr<ColumnReader> col_reader;
ASSERT_OK(reader->GetColumn(0, &col_reader));
Expand Down Expand Up @@ -2974,9 +2967,8 @@ TEST(ArrowReadWrite, DecimalStats) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, /*row_group_size=*/100,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

std::shared_ptr<Scalar> min, max;
ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
Expand Down Expand Up @@ -3575,8 +3567,8 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {

void InitReader() {
ASSERT_OK_AND_ASSIGN(auto buffer, nested_parquet_->Finish());
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader_));
ASSERT_OK_AND_ASSIGN(reader_, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));
}

void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
Expand Down Expand Up @@ -5344,8 +5336,8 @@ TEST(TestArrowReadWrite, MultithreadedWrite) {

// Read to verify the data.
std::shared_ptr<Table> result;
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), pool, &reader));
ASSERT_OK_AND_ASSIGN(auto reader,
OpenFile(std::make_shared<BufferReader>(buffer), pool));
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
}
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1372,9 +1372,14 @@ Result<std::unique_ptr<FileReader>> FileReaderBuilder::Build() {

Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool,
std::unique_ptr<FileReader>* reader) {
return OpenFile(std::move(file), pool).Value(reader);
}

Result<std::unique_ptr<FileReader>> OpenFile(
std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool) {
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(std::move(file)));
return builder.memory_pool(pool)->Build(reader);
return builder.memory_pool(pool)->Build();
}

namespace internal {
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,21 @@ class PARQUET_EXPORT FileReaderBuilder {
/// \brief Build FileReader from Arrow file and MemoryPool
///
/// Advanced settings are supported through the FileReaderBuilder class.
///
/// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
PARQUET_EXPORT
::arrow::Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile>,
::arrow::MemoryPool* allocator,
std::unique_ptr<FileReader>* reader);

/// \brief Build FileReader from Arrow file and MemoryPool
///
/// Advanced settings are supported through the FileReaderBuilder class.
PARQUET_EXPORT
::arrow::Result<std::unique_ptr<FileReader>> OpenFile(
std::shared_ptr<::arrow::io::RandomAccessFile>, ::arrow::MemoryPool* allocator);

/// @}

PARQUET_EXPORT
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/parquet/arrow/reconstruct_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ class FileTester {
protected:
Status Open(std::shared_ptr<Buffer> buffer, MemoryPool* pool) {
pool_ = pool;
return OpenFile(std::make_shared<BufferReader>(buffer), pool_, &file_reader_);
ARROW_ASSIGN_OR_RAISE(file_reader_,
OpenFile(std::make_shared<BufferReader>(buffer), pool_));
return Status::OK();
}

MemoryPool* pool_;
Expand Down

0 comments on commit 33e8cbb

Please sign in to comment.