diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 0c0ccd42410148..de3214b0465dc7 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1239,10 +1239,9 @@ template Status ScanLocalState::_start_scanners( const std::list& scanners) { auto& p = _parent->cast(); - _scanner_ctx = PipScannerContext::create_shared( + _scanner_ctx = PipXScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - state()->scan_queue_mem_limit(), p._col_distribute_ids, 1, _scan_dependency, - _finish_dependency); + state()->scan_queue_mem_limit(), _scan_dependency, _finish_dependency); return Status::OK(); } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index fbf59fffab20a5..9a717ec08b26d8 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -21,11 +21,9 @@ #include "runtime/descriptors.h" #include "scanner_context.h" -namespace doris { +namespace doris::pipeline { -namespace pipeline { - -class PipScannerContext : public vectorized::ScannerContext { +class PipScannerContext final : public vectorized::ScannerContext { ENABLE_FACTORY_CREATOR(PipScannerContext); public: @@ -41,19 +39,6 @@ class PipScannerContext : public vectorized::ScannerContext { _col_distribute_ids(col_distribute_ids), _need_colocate_distribute(!_col_distribute_ids.empty()) {} - PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, - const TupleDescriptor* output_tuple_desc, - const RowDescriptor* output_row_descriptor, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids, - const int num_parallel_instances, - std::shared_ptr dependency, - std::shared_ptr finish_dependency) - : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, - limit_, max_bytes_in_blocks_queue, num_parallel_instances, - local_state, dependency, finish_dependency), - _need_colocate_distribute(false) {} - Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id, bool wait = false) override { { @@ -95,9 +80,6 @@ class PipScannerContext : public vectorized::ScannerContext { if (_blocks_queues[id].empty()) { this->reschedule_scanner_ctx(); - if (_dependency) { - _dependency->block(); - } } } @@ -180,9 +162,6 @@ class PipScannerContext : public vectorized::ScannerContext { for (int j = i; j < block_size; j += queue_size) { _blocks_queues[queue].emplace_back(std::move(blocks[j])); } - if (_dependency) { - _dependency->set_ready(); - } } _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; } @@ -232,9 +211,6 @@ class PipScannerContext : public vectorized::ScannerContext { _blocks_queues[i].emplace_back(std::move(_colocate_blocks[i])); _colocate_mutable_blocks[i]->clear(); } - if (_dependency) { - _dependency->set_ready(); - } } } } @@ -248,7 +224,7 @@ class PipScannerContext : public vectorized::ScannerContext { return res; } -private: +protected: int _next_queue_to_feed = 0; std::vector> _queue_mutexs; std::vector> _blocks_queues; @@ -286,9 +262,6 @@ class PipScannerContext : public vectorized::ScannerContext { std::lock_guard queue_l(*_queue_mutexs[loc]); _blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc])); } - if (_dependency) { - _dependency->set_ready(); - } _colocate_blocks[loc] = get_free_block(); _colocate_mutable_blocks[loc]->set_mutable_columns( _colocate_blocks[loc]->mutate_columns()); @@ -297,5 +270,86 @@ class PipScannerContext : public vectorized::ScannerContext { } }; -} // namespace pipeline -} // namespace doris +class PipXScannerContext final : public vectorized::ScannerContext { + ENABLE_FACTORY_CREATOR(PipXScannerContext); + +public: + PipXScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, + const TupleDescriptor* output_tuple_desc, + const RowDescriptor* output_row_descriptor, + const std::list& scanners, int64_t limit_, + int64_t max_bytes_in_blocks_queue, + std::shared_ptr dependency, + std::shared_ptr finish_dependency) + : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, + limit_, max_bytes_in_blocks_queue, 1, local_state, + dependency, finish_dependency) {} + Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, + int id, bool wait = false) override { + std::unique_lock l(_transfer_lock); + if (state->is_cancelled()) { + set_status_on_error(Status::Cancelled("cancelled"), false); + } + + if (!status().ok()) { + return _process_status; + } + + std::vector merge_blocks; + if (_blocks_queue.empty()) { + *eos = done(); + return Status::OK(); + } + if (_process_status.is()) { + *eos = true; + return Status::OK(); + } + *block = std::move(_blocks_queue.front()); + _blocks_queue.pop_front(); + + auto rows = (*block)->rows(); + while (!_blocks_queue.empty()) { + const auto add_rows = (*_blocks_queue.front()).rows(); + if (rows + add_rows < state->batch_size()) { + rows += add_rows; + merge_blocks.emplace_back(std::move(_blocks_queue.front())); + _blocks_queue.pop_front(); + } else { + break; + } + } + + if (_blocks_queue.empty()) { + this->reschedule_scanner_ctx(); + _dependency->block(); + } + + _cur_bytes_in_queue -= (*block)->allocated_bytes(); + if (!merge_blocks.empty()) { + vectorized::MutableBlock m(block->get()); + for (auto& merge_block : merge_blocks) { + _cur_bytes_in_queue -= merge_block->allocated_bytes(); + static_cast(m.merge(*merge_block)); + return_free_block(std::move(merge_block)); + } + (*block)->set_columns(std::move(m.mutable_columns())); + } + + return Status::OK(); + } + + void reschedule_scanner_ctx() override { + if (done()) { + return; + } + auto state = _scanner_scheduler->submit(shared_from_this()); + //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? + if (state.ok()) { + _num_scheduling_ctx++; + } else { + set_status_on_error(state, false); + } + } +}; + +} // namespace doris::pipeline diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 954c294574f33d..16bb1ce8487f4a 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -259,6 +259,9 @@ void ScannerContext::append_blocks_to_queue(std::vector& _blocks_queue.push_back(std::move(b)); } blocks.clear(); + if (_dependency) { + _dependency->set_ready(); + } _blocks_queue_added_cv.notify_one(); _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue); } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 6a3e8553f8fc3a..035d396bf65660 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -170,7 +170,7 @@ class ScannerContext : public std::enable_shared_from_this { SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } - void reschedule_scanner_ctx(); + virtual void reschedule_scanner_ctx(); // the unique id of this context std::string ctx_id;