Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17350: [C++] Create a scheduler for asynchronous work #13912

Merged
merged 10 commits into from
Sep 2, 2022
29 changes: 17 additions & 12 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,21 @@ struct ExecPlanImpl : public ExecPlan {

Result<Future<>> BeginExternalTask() {
Future<> completion_future = Future<>::Make();
ARROW_ASSIGN_OR_RAISE(bool task_added,
task_group_.AddTaskIfNotEnded(completion_future));
if (task_added) {
return std::move(completion_future);
if (async_scheduler_->AddSimpleTask(
[completion_future] { return completion_future; })) {
return completion_future;
}
// Return an invalid future if we were already finished to signal to the
// caller that they should not begin the task
return Future<>{};
}

Status ScheduleTask(std::function<Status()> fn) {
auto executor = exec_context_->executor();
if (!executor) return fn();
// Adds a task which submits fn to the executor and tracks its progress. If we're
// already stopping then the task is ignored and fn is not executed.
return task_group_
.AddTaskIfNotEnded([executor, fn]() { return executor->Submit(std::move(fn)); })
.status();
// aborted then the task is ignored and fn is not executed.
async_scheduler_->AddSimpleTask(
[executor, fn]() { return executor->Submit(std::move(fn)); });
return Status::OK();
}

Status ScheduleTask(std::function<Status(size_t)> fn) {
Expand All @@ -113,6 +110,8 @@ struct ExecPlanImpl : public ExecPlan {
return task_scheduler_->StartTaskGroup(GetThreadIndex(), task_group_id, num_tasks);
}

util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_.get(); }

Status Validate() const {
if (nodes_.empty()) {
return Status::Invalid("ExecPlan has no node");
Expand Down Expand Up @@ -197,7 +196,8 @@ struct ExecPlanImpl : public ExecPlan {
void EndTaskGroup() {
bool expected = false;
if (group_ended_.compare_exchange_strong(expected, true)) {
task_group_.End().AddCallback([this](const Status& st) {
async_scheduler_->End();
async_scheduler_->OnFinished().AddCallback([this](const Status& st) {
MARK_SPAN(span_, error_st_ & st);
END_SPAN(span_);
finished_.MarkFinished(error_st_ & st);
Expand Down Expand Up @@ -328,7 +328,8 @@ struct ExecPlanImpl : public ExecPlan {

ThreadIndexer thread_indexer_;
std::atomic<bool> group_ended_{false};
util::AsyncTaskGroup task_group_;
std::unique_ptr<util::AsyncTaskScheduler> async_scheduler_ =
util::AsyncTaskScheduler::Make();
std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();
};

Expand Down Expand Up @@ -386,6 +387,10 @@ Status ExecPlan::StartTaskGroup(int task_group_id, int64_t num_tasks) {
return ToDerived(this)->StartTaskGroup(task_group_id, num_tasks);
}

util::AsyncTaskScheduler* ExecPlan::async_scheduler() {
return ToDerived(this)->async_scheduler();
}

Status ExecPlan::Validate() { return ToDerived(this)->Validate(); }

Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
/// \param num_tasks The number of times to run the task
Status StartTaskGroup(int task_group_id, int64_t num_tasks);

util::AsyncTaskScheduler* async_scheduler();

/// The initial inputs
const NodeVector& sources() const;

Expand Down
20 changes: 16 additions & 4 deletions cpp/src/arrow/compute/exec/tpch_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/compute/exec/tpch_node.h"
#include "arrow/buffer.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/util/async_util.h"
#include "arrow/util/formatting.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
Expand Down Expand Up @@ -3374,13 +3375,18 @@ class TpchNode : public ExecNode {
[[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }

Status StartProducing() override {
return generator_->StartProducing(
num_running_++;
ARROW_RETURN_NOT_OK(generator_->StartProducing(
plan_->max_concurrency(),
[this](ExecBatch batch) { this->OutputBatchCallback(std::move(batch)); },
[this](int64_t num_batches) { this->FinishedCallback(num_batches); },
[this](std::function<Status(size_t)> func) -> Status {
return this->ScheduleTaskCallback(std::move(func));
});
}));
if (--num_running_ == 0) {
finished_.MarkFinished(Status::OK());
}
return Status::OK();
}

void PauseProducing(ExecNode* output, int32_t counter) override {
Expand Down Expand Up @@ -3408,23 +3414,29 @@ class TpchNode : public ExecNode {

void FinishedCallback(int64_t total_num_batches) {
outputs_[0]->InputFinished(this, static_cast<int>(total_num_batches));
finished_.MarkFinished();
finished_generating_.store(true);
}

Status ScheduleTaskCallback(std::function<Status(size_t)> func) {
if (finished_.is_finished()) return Status::OK();
if (finished_generating_.load()) return Status::OK();
num_running_++;
return plan_->ScheduleTask([this, func](size_t thread_index) {
Status status = func(thread_index);
if (!status.ok()) {
StopProducing();
ErrorIfNotOk(status);
}
if (--num_running_ == 0) {
finished_.MarkFinished(Status::OK());
}
return status;
});
}

const char* name_;
std::unique_ptr<TpchTableGenerator> generator_;
std::atomic<bool> finished_generating_{false};
std::atomic<int> num_running_{0};
};

class TpchGenImpl : public TpchGen {
Expand Down
Loading