From da4235e3c08b46a8ff7a5207a93c03db75ce5515 Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Fri, 5 Aug 2022 15:59:33 +0300 Subject: [PATCH] ARROW-17318: [C++][Dataset] Support async streaming interface for getting fragments in Dataset Add `GetFragmentsAsync()` and `GetFragmentsAsyncImpl()` functions to the generic `Dataset` interface, which allows to produce fragments in a streamed fashion. This is one of the prerequisites for making `FileSystemDataset` to support lazy fragment processing, which, in turn, can be used to start scan operations without waiting for the entire dataset to be discovered. To aid the transition process of moving to async implementation in `Dataset`/`AsyncScanner` code, a default implementation for `GetFragmentsAsyncImpl()` is provided (yielding a VectorGenerator over the fragments vector, which is stored by every implementation of Dataset interface at the moment). Tests: unit(release) Signed-off-by: Pavel Solodovnikov --- cpp/src/arrow/dataset/dataset.cc | 19 +++++++++++++++++++ cpp/src/arrow/dataset/dataset.h | 8 ++++++++ 2 files changed, 27 insertions(+) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 1e4c9b7f719bb..4ac43e136e135 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -160,6 +160,25 @@ Result Dataset::GetFragments(compute::Expression predicate) { : MakeEmptyIterator>(); } +Result Dataset::GetFragmentsAsync() { + return GetFragmentsAsync(compute::literal(true)); +} + +Result Dataset::GetFragmentsAsync(compute::Expression predicate) { + ARROW_ASSIGN_OR_RAISE( + predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_)); + return predicate.IsSatisfiable() ? GetFragmentsAsyncImpl(std::move(predicate)) + : MakeEmptyGenerator>(); +} + +// Default impl delegating the work to `GetFragmentsImpl` and wrapping it into a +// VectorGenerator +Result Dataset::GetFragmentsAsyncImpl(compute::Expression predicate) { + ARROW_ASSIGN_OR_RAISE(auto iter, GetFragmentsImpl(std::move(predicate))); + ARROW_ASSIGN_OR_RAISE(auto vec, iter.ToVector()); + return MakeVectorGenerator(std::move(vec)); +} + struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { explicit VectorRecordBatchGenerator(RecordBatchVector batches) : batches_(std::move(batches)) {} diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 9f4fee52154a9..cd8a4ee587e11 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -28,6 +28,7 @@ #include "arrow/compute/exec/expression.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/util/async_generator.h" #include "arrow/util/macros.h" #include "arrow/util/mutex.h" #include "arrow/util/optional.h" @@ -134,6 +135,8 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { /// @} +using FragmentGenerator = AsyncGenerator>; + /// \brief A container of zero or more Fragments. /// /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a @@ -148,6 +151,10 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { Result GetFragments(compute::Expression predicate); Result GetFragments(); + /// \brief Async versions of `GetFragments`. + Result GetFragmentsAsync(compute::Expression predicate); + Result GetFragmentsAsync(); + const std::shared_ptr& schema() const { return schema_; } /// \brief An expression which evaluates to true for all data viewed by this Dataset. @@ -174,6 +181,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { Dataset(std::shared_ptr schema, compute::Expression partition_expression); virtual Result GetFragmentsImpl(compute::Expression predicate) = 0; + virtual Result GetFragmentsAsyncImpl(compute::Expression predicate); std::shared_ptr schema_; compute::Expression partition_expression_ = compute::literal(true);