diff --git a/core/io/resource.cpp b/core/io/resource.cpp
index ff12dc58518a..6177cba6a40c 100644
--- a/core/io/resource.cpp
+++ b/core/io/resource.cpp
@@ -40,12 +40,12 @@
 #include <stdio.h>
 
 void Resource::emit_changed() {
-	if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
-		// Let the connection happen on the call queue, later, since signals are not thread-safe.
-		call_deferred("emit_signal", CoreStringName(changed));
-	} else {
-		emit_signal(CoreStringName(changed));
+	if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
+		ResourceLoader::resource_changed_emit(this);
+		return;
 	}
+
+	emit_signal(CoreStringName(changed));
 }
 
 void Resource::_resource_path_changed() {
@@ -166,22 +166,22 @@ bool Resource::editor_can_reload_from_file() {
 }
 
 void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) {
-	if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
-		// Let the check and connection happen on the call queue, later, since signals are not thread-safe.
-		callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags);
+	if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
+		ResourceLoader::resource_changed_connect(this, p_callable, p_flags);
 		return;
 	}
+
 	if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) {
 		connect(CoreStringName(changed), p_callable, p_flags);
 	}
 }
 
 void Resource::disconnect_changed(const Callable &p_callable) {
-	if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
-		// Let the check and disconnection happen on the call queue, later, since signals are not thread-safe.
-		callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable);
+	if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
+		ResourceLoader::resource_changed_disconnect(this, p_callable);
 		return;
 	}
+
 	if (is_connected(CoreStringName(changed), p_callable)) {
 		disconnect(CoreStringName(changed), p_callable);
 	}
diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp
index 7cf101b0de5e..a083a91a634e 100644
--- a/core/io/resource_loader.cpp
+++ b/core/io/resource_loader.cpp
@@ -31,6 +31,7 @@
 #include "resource_loader.h"
 
 #include "core/config/project_settings.h"
+#include "core/core_bind.h"
 #include "core/io/file_access.h"
 #include "core/io/resource_importer.h"
 #include "core/object/script_language.h"
@@ -234,17 +235,22 @@ void ResourceLoader::LoadToken::clear() {
 		// User-facing tokens shouldn't be deleted until completely claimed.
 		DEV_ASSERT(user_rc == 0 && user_path.is_empty());
 
-		if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered.
-			DEV_ASSERT(thread_load_tasks.has(local_path));
-			ThreadLoadTask &load_task = thread_load_tasks[local_path];
-			if (load_task.task_id && !load_task.awaited) {
-				task_to_await = load_task.task_id;
+		if (!local_path.is_empty()) {
+			if (task_if_unregistered) {
+				memdelete(task_if_unregistered);
+				task_if_unregistered = nullptr;
+			} else {
+				DEV_ASSERT(thread_load_tasks.has(local_path));
+				ThreadLoadTask &load_task = thread_load_tasks[local_path];
+				if (load_task.task_id && !load_task.awaited) {
+					task_to_await = load_task.task_id;
+				}
+				// Removing a task which is still in progress would be catastrophic.
+				// Tokens must be alive until the task thread function is done.
+				DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
+				thread_load_tasks.erase(local_path);
 			}
-			// Removing a task which is still in progress would be catastrophic.
-			// Tokens must be alive until the task thread function is done.
-			DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
-			thread_load_tasks.erase(local_path);
-			local_path.clear();
+			local_path.clear(); // Mark as already cleared.
 		}
 	}
 
@@ -324,6 +330,9 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
 		}
 	}
 
+	ThreadLoadTask *curr_load_task_backup = curr_load_task;
+	curr_load_task = &load_task;
+
 	// Thread-safe either if it's the current thread or a brand new one.
 	CallQueue *own_mq_override = nullptr;
 	if (load_nesting == 0) {
@@ -451,6 +460,8 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
 		}
 		DEV_ASSERT(load_paths_stack.is_empty());
 	}
