Skip to content

Commit

Permalink
Revert "[Opt](scanner-scheduler) Optimize BlockingQueue, `BlockingP…
Browse files Browse the repository at this point in the history
…riorityQueue` and change remote scan thread pool. (apache#26784)"

This is only a test to see if the commit causing load performance drop
This reverts commit 0491437.
  • Loading branch information
freemandealer committed Dec 21, 2023
1 parent ddcfba0 commit d7bc877
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 73 deletions.
50 changes: 10 additions & 40 deletions be/src/util/blocking_priority_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class BlockingPriorityQueue {
_max_element(max_elements),
_upgrade_counter(0),
_total_get_wait_time(0),
_total_put_wait_time(0),
_get_waiting(0),
_put_waiting(0) {}
_total_put_wait_time(0) {}

// Get an element from the queue, waiting indefinitely (or until timeout) for one to become available.
// Returns false if we were shut down prior to getting the element, and there
Expand All @@ -55,20 +53,10 @@ class BlockingPriorityQueue {
std::unique_lock unique_lock(_lock);
bool wait_successful = false;
if (timeout_ms > 0) {
while (!(_shutdown || !_queue.empty())) {
++_get_waiting;
if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) ==
std::cv_status::timeout) {
// timeout
wait_successful = _shutdown || !_queue.empty();
break;
}
}
wait_successful = _get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms),
[this] { return _shutdown || !_queue.empty(); });
} else {
while (!(_shutdown || !_queue.empty())) {
++_get_waiting;
_get_cv.wait(unique_lock);
}
_get_cv.wait(unique_lock, [this] { return _shutdown || !_queue.empty(); });
wait_successful = true;
}
_total_get_wait_time += timer.elapsed_time();
Expand All @@ -88,11 +76,7 @@ class BlockingPriorityQueue {
*out = _queue.top();
_queue.pop();
++_upgrade_counter;
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
_put_cv.notify_one();
return true;
} else {
assert(_shutdown);
Expand Down Expand Up @@ -128,11 +112,7 @@ class BlockingPriorityQueue {
_queue.pop();
++_upgrade_counter;
_total_get_wait_time += timer.elapsed_time();
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
_put_cv.notify_one();
return true;
}

Expand All @@ -146,7 +126,6 @@ class BlockingPriorityQueue {
timer.start();
std::unique_lock unique_lock(_lock);
while (!(_shutdown || _queue.size() < _max_element)) {
++_put_waiting;
_put_cv.wait(unique_lock);
}
_total_put_wait_time += timer.elapsed_time();
Expand All @@ -156,11 +135,7 @@ class BlockingPriorityQueue {
}

_queue.push(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
_get_cv.notify_one();
return true;
}

Expand All @@ -169,11 +144,8 @@ class BlockingPriorityQueue {
std::unique_lock unique_lock(_lock);
if (_queue.size() < _max_element && !_shutdown) {
_queue.push(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
unique_lock.unlock();
_get_cv.notify_one();
return true;
}
return false;
Expand Down Expand Up @@ -213,8 +185,6 @@ class BlockingPriorityQueue {
int _upgrade_counter;
std::atomic<uint64_t> _total_get_wait_time;
std::atomic<uint64_t> _total_put_wait_time;
size_t _get_waiting;
size_t _put_waiting;
};

} // namespace doris
} // namespace doris
34 changes: 6 additions & 28 deletions be/src/util/blocking_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class BlockingQueue {
: _shutdown(false),
_max_elements(max_elements),
_total_get_wait_time(0),
_total_put_wait_time(0),
_get_waiting(0),
_put_waiting(0) {}
_total_put_wait_time(0) {}

// Get an element from the queue, waiting indefinitely for one to become available.
// Returns false if we were shut down prior to getting the element, and there
Expand All @@ -52,20 +50,13 @@ class BlockingQueue {
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
while (!(_shutdown || !_list.empty())) {
++_get_waiting;
_get_cv.wait(unique_lock);
}
_get_cv.wait(unique_lock, [this] { return _shutdown || !_list.empty(); });
_total_get_wait_time += timer.elapsed_time();

if (!_list.empty()) {
*out = _list.front();
_list.pop_front();
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
_put_cv.notify_one();
return true;
} else {
assert(_shutdown);
Expand All @@ -79,22 +70,15 @@ class BlockingQueue {
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
while (!(_shutdown || _list.size() < _max_elements)) {
++_put_waiting;
_put_cv.wait(unique_lock);
}
_put_cv.wait(unique_lock, [this] { return _shutdown || _list.size() < _max_elements; });
_total_put_wait_time += timer.elapsed_time();

if (_shutdown) {
return false;
}

_list.push_back(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
_get_cv.notify_one();
return true;
}

Expand All @@ -114,11 +98,7 @@ class BlockingQueue {
}

_list.push_back(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
_get_cv.notify_one();
return true;
}

Expand Down Expand Up @@ -156,8 +136,6 @@ class BlockingQueue {
std::list<T> _list;
std::atomic<uint64_t> _total_get_wait_time;
std::atomic<uint64_t> _total_put_wait_time;
size_t _get_waiting;
size_t _put_waiting;
};

} // namespace doris
8 changes: 4 additions & 4 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void ScannerScheduler::stop() {

_scheduler_pool->wait();
_local_scan_thread_pool->join();
_remote_scan_thread_pool->join();
_remote_scan_thread_pool->wait();
_limited_scan_thread_pool->wait();
_group_local_scan_thread_pool->wait();

Expand Down Expand Up @@ -259,8 +259,7 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
work_func, ctx, ctx->get_task_group()->local_scan_task_entity(), nice};
ret = _task_group_local_scan_queue->push_back(scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner = *iter, ctx] {
ret = _remote_scan_thread_pool->submit_func([this, scanner = *iter, ctx] {
this->_scanner_scan(this, ctx, scanner);
};
task.priority = nice;
Expand Down Expand Up @@ -451,7 +450,7 @@ void ScannerScheduler::_register_metrics() {
REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
[this]() { return _remote_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
[this]() { return _remote_scan_thread_pool->get_active_threads(); });
[this]() { return _remote_scan_thread_pool->num_threads(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
[this]() { return _limited_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
Expand All @@ -472,4 +471,5 @@ void ScannerScheduler::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
}

} // namespace doris::vectorized
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class ScannerScheduler {
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.)
// _limited_scan_thread_pool is a special pool for queries with resource limit
std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;

std::unique_ptr<taskgroup::ScanTaskTaskGroupQueue> _task_group_local_scan_queue;
Expand Down

0 comments on commit d7bc877

Please sign in to comment.