diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 6647c35d49d76..8b20d581062d7 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -88,42 +88,47 @@ class ExecPlanReader : public arrow::RecordBatchReader { *batch_out = batch_result.ValueUnsafe(); } else { batch_out->reset(); + StopProducing(); } return arrow::Status::OK(); } arrow::Status Close() { - plan_.reset(); - status_ = 2; + StopProducing(); return arrow::Status::OK(); } const std::shared_ptr& Plan() { return plan_; } + ~ExecPlanReader() { StopProducing(); } + private: std::shared_ptr schema_; std::shared_ptr plan_; arrow::AsyncGenerator> sink_gen_; - std::shared_ptr stop_producing_; int status_; arrow::Status StartProducing() { ARROW_RETURN_NOT_OK(plan_->StartProducing()); status_ = 1; + return arrow::Status::OK(); + } - // If the generator is destroyed before being completely drained, inform plan - const std::shared_ptr plan(plan_); - stop_producing_ = {nullptr, [plan](...) { - bool not_finished_yet = plan->finished().TryAddCallback( - [&plan] { return [plan](const arrow::Status&) {}; }); + void StopProducing() { + if (status_ == 1) { + std::shared_ptr plan(plan_); + bool not_finished_yet = plan_->finished().TryAddCallback( + [&plan] { return [plan](const arrow::Status&) {}; }); - if (not_finished_yet) { - plan->StopProducing(); - } - }}; + if (not_finished_yet) { + plan_->StopProducing(); + } - return arrow::Status::OK(); + status_ = 2; + plan_.reset(); + sink_gen_ = arrow::MakeEmptyGenerator>(); + } } };