+
+	curr_load_task = curr_load_task_backup;
 }
 
 static String _validate_local_path(const String &p_path) {
@@ -521,9 +532,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
 
 	Ref<LoadToken> load_token;
 	bool must_not_register = false;
-	ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load.
 	ThreadLoadTask *load_task_ptr = nullptr;
-	bool run_on_current_thread = false;
 	{
 		MutexLock thread_load_lock(thread_load_mutex);
 
@@ -578,12 +587,11 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
 				}
 			}
 
-			// If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish within scope.
+			// If we want to ignore cache, but there's another task loading it, we can't add this one to the map.
 			must_not_register = ignoring_cache && thread_load_tasks.has(local_path);
 			if (must_not_register) {
-				load_token->local_path.clear();
-				unregistered_load_task = load_task;
-				load_task_ptr = &unregistered_load_task;
+				load_token->task_if_unregistered = memnew(ThreadLoadTask(load_task));
+				load_task_ptr = load_token->task_if_unregistered;
 			} else {
 				DEV_ASSERT(!thread_load_tasks.has(local_path));
 				HashMap<String, ResourceLoader::ThreadLoadTask>::Iterator E = thread_load_tasks.insert(local_path, load_task);
@@ -591,9 +599,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
 			}
 		}
 
-		run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT;
-
-		if (run_on_current_thread) {
+		if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) {
 			// The current thread may happen to be a thread from the pool.
 			WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id();
 			if (tid != WorkerThreadPool::INVALID_TASK_ID) {
@@ -606,11 +612,8 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
 		}
 	} // MutexLock(thread_load_mutex).
 
-	if (run_on_current_thread) {
+	if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) {
 		_run_load_task(load_task_ptr);
-		if (must_not_register) {
-			load_token->res_if_unregistered = load_task_ptr->resource;
-		}
 	}
 
 	return load_token;
@@ -738,7 +741,10 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
 		*r_error = OK;
 	}
 
