Skip to content

Commit

Permalink
go back to the other approach
Browse files Browse the repository at this point in the history
  • Loading branch information
paleolimbot committed Aug 23, 2022
1 parent f91d84f commit 8dc78c3
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions r/src/compute-exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,42 +88,47 @@ class ExecPlanReader : public arrow::RecordBatchReader {
*batch_out = batch_result.ValueUnsafe();
} else {
batch_out->reset();
StopProducing();
}

return arrow::Status::OK();
}

arrow::Status Close() {
plan_.reset();
status_ = 2;
StopProducing();
return arrow::Status::OK();
}

const std::shared_ptr<arrow::compute::ExecPlan>& Plan() { return plan_; }

~ExecPlanReader() { StopProducing(); }

private:
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::compute::ExecPlan> plan_;
arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen_;
std::shared_ptr<void> stop_producing_;
int status_;

arrow::Status StartProducing() {
ARROW_RETURN_NOT_OK(plan_->StartProducing());
status_ = 1;
return arrow::Status::OK();
}

// If the generator is destroyed before being completely drained, inform plan
const std::shared_ptr<arrow::compute::ExecPlan> plan(plan_);
stop_producing_ = {nullptr, [plan](...) {
bool not_finished_yet = plan->finished().TryAddCallback(
[&plan] { return [plan](const arrow::Status&) {}; });
void StopProducing() {
if (status_ == 1) {
std::shared_ptr<arrow::compute::ExecPlan> plan(plan_);
bool not_finished_yet = plan_->finished().TryAddCallback(
[&plan] { return [plan](const arrow::Status&) {}; });

if (not_finished_yet) {
plan->StopProducing();
}
}};
if (not_finished_yet) {
plan_->StopProducing();
}

return arrow::Status::OK();
status_ = 2;
plan_.reset();
sink_gen_ = arrow::MakeEmptyGenerator<arrow::util::optional<compute::ExecBatch>>();
}
}
};

Expand Down

0 comments on commit 8dc78c3

Please sign in to comment.