Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decrease overhead of WorkerThreadPool task processing #72716

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,33 @@ void WorkerThreadPool::_process_task_queue() {
_process_task(task);
}

// We want a batch size that's small enough to avoid unbalanced threads,
// but large enough to minimize cache contention and synchronization overhead.
// This is currently using a placeholder formula, it should probably be
// adjusted once there's more multithreading code available to test with.
static int calculate_batch_size(int p_num_elements, int p_num_threads) {
#ifndef DEBUG_ENABLED
// Prevent the batch size from becoming too large on systems with
// low core counts. This allows threads that finish early to be
// able to pick up another batch instead of just idling.
// Disabled in debug mode so that larger batch sizes can be reached
// with a smaller number of elements, making testing more efficient.
p_num_threads = MAX(p_num_threads, 8);
#endif

// Divide workload evenly among all available threads.
// With this method, any threads that finish processing early will
// still have to wait for the slowest thread in the group.
int batch_size = p_num_elements / p_num_threads;

// Cap the batch size since context switching costs become
// negligible past a certain point.
// 4096 may or may not be that point.
batch_size = MIN(batch_size, 4096);

return MAX(batch_size, 1);
}

void WorkerThreadPool::_process_task(Task *p_task) {
bool low_priority = p_task->low_priority;
int pool_thread_index = -1;
Expand All @@ -72,29 +99,47 @@ void WorkerThreadPool::_process_task(Task *p_task) {
if (p_task->group) {
// Handling a group
bool do_post = false;
Callable::CallError ce;
Variant ret;
Variant arg;
Variant *argptr = &arg;

while (true) {
uint32_t work_index = p_task->group->index.postincrement();
// Process nearby indices in the same thread for better caching
const uint32_t batch_size = calculate_batch_size(p_task->group->max, p_task->group->tasks_used);

while (true) {
uint32_t work_index = p_task->group->index.postadd(batch_size);
if (work_index >= p_task->group->max) {
break;
}

#ifdef DEBUG_ENABLED
myaaaaaaaaa marked this conversation as resolved.
Show resolved Hide resolved
// Probably random enough
const uint32_t rand = hash_murmur3_one_32(work_index, (uintptr_t)p_task);

// Make execution order more random so that race conditions
// have a higher chance of being found. We run this outside
// the batch to minimize the performance impact.
OS::get_singleton()->delay_usec(rand % 10);
#endif

uint32_t local_batch = MIN(batch_size, p_task->group->max - work_index);

if (p_task->native_group_func) {
p_task->native_group_func(p_task->native_func_userdata, work_index);
for (uint32_t i = 0; i < local_batch; i++) {
p_task->native_group_func(p_task->native_func_userdata, work_index + i);
}
} else if (p_task->template_userdata) {
p_task->template_userdata->callback_indexed(work_index);
p_task->template_userdata->callback_range(work_index, work_index + local_batch);
} else {
arg = work_index;
p_task->callable.callp((const Variant **)&argptr, 1, ret, ce);
Callable::CallError ce;
Variant ret;
Variant arg;
Variant *argptr = &arg;
for (uint32_t i = 0; i < local_batch; i++) {
arg = work_index + i;
p_task->callable.callp((const Variant **)&argptr, 1, ret, ce);
}
}

// This is the only way to ensure posting is done when all tasks are really complete.
uint32_t completed_amount = p_task->group->completed_index.increment();

uint32_t completed_amount = p_task->group->completed_index.add(local_batch);
if (completed_amount == p_task->group->max) {
do_post = true;
}
Expand Down
8 changes: 5 additions & 3 deletions core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class WorkerThreadPool : public Object {

struct BaseTemplateUserdata {
virtual void callback() {}
virtual void callback_indexed(uint32_t p_index) {}
virtual void callback_range(uint32_t p_from, uint32_t p_to) {}
virtual ~BaseTemplateUserdata() {}
};

Expand Down Expand Up @@ -154,8 +154,10 @@ class WorkerThreadPool : public Object {
C *instance;
M method;
U userdata;
virtual void callback_indexed(uint32_t p_index) override {
(instance->*method)(p_index, userdata);
virtual void callback_range(uint32_t p_from, uint32_t p_to) override {
for (uint32_t p_index = p_from; p_index < p_to; p_index++) {
(instance->*method)(p_index, userdata);
}
}
};

Expand Down