-	if (!p_load_token.local_path.is_empty()) {
+	ThreadLoadTask *load_task_ptr = nullptr;
+	if (p_load_token.task_if_unregistered) {
+		load_task_ptr = p_load_token.task_if_unregistered;
+	} else {
 		if (!thread_load_tasks.has(p_load_token.local_path)) {
 			if (r_error) {
 				*r_error = ERR_BUG;
@@ -809,22 +815,47 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
 			load_task.error = FAILED;
 		}
 
-		Ref<Resource> resource = load_task.resource;
-		if (r_error) {
-			*r_error = load_task.error;
-		}
-		return resource;
-	} else {
-		// Special case of an unregistered task.
-		// The resource should have been loaded by now.
-		Ref<Resource> resource = p_load_token.res_if_unregistered;
-		if (!resource.is_valid()) {
-			if (r_error) {
-				*r_error = FAILED;
+		load_task_ptr = &load_task;
+	}
+
+	Ref<Resource> resource = load_task_ptr->resource;
+	if (r_error) {
+		*r_error = load_task_ptr->error;
+	}
+
+	if (resource.is_valid()) {
+		if (curr_load_task) {
+			// A task awaiting another => Let the awaiter accumulate the resource changed connections.
+			DEV_ASSERT(curr_load_task != load_task_ptr);
+			for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
+				curr_load_task->resource_changed_connections.push_back(rcc);
+			}
+		} else {
+			// A leaf task being awaited => Propagate the resource changed connections.
+			if (Thread::is_main_thread()) {
+				// On the main thread it's safe to migrate the connections to the standard signal mechanism.
+				for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
+					if (rcc.callable.is_valid()) {
+						rcc.source->connect_changed(rcc.callable, rcc.flags);
+					}
+				}
+			} else {
+				// On non-main threads, we have to queue and call it done when processed.
+				if (!load_task_ptr->resource_changed_connections.is_empty()) {
+					for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
+						if (rcc.callable.is_valid()) {
+							MessageQueue::get_main_singleton()->push_callable(callable_mp(rcc.source, &Resource::connect_changed).bind(rcc.callable, rcc.flags));
+						}
+					}
+					core_bind::Semaphore done;
+					MessageQueue::get_main_singleton()->push_callable(callable_mp(&done, &core_bind::Semaphore::post));
+					done.wait();
+				}
 			}
 		}
-		return resource;
 	}
+
+	return resource;
 }
 
 bool ResourceLoader::_ensure_load_progress() {
@@ -838,6 +869,50 @@ bool ResourceLoader::_ensure_load_progress() {
 	return true;
 }
 
+void ResourceLoader::resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags) {
+	print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "\t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));
+
+	MutexLock lock(thread_load_mutex);
+
+	for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
+		if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
+			return;
+		}
+	}
+
+	ThreadLoadTask::ResourceChangedConnection rcc;
+	rcc.source = p_source;
+	rcc.callable = p_callable;
+	rcc.flags = p_flags;
+	curr_load_task->resource_changed_connections.push_back(rcc);
+}
+
+void ResourceLoader::resource_changed_disconnect(Resource *p_source, const Callable &p_callable) {
+	print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));
+
+	MutexLock lock(thread_load_mutex);
+
+	for (uint32_t i = 0; i < curr_load_task->resource_changed_connections.size(); ++i) {
+		const ThreadLoadTask::ResourceChangedConnection &rcc = curr_load_task->resource_changed_connections[i];
+		if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
+			curr_load_task->resource_changed_connections.remove_at_unordered(i);
+			return;
+		}
+	}
+}
+
+void ResourceLoader::resource_changed_emit(Resource *p_source) {
+	print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR, Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class()));
+
+	MutexLock lock(thread_load_mutex);
+
+	for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
+		if (unlikely(rcc.source == p_source)) {
+			rcc.callable.call();
+		}
+	}
+}
+
 Ref<Resource> ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) {
 	ERR_FAIL_COND_V(load_nesting == 0, Ref<Resource>()); // It makes no sense to use this from nesting level 0.
 	const String &local_path = _validate_local_path(p_path);
@@ -1368,6 +1443,7 @@ bool ResourceLoader::timestamp_on_load = false;
 thread_local int ResourceLoader::load_nesting = 0;
 thread_local Vector<String> ResourceLoader::load_paths_stack;
 thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides;
+thread_local ResourceLoader::ThreadLoadTask *ResourceLoader::curr_load_task = nullptr;
 
 SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> &_get_res_loader_mutex() {
 	return ResourceLoader::thread_load_mutex;
diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h
index f75bf019fb54..caaf9f8f45dd 100644
--- a/core/io/resource_loader.h
+++ b/core/io/resource_loader.h
@@ -106,6 +106,8 @@ class ResourceLoader {
 		MAX_LOADERS = 64
 	};
 
+	struct ThreadLoadTask;
+
 public:
 	enum ThreadLoadStatus {
 		THREAD_LOAD_INVALID_RESOURCE,
@@ -124,7 +126,7 @@ class ResourceLoader {
 		String local_path;
 		String user_path;
 		uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero.
-		Ref<Resource> res_if_unregistered;
+		ThreadLoadTask *task_if_unregistered = nullptr;
 
 		void clear();
 
@@ -187,6 +189,13 @@ class ResourceLoader {
 		Ref<Resource> resource;
 		bool use_sub_threads = false;
 		HashSet<String> sub_tasks;
+
+		struct ResourceChangedConnection {
+			Resource *source = nullptr;
+			Callable callable;
+			uint32_t flags = 0;
+		};
+		LocalVector<ResourceChangedConnection> resource_changed_connections;
 	};
 
 	static void _run_load_task(void *p_userdata);
@@ -194,6 +203,7 @@ class ResourceLoader {
 	static thread_local int load_nesting;
 	static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level.
 	static thread_local Vector<String> load_paths_stack;
+	static thread_local ThreadLoadTask *curr_load_task;
 
 	static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
 	friend SafeBinaryMutex<BINARY_MUTEX_TAG> &_get_res_loader_mutex();
@@ -214,6 +224,10 @@ class ResourceLoader {
 
 	static bool is_within_load() { return load_nesting > 0; };
 
+	static void resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags);
+	static void resource_changed_disconnect(Resource *p_source, const Callable &p_callable);
+	static void resource_changed_emit(Resource *p_source);
+
 	static Ref<Resource> load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr);
 	static bool exists(const String &p_path, const String &p_type_hint = "");