Skip to content

Commit

Permalink
[Opt](scanner-scheduler) Optimize BlockingQueue, `BlockingPriorityQ…
Browse files Browse the repository at this point in the history
…ueue` and change remote scan thread pool. (apache#26784)

- Optimize `BlockingQueue`, `BlockingPriorityQueue` by swapping `notify` and `unlock` to reduce lock competition. Ref: https://www.boost.org/doc/libs/1_54_0/boost/thread/sync_bounded_queue.hpp
- Change remote scan thread pool to `PriorityQueue`.

Before:
```
mysql> select  sum(lo_partkey)  from  lineorder;
+-----------------+
| sum(lo_partkey) |
+-----------------+
| 300021444265405 |
+-----------------+
1 row in set (1.11 sec)
```

After:
```
mysql> select  sum(lo_partkey)  from  lineorder;
+-----------------+
| sum(lo_partkey) |
+-----------------+
| 300021444265405 |
+-----------------+
1 row in set (0.80 sec)
```
  • Loading branch information
kaka11chen committed Nov 15, 2023
1 parent e0387ad commit d3b1e1b
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 30 deletions.
54 changes: 43 additions & 11 deletions be/src/util/blocking_priority_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class BlockingPriorityQueue {
_max_element(max_elements),
_upgrade_counter(0),
_total_get_wait_time(0),
_total_put_wait_time(0) {}
_total_put_wait_time(0),
_get_waiting(0),
_put_waiting(0) {}

// Get an element from the queue, waiting indefinitely (or until timeout) for one to become available.
// Returns false if we were shut down prior to getting the element, and there
Expand All @@ -55,17 +57,29 @@ class BlockingPriorityQueue {
bool wait_successful = false;
#if !defined(USE_BTHREAD_SCANNER)
if (timeout_ms > 0) {
wait_successful = _get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms),
[this] { return _shutdown || !_queue.empty(); });
while (!(_shutdown || !_queue.empty())) {
++_get_waiting;
if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) ==
std::cv_status::timeout) {
// timeout
wait_successful = _shutdown || !_queue.empty();
break;
}
}
} else {
_get_cv.wait(unique_lock, [this] { return _shutdown || !_queue.empty(); });
while (!(_shutdown || !_queue.empty())) {
++_get_waiting;
_get_cv.wait(unique_lock);
}
wait_successful = true;
}
#else
if (timeout_ms > 0) {
wait_successful = true;
while (!(_shutdown || !_queue.empty())) {
if (_get_cv.wait_for(unique_lock, timeout_ms * 1000) != 0) {
++_get_waiting;
if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) ==
std::cv_status::timeout) {
// timeout
wait_successful = _shutdown || !_queue.empty();
break;
Expand Down Expand Up @@ -95,7 +109,11 @@ class BlockingPriorityQueue {
*out = _queue.top();
_queue.pop();
++_upgrade_counter;
_put_cv.notify_one();
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
return true;
} else {
assert(_shutdown);
Expand Down Expand Up @@ -131,7 +149,11 @@ class BlockingPriorityQueue {
_queue.pop();
++_upgrade_counter;
_total_get_wait_time += timer.elapsed_time();
_put_cv.notify_one();
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
return true;
}

Expand All @@ -145,6 +167,7 @@ class BlockingPriorityQueue {
timer.start();
std::unique_lock unique_lock(_lock);
while (!(_shutdown || _queue.size() < _max_element)) {
++_put_waiting;
_put_cv.wait(unique_lock);
}
_total_put_wait_time += timer.elapsed_time();
Expand All @@ -154,7 +177,11 @@ class BlockingPriorityQueue {
}

_queue.push(val);
_get_cv.notify_one();
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
return true;
}

Expand All @@ -163,8 +190,11 @@ class BlockingPriorityQueue {
std::unique_lock unique_lock(_lock);
if (_queue.size() < _max_element && !_shutdown) {
_queue.push(val);
unique_lock.unlock();
_get_cv.notify_one();
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
return true;
}
return false;
Expand Down Expand Up @@ -204,6 +234,8 @@ class BlockingPriorityQueue {
int _upgrade_counter;
std::atomic<uint64_t> _total_get_wait_time;
std::atomic<uint64_t> _total_put_wait_time;
size_t _get_waiting;
size_t _put_waiting;
};

} // namespace doris
} // namespace doris
34 changes: 28 additions & 6 deletions be/src/util/blocking_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ class BlockingQueue {
: _shutdown(false),
_max_elements(max_elements),
_total_get_wait_time(0),
_total_put_wait_time(0) {}
_total_put_wait_time(0),
_get_waiting(0),
_put_waiting(0) {}

// Get an element from the queue, waiting indefinitely for one to become available.
// Returns false if we were shut down prior to getting the element, and there
Expand All @@ -50,13 +52,20 @@ class BlockingQueue {
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
_get_cv.wait(unique_lock, [this] { return _shutdown || !_list.empty(); });
while (!(_shutdown || !_list.empty())) {
++_get_waiting;
_get_cv.wait(unique_lock);
}
_total_get_wait_time += timer.elapsed_time();

if (!_list.empty()) {
*out = _list.front();
_list.pop_front();
_put_cv.notify_one();
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
return true;
} else {
assert(_shutdown);
Expand All @@ -70,15 +79,22 @@ class BlockingQueue {
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
_put_cv.wait(unique_lock, [this] { return _shutdown || _list.size() < _max_elements; });
while (!(_shutdown || _list.size() < _max_elements)) {
++_put_waiting;
_put_cv.wait(unique_lock);
}
_total_put_wait_time += timer.elapsed_time();

if (_shutdown) {
return false;
}

_list.push_back(val);
_get_cv.notify_one();
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
return true;
}

Expand All @@ -98,7 +114,11 @@ class BlockingQueue {
}

_list.push_back(val);
_get_cv.notify_one();
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
return true;
}

Expand Down Expand Up @@ -136,6 +156,8 @@ class BlockingQueue {
std::list<T> _list;
std::atomic<uint64_t> _total_get_wait_time;
std::atomic<uint64_t> _total_put_wait_time;
size_t _get_waiting;
size_t _put_waiting;
};

} // namespace doris
24 changes: 12 additions & 12 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ ScannerScheduler::~ScannerScheduler() {

_scheduler_pool->wait();
_local_scan_thread_pool->join();
_remote_scan_thread_pool->join();
_limited_scan_thread_pool->wait();
_group_local_scan_thread_pool->wait();

for (int i = 0; i < QUEUE_NUM; i++) {
delete _pending_queues[i];
Expand All @@ -109,14 +112,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
config::doris_scanner_thread_pool_queue_size, "local_scan");

// 3. remote scan thread pool
_remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1
? config::doris_max_remote_scanner_thread_pool_thread_num
: std::max(512, CpuInfo::num_cores() * 10);
ThreadPoolBuilder("RemoteScanThreadPool")
.set_min_threads(config::doris_scanner_thread_pool_thread_num) // 48 default
.set_max_threads(_remote_thread_pool_max_size)
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_remote_scan_thread_pool);
_remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
config::doris_remote_scanner_thread_pool_thread_num,
config::doris_remote_scanner_thread_pool_queue_size, "RemoteScanThreadPool");

// 4. limited scan thread pool
ThreadPoolBuilder("LimitedScanThreadPool")
Expand Down Expand Up @@ -226,9 +224,12 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
task.priority = nice;
ret = _local_scan_thread_pool->offer(task);
} else {
ret = _remote_scan_thread_pool->submit_func([this, scanner = *iter, ctx] {
PriorityThreadPool::Task task;
task.work_function = [this, scanner = *iter, ctx] {
this->_scanner_scan(this, ctx, scanner);
});
};
task.priority = nice;
ret = _remote_scan_thread_pool->offer(task);
}
if (ret) {
this_run.erase(iter++);
Expand Down Expand Up @@ -422,7 +423,7 @@ void ScannerScheduler::_register_metrics() {
REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
[this]() { return _remote_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
[this]() { return _remote_scan_thread_pool->num_threads(); });
[this]() { return _remote_scan_thread_pool->get_active_threads(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
[this]() { return _limited_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
Expand All @@ -437,5 +438,4 @@ void ScannerScheduler::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
}

} // namespace doris::vectorized
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ScannerScheduler {
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.)
// _limited_scan_thread_pool is a special pool for queries with resource limit
std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
std::unique_ptr<ThreadPool> _remote_scan_thread_pool;
std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;

// true is the scheduler is closed.
Expand Down

0 comments on commit d3b1e1b

Please sign in to comment.