diff --git a/core/io/resource.cpp b/core/io/resource.cpp index c045c0fc7420..432adb88da9f 100644 --- a/core/io/resource.cpp +++ b/core/io/resource.cpp @@ -40,7 +40,12 @@ #include void Resource::emit_changed() { - emit_signal(CoreStringName(changed)); + if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { + // Let the connection happen on the call queue, later, since signals are not thread-safe. + call_deferred("emit_signal", CoreStringName(changed)); + } else { + emit_signal(CoreStringName(changed)); + } } void Resource::_resource_path_changed() { @@ -161,12 +166,22 @@ bool Resource::editor_can_reload_from_file() { } void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) { + if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { + // Let the check and connection happen on the call queue, later, since signals are not thread-safe. + callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags); + return; + } if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) { connect(CoreStringName(changed), p_callable, p_flags); } } void Resource::disconnect_changed(const Callable &p_callable) { + if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { + // Let the check and disconnection happen on the call queue, later, since signals are not thread-safe. + callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable); + return; + } if (is_connected(CoreStringName(changed), p_callable)) { disconnect(CoreStringName(changed), p_callable); } diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 20dd192da1a9..d606db620c19 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -304,31 +304,24 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { thread_load_mutex.unlock(); // Thread-safe either if it's the current thread or a brand new one. - thread_local bool mq_override_present = false; CallQueue *own_mq_override = nullptr; if (load_nesting == 0) { - mq_override_present = false; load_paths_stack = memnew(Vector); - if (!load_task.dependent_path.is_empty()) { - load_paths_stack->push_back(load_task.dependent_path); - } if (!Thread::is_main_thread()) { // Let the caller thread use its own, for added flexibility. Provide one otherwise. if (MessageQueue::get_singleton() == MessageQueue::get_main_singleton()) { own_mq_override = memnew(CallQueue); MessageQueue::set_thread_singleton_override(own_mq_override); } - mq_override_present = true; set_current_thread_safe_for_nodes(true); } - } else { - DEV_ASSERT(load_task.dependent_path.is_empty()); } // -- - Ref res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress); - if (mq_override_present) { + Error load_err = OK; + Ref res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); + if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) { MessageQueue::get_singleton()->flush(); } @@ -336,7 +329,8 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { load_task.resource = res; - load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0 + load_task.progress = 1.0; // It was fully loaded at this point, so force progress to 1.0. + load_task.error = load_err; if (load_task.error != OK) { load_task.status = THREAD_LOAD_FAILED; } else { @@ -473,12 +467,13 @@ Ref ResourceLoader::_load_start(const String &p_path, if (!ignoring_cache && thread_load_tasks.has(local_path)) { load_token = Ref(thread_load_tasks[local_path].load_token); - if (!load_token.is_valid()) { + if (load_token.is_valid()) { + return load_token; + } else { // The token is dying (reached 0 on another thread). // Ensure it's killed now so the path can be safely reused right away. thread_load_tasks[local_path].load_token->clear(); } - return load_token; } load_token.instantiate(); @@ -560,39 +555,46 @@ float ResourceLoader::_dependency_get_progress(const String &p_path) { } ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const String &p_path, float *r_progress) { - MutexLock thread_load_lock(thread_load_mutex); + bool ensure_progress = false; + ThreadLoadStatus status = THREAD_LOAD_IN_PROGRESS; + { + MutexLock thread_load_lock(thread_load_mutex); - if (!user_load_tokens.has(p_path)) { - print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected."); - return THREAD_LOAD_INVALID_RESOURCE; - } + if (!user_load_tokens.has(p_path)) { + print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected."); + return THREAD_LOAD_INVALID_RESOURCE; + } - String local_path = _validate_local_path(p_path); - if (!thread_load_tasks.has(local_path)) { + String local_path = _validate_local_path(p_path); + if (!thread_load_tasks.has(local_path)) { #ifdef DEV_ENABLED - CRASH_NOW(); + CRASH_NOW(); #endif - // On non-dev, be defensive and at least avoid crashing (at this point at least). - return THREAD_LOAD_INVALID_RESOURCE; - } + // On non-dev, be defensive and at least avoid crashing (at this point at least). + return THREAD_LOAD_INVALID_RESOURCE; + } - ThreadLoadTask &load_task = thread_load_tasks[local_path]; - ThreadLoadStatus status; - status = load_task.status; - if (r_progress) { - *r_progress = _dependency_get_progress(local_path); - } + ThreadLoadTask &load_task = thread_load_tasks[local_path]; + status = load_task.status; + if (r_progress) { + *r_progress = _dependency_get_progress(local_path); + } - // Support userland polling in a loop on the main thread. - if (Thread::is_main_thread() && status == THREAD_LOAD_IN_PROGRESS) { - uint64_t frame = Engine::get_singleton()->get_process_frames(); - if (frame == load_task.last_progress_check_main_thread_frame) { - _ensure_load_progress(); - } else { - load_task.last_progress_check_main_thread_frame = frame; + // Support userland polling in a loop on the main thread. + if (Thread::is_main_thread() && status == THREAD_LOAD_IN_PROGRESS) { + uint64_t frame = Engine::get_singleton()->get_process_frames(); + if (frame == load_task.last_progress_check_main_thread_frame) { + ensure_progress = true; + } else { + load_task.last_progress_check_main_thread_frame = frame; + } } } + if (ensure_progress) { + _ensure_load_progress(); + } + return status; } @@ -626,13 +628,13 @@ Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_e if (Thread::is_main_thread() && !load_token->local_path.is_empty()) { const ThreadLoadTask &load_task = thread_load_tasks[load_token->local_path]; while (load_task.status == THREAD_LOAD_IN_PROGRESS) { - if (!_ensure_load_progress()) { - // This local poll loop is not needed. - break; - } thread_load_lock.~MutexLock(); + bool exit = !_ensure_load_progress(); OS::get_singleton()->delay_usec(1000); new (&thread_load_lock) MutexLock(thread_load_mutex); + if (exit) { + break; + } } } diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 46df79ea221e..5f1831f0d988 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -170,7 +170,6 @@ class ResourceLoader { LoadToken *load_token = nullptr; String local_path; String remapped_path; - String dependent_path; String type_hint; float progress = 0.0f; float max_reported_progress = 0.0f; diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index caf4ed3835a0..a873bc1f09c9 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -59,8 +59,9 @@ void WorkerThreadPool::_process_task(Task *p_task) { CallQueue *call_queue_backup = MessageQueue::get_singleton() != MessageQueue::get_main_singleton() ? MessageQueue::get_singleton() : nullptr; { - // Tasks must start with this unset. They are free to set-and-forget otherwise. + // Tasks must start with these at default values. They are free to set-and-forget otherwise. set_current_thread_safe_for_nodes(false); + MessageQueue::set_thread_singleton_override(nullptr); // Since the WorkerThreadPool is started before the script server, // its pre-created threads can't have ScriptServer::thread_enter() called on them early. // Therefore, we do it late at the first opportunity, so in case the task @@ -397,16 +398,17 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { task->waiting_user++; } - task_mutex.unlock(); - if (caller_pool_thread) { + task_mutex.unlock(); _wait_collaboratively(caller_pool_thread, task); + task_mutex.lock(); task->waiting_pool--; if (task->waiting_pool == 0 && task->waiting_user == 0) { tasks.erase(p_task_id); task_allocator.free(task); } } else { + task_mutex.unlock(); task->done_semaphore.wait(); task_mutex.lock(); task->waiting_user--; @@ -414,9 +416,9 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { tasks.erase(p_task_id); task_allocator.free(task); } - task_mutex.unlock(); } + task_mutex.unlock(); return OK; } @@ -670,7 +672,7 @@ uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mut uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) { for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) { - if (unlikely(unlockable_mutexes[i] == (uintptr_t)p_mutex)) { + if (unlikely((unlockable_mutexes[i] & ~1) == (uintptr_t)p_mutex)) { // Already registered in the current thread. return UINT32_MAX; } diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 0748e9cb8373..1e6c6e42a96e 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -370,15 +370,19 @@ class CommandQueueMT { flush_read_ptr += 8; CommandBase *cmd = reinterpret_cast(&command_mem[flush_read_ptr]); cmd->call(); + + // Handle potential realloc due to the command and unlock allowance. + cmd = reinterpret_cast(&command_mem[flush_read_ptr]); + if (unlikely(cmd->sync)) { sync_head++; unlock(); // Give an opportunity to awaiters right away. sync_cond_var.notify_all(); lock(); + // Handle potential realloc happened during unlock. + cmd = reinterpret_cast(&command_mem[flush_read_ptr]); } - // If the command involved reallocating the buffer, the address may have changed. - cmd = reinterpret_cast(&command_mem[flush_read_ptr]); cmd->~CommandBase(); flush_read_ptr += size;