From 6f77db721956a6c632cc01af283c23929b43f0f2 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Thu, 14 Nov 2024 14:49:13 +0800 Subject: [PATCH] chore: remove codes (#277) --- .../ten_runtime/protocol/asynced/internal.h | 165 ----- .../protocol/asynced/protocol_asynced.h | 298 --------- core/src/ten_runtime/protocol/BUILD.gn | 5 +- .../src/ten_runtime/protocol/asynced/BUILD.gn | 11 - .../ten_runtime/protocol/asynced/external.c | 156 ----- .../ten_runtime/protocol/asynced/internal.c | 94 --- .../protocol/asynced/protocol_asynced.c | 613 ------------------ 7 files changed, 1 insertion(+), 1341 deletions(-) delete mode 100644 core/include_internal/ten_runtime/protocol/asynced/internal.h delete mode 100644 core/include_internal/ten_runtime/protocol/asynced/protocol_asynced.h delete mode 100644 core/src/ten_runtime/protocol/asynced/BUILD.gn delete mode 100644 core/src/ten_runtime/protocol/asynced/external.c delete mode 100644 core/src/ten_runtime/protocol/asynced/internal.c delete mode 100644 core/src/ten_runtime/protocol/asynced/protocol_asynced.c diff --git a/core/include_internal/ten_runtime/protocol/asynced/internal.h b/core/include_internal/ten_runtime/protocol/asynced/internal.h deleted file mode 100644 index a83f1bb636..0000000000 --- a/core/include_internal/ten_runtime/protocol/asynced/internal.h +++ /dev/null @@ -1,165 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#pragma once - -// The ten_protocol_asynced_t acts as a bridge between the implementation -// protocol (we consider it as the 'external') and the ten world (we consider it -// as the 'internal'). Some fields and functions of ten_protocol_asynced_t can -// only be accessed from the external, and others are internal. So we split the -// functions of ten_protocol_asynced_t into two parts. The functions in -// 'internal.h' can only be accessed from the ten world. In other words, all the -// functions _must_ be declared with 'TEN_RUNTIME_PRIVATE_API'. - -#include "ten_runtime/ten_config.h" - -#include - -typedef struct ten_protocol_asynced_t ten_protocol_asynced_t; - -typedef void (*ten_protocol_asynced_task_handler_func_t)( - ten_protocol_asynced_t *self, void *arg); - -typedef struct ten_protocol_asynced_task_t { - ten_protocol_asynced_task_handler_func_t handler; - void *arg; -} ten_protocol_asynced_task_t; - -// The implementation protocol is always being closed from the ten world due to -// the closure of its owner (i.e., ten_connection_t or ten_app_t). And in the -// meantime, the resources in the implementation (ex: physical connection) maybe -// closed from the implementation thread. -// -// The brief closing flow of the 'asynced' protocol is as follows: -// -// 1. If the protocol is being closed from the ten world, ex: the ten_app_t is -// closed. -// -// Note that ten_libws_worker_t will implement the 'ten_closeable_t' -// interface too. -// -// ten_protocol_t ten_protocol_asynced_t ten_libws_worker_t -// (ten_closeable_t) (ten_closeable_t) -// ^ | ^ close_async() -// | | | | -// +--- underlying_resource ---+ +-- action_to_close() --+ -// -// // trigger by ten_connection_close() or ten_app_close() -// ten_protocol_close() { -// ten_closeable_close() -// } -// -// ten_closeable_close() { -// close_owned() ---------------+ -// } | -// V -// ten_closeable_close() { -// action_to_close() -------+ -// } | -// V -// // in ten world -// action_to_close() { -// close_async() -// } -// -// // in the external -// // thread -// close() { -// close_conn() -// } -// -// on_conn_close() { -// on_closed() -// } -// -// on_closed() { -// +------------------ closed_done_async() -// | } -// V -// // in the ten world -// action_to_close_done() { -// +--------------- on_closed() -// | } -// V -// ten_closeable_on_closed() { -// on_closed() -// // continue to call ten_connection_t::on_closed()... -// } -// -// 2. If the resources in the implementation are closed first, ex: the physical -// connection is broken. -// -// ten_protocol_t ten_protocol_asynced_t ten_libws_worker_t -// (ten_closeable_t) (ten_closeable_t) -// ^ | ^ close_async() -// | | | | -// +--- underlying_resource ----+ +-- intend_to_close() --+ -// -// // in the external -// // thread -// on_conn_close() { -// +------------------ intend_to_close_async() -// | } -// V -// // in the ten world -// intend_to_close() { -// | -// | -// is_closing_root() -------------------------+ -// } | -// | -// is_closing_root() { <-------------------------------------------------+ -// if (is_root) { -// ten_closeable_close() -// } else { -// announce_intend_to_close() -// } -// } -// -// ten_closeable_close() { -// // Start to close, same as the case 1. -// } - -TEN_RUNTIME_PRIVATE_API void ten_protocol_asynced_init_closeable( - ten_protocol_asynced_t *self); - -TEN_RUNTIME_PRIVATE_API void ten_protocol_asynced_intends_to_close_task( - void *self_, void *arg); - -TEN_RUNTIME_PRIVATE_API void ten_protocol_asynced_on_impl_closed_task(void *self_, - void *arg); - -/** - * @brief If the protocol attaches to a connection (i.e., the 'ten_connection_t' - * object), it's not always safe to retrieve the runloop of the base protocol - * (i.e., the 'ten_protocol_t' object) as the connection might be in migration. - * - * @return true if the 'migration_state' is INIT or DONE, otherwise false. - */ -TEN_RUNTIME_PRIVATE_API bool -ten_protocol_asynced_safe_to_retrieve_runtime_runloop( - ten_protocol_asynced_t *self); - -/** - * @brief Submit a task to the runloop of the base protocol (i.e., the - * 'ten_protocol_t') if the connection migration is not started or has been - * completed. Otherwise, cache the task in the - * 'ten_protocol_asynced_t::pending_task_queue'. - * - * @param handler_if_in_migration The handler to process the task once the - * migration is completed. This function will be called from the external - * protocol thread. - * - * @param runloop_task_handler The handler to process the task in the ten - * app/engine thread. - */ -TEN_RUNTIME_PRIVATE_API void ten_protocol_asynced_post_task_to_ten( - ten_protocol_asynced_t *self, - ten_protocol_asynced_task_handler_func_t handler_if_in_migration, - void (*runloop_task_handler)(void *, void *), void *arg); - -TEN_RUNTIME_PRIVATE_API void ten_protocol_asynced_close_impl( - ten_protocol_asynced_t *self, void *arg); diff --git a/core/include_internal/ten_runtime/protocol/asynced/protocol_asynced.h b/core/include_internal/ten_runtime/protocol/asynced/protocol_asynced.h deleted file mode 100644 index 96cbc32a6c..0000000000 --- a/core/include_internal/ten_runtime/protocol/asynced/protocol_asynced.h +++ /dev/null @@ -1,298 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#pragma once - -#include "ten_runtime/ten_config.h" - -#include "include_internal/ten_runtime/common/closeable.h" -#include "include_internal/ten_runtime/connection/connection.h" -#include "include_internal/ten_runtime/protocol/protocol.h" - -typedef struct ten_protocol_asynced_t ten_protocol_asynced_t; -typedef struct ten_protocol_asynced_creation_info_t - ten_protocol_asynced_creation_info_t; - -/** - * @brief Used to trigger the implementation protocol to do something in its - * thread from the TEN world. - * - * @param cb The callback function to be called in the implementation protocol - * thread later. - * - * @param arg The extra argument to be passed to the callback function. - * - * @note The implementation protocol should call the callback function in its - * own thread. Note that increasing the reference count of 'self' and 'arg' - * before calling this function to ensure those objects are valid when the - * callback function is called. - */ -typedef void (*ten_protocol_asynced_post_task_to_impl_func_t)( - ten_protocol_asynced_t *self, - void (*cb)(ten_protocol_asynced_t *, void *arg), void *arg); - -typedef void (*ten_protocol_asynced_on_created_func_t)( - ten_protocol_asynced_t *protocol, - ten_protocol_asynced_creation_info_t *info); - -typedef struct ten_protocol_asynced_creation_info_t { - ten_protocol_asynced_on_created_func_t on_created; - void *user_data; -} ten_protocol_asynced_creation_info_t; - -/** - * @brief As the implementation protocol might have its own runloop, which means - * that the implementation protocol and ten base protocol (i.e., ten_protocol_t) - * belongs to different threads. The messages between the implementation - * protocol and base protocol can not be exchanged directly (i.e., a function - * should always be called in the same thread). So we provide the - * 'ten_protocol_asynced_t', which is a bridge between the implementation and - * base protocols, and the 'ten_protocol_asynced_t' will care about the thread - * safety of the message exchanging between those two sides. A simple - * demonstration is as follows. - * - * 1) Messages from the implementation protocol to the base protocol: - * - * | implementation | -- sync call --> | ten_protocol_asynced_t | ---+ - * | - * | base protocol | <-- sync call -- [runloop of base protocol] <-----+ - * - * 2) Messages from the base protocol to the implementation protocol: - * - * | base protocol | -- sync call --> | ten_protocol_asynced_t | ----+ - * | - * | implementation | <-- sync call -- [runloop of implementation] <---+ - * - * Basically, the ten_protocol_asynced_t will hold the runloop of both the - * implementation protocol and the base protocol. - * - * - The runloop of the base protocol will be retrieved by - * 'ten_protocol_get_attached_runloop(ten_protocol_asynced_t::base)'. - * - * - The 'ten_protocol_asynced_t::trigger_impl' is a standard interface that the - * implementation must implement, which will be used to submit tasks to the - * runloop of the implementation protocol. - * - * The apis called from the implementation protocol on the - * ten_protocol_asynced_t, or the callbacks registered from the implementation - * protocol in the ten_protocol_asynced_t, are always called synchronously in - * the implementation thread. In the meanwhile, the apis in the base protocol - * are always called synchronously in the ten app or engine thread. - * - * @note As the ten_protocol_asynced_t is a bridge between two different - * threads, some fields will be accessed from the implementation thread, and - * others will be accessed from the ten world. Details are as follows. - * - * - The 'base', 'closeable', 'trigger_impl' are accessed fields from the ten - * world. - * - * - Others are accessed in the implementation thread. - * - * Please keep in mind that, _NO_ fields are read or written in both sides. - */ -typedef struct ten_protocol_asynced_t { - ten_protocol_t base; - - /** - * The ten_protocol_asynced_t is an underlying resource of the base protocol. - * - * @note All protocol instances are created and initted in the ten world, - * including this 'ten_protocol_asynced_t'. So this 'closeable' belongs to the - * ten world, and the underlying resource (ex: this closeable) and its owner - * (ex: the base protocol) must be in the same thread. - */ - ten_closeable_t closeable; - - /** - * @brief The closeable reference of the implementation belongs to the - * external thread. As the 'ten_protocol_asynced_t::closeable' and - * 'impl_closeable' belongs to different threads, the 'impl_closeable' could - * not be an underlying resource of 'ten_protocol_asynced_t::closeable' by - * calling 'ten_closeable_add_underlying_resource()'. The correct way to close - * the implementation protocol is that register a 'intend_to_close' hook in - * 'ten_protocol_asynced_t::closeable', and switch to the implementation - * protocol thread to close 'impl_closeable' in the 'intend_to_close' hook. - * - * @note Do _NOT_ access this field in the ten world. - */ - ten_closeable_t *impl_closeable; - - /** - * According to the comments of 'ten_connection_t::migration_state', there - * might be race conditions if the asynced protocol reads/writes the - * 'ten_connection_t::migration_state' in the implementation protocol thread. - * - * First, the 'ten_connection_t::migration_state' is assigned to 'INIT' in the - * app thread when the connection (i.e., the 'ten_connection_t' object) is - * created. - * - * Then the implementation protocol will retrieve the value of - * 'ten_connection_t::migration_state' in its thread when handling the first - * message, the value will be 'INIT'. The implementation protocol will - * retrieve the correct value because it access the memory of - * 'ten_connection_t' after the libws runloop task - * (ten_libws_worker_on_protocol_created_task). In other words, the reading of - * 'ten_connection_t::migration_state' is after the writing of - * 'ten_connection_t::migration_state'. Refer to - * 'ten_libws_server_on_protocol_created_async()'. - * - * But when the migration is completed in the engine thread, there might be - * race condition when the implementation protocol reads the value. Ex: - * - * | Engine thread | Implementation protocol thread | - * |-------------------------|--------------------------------| - * t1 | | read state | - * |-------------------------|--------------------------------| - * t2 | write state | | - * |-------------------------|--------------------------------| - * t3 | acquire in_lock | | - * | pop into in_msgs queue | | - * |-------------------------|--------------------------------| - * t4 | | acquire in_lock | - * | | push from in_msgs queue | - * |-------------------------|--------------------------------| - * - * As the 'write state' and 'read state' operation is not protected by - * 'in_lock', the above 'write state' and 'acquire in_lock' in the engine - * thread operations are not atomic, neither are the 'read state' and 'acquire - * in_lock' in the implementation protocol thread. So the engine thread might - * acquire the 'in_lock' before the implementation protocol thread (i.e., t3 < - * t4). In this case, the engine thread could not retrieve the pending - * messages from the 'in_msgs' list, as the implementation protocol thread has - * not put the messages into the list yet. And it's too heavy to use the - * 'in_lock' to protect the 'migration_state'. - * - * So, keep a mirror of 'ten_connection_t::migration_state' in the asynced - * protocol, and obey the following rules. - * - * - The 'ten_connection_t::migration_state' will only be accessed in the TEN - * world. - * - * - The 'ten_protocol_asynced_t::migration_state' will only be accessed in - * the implementation protocol thread. - * - * - When the migration is completed in the engine thread, or reset in the app - * thread (ex, no engine was found), the 'ten_connection_t::migration_state' - * will be updated to 'DONE' or 'INIT'. And then - * 'ten_protocol_asynced_t::migration_state' will be synced through the - * runloop task (i.e., the 'ten_protocol_t::on_cleaned_for_external()' - * callback) to ensure that the implementation protocol could retrieve the - * correct runloop of the connection. - * - * And there is no need to use any mutex lock to protect the - * 'migration_state' in both sides. - * - * Refer 'ten_protocol_asynced_on_input_async()' to know about how the - * 'migrate_state' will be changed. - */ - TEN_CONNECTION_MIGRATION_STATE migration_state; - - /** - * @brief According to the design principle, the implementation protocol only - * care about its resources such as the physical connections. The life cycle - * (ex: closing and destroying) of the protocol objects (including the - * 'ten_protocol_t' and the corresponding implementation protocol) could only - * be managed by the TEN runtime. In other words, if the physical connection - * is broken, the implementation protocol should not close itself, instead, it - * should send an event to the TEN runtime, and the protocol (i.e., the - * 'ten_protocol_t' object) will be closed from the ten world if needed. So - * there might be some messages/events from the implementation protocol that - * might not be able to be transmitted to their desired destination due to - * the following reasons: - * - * - The physical connection is broken in the implementation protocol, but at - * the same time, the corresponding 'ten_connection_t' object is in the - * migration. It's not safe to retrieve the runloop of the - * 'ten_connection_t'. - * - * - The implementation protocol receives messages from the client side, but - * at the same time, the corresponding 'ten_connection_t' object is in the - * migration. - * - * The 'pending_task_queue' is used to cache those messages/events. - * - * @note This queue _must_ be read/written in the external protocol thread, - * this queue will _not_ be protected by any mutex lock. - */ - ten_list_t pending_task_queue; // ten_protocol_asynced_task_t - - ten_protocol_asynced_post_task_to_impl_func_t post_task_to_impl; -} ten_protocol_asynced_t; - -TEN_RUNTIME_API ten_protocol_asynced_creation_info_t * -ten_protocol_asynced_creation_info_create( - ten_protocol_asynced_on_created_func_t on_created, void *user_data); - -TEN_RUNTIME_API void ten_protocol_asynced_creation_info_destroy( - ten_protocol_asynced_creation_info_t *self); - -TEN_RUNTIME_API void ten_protocol_asynced_init( - ten_protocol_asynced_t *self, const char *name, - ten_addon_host_t *addon_host, ten_protocol_on_output_func_t on_output, - ten_protocol_listen_func_t on_listen, - ten_protocol_connect_to_func_t on_connect_to, - ten_protocol_asynced_post_task_to_impl_func_t post_task_to_impl); - -TEN_RUNTIME_API void ten_protocol_asynced_deinit(ten_protocol_asynced_t *self); - -/** - * @brief Call this function when the protocol receives a TEN message, and want - * to send that message into the TEN world. The protocol can be a server - * (listening protocol) or a client (communication protocol). The message may be - * a request received by a server, or the result received by a client. - */ -TEN_RUNTIME_API bool ten_protocol_asynced_on_input_async( - ten_protocol_asynced_t *self, ten_shared_ptr_t *msg); - -/** - * @brief The protocol acts as a client, call this function after connecting to - * server or disconnecting from server. - * @param is_connected true if the connection has been established, otherwise - * false. - */ -TEN_RUNTIME_API void ten_protocol_asynced_on_connected_async( - ten_protocol_asynced_t *self, bool is_connected); - -/** - * @brief Create a new protocol when a client request is accepted. - * - * @return Whether the 'protocol creating' task is submitted to the TEN app - * runloop. - */ -TEN_RUNTIME_API bool ten_protocol_asynced_on_client_accepted_async( - ten_protocol_asynced_t *listening_protocol, - ten_protocol_asynced_creation_info_t *info); - -/** - * @brief Get the protocol name from its manifest. - */ -TEN_RUNTIME_API const char *ten_protocol_asynced_get_name( - ten_protocol_asynced_t *self); - -TEN_RUNTIME_API void ten_protocol_asynced_set_impl_closeable( - ten_protocol_asynced_t *self, ten_closeable_t *impl); - -/** - * @brief The implementation protocol has been closed from its thread, then - * switch to the ten world to continue to close the base protocol. - */ -TEN_RUNTIME_API void ten_protocol_asynced_on_impl_closed_async( - ten_protocol_asynced_t *self); - -/** - * @brief The closeable of the implementation protocol (i.e., @a impl) could not - * be the directly underlying resource of the ten_protocol_asynced_t::closeable, - * as they belong to different threads. So we need to set the necessary hooks in - * those two closeable objects to ensure that the behaviors will be correct. Ex: - * the 'intend_to_close', 'is_closing_root' behaviors. In some cases, the - * relevant ten_protocol_asynced_t of the implementation protocol is not created - * in time (ex: the libws worker), the implementation protocol could not call - * the above 'ten_protocol_asynced_set_impl_closeable()' once it is created. So - * we provide this function for the implementation protocol to set the default - * behaviors once it is created. - */ -TEN_RUNTIME_PRIVATE_API void -ten_protocol_asynced_set_default_closeable_behavior(ten_closeable_t *impl); diff --git a/core/src/ten_runtime/protocol/BUILD.gn b/core/src/ten_runtime/protocol/BUILD.gn index e9b4472dec..c8010c0d07 100644 --- a/core/src/ten_runtime/protocol/BUILD.gn +++ b/core/src/ten_runtime/protocol/BUILD.gn @@ -8,8 +8,5 @@ import("//build/ten_runtime/glob.gni") glob("protocol") { file_list = all_native_files - deps = [ - "asynced", - "integrated", - ] + deps = [ "integrated" ] } diff --git a/core/src/ten_runtime/protocol/asynced/BUILD.gn b/core/src/ten_runtime/protocol/asynced/BUILD.gn deleted file mode 100644 index a06ee37b57..0000000000 --- a/core/src/ten_runtime/protocol/asynced/BUILD.gn +++ /dev/null @@ -1,11 +0,0 @@ -# -# Copyright © 2024 Agora -# This file is part of TEN Framework, an open source project. -# Licensed under the Apache License, Version 2.0, with certain conditions. -# Refer to the "LICENSE" file in the root directory for more information. -# -import("//build/ten_runtime/glob.gni") - -glob("asynced") { - file_list = all_native_files -} diff --git a/core/src/ten_runtime/protocol/asynced/external.c b/core/src/ten_runtime/protocol/asynced/external.c deleted file mode 100644 index 688785bf6f..0000000000 --- a/core/src/ten_runtime/protocol/asynced/external.c +++ /dev/null @@ -1,156 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#include "include_internal/ten_runtime/common/closeable.h" -#include "include_internal/ten_runtime/protocol/asynced/internal.h" -#include "include_internal/ten_runtime/protocol/asynced/protocol_asynced.h" -#include "include_internal/ten_runtime/protocol/protocol.h" -#include "ten_utils/macro/check.h" -#include "ten_utils/lib/ref.h" -#include "ten_utils/macro/mark.h" - -// @{ -// intend_to_close. - -// As 'ten_protocol_asynced_impl_on_intend_to_close()' and -// 'ten_protocol_asynced_handle_intends_to_close()' will call each other, we -// declare it here. -static void ten_protocol_asynced_impl_on_intend_to_close( - ten_closeable_t *impl, void *self_, void *on_intend_to_close_data); - -static void ten_protocol_asynced_handle_intends_to_close( - ten_protocol_asynced_t *self, TEN_UNUSED void *arg) { - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(self->impl_closeable && - ten_closeable_check_integrity(self->impl_closeable, true), - "Access across threads."); - - ten_protocol_asynced_impl_on_intend_to_close(self->impl_closeable, self, - NULL); -} - -// When the implementation protocol wants to close, it needs to notify the -// protocol in the TEN world, and this notification action would across threads. -static void ten_protocol_asynced_impl_on_intend_to_close( - ten_closeable_t *impl, void *self_, - TEN_UNUSED void *on_intend_to_close_data) { - TEN_ASSERT( - impl && ten_closeable_check_integrity(impl, true), - "This function is always called in the implementation protocol thread."); - - ten_protocol_asynced_t *self = (ten_protocol_asynced_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - - ten_protocol_asynced_post_task_to_ten( - self, ten_protocol_asynced_handle_intends_to_close, - ten_protocol_asynced_intends_to_close_task, NULL); -} -// @} - -static void ten_protocol_asynced_impl_on_closed( - ten_closeable_t *impl, void *self_, void *on_closed_data, - ten_closeable_on_closed_done_func_t on_closed_done) { - TEN_ASSERT( - impl && ten_closeable_check_integrity(impl, true), - "This function is always called in the implementation protocol thread."); - - // The ten_protocol_asynced_t::closeable is not the directly owner of - // ten_protocol_asynced_t::impl_closeable, as they are in different threads. - // Since the async protocol only has the external implementation protocol as - // its resource, there is not much else to do within the on_closed() callback. - // It simply needs to invoke the on_closed_done() callback to notify @a impl - // that all the tasks it needs to do for the 'closed' event have been - // completed. - if (on_closed_done) { - on_closed_done(impl, self_, on_closed_data); - } -} - -static void ten_protocol_asynced_impl_on_closed_all_done( - TEN_UNUSED ten_closeable_t *impl, void *self_, - TEN_UNUSED void *on_closed_all_done_data) { - ten_protocol_asynced_t *self = (ten_protocol_asynced_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - - // The implementation protocol has been closed, so the connection could not be - // in migration. - ten_protocol_asynced_post_task_to_ten( - self, NULL, ten_protocol_asynced_on_impl_closed_task, NULL); -} - -void ten_protocol_asynced_set_impl_closeable(ten_protocol_asynced_t *self, - ten_closeable_t *impl) { - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT( - impl && ten_closeable_check_integrity(impl, true), - "This function is always called in the implementation protocol thread."); - - self->impl_closeable = impl; - - // The closeable of the implementation (i.e., 'self->impl_closeable') belongs - // to the implementation protocol thread, so we have to customize the - // 'intend_to_close' and 'on_closed' hook to do the thread context switch. - ten_closeable_add_be_notified( - self->impl_closeable, self, ten_protocol_asynced_impl_on_intend_to_close, - NULL, ten_protocol_asynced_impl_on_closed, NULL, - ten_protocol_asynced_impl_on_closed_all_done, NULL); -} - -static bool ten_protocol_asynced_is_closing_root_myself( - TEN_UNUSED ten_closeable_t *self, TEN_UNUSED ten_closeable_t *underlying, - TEN_UNUSED void *on_closing_root_not_found_data) { - // The closeable of the implementation protocol will be the root in its own - // world, as it could _not_ be the directly underlying resource of the - // ten_protocol_asynced_t::closeable. In other words, the - // 'belong_to_resources' of the closeable of the implementation protocol is - // EMPTY. However, the closeable of the implementation protocol could not be - // the closing root, as the resources in the implementation world are a - // subtree of the base protocol (i.e., ten_protocol_t) in the ten world. The - // duty of ten_protocol_asynced_t::closeable is to connect the resources in - // the two worlds. - return false; -} - -void ten_protocol_asynced_set_default_closeable_behavior( - ten_closeable_t *impl) { - TEN_ASSERT(impl && ten_closeable_check_integrity(impl, true), - "Access across threads."); - - ten_closeable_set_is_closing_root_myself( - impl, ten_protocol_asynced_is_closing_root_myself, NULL); -} - -void ten_protocol_asynced_on_impl_closed_async(ten_protocol_asynced_t *self) { - TEN_ASSERT(self, "Should not happen."); - - ten_protocol_t *base_protocol = &self->base; - TEN_ASSERT(base_protocol, "Should not happen."); - TEN_ASSERT( - // TEN_NOLINTNEXTLINE(thread-check) - // By design, it can be called from any threads when the - // implementation is closed. When the implementation is - // closing, it may have to switch to its own thread to do some - // cleanup. When the implementation closure is done, it can - // call this function directly from its thread without needing - // to switch to the thread of the base protocol. - ten_protocol_check_integrity(base_protocol, false), "Should not happen."); - - ten_protocol_asynced_post_task_to_ten( - self, NULL, ten_protocol_asynced_on_impl_closed_task, NULL); -} - -void ten_protocol_asynced_close_impl(ten_protocol_asynced_t *self, - TEN_UNUSED void *arg) { - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT( - self->impl_closeable && - ten_closeable_check_integrity(self->impl_closeable, true), - "This function is always called in the implementation protocol thread."); - - ten_closeable_close(self->impl_closeable); - - ten_ref_dec_ref(&self->base.ref); -} diff --git a/core/src/ten_runtime/protocol/asynced/internal.c b/core/src/ten_runtime/protocol/asynced/internal.c deleted file mode 100644 index 1fb6f37c76..0000000000 --- a/core/src/ten_runtime/protocol/asynced/internal.c +++ /dev/null @@ -1,94 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#include "include_internal/ten_runtime/protocol/asynced/internal.h" - -#include "include_internal/ten_runtime/common/closeable.h" -#include "include_internal/ten_runtime/protocol/asynced/protocol_asynced.h" -#include "include_internal/ten_runtime/protocol/close.h" -#include "include_internal/ten_runtime/protocol/protocol.h" -#include "ten_utils/macro/check.h" -#include "ten_utils/lib/ref.h" -#include "ten_utils/macro/field.h" -#include "ten_utils/macro/mark.h" - -void ten_protocol_asynced_on_impl_closed_task(void *self_, - TEN_UNUSED void *arg) { - ten_protocol_asynced_t *self = (ten_protocol_asynced_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_closeable_check_integrity(&self->closeable, true), - "This function must be called in the ten world."); - - ten_protocol_t *base_protocol = &self->base; - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, true), - "Invalid argument."); - - ten_closeable_action_to_close_myself_done(&self->closeable, NULL); - - ten_ref_dec_ref(&base_protocol->ref); -} - -static void ten_protocol_asynced_action_to_close_myself( - ten_closeable_t *closeable, TEN_UNUSED void *action_to_close_myself_data, - TEN_UNUSED ten_closeable_action_to_close_myself_done_func_t - action_to_close_myself_done) { - TEN_ASSERT(closeable && ten_closeable_check_integrity(closeable, true), - "Access across threads."); - - ten_protocol_asynced_t *self = - CONTAINER_OF_FROM_OFFSET(closeable, closeable->offset_in_impl); - TEN_ASSERT(self, "Invalid argument."); - - ten_protocol_t *base_protocol = &self->base; - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, true), - "Access across threads."); - - ten_ref_inc_ref(&base_protocol->ref); - - // Note that we can not read 'ten_protocol_asynced_t::impl_closeable' here - // as it should be accessed in the implementation protocol thread. - self->post_task_to_impl(self, ten_protocol_asynced_close_impl, NULL); -} - -void ten_protocol_asynced_init_closeable(ten_protocol_asynced_t *self) { - TEN_ASSERT(self, "Invalid argument."); - - ten_closeable_init(&self->closeable, - offsetof(ten_protocol_asynced_t, closeable)); - - // The closure of the ten_protocol_asynced_t is triggered by its owner (i.e., - // ten_protocol_t) from the ten world, but the implementation protocol runs in - // another thread. So what the ten_protocol_asynced_t does in closing itself - // is to do the thread context switch and to close the implementation in the - // external thread. - ten_closeable_set_action_to_close_myself( - &self->closeable, ten_protocol_asynced_action_to_close_myself, NULL); - - ten_closeable_add_underlying_resource( - &self->base.closeable, &self->closeable, NULL, NULL, - (ten_closeable_on_intend_to_close_func_t) - ten_protocol_on_impl_intends_to_close, - NULL, ten_protocol_on_impl_closed_all_done, NULL); - - // This field _MUST_ be assigned in the implementation protocol thread. - self->impl_closeable = NULL; -} - -void ten_protocol_asynced_intends_to_close_task(void *self_, - TEN_UNUSED void *arg) { - ten_protocol_asynced_t *self = (ten_protocol_asynced_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_closeable_check_integrity(&self->closeable, true), - "This function is always called in the ten world."); - - ten_protocol_t *base_protocol = &self->base; - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, true), - "Access across threads."); - - ten_closeable_intend_to_close(&self->closeable, NULL); - - ten_ref_dec_ref(&base_protocol->ref); -} diff --git a/core/src/ten_runtime/protocol/asynced/protocol_asynced.c b/core/src/ten_runtime/protocol/asynced/protocol_asynced.c deleted file mode 100644 index 966cfe5979..0000000000 --- a/core/src/ten_runtime/protocol/asynced/protocol_asynced.c +++ /dev/null @@ -1,613 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#include "include_internal/ten_runtime/protocol/asynced/protocol_asynced.h" - -#include "include_internal/ten_runtime/addon/addon.h" -#include "include_internal/ten_runtime/addon/protocol/protocol.h" -#include "include_internal/ten_runtime/app/app.h" -#include "include_internal/ten_runtime/app/migration.h" -#include "include_internal/ten_runtime/app/msg_interface/common.h" -#include "include_internal/ten_runtime/common/closeable.h" -#include "include_internal/ten_runtime/connection/connection.h" -#include "include_internal/ten_runtime/connection/migration.h" -#include "include_internal/ten_runtime/engine/engine.h" -#include "include_internal/ten_runtime/engine/internal/migration.h" -#include "include_internal/ten_runtime/msg/msg.h" -#include "include_internal/ten_runtime/protocol/asynced/internal.h" -#include "include_internal/ten_runtime/protocol/close.h" -#include "include_internal/ten_runtime/protocol/protocol.h" -#include "include_internal/ten_runtime/remote/remote.h" -#include "include_internal/ten_runtime/ten_env/ten_env.h" -#include "ten_runtime/app/app.h" -#include "ten_runtime/protocol/close.h" -#include "ten_utils/container/list.h" -#include "ten_utils/container/list_node_ptr.h" -#include "ten_utils/container/list_ptr.h" -#include "ten_utils/io/runloop.h" -#include "ten_utils/lib/alloc.h" -#include "ten_utils/lib/ref.h" -#include "ten_utils/lib/smart_ptr.h" -#include "ten_utils/lib/string.h" -#include "ten_utils/macro/check.h" -#include "ten_utils/macro/mark.h" -#include "ten_utils/macro/memory.h" -#include "ten_utils/sanitizer/thread_check.h" - -static void ten_protocol_asynced_migrate(ten_protocol_asynced_t *self, - ten_engine_t *engine, - ten_connection_t *connection, - TEN_UNUSED ten_shared_ptr_t *cmd) { - TEN_ASSERT(self && ten_protocol_check_integrity(&self->base, true), - "Should not happen."); - TEN_ASSERT(engine->app && ten_app_check_integrity(engine->app, true), - "The function is called in the app thread, and will migrate the " - "protocol to the protocol thread."); - - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: This function will be called when starting to migrate the - // connection from the TEN app to the TEN engine. So the - // 'ten_engine_on_connection_cleaned_async()' function uses the async tasks to - // ensure the thread safety. - TEN_ASSERT(engine && ten_engine_check_integrity(engine, false), - "Should not happen."); - TEN_ASSERT(connection && ten_connection_check_integrity(connection, true), - "Should not happen."); - - // We are in the app thread now, so we can call 'ten_app_clean_connection()' - // here directly. - ten_app_clean_connection(engine->app, connection); - - // We need to switch to the engine thread to do some operations which need to - // be happened in the engine thread. - ten_engine_on_connection_cleaned_async(engine, connection, cmd); -} - -static void ten_protocol_asynced_on_input(void *self_, void *arg) { - ten_protocol_asynced_t *self = (ten_protocol_asynced_t *)self_; - TEN_ASSERT(self && ten_protocol_check_integrity(&self->base, true), - "Should not happen."); - - ten_shared_ptr_t *msg = (ten_shared_ptr_t *)arg; - TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Invalid argument."); - - if (!ten_protocol_is_closing(&self->base)) { - ten_protocol_on_input(&self->base, msg); - } - - ten_shared_ptr_destroy(msg); - - // The task is completed, so delete a reference to the 'protocol' to reflect - // this fact. - ten_ref_dec_ref(&self->base.ref); -} - -ten_protocol_asynced_creation_info_t *ten_protocol_asynced_creation_info_create( - ten_protocol_asynced_on_created_func_t on_created, void *user_data) { - TEN_ASSERT(on_created, "Invalid argument."); - - ten_protocol_asynced_creation_info_t *self = - (ten_protocol_asynced_creation_info_t *)TEN_MALLOC( - sizeof(ten_protocol_asynced_creation_info_t)); - TEN_ASSERT(self, "Failed to allocate memory."); - - self->on_created = on_created; - self->user_data = user_data; - - return self; -} - -void ten_protocol_asynced_creation_info_destroy( - ten_protocol_asynced_creation_info_t *self) { - TEN_ASSERT(self, "Invalid argument."); - - TEN_FREE(self); -} - -static void ten_protocol_asynced_on_base_protocol_cleaned_task( - ten_protocol_asynced_t *self, void *arg) { - TEN_ASSERT(self, "Invalid argument."); - - ten_protocol_t *base_protocol = &self->base; - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: This function must be called in the implementation protocol - // thread after the migration is completed, as the - // 'ten_protocol_asynced_t::migration_state' field must be accessed from the - // implementation protocol thread. So do not check thread integrity of the - // base protocol. - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, false), - "Invalid argument."); - - bool is_migration_state_reset = arg; - - if (is_migration_state_reset) { - TEN_LOGD("The connection migration is reset."); - self->migration_state = TEN_CONNECTION_MIGRATION_STATE_INIT; - } else { - TEN_LOGD("The connection migration is completed."); - self->migration_state = TEN_CONNECTION_MIGRATION_STATE_DONE; - } - - ten_list_foreach (&self->pending_task_queue, iter) { - ten_protocol_asynced_task_t *task = ten_ptr_listnode_get(iter.node); - TEN_ASSERT(task && task->handler, "Should not happen."); - - task->handler(self, task->arg); - } - - ten_list_clear(&self->pending_task_queue); - - ten_ref_dec_ref(&base_protocol->ref); -} - -static void ten_protocol_asynced_on_base_protocol_cleaned( - ten_protocol_asynced_t *self, bool is_migration_state_reset) { - TEN_ASSERT(self, "Invalid argument."); - - ten_protocol_t *base_protocol = &self->base; - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, true), - "This function is always called in the engine thread when the " - "migration in the TEN world is completed."); - - ten_sanitizer_thread_check_set_belonging_thread_to_current_thread( - &self->closeable.thread_check); - - // The below 'trigger_impl()' function will post a runloop task, so increase - // the reference count of the base protocol here. - ten_ref_inc_ref(&base_protocol->ref); - - self->post_task_to_impl(self, - ten_protocol_asynced_on_base_protocol_cleaned_task, - (void *)is_migration_state_reset); // NOLINT -} - -void ten_protocol_asynced_init( - ten_protocol_asynced_t *self, const char *name, - ten_addon_host_t *addon_host, ten_protocol_on_output_func_t on_output, - ten_protocol_listen_func_t listen, - ten_protocol_connect_to_func_t connect_to, - ten_protocol_asynced_post_task_to_impl_func_t post_task_to_impl) { - TEN_ASSERT(self && name && addon_host, "Should not happen."); - TEN_ASSERT(post_task_to_impl, - "The 'trigger_impl' could not be NULL, it's used to notify the " - "implementation protocol from the TEN world."); - - // The 'ten_protocol_asynced_t::closeable' is an underlying resource of - // 'ten_protocol_t::closeable', so the 'ten_protocol_asynced_t' does _not_ - // need to register the 'ten_protocol_t::close' callback. - ten_protocol_init(&self->base, name, NULL, on_output, listen, connect_to, - (ten_protocol_migrate_func_t)ten_protocol_asynced_migrate, - /* clean */ NULL); - - ten_protocol_asynced_init_closeable(self); - - // Note that the value of 'ten_protocol_asynced_t::migration_state' is only - // _meaningful_ in the flow of the input messages handling (i.e., when the - // implementation protocol receives messages from the client side), in other - // words, we only need to consider the migration flow only if there is any - // input messages coming from this protocol, therefore, in the very beginning, - // we set the default value to 'DONE', but not 'INIT' here. - // - // When there is any client connects to this protocol, it implies that there - // will be some input messages coming from this protocol, so we will set - // 'ten_protocol_asynced_t::migration_state' to INIT in - // 'ten_protocol_asynced_on_client_accepted()'. - self->migration_state = TEN_CONNECTION_MIGRATION_STATE_DONE; - - ten_list_init(&self->pending_task_queue); - - self->post_task_to_impl = post_task_to_impl; - - self->base.on_cleaned_for_external = - (ten_protocol_on_cleaned_for_external_func_t) - ten_protocol_asynced_on_base_protocol_cleaned; -} - -void ten_protocol_asynced_deinit(ten_protocol_asynced_t *self) { - TEN_ASSERT(self && - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: The belonging thread of the 'protocol' is - // ended when this function is called, so we can not check - // thread integrity here. - ten_protocol_check_integrity(&self->base, false), - "Should not happen."); - - TEN_ASSERT(ten_list_is_empty(&self->pending_task_queue), - "The pending tasks should be processed before the protocol is " - "destroyed."); - - ten_closeable_deinit(&self->closeable); - ten_protocol_deinit(&self->base); -} - -/** - * @brief Handles the cached messages once the connection migration is completed - * or reset. - * - * @note Only the first message will be transferred to the TEN world before - * the migration is completed. And it's safe to retrieve the attached runloop of - * the base protocol (i.e., the base class of ten_protocol_asynced_t -- - * ten_protocol_t) only if the 'migration_state' is INIT (i.e., no message has - * been received yet) or DONE (i.e., the migration has been completed), - * otherwise, the messages should be cached until the migration is completed. - * - * The reason why do not use the 'ten_protocol_t::in_msgs' queue to cache - * those messages is that the 'ten_protocol_t::in_msgs' queue will be accessed - * both in the TEN world and the implementation protocol, in other words, the - * reading and writing of 'ten_protocol_t::in_msgs' _must_ be protected by - * 'ten_protocol_t::in_lock'. However, the queue for cached messages will be - * only accessed in the implementation protocol world, no mutex lock is - * needed. So it's better to use a separate queue for cached messages instead. - */ -static void ten_protocol_asynced_on_input_task(ten_protocol_asynced_t *self, - void *arg) { - TEN_ASSERT(self && arg, "Invalid argument."); - - ten_protocol_t *base_protocol = &self->base; - - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: This function is called from the implementation protocol - // thread to handle the pending task once the migration is completed or reset, - // so do not check the thread integrity of the base protocol. - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, false), - "Invalid argument."); - - ten_shared_ptr_t *msg = (ten_shared_ptr_t *)arg; - - ten_protocol_asynced_on_input_async(self, msg); - ten_shared_ptr_destroy(msg); -} - -/** - * This function is called from the implementation protocol thread when it - * receives a message from the client side, and it wants to transfer the message - * to the TEN world through the runloop of the related connection (i.e., the - * 'ten_connection_t' object). The brief flow is as follows: - * - * - When the first message comes, i.e., the - * 'ten_protocol_asynced_t::migration_state' is 'INIT' now, the - * 'ten_connection_t::migration_state' must be 'INIT' too, as the connection - * migration will be started only if the connection handles messages. So it's - * safe to retrieve the runloop of the related connection in this stage. And - * only one message could be transferred to the TEN world as the connection - * migration is always asynchronous and the migration must _not_ be executed - * twice. So 'ten_protocol_asynced_t::migration_state' will be set to - * 'FIRST_MSG' to ensure that the connection only handle one message before - * the migration is completed. - * - * - Before the migration is completed, i.e., the - * 'ten_protocol_asynced_t::migration_state' is not 'DONE', all the subsequent - * messages should be cached, and the closing flow of the asynced protocol - * should be frozen. - * - * - Once the migration is completed in the TEN world, the implementation - * protocol thread will receive an event, and the - * 'ten_protocol_asynced_on_base_protocol_cleaned_task()' function will be - * executed. Then the 'ten_protocol_asynced_t::migration_state' will be set to - * 'DONE', and it's time to handle the pending closing flow and messages. - * - * - The subsequent messages will be transferred to the TEN world directly as - * the migration is completed, the runloop of the related connection will be - * correct. - * - */ -bool ten_protocol_asynced_on_input_async(ten_protocol_asynced_t *self, - ten_shared_ptr_t *msg) { - TEN_ASSERT(self && msg, "Should not happen."); - - ten_protocol_t *base_protocol = &self->base; - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: By design, this function is used to send messages to the - // runtime from the implemented protocol. And the implemented protocol has its - // own thread. - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, false), - "Should not happen."); - - // The connection protocol is created in the implemented thread, so we can - // access its fields directly here. Refer to - // 'ten_protocol_asynced_on_client_accepted()'. - // - // Note that the 'ten_protocol_t::role' will be updated when the engine or app - // handles the message, so we can _not_ read it here. Refer to - // 'ten_connection_handle_command_from_external_client()'. - TEN_ASSERT(ten_protocol_attach_to(base_protocol) == - TEN_PROTOCOL_ATTACH_TO_CONNECTION, - "Should not happen."); - - if (ten_protocol_is_closing(&self->base)) { - TEN_LOGD( - "Protocol asynced[%p] is closing, could not handle messages any more.", - self); - return false; - } - - msg = ten_shared_ptr_clone(msg); - ten_protocol_asynced_post_task_to_ten(self, - ten_protocol_asynced_on_input_task, - ten_protocol_asynced_on_input, msg); - - if (self->migration_state == TEN_CONNECTION_MIGRATION_STATE_INIT) { - // Only transfer one message to the TEN world before the migration is - // completed, as the protocol side could not know whether the connection - // needs to be migrated or not. And the migration is always asynchronous, so - // transfer only one message to determine whether the connection needs to be - // migrated. - self->migration_state = TEN_CONNECTION_MIGRATION_STATE_FIRST_MSG; - } - - return true; -} - -static void ten_protocol_asynced_on_connected(void *self_, void *arg) { - ten_protocol_asynced_t *protocol = (ten_protocol_asynced_t *)self_; - TEN_ASSERT(protocol && ten_protocol_check_integrity(&protocol->base, true), - "Should not happen."); - - if (protocol->base.on_server_connected) { - protocol->base.on_server_connected(&protocol->base, (bool)arg); - } - - // The task is completed, so delete a reference to the 'protocol' to reflect - // this fact. - ten_ref_dec_ref(&protocol->base.ref); -} - -void ten_protocol_asynced_on_connected_async(ten_protocol_asynced_t *self, - bool is_connected) { - TEN_ASSERT(self, "Invalid argument."); - - ten_protocol_t *base_protocol = &self->base; - - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: This function is intended to be called in - // different threads. - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, false), - "Should not happen."); - - // TODO(Wei): Does the access to the 'is_closed' field is safe among different - // threads? - TEN_ASSERT(!base_protocol->is_closed, - "The protocol could not connect to remote after it is closed."); - - // This function is called from the 'external protocol' thread, when the - // 'connect_to' request is processed. But the TEN protocol maybe closing when - // this function is called, because the TEN engine is closing. Even if the TEN - // protocol is closing, the following 'on_connected' task still has to be - // submitted, otherwise, there will be memory leaks, because the 'connect_to' - // cmd can not be destroyed (this action will be triggered in - // 'ten_protocol_asynced_on_connected'). - // - // And it's fine to execute the 'on_connected' task, because the asynced - // protocol has not been closed, the TEN protocol and remote won't be closed. - - // Before posting a runloop task, we have to add a reference to the - // 'protocol', so that it will not be destroyed _before_ the runloop task is - // executed. - ten_ref_inc_ref(&base_protocol->ref); - - ten_runloop_t *loop = ten_protocol_get_attached_runloop(base_protocol); - TEN_ASSERT(loop, - "The connection migration is completed by default in 'connect_to' " - "scenario, so the runloop could not be NULL."); - - ten_runloop_post_task_tail(loop, ten_protocol_asynced_on_connected, self, - (void *)is_connected); // NOLINT -} - -static void ten_app_thread_on_client_protocol_created(ten_env_t *ten_env, - ten_protocol_t *instance, - void *cb_data) { - TEN_ASSERT(ten_env && ten_env_check_integrity(ten_env, true), - "Should not happen."); - - ten_app_t *app = ten_env_get_attached_app(ten_env); - TEN_ASSERT(app && ten_app_check_integrity(app, true), "Should not happen."); - - ten_protocol_asynced_creation_info_t *info = - (ten_protocol_asynced_creation_info_t *)cb_data; - TEN_ASSERT(info, "Invalid argument."); - - ten_protocol_asynced_t *listening_protocol = info->user_data; - TEN_ASSERT(listening_protocol && - ten_protocol_check_integrity(&listening_protocol->base, true), - "Invalid argument."); - - ten_protocol_asynced_t *protocol = (ten_protocol_asynced_t *)instance; - if (!protocol) { - TEN_LOGE("Failed to create the protocol instance."); - info->on_created(NULL, info); - return; - } - - // Those implementation protocols are used for handling the messages from - // the client side, and the related connection (i.e., the 'ten_connection_t' - // object) might need to be migrated, so set the value to 'INIT' as the - // default value is 'DONE'. Refer to 'ten_protocol_asynced_init()'. - protocol->migration_state = TEN_CONNECTION_MIGRATION_STATE_INIT; - - ten_protocol_attach_to_app_and_thread(&protocol->base, app); - - if (listening_protocol->base.on_client_accepted) { - listening_protocol->base.on_client_accepted(&listening_protocol->base, - &protocol->base); - } - - info->on_created(protocol, info); -} - -static void ten_protocol_asynced_on_client_accepted(void *self, void *info_) { - ten_protocol_asynced_t *listening_protocol = (ten_protocol_asynced_t *)self; - TEN_ASSERT(listening_protocol, "Should not happen."); - - ten_protocol_asynced_creation_info_t *info = - (ten_protocol_asynced_creation_info_t *)info_; - TEN_ASSERT(info, "Invalid argument."); - - if (ten_protocol_is_closing(&listening_protocol->base)) { - info->on_created(NULL, info); - } else { - ten_protocol_t *listening_base_protocol = &listening_protocol->base; - TEN_ASSERT(listening_base_protocol && - ten_protocol_check_integrity(listening_base_protocol, true), - "Invalid argument."); - TEN_ASSERT(listening_base_protocol->role == TEN_PROTOCOL_ROLE_LISTEN && - ten_protocol_attach_to(listening_base_protocol) == - TEN_PROTOCOL_ATTACH_TO_APP, - "Should not happen."); - - ten_addon_host_t *addon_host = listening_base_protocol->addon_host; - TEN_ASSERT(addon_host && ten_addon_host_check_integrity(addon_host) && - addon_host->type == TEN_ADDON_TYPE_PROTOCOL, - "Should not happen."); - - ten_app_t *app = listening_base_protocol->attached_target.app; - TEN_ASSERT(app && ten_app_check_integrity(app, true), "Should not happen."); - - ten_error_t err; - ten_error_init(&err); - - // We can _not_ know whether the protocol role is - // 'TEN_PROTOCOL_ROLE_IN_INTERNAL' or 'TEN_PROTOCOL_ROLE_IN_EXTERNAL' - // until the message received from the protocol is processed. Refer to - // 'ten_connection_on_msgs()' and - // 'ten_connection_handle_command_from_external_client()'. - bool rc = ten_addon_create_protocol( - app->ten_env, ten_string_get_raw_str(&addon_host->name), - ten_string_get_raw_str(&addon_host->name), TEN_PROTOCOL_ROLE_IN_DEFAULT, - ten_app_thread_on_client_protocol_created, info, NULL); - TEN_ASSERT(rc, "Failed to create protocol, err: %s", - ten_error_errmsg(&err)); - - ten_error_deinit(&err); - } - - // The task is completed, so delete a reference to the 'protocol' to reflect - // this fact. - ten_ref_dec_ref(&listening_protocol->base.ref); -} - -bool ten_protocol_asynced_on_client_accepted_async( - ten_protocol_asynced_t *listening_protocol, - ten_protocol_asynced_creation_info_t *info) { - TEN_ASSERT(listening_protocol, "Should not happen."); - - if (ten_protocol_is_closing(&listening_protocol->base)) { - TEN_LOGD( - "Protocol asynced[%p] is closing, could not receive client request.", - listening_protocol); - return false; - } - - // Before posting a runloop task, we have to add a reference to the - // 'protocol', so that it will not be destroyed _before_ the runloop task is - // executed. - ten_ref_inc_ref(&listening_protocol->base.ref); - - // TODO(xilin): Replace pushing a task to the runloop with wrapping it into - // 'ten_env' apis. - - ten_runloop_t *loop = - ten_protocol_get_attached_runloop(&listening_protocol->base); - TEN_ASSERT(loop, - "The attached runloop of the listen protocol is always the app's, " - "it could not be NULL."); - - ten_runloop_post_task_tail(loop, ten_protocol_asynced_on_client_accepted, - listening_protocol, info); - - return true; -} - -const char *ten_protocol_asynced_get_name(ten_protocol_asynced_t *self) { - TEN_ASSERT(self && ten_protocol_check_integrity(&self->base, true), - "Access across threads."); - - ten_addon_host_t *addon_host = self->base.addon_host; - TEN_ASSERT(addon_host && ten_addon_host_check_integrity(addon_host), - "Invalid argument."); - - ten_value_t *item = ten_value_object_peek(&addon_host->manifest, "name"); - TEN_ASSERT(item, "Failed to get protocol name from its manifest."); - - const char *name = ten_value_peek_raw_str(item); - return name; -} - -bool ten_protocol_asynced_safe_to_retrieve_runtime_runloop( - ten_protocol_asynced_t *self) { - TEN_ASSERT(self, "Invalid argument."); - - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: This function is always called in the implementation protocol - // thread. - TEN_ASSERT(ten_protocol_check_integrity(&self->base, false), - "Invalid argument."); - - return self->migration_state == TEN_CONNECTION_MIGRATION_STATE_INIT || - self->migration_state == TEN_CONNECTION_MIGRATION_STATE_DONE; -} - -static ten_protocol_asynced_task_t *ten_protocol_asynced_task_create( - ten_protocol_asynced_task_handler_func_t handler, void *arg) { - ten_protocol_asynced_task_t *self = - TEN_MALLOC(sizeof(ten_protocol_asynced_task_t)); - TEN_ASSERT(self, "Failed to allocate memory."); - - self->handler = handler; - self->arg = arg; - - return self; -} - -static void ten_protocol_asynced_task_destroy( - ten_protocol_asynced_task_t *self) { - TEN_ASSERT(self, "Invalid argument."); - - self->handler = NULL; - self->arg = NULL; - - TEN_FREE(self); -} - -void ten_protocol_asynced_post_task_to_ten( - ten_protocol_asynced_t *self, - ten_protocol_asynced_task_handler_func_t handler_if_in_migration, - void (*runloop_task_handler)(void *, void *), void *arg) { - TEN_ASSERT(self && runloop_task_handler, "Invalid argument."); - - ten_protocol_t *base_protocol = &self->base; - - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: This function is always called in the implementation protocol - // thread. All fields of the 'ten_protocol_asynced_t' accessed here are only - // read/written in the implementation protocol thread. - TEN_ASSERT(ten_protocol_check_integrity(base_protocol, false), - "Invalid argument."); - - if (ten_protocol_asynced_safe_to_retrieve_runtime_runloop(self)) { - ten_runloop_t *loop = ten_protocol_get_attached_runloop(base_protocol); - TEN_ASSERT(loop, "Invalid argument."); - - ten_ref_inc_ref(&base_protocol->ref); - - ten_runloop_post_task_tail(loop, runloop_task_handler, self, arg); - } else { - if (handler_if_in_migration) { - // Create a pending task in the implementation protocol, and will continue - // to handle it in the implementation protocol when the migration is - // completed. - ten_protocol_asynced_task_t *pending_task = - ten_protocol_asynced_task_create(handler_if_in_migration, arg); - ten_list_push_ptr_back( - &self->pending_task_queue, pending_task, - (ten_ptr_listnode_destroy_func_t)ten_protocol_asynced_task_destroy); - } else { - TEN_ASSERT(0, - "The 'handler_if_in_migration' is required if the connection " - "is in migration."); - } - } -}