Skip to content

Commit

Permalink
ARROW-10484: [C++] Make Future<> more generic
Browse files Browse the repository at this point in the history
`Future<Status>` and `Future<void>` have been removed in favor of `Future<>`, whose `ValueType` is an empty struct. This is a statusy future but still provides a `result()` member function, which simplifies generic code handling `Future<T>` since special cases which avoid `result()` need not be written. Additionally, `Future<T>` is now convertible to a `Future<>` which views only its status, without allocation of new state/storage. This will expedite observing the statuses of heterogeneous collections of futures.

Details:
- `DeferNotOk` and `ExecuteAndMarkFinished` are no longer members of `Future<>`
- Constructing finished Futures no longer locks the waiter mutex
- Future now holds a `shared_ptr<FutureImpl>` directly. `FutureImpl` stores the Future's result in a type erased allocation.
- `FutureStorage<>` and `FutureStorageBase` are obviated and have been removed
- deleted `Executor::SubmitAsFuture()` since it isn't used and can be trivially replaced: `ex->SubmitAsFuture(f)` -> `DeferNotOk(ex->Submit(f))`

Closes #8591 from bkietz/10484-FuturevoidStatus-could-be

Authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
bkietz committed Nov 12, 2020
1 parent 40a7a6f commit 3051604
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 235 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/flight_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Status DoSinglePerfRun(FlightClient* client, bool test_put, PerformanceStats* st
// }

ARROW_ASSIGN_OR_RAISE(auto pool, ThreadPool::Make(FLAGS_num_threads));
std::vector<Future<Status>> tasks;
std::vector<Future<>> tasks;
for (const auto& endpoint : plan->endpoints()) {
ARROW_ASSIGN_OR_RAISE(auto task, pool->Submit(ConsumeStream, endpoint));
tasks.push_back(std::move(task));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/io/caching.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> file, AsyncCont
impl_->options = options;
}

ReadRangeCache::~ReadRangeCache() {}
ReadRangeCache::~ReadRangeCache() = default;

Status ReadRangeCache::Cache(std::vector<ReadRange> ranges) {
ranges = internal::CoalesceReadRanges(std::move(ranges), impl_->options.hole_size_limit,
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,9 @@ Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const AsyncContext&
TaskHints hints;
hints.io_size = nbytes;
hints.external_id = ctx.external_id;
auto maybe_fut = ctx.executor->Submit(std::move(hints), [self, position, nbytes] {
return DeferNotOk(ctx.executor->Submit(std::move(hints), [self, position, nbytes] {
return self->ReadAt(position, nbytes);
});
if (!maybe_fut.ok()) {
return Future<std::shared_ptr<Buffer>>::MakeFinished(maybe_fut.status());
}
return *std::move(maybe_fut);
}));
}

// Default WillNeed() implementation: no-op
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#pragma once

#include <cstddef>
#include <new>
#include <string>
#include <type_traits>
Expand Down Expand Up @@ -423,6 +424,8 @@ class ARROW_MUST_USE_TYPE Result : public util::EqualityComparable<Result<T>> {

void Destroy() {
if (ARROW_PREDICT_TRUE(status_.ok())) {
static_assert(offsetof(Result<T>, status_) == 0,
"Status is guaranteed to be at the start of Result<>");
internal::launder(reinterpret_cast<const T*>(&data_))->~T();
}
}
Expand All @@ -448,6 +451,9 @@ class ARROW_MUST_USE_TYPE Result : public util::EqualityComparable<Result<T>> {
/// WARNING: ARROW_ASSIGN_OR_RAISE expands into multiple statements;
/// it cannot be used in a single statement (e.g. as the body of an if
/// statement without {})!
///
/// WARNING: ARROW_ASSIGN_OR_RAISE `std::move`s its right operand. If you have
/// an lvalue Result which you *don't* want to move out of cast appropriately.
#define ARROW_ASSIGN_OR_RAISE(lhs, rexpr) \
ARROW_ASSIGN_OR_RAISE_IMPL(ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), \
lhs, rexpr);
Expand Down
49 changes: 29 additions & 20 deletions cpp/src/arrow/result_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ namespace {
using ::testing::Eq;

StatusCode kErrorCode = StatusCode::Invalid;
constexpr char kErrorMessage[] = "Invalid argument";
constexpr const char* kErrorMessage = "Invalid argument";

const int kIntElement = 42;
constexpr char kStringElement[] =
constexpr const char* kStringElement =
"The Answer to the Ultimate Question of Life, the Universe, and Everything";

// A data type without a default constructor.
Expand All @@ -46,6 +46,10 @@ struct Foo {
std::string baz;

explicit Foo(int value) : bar(value), baz(kStringElement) {}

bool operator==(const Foo& other) const {
return (bar == other.bar) && (baz == other.baz);
}
};

// A data type with only copy constructors.
Expand All @@ -59,7 +63,7 @@ struct CopyOnlyDataType {
};

struct ImplicitlyCopyConvertible {
ImplicitlyCopyConvertible(const CopyOnlyDataType& co) // NOLINT(runtime/explicit)
ImplicitlyCopyConvertible(const CopyOnlyDataType& co) // NOLINT runtime/explicit
: copy_only(co) {}

CopyOnlyDataType copy_only;
Expand All @@ -72,9 +76,9 @@ struct MoveOnlyDataType {
MoveOnlyDataType(const MoveOnlyDataType& other) = delete;
MoveOnlyDataType& operator=(const MoveOnlyDataType& other) = delete;

MoveOnlyDataType(MoveOnlyDataType&& other) { MoveFrom(other); }
MoveOnlyDataType(MoveOnlyDataType&& other) { MoveFrom(&other); }
MoveOnlyDataType& operator=(MoveOnlyDataType&& other) {
MoveFrom(other);
MoveFrom(&other);
return *this;
}

Expand All @@ -87,17 +91,17 @@ struct MoveOnlyDataType {
}
}

void MoveFrom(MoveOnlyDataType& other) {
void MoveFrom(MoveOnlyDataType* other) {
Destroy();
data = other.data;
other.data = nullptr;
data = other->data;
other->data = nullptr;
}

int* data = nullptr;
};

struct ImplicitlyMoveConvertible {
ImplicitlyMoveConvertible(MoveOnlyDataType&& mo) // NOLINT(runtime/explicit)
ImplicitlyMoveConvertible(MoveOnlyDataType&& mo) // NOLINT runtime/explicit
: move_only(std::move(mo)) {}

MoveOnlyDataType move_only;
Expand Down Expand Up @@ -128,6 +132,10 @@ struct HeapAllocatedObject {
}

~HeapAllocatedObject() { delete value; }

bool operator==(const HeapAllocatedObject& other) const {
return *value == *other.value;
}
};

// Constructs a Foo.
Expand Down Expand Up @@ -165,14 +173,6 @@ struct StringVectorCtor {
std::vector<std::string> operator()() { return {kStringElement, kErrorMessage}; }
};

bool operator==(const Foo& lhs, const Foo& rhs) {
return (lhs.bar == rhs.bar) && (lhs.baz == rhs.baz);
}

bool operator==(const HeapAllocatedObject& lhs, const HeapAllocatedObject& rhs) {
return *lhs.value == *rhs.value;
}

// Returns an rvalue reference to the Result<T> object pointed to by
// |result|.
template <class T>
Expand All @@ -184,9 +184,8 @@ Result<T>&& MoveResult(Result<T>* result) {
template <typename T>
class ResultTest : public ::testing::Test {};

typedef ::testing::Types<IntCtor, FooCtor, StringCtor, StringVectorCtor,
HeapAllocatedObjectCtor>
TestTypes;
using TestTypes = ::testing::Types<IntCtor, FooCtor, StringCtor, StringVectorCtor,
HeapAllocatedObjectCtor>;

TYPED_TEST_SUITE(ResultTest, TestTypes);

Expand Down Expand Up @@ -715,5 +714,15 @@ TEST(ResultTest, Equality) {
}
}

TEST(ResultTest, ViewAsStatus) {
Result<int> ok(3);
Result<int> err(Status::Invalid("error"));

auto ViewAsStatus = [](const void* ptr) { return static_cast<const Status*>(ptr); };

EXPECT_EQ(ViewAsStatus(&ok), &ok.status());
EXPECT_EQ(ViewAsStatus(&err), &err.status());
}

} // namespace
} // namespace arrow
12 changes: 9 additions & 3 deletions cpp/src/arrow/util/future.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class FutureWaiterImpl : public FutureWaiter {
}
}

~FutureWaiterImpl() {
~FutureWaiterImpl() override {
for (auto future : futures_) {
future->RemoveWaiter(this);
}
Expand Down Expand Up @@ -174,9 +174,9 @@ FutureWaiterImpl* GetConcreteWaiter(FutureWaiter* waiter) {

} // namespace

FutureWaiter::FutureWaiter() {}
FutureWaiter::FutureWaiter() = default;

FutureWaiter::~FutureWaiter() {}
FutureWaiter::~FutureWaiter() = default;

std::unique_ptr<FutureWaiter> FutureWaiter::Make(Kind kind,
std::vector<FutureImpl*> futures) {
Expand Down Expand Up @@ -277,6 +277,12 @@ std::unique_ptr<FutureImpl> FutureImpl::Make() {
return std::unique_ptr<FutureImpl>(new ConcreteFutureImpl());
}

std::unique_ptr<FutureImpl> FutureImpl::MakeFinished(FutureState state) {
std::unique_ptr<ConcreteFutureImpl> ptr(new ConcreteFutureImpl());
ptr->state_ = state;
return std::move(ptr);
}

FutureImpl::FutureImpl() : state_(FutureState::PENDING) {}

FutureState FutureImpl::SetWaiter(FutureWaiter* w, int future_num) {
Expand Down
Loading

0 comments on commit 3051604

Please sign in to comment.