Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch of fixes for WorkerThreadPool and ResourceLoader (safe set) #94526

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion core/io/resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@
#include <stdio.h>

void Resource::emit_changed() {
emit_signal(CoreStringName(changed));
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the connection happen on the call queue, later, since signals are not thread-safe.
call_deferred("emit_signal", CoreStringName(changed));
} else {
emit_signal(CoreStringName(changed));
}
}

void Resource::_resource_path_changed() {
Expand Down Expand Up @@ -161,12 +166,22 @@ bool Resource::editor_can_reload_from_file() {
}

void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the check and connection happen on the call queue, later, since signals are not thread-safe.
callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags);
return;
}
if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) {
connect(CoreStringName(changed), p_callable, p_flags);
}
}

void Resource::disconnect_changed(const Callable &p_callable) {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the check and disconnection happen on the call queue, later, since signals are not thread-safe.
callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable);
return;
}
if (is_connected(CoreStringName(changed), p_callable)) {
disconnect(CoreStringName(changed), p_callable);
}
Expand Down
84 changes: 43 additions & 41 deletions core/io/resource_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,39 +304,33 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
thread_load_mutex.unlock();

// Thread-safe either if it's the current thread or a brand new one.
thread_local bool mq_override_present = false;
CallQueue *own_mq_override = nullptr;
if (load_nesting == 0) {
mq_override_present = false;
load_paths_stack = memnew(Vector<String>);

if (!load_task.dependent_path.is_empty()) {
load_paths_stack->push_back(load_task.dependent_path);
}
if (!Thread::is_main_thread()) {
// Let the caller thread use its own, for added flexibility. Provide one otherwise.
if (MessageQueue::get_singleton() == MessageQueue::get_main_singleton()) {
own_mq_override = memnew(CallQueue);
MessageQueue::set_thread_singleton_override(own_mq_override);
}
mq_override_present = true;
set_current_thread_safe_for_nodes(true);
}
} else {
DEV_ASSERT(load_task.dependent_path.is_empty());
}
// --

Ref<Resource> res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress);
if (mq_override_present) {
Error load_err = OK;
Ref<Resource> res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress);
if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) {
MessageQueue::get_singleton()->flush();
}

thread_load_mutex.lock();

load_task.resource = res;

load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0
load_task.progress = 1.0; // It was fully loaded at this point, so force progress to 1.0.
load_task.error = load_err;
if (load_task.error != OK) {
load_task.status = THREAD_LOAD_FAILED;
} else {
Expand Down Expand Up @@ -473,12 +467,13 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,

if (!ignoring_cache && thread_load_tasks.has(local_path)) {
load_token = Ref<LoadToken>(thread_load_tasks[local_path].load_token);
if (!load_token.is_valid()) {
if (load_token.is_valid()) {
return load_token;
} else {
// The token is dying (reached 0 on another thread).
// Ensure it's killed now so the path can be safely reused right away.
thread_load_tasks[local_path].load_token->clear();
}
return load_token;
}

load_token.instantiate();
Expand Down Expand Up @@ -560,39 +555,46 @@ float ResourceLoader::_dependency_get_progress(const String &p_path) {
}

ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const String &p_path, float *r_progress) {
MutexLock thread_load_lock(thread_load_mutex);
bool ensure_progress = false;
ThreadLoadStatus status = THREAD_LOAD_IN_PROGRESS;
{
MutexLock thread_load_lock(thread_load_mutex);

if (!user_load_tokens.has(p_path)) {
print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected.");
return THREAD_LOAD_INVALID_RESOURCE;
}
if (!user_load_tokens.has(p_path)) {
print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected.");
return THREAD_LOAD_INVALID_RESOURCE;
}

String local_path = _validate_local_path(p_path);
if (!thread_load_tasks.has(local_path)) {
String local_path = _validate_local_path(p_path);
if (!thread_load_tasks.has(local_path)) {
#ifdef DEV_ENABLED
CRASH_NOW();
CRASH_NOW();
#endif
// On non-dev, be defensive and at least avoid crashing (at this point at least).
return THREAD_LOAD_INVALID_RESOURCE;
}
// On non-dev, be defensive and at least avoid crashing (at this point at least).
return THREAD_LOAD_INVALID_RESOURCE;
}

ThreadLoadTask &load_task = thread_load_tasks[local_path];
ThreadLoadStatus status;
status = load_task.status;
if (r_progress) {
*r_progress = _dependency_get_progress(local_path);
}
ThreadLoadTask &load_task = thread_load_tasks[local_path];
status = load_task.status;
if (r_progress) {
*r_progress = _dependency_get_progress(local_path);
}

// Support userland polling in a loop on the main thread.
if (Thread::is_main_thread() && status == THREAD_LOAD_IN_PROGRESS) {
uint64_t frame = Engine::get_singleton()->get_process_frames();
if (frame == load_task.last_progress_check_main_thread_frame) {
_ensure_load_progress();
} else {
load_task.last_progress_check_main_thread_frame = frame;
// Support userland polling in a loop on the main thread.
if (Thread::is_main_thread() && status == THREAD_LOAD_IN_PROGRESS) {
uint64_t frame = Engine::get_singleton()->get_process_frames();
if (frame == load_task.last_progress_check_main_thread_frame) {
ensure_progress = true;
} else {
load_task.last_progress_check_main_thread_frame = frame;
}
}
}

if (ensure_progress) {
_ensure_load_progress();
}

return status;
}

Expand Down Expand Up @@ -626,13 +628,13 @@ Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_e
if (Thread::is_main_thread() && !load_token->local_path.is_empty()) {
const ThreadLoadTask &load_task = thread_load_tasks[load_token->local_path];
while (load_task.status == THREAD_LOAD_IN_PROGRESS) {
if (!_ensure_load_progress()) {
// This local poll loop is not needed.
break;
}
thread_load_lock.~MutexLock();
bool exit = !_ensure_load_progress();
OS::get_singleton()->delay_usec(1000);
new (&thread_load_lock) MutexLock(thread_load_mutex);
if (exit) {
break;
}
}
}

Expand Down
1 change: 0 additions & 1 deletion core/io/resource_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ class ResourceLoader {
LoadToken *load_token = nullptr;
String local_path;
String remapped_path;
String dependent_path;
String type_hint;
float progress = 0.0f;
float max_reported_progress = 0.0f;
Expand Down
12 changes: 7 additions & 5 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ void WorkerThreadPool::_process_task(Task *p_task) {
CallQueue *call_queue_backup = MessageQueue::get_singleton() != MessageQueue::get_main_singleton() ? MessageQueue::get_singleton() : nullptr;

{
// Tasks must start with this unset. They are free to set-and-forget otherwise.
// Tasks must start with these at default values. They are free to set-and-forget otherwise.
set_current_thread_safe_for_nodes(false);
MessageQueue::set_thread_singleton_override(nullptr);
// Since the WorkerThreadPool is started before the script server,
// its pre-created threads can't have ScriptServer::thread_enter() called on them early.
// Therefore, we do it late at the first opportunity, so in case the task
Expand Down Expand Up @@ -397,26 +398,27 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
task->waiting_user++;
}

task_mutex.unlock();

if (caller_pool_thread) {
task_mutex.unlock();
_wait_collaboratively(caller_pool_thread, task);
task_mutex.lock();
task->waiting_pool--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
} else {
task_mutex.unlock();
task->done_semaphore.wait();
task_mutex.lock();
task->waiting_user--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
task_mutex.unlock();
}

task_mutex.unlock();
return OK;
}

Expand Down Expand Up @@ -670,7 +672,7 @@ uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mut

uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) {
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlikely(unlockable_mutexes[i] == (uintptr_t)p_mutex)) {
if (unlikely((unlockable_mutexes[i] & ~1) == (uintptr_t)p_mutex)) {
// Already registered in the current thread.
return UINT32_MAX;
}
Expand Down
8 changes: 6 additions & 2 deletions core/templates/command_queue_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,19 @@ class CommandQueueMT {
flush_read_ptr += 8;
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
cmd->call();

// Handle potential realloc due to the command and unlock allowance.
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);

if (unlikely(cmd->sync)) {
sync_head++;
unlock(); // Give an opportunity to awaiters right away.
sync_cond_var.notify_all();
lock();
// Handle potential realloc happened during unlock.
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
}

// If the command involved reallocating the buffer, the address may have changed.
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
cmd->~CommandBase();

flush_read_ptr += size;
Expand Down
Loading