Skip to content

Commit

Permalink
[pipelineX](fix) Fix pip scanner context bug (apache#29229)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and HappenLee committed Jan 12, 2024
1 parent 0bda910 commit 648deaf
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 36 deletions.
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,10 +1239,9 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<vectorized::VScannerSPtr>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(
_scanner_ctx = PipXScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
state()->scan_queue_mem_limit(), p._col_distribute_ids, 1, _scan_dependency,
_finish_dependency);
state()->scan_queue_mem_limit(), _scan_dependency, _finish_dependency);
return Status::OK();
}

Expand Down
118 changes: 86 additions & 32 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
#include "runtime/descriptors.h"
#include "scanner_context.h"

namespace doris {
namespace doris::pipeline {

namespace pipeline {

class PipScannerContext : public vectorized::ScannerContext {
class PipScannerContext final : public vectorized::ScannerContext {
ENABLE_FACTORY_CREATOR(PipScannerContext);

public:
Expand All @@ -41,19 +39,6 @@ class PipScannerContext : public vectorized::ScannerContext {
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}

PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances,
std::shared_ptr<pipeline::ScanDependency> dependency,
std::shared_ptr<pipeline::Dependency> finish_dependency)
: vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners,
limit_, max_bytes_in_blocks_queue, num_parallel_instances,
local_state, dependency, finish_dependency),
_need_colocate_distribute(false) {}

Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
int id, bool wait = false) override {
{
Expand Down Expand Up @@ -95,9 +80,6 @@ class PipScannerContext : public vectorized::ScannerContext {

if (_blocks_queues[id].empty()) {
this->reschedule_scanner_ctx();
if (_dependency) {
_dependency->block();
}
}
}

Expand Down Expand Up @@ -180,9 +162,6 @@ class PipScannerContext : public vectorized::ScannerContext {
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
}
if (_dependency) {
_dependency->set_ready();
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
}
Expand Down Expand Up @@ -232,9 +211,6 @@ class PipScannerContext : public vectorized::ScannerContext {
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
_colocate_mutable_blocks[i]->clear();
}
if (_dependency) {
_dependency->set_ready();
}
}
}
}
Expand All @@ -248,7 +224,7 @@ class PipScannerContext : public vectorized::ScannerContext {
return res;
}

private:
protected:
int _next_queue_to_feed = 0;
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
Expand Down Expand Up @@ -286,9 +262,6 @@ class PipScannerContext : public vectorized::ScannerContext {
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
if (_dependency) {
_dependency->set_ready();
}
_colocate_blocks[loc] = get_free_block();
_colocate_mutable_blocks[loc]->set_mutable_columns(
_colocate_blocks[loc]->mutate_columns());
Expand All @@ -297,5 +270,86 @@ class PipScannerContext : public vectorized::ScannerContext {
}
};

} // namespace pipeline
} // namespace doris
class PipXScannerContext final : public vectorized::ScannerContext {
ENABLE_FACTORY_CREATOR(PipXScannerContext);

public:
PipXScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue,
std::shared_ptr<pipeline::ScanDependency> dependency,
std::shared_ptr<pipeline::Dependency> finish_dependency)
: vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners,
limit_, max_bytes_in_blocks_queue, 1, local_state,
dependency, finish_dependency) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
int id, bool wait = false) override {
std::unique_lock l(_transfer_lock);
if (state->is_cancelled()) {
set_status_on_error(Status::Cancelled("cancelled"), false);
}

if (!status().ok()) {
return _process_status;
}

std::vector<vectorized::BlockUPtr> merge_blocks;
if (_blocks_queue.empty()) {
*eos = done();
return Status::OK();
}
if (_process_status.is<ErrorCode::CANCELLED>()) {
*eos = true;
return Status::OK();
}
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();

auto rows = (*block)->rows();
while (!_blocks_queue.empty()) {
const auto add_rows = (*_blocks_queue.front()).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
merge_blocks.emplace_back(std::move(_blocks_queue.front()));
_blocks_queue.pop_front();
} else {
break;
}
}

if (_blocks_queue.empty()) {
this->reschedule_scanner_ctx();
_dependency->block();
}

_cur_bytes_in_queue -= (*block)->allocated_bytes();
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
_cur_bytes_in_queue -= merge_block->allocated_bytes();
static_cast<void>(m.merge(*merge_block));
return_free_block(std::move(merge_block));
}
(*block)->set_columns(std::move(m.mutable_columns()));
}

return Status::OK();
}

void reschedule_scanner_ctx() override {
if (done()) {
return;
}
auto state = _scanner_scheduler->submit(shared_from_this());
//todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times?
if (state.ok()) {
_num_scheduling_ctx++;
} else {
set_status_on_error(state, false);
}
}
};

} // namespace doris::pipeline
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
_blocks_queue.push_back(std::move(b));
}
blocks.clear();
if (_dependency) {
_dependency->set_ready();
}
_blocks_queue_added_cv.notify_one();
_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> {

SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; }

void reschedule_scanner_ctx();
virtual void reschedule_scanner_ctx();

// the unique id of this context
std::string ctx_id;
Expand Down

0 comments on commit 648deaf

Please sign in to comment.