Skip to content

Commit

Permalink
ARROW-17318: [C++][Dataset] Support async streaming interface for get…
Browse files Browse the repository at this point in the history
…ting fragments in Dataset

Add `GetFragmentsAsync()` and `GetFragmentsAsyncImpl()`
functions to the generic `Dataset` interface, which
allows to produce fragments in a streamed fashion.

This is one of the prerequisites for making
`FileSystemDataset` to support lazy fragment
processing, which, in turn, can be used to start
scan operations without waiting for the entire
dataset to be discovered.

To aid the transition process of moving to async
implementation in `Dataset`/`AsyncScanner` code,
a default implementation for `GetFragmentsAsyncImpl()`
is provided (iterating over `GetFragmentsImpl()`
via a `BackgroundGenerator`).

Tests: unit(release)

Signed-off-by: Pavel Solodovnikov <[email protected]>
  • Loading branch information
ManManson committed Aug 10, 2022
1 parent f0a6eba commit e91396c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
26 changes: 26 additions & 0 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/scanner.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -160,6 +161,31 @@ Result<FragmentIterator> Dataset::GetFragments(compute::Expression predicate) {
: MakeEmptyIterator<std::shared_ptr<Fragment>>();
}

Result<FragmentGenerator> Dataset::GetFragmentsAsync() {
return GetFragmentsAsync(compute::literal(true));
}

Result<FragmentGenerator> Dataset::GetFragmentsAsync(compute::Expression predicate) {
ARROW_ASSIGN_OR_RAISE(
predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_));
return predicate.IsSatisfiable() ? GetFragmentsAsyncImpl(std::move(predicate))
: MakeEmptyGenerator<std::shared_ptr<Fragment>>();
}

// Default impl delegating the work to `GetFragmentsImpl` and wrapping it into
// BackgroundGenerator/TransferredGenerator, which offloads potentially
// IO-intensive work to the default IO thread pool and then transfers the control
// back to the default CPU thread pool.
Result<FragmentGenerator> Dataset::GetFragmentsAsyncImpl(compute::Expression predicate) {
ARROW_ASSIGN_OR_RAISE(auto iter, GetFragmentsImpl(std::move(predicate)));
ARROW_ASSIGN_OR_RAISE(
auto background_gen,
MakeBackgroundGenerator(std::move(iter), io::default_io_context().executor()));
auto transferred_gen = MakeTransferredGenerator(std::move(background_gen),
::arrow::internal::GetCpuThreadPool());
return transferred_gen;
}

struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit VectorRecordBatchGenerator(RecordBatchVector batches)
: batches_(std::move(batches)) {}
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/compute/exec/expression.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/async_generator_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
Expand Down Expand Up @@ -134,6 +135,8 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {

/// @}

using FragmentGenerator = AsyncGenerator<std::shared_ptr<Fragment>>;

/// \brief A container of zero or more Fragments.
///
/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
Expand All @@ -148,6 +151,10 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
Result<FragmentIterator> GetFragments(compute::Expression predicate);
Result<FragmentIterator> GetFragments();

/// \brief Async versions of `GetFragments`.
Result<FragmentGenerator> GetFragmentsAsync(compute::Expression predicate);
Result<FragmentGenerator> GetFragmentsAsync();

const std::shared_ptr<Schema>& schema() const { return schema_; }

/// \brief An expression which evaluates to true for all data viewed by this Dataset.
Expand All @@ -174,6 +181,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
Dataset(std::shared_ptr<Schema> schema, compute::Expression partition_expression);

virtual Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) = 0;
virtual Result<FragmentGenerator> GetFragmentsAsyncImpl(compute::Expression predicate);

std::shared_ptr<Schema> schema_;
compute::Expression partition_expression_ = compute::literal(true);
Expand Down

0 comments on commit e91396c

Please sign in to comment.