Skip to content

Commit

Permalink
[GLUTEN-7243][VL] Suspend the Velox task while reading an input Java …
Browse files Browse the repository at this point in the history
…iterator to make the task spillable (apache#7748)
  • Loading branch information
zhztheplayer authored Nov 5, 2024
1 parent 5c3545a commit c928c89
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 39 deletions.
12 changes: 0 additions & 12 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,6 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) {
std::string logPrefix{"Spill[" + poolName + "]: "};
int64_t shrunken = memoryManager_->shrink(size);
if (spillStrategy_ == "auto") {
if (task_->numThreads() != 0) {
// Task should have zero running threads, otherwise there's
// possibility that this spill call hangs. See https://github.com/apache/incubator-gluten/issues/7243.
// As of now, non-zero running threads usually happens when:
// 1. Task A spills task B;
// 2. Task A trys to grow buffers created by task B, during which spill is requested on task A again;
LOG(INFO) << fmt::format(
"{} spill is requested on a task {} that has non-zero running threads, which is not currently supported. Skipping.",
logPrefix,
task_->taskId());
return shrunken;
}
int64_t remaining = size - shrunken;
LOG(INFO) << fmt::format("{} trying to request spill for {}.", logPrefix, velox::succinctBytes(remaining));
auto mm = memoryManager_->getMemoryManager();
Expand Down
70 changes: 48 additions & 22 deletions cpp/velox/operators/plannodes/RowVectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,57 +26,81 @@ namespace gluten {
class RowVectorStream {
public:
explicit RowVectorStream(
facebook::velox::exec::DriverCtx* driverCtx,
facebook::velox::memory::MemoryPool* pool,
std::shared_ptr<ResultIterator> iterator,
ResultIterator* iterator,
const facebook::velox::RowTypePtr& outputType)
: iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}
: driverCtx_(driverCtx), pool_(pool), outputType_(outputType), iterator_(iterator) {}

bool hasNext() {
if (!finished_) {
finished_ = !iterator_->hasNext();
if (finished_) {
return false;
}
return !finished_;
bool hasNext;
{
// We are leaving Velox task execution and are probably entering Spark code through JNI. Suspend the current
// driver to make the current task open to spilling.
//
// When a task is getting spilled, it should have been suspended so has zero running threads, otherwise there's
// possibility that this spill call hangs. See https://github.com/apache/incubator-gluten/issues/7243.
// As of now, non-zero running threads usually happens when:
// 1. Task A spills task B;
// 2. Task A trys to grow buffers created by task B, during which spill is requested on task A again.
facebook::velox::exec::SuspendedSection(driverCtx_->driver);
hasNext = iterator_->hasNext();
}
if (!hasNext) {
finished_ = true;
}
return hasNext;
}

// Convert arrow batch to rowvector and use new output columns
facebook::velox::RowVectorPtr next() {
if (finished_) {
return nullptr;
}
const std::shared_ptr<VeloxColumnarBatch>& vb = VeloxColumnarBatch::from(pool_, iterator_->next());
std::shared_ptr<ColumnarBatch> cb;
{
// We are leaving Velox task execution and are probably entering Spark code through JNI. Suspend the current
// driver to make the current task open to spilling.
facebook::velox::exec::SuspendedSection(driverCtx_->driver);
cb = iterator_->next();
}
const std::shared_ptr<VeloxColumnarBatch>& vb = VeloxColumnarBatch::from(pool_, cb);
auto vp = vb->getRowVector();
VELOX_DCHECK(vp != nullptr);
return std::make_shared<facebook::velox::RowVector>(
vp->pool(), outputType_, facebook::velox::BufferPtr(0), vp->size(), vp->children());
}

private:
bool finished_{false};
std::shared_ptr<ResultIterator> iterator_;
const facebook::velox::RowTypePtr outputType_;
facebook::velox::exec::DriverCtx* driverCtx_;
facebook::velox::memory::MemoryPool* pool_;
const facebook::velox::RowTypePtr outputType_;
ResultIterator* iterator_;

bool finished_{false};
};

class ValueStreamNode final : public facebook::velox::core::PlanNode {
public:
ValueStreamNode(
const facebook::velox::core::PlanNodeId& id,
const facebook::velox::RowTypePtr& outputType,
std::unique_ptr<RowVectorStream> valueStream)
: facebook::velox::core::PlanNode(id), outputType_(outputType), valueStream_(std::move(valueStream)) {
VELOX_CHECK_NOT_NULL(valueStream_);
}
std::shared_ptr<ResultIterator> iterator)
: facebook::velox::core::PlanNode(id), outputType_(outputType), iterator_(std::move(iterator)) {}

const facebook::velox::RowTypePtr& outputType() const override {
return outputType_;
}

const std::vector<facebook::velox::core::PlanNodePtr>& sources() const override {
return kEmptySources;
return kEmptySources_;
};

RowVectorStream* rowVectorStream() const {
return valueStream_.get();
ResultIterator* iterator() const {
return iterator_.get();
}

std::string_view name() const override {
Expand All @@ -91,8 +115,8 @@ class ValueStreamNode final : public facebook::velox::core::PlanNode {
void addDetails(std::stringstream& stream) const override{};

const facebook::velox::RowTypePtr outputType_;
std::unique_ptr<RowVectorStream> valueStream_;
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
std::shared_ptr<ResultIterator> iterator_;
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
};

class ValueStream : public facebook::velox::exec::SourceOperator {
Expand All @@ -107,15 +131,17 @@ class ValueStream : public facebook::velox::exec::SourceOperator {
operatorId,
valueStreamNode->id(),
valueStreamNode->name().data()) {
valueStream_ = valueStreamNode->rowVectorStream();
ResultIterator* itr = valueStreamNode->iterator();
VELOX_CHECK_NOT_NULL(itr);
rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr, outputType_);
}

facebook::velox::RowVectorPtr getOutput() override {
if (finished_) {
return nullptr;
}
if (valueStream_->hasNext()) {
return valueStream_->next();
if (rvStream_->hasNext()) {
return rvStream_->next();
} else {
finished_ = true;
return nullptr;
Expand All @@ -132,7 +158,7 @@ class ValueStream : public facebook::velox::exec::SourceOperator {

private:
bool finished_ = false;
RowVectorStream* valueStream_;
std::unique_ptr<RowVectorStream> rvStream_;
};

class RowVectorStreamOperatorTranslator : public facebook::velox::exec::Operator::PlanNodeTranslator {
Expand Down
3 changes: 1 addition & 2 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx);
iterator = inputIters_[streamIdx];
}
auto valueStream = std::make_unique<RowVectorStream>(pool_, iterator, outputType);
auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, std::move(valueStream));
auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, std::move(iterator));

auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with LogLevelU
* Validate whether this SparkPlan supports to be transformed into substrait node in Native Code.
*/
final def doValidate(): ValidationResult = {
val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance
val schemaValidationResult = BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason =>
ValidationResult.failed(s"Found schema check failure for $schema, due to: $reason")
}
.getOrElse(ValidationResult.succeeded)
if (!schemaVaidationResult.ok()) {
if (!schemaValidationResult.ok()) {
TestStats.addFallBackClassName(this.getClass.toString)
return schemaVaidationResult
return schemaValidationResult
}
try {
TransformerState.enterValidation
Expand Down

0 comments on commit c928c89

Please sign in to comment.