Skip to content

Commit

Permalink
feat: enable extensions in different threads can receive msgs when on…
Browse files Browse the repository at this point in the history
…_stop
  • Loading branch information
halajohn committed Jan 9, 2025
1 parent 5216bec commit 4f6b51e
Show file tree
Hide file tree
Showing 41 changed files with 543 additions and 157 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=AudioFrameTest.MultiDestAudioFrame"
"--gtest_filter=ExtensionTest.MultiDestSendInStopPeriod"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
8 changes: 6 additions & 2 deletions core/src/ten_runtime/addon/extension/extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ bool ten_addon_create_extension(ten_env_t *ten_env, const char *addon_name,
ten_addon_on_create_extension_instance_ctx_create(
TEN_ADDON_TYPE_EXTENSION, addon_name, instance_name, cb, cb_data);

ten_runloop_post_task_tail(
int rc = ten_runloop_post_task_tail(
ten_extension_group_get_attached_runloop(extension_group),
ten_extension_thread_create_extension_instance,
extension_group->extension_thread, ctx);
TEN_ASSERT(!rc, "Should not happen.");

return true;
}
}
Expand Down Expand Up @@ -167,10 +169,12 @@ bool ten_addon_destroy_extension(ten_env_t *ten_env, ten_extension_t *extension,
ten_addon_host_on_destroy_instance_ctx_create(addon_host, extension, cb,
cb_data);

ten_runloop_post_task_tail(
int rc = ten_runloop_post_task_tail(
ten_extension_group_get_attached_runloop(extension_group),
ten_extension_thread_destroy_addon_instance,
extension_group->extension_thread, destroy_instance_info);
TEN_ASSERT(!rc, "Should not happen.");

return true;
}
}
Expand Down
26 changes: 17 additions & 9 deletions core/src/ten_runtime/addon/ten_env/on_xxx.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,12 @@ static void ten_extension_addon_on_create_instance_done(ten_env_t *self,
ctx->extension = extension;
ctx->addon_context = addon_context;

ten_runloop_post_task_tail(
int rc = ten_runloop_post_task_tail(
ten_extension_group_get_attached_runloop(extension_group),
ten_extension_thread_on_addon_create_extension_done, extension_thread,
ctx);
TEN_ASSERT(!rc, "Should not happen.");

break;
}

Expand Down Expand Up @@ -249,9 +251,10 @@ static void ten_extension_group_addon_on_create_instance_done(ten_env_t *self,
ctx->extension_group = extension_group;
ctx->addon_context = addon_context;

ten_runloop_post_task_tail(
int rc = ten_runloop_post_task_tail(
ten_engine_get_attached_runloop(engine),
ten_engine_on_addon_create_extension_group_done, engine, ctx);
TEN_ASSERT(!rc, "Should not happen.");
break;
}

Expand Down Expand Up @@ -313,9 +316,10 @@ static void ten_protocol_addon_on_create_instance_done(ten_env_t *self,
ctx->protocol = protocol;
ctx->addon_context = addon_context;

ten_runloop_post_task_tail(
int rc = ten_runloop_post_task_tail(
ten_engine_get_attached_runloop(engine),
ten_engine_thread_on_addon_create_protocol_done, engine, ctx);
TEN_ASSERT(!rc, "Should not happen.");
break;
}

Expand All @@ -330,9 +334,10 @@ static void ten_protocol_addon_on_create_instance_done(ten_env_t *self,
ctx->protocol = instance;
ctx->addon_context = addon_context;

ten_runloop_post_task_tail(ten_app_get_attached_runloop(app),
ten_app_thread_on_addon_create_protocol_done,
app, ctx);
int rc = ten_runloop_post_task_tail(
ten_app_get_attached_runloop(app),
ten_app_thread_on_addon_create_protocol_done, app, ctx);
TEN_ASSERT(!rc, "Should not happen.");
break;
}

Expand Down Expand Up @@ -387,9 +392,10 @@ static void ten_addon_loader_addon_on_create_instance_done(ten_env_t *self,
ctx->addon_loader = instance;
ctx->addon_context = addon_context;

ten_runloop_post_task_tail(
int rc = ten_runloop_post_task_tail(
ten_app_get_attached_runloop(app),
ten_app_thread_on_addon_create_addon_loader_done, app, ctx);
TEN_ASSERT(!rc, "Should not happen.");
break;
}

Expand Down Expand Up @@ -489,10 +495,11 @@ void ten_addon_on_destroy_instance_done(ten_env_t *self, void *context) {
ten_engine_check_integrity(engine, false),
"Should not happen.");

ten_runloop_post_task_tail(
int rc = ten_runloop_post_task_tail(
ten_engine_get_attached_runloop(engine),
ten_engine_on_addon_destroy_extension_group_done, engine,
addon_context);
TEN_ASSERT(!rc, "Should not happen.");
break;
}

Expand All @@ -519,10 +526,11 @@ void ten_addon_on_destroy_instance_done(ten_env_t *self, void *context) {
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");

ten_runloop_post_task_tail(
int rc = ten_runloop_post_task_tail(
ten_extension_group_get_attached_runloop(extension_group),
ten_extension_thread_on_addon_destroy_extension_done,
extension_thread, addon_context);
TEN_ASSERT(!rc, "Should not happen.");
break;
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/ten_runtime/app/close.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ bool ten_app_close(ten_app_t *self, TEN_UNUSED ten_error_t *err) {

self->state = TEN_APP_STATE_CLOSING;

ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_close_task, self, NULL);
int rc = ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_close_task, self, NULL);
TEN_ASSERT(!rc, "Should not happen.");

done:
ten_mutex_unlock(self->state_lock);
Expand Down
7 changes: 4 additions & 3 deletions core/src/ten_runtime/app/engine_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ static void ten_app_check_termination_when_engine_closed_async(
ten_app_check_integrity(self, false),
"Should not happen.");

ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_check_termination_when_engine_closed_,
self, engine);
int rc = ten_runloop_post_task_tail(
ten_app_get_attached_runloop(self),
ten_app_check_termination_when_engine_closed_, self, engine);
TEN_ASSERT(!rc, "Should not happen.");
}

// This function is called in the engine thread.
Expand Down
5 changes: 3 additions & 2 deletions core/src/ten_runtime/app/internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ void ten_app_start(ten_app_t *self) {
ten_app_find_and_set_base_dir(self);

// Add the first task of app.
ten_runloop_post_task_tail(self->loop, ten_app_handle_metadata_task, self,
NULL);
int rc = ten_runloop_post_task_tail(self->loop, ten_app_handle_metadata_task,
self, NULL);
TEN_ASSERT(!rc, "Should not happen.");

ten_runloop_run(self->loop);

Expand Down
6 changes: 4 additions & 2 deletions core/src/ten_runtime/app/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ void ten_app_clean_connection_async(ten_app_t *self,
ten_connection_check_integrity(connection, false),
"Should not happen.");

ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_clean_connection_task, connection, NULL);
int rc = ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_clean_connection_task, connection,
NULL);
TEN_ASSERT(!rc, "Should not happen.");
}
5 changes: 3 additions & 2 deletions core/src/ten_runtime/app/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,9 @@ static void ten_app_handle_in_msgs_async(ten_app_t *self) {
ten_app_check_integrity(self, false),
"Should not happen.");

ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_handle_in_msgs_task, self, NULL);
int rc = ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_handle_in_msgs_task, self, NULL);
TEN_ASSERT(!rc, "Should not happen.");
}

void ten_app_push_to_in_msgs_queue(ten_app_t *self, ten_shared_ptr_t *msg) {
Expand Down
17 changes: 10 additions & 7 deletions core/src/ten_runtime/app/ten_env/metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ void ten_app_set_property_async(ten_app_t *self, const char *name,
ten_app_set_property_context_t *set_property_context =
set_property_context_create(name, value, cb, cb_data);

ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_set_property_task, self,
set_property_context);
int rc = ten_runloop_post_task_tail(ten_app_get_attached_runloop(self),
ten_app_set_property_task, self,
set_property_context);
TEN_ASSERT(!rc, "Should not happen.");
}

ten_value_t *ten_app_peek_property(ten_app_t *app, const char *name) {
Expand Down Expand Up @@ -162,8 +163,9 @@ void ten_app_peek_property_async(ten_app_t *self, const char *name,
ten_app_peek_property_context_t *context =
ten_app_peek_property_context_create(name, cb, cb_data);

ten_runloop_post_task_tail(self->loop, ten_app_peek_property_task, self,
context);
int rc = ten_runloop_post_task_tail(self->loop, ten_app_peek_property_task,
self, context);
TEN_ASSERT(!rc, "Should not happen.");
}

ten_value_t *ten_app_peek_manifest(ten_app_t *self, const char *name) {
Expand Down Expand Up @@ -241,6 +243,7 @@ void ten_app_peek_manifest_async(ten_app_t *self, const char *name,
ten_app_peek_manifest_context_t *context =
ten_app_peek_manifest_context_create(name, cb, cb_data);

ten_runloop_post_task_tail(self->loop, ten_app_peek_manifest_task, self,
context);
int rc = ten_runloop_post_task_tail(self->loop, ten_app_peek_manifest_task,
self, context);
TEN_ASSERT(!rc, "Should not happen.");
}
6 changes: 4 additions & 2 deletions core/src/ten_runtime/connection/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ static void ten_connection_on_migration_is_done_or_reset(
// thread if the migration is completed. We must make sure the engine handle
// all the messages in the original order.
ten_runloop_t *loop = ten_connection_get_attached_runloop(self);
ten_runloop_post_task_tail(loop, ten_protocol_on_cleaned_task, protocol,
(void *)is_migration_state_reset);
int rc =
ten_runloop_post_task_tail(loop, ten_protocol_on_cleaned_task, protocol,
(void *)is_migration_state_reset);
TEN_ASSERT(!rc, "Should not happen.");
} else {
TEN_ASSERT(0, "Should not happen.");
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/ten_runtime/engine/internal/close.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ void ten_engine_close_async(ten_engine_t *self) {
return;
}

ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_close_task, self, NULL);
int rc = ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_close_task, self, NULL);
TEN_ASSERT(!rc, "Should not happen.");
}

bool ten_engine_is_closing(ten_engine_t *self) {
Expand Down
13 changes: 4 additions & 9 deletions core/src/ten_runtime/engine/internal/extension_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,6 @@ static void ten_engine_on_extension_msgs(ten_engine_t *self) {
"When this function is executed, there should be only one "
"destination remaining in the message's dest.");

if (ten_engine_is_closing(self) &&
!ten_msg_type_to_handle_when_closing(msg)) {
// Except some special messages, do not handle the message if the engine
// is closing.
continue;
}

ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc),
"Should not happen.");
Expand Down Expand Up @@ -141,8 +134,10 @@ static void ten_engine_on_extension_msgs_async(ten_engine_t *self) {
ten_engine_check_integrity(self, false),
"Should not happen.");

ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_on_extension_msgs_task, self, NULL);
int rc =
ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_on_extension_msgs_task, self, NULL);
TEN_ASSERT(!rc, "Should not happen.");
}

void ten_engine_push_to_extension_msgs_queue(ten_engine_t *self,
Expand Down
7 changes: 4 additions & 3 deletions core/src/ten_runtime/engine/internal/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ void ten_engine_on_connection_cleaned_async(ten_engine_t *self,
ten_engine_migration_user_data_t *user_data =
ten_engine_migration_user_data_create(connection, cmd);

ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_on_connection_cleaned_task, self,
user_data);
int rc = ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_on_connection_cleaned_task,
self, user_data);
TEN_ASSERT(!rc, "Should not happen.");
}

void ten_engine_on_connection_closed(ten_connection_t *connection,
Expand Down
11 changes: 4 additions & 7 deletions core/src/ten_runtime/engine/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ void ten_engine_handle_in_msgs_async(ten_engine_t *self) {
ten_engine_check_integrity(self, false),
"Should not happen.");

ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_handle_in_msgs_task, self, NULL);
int rc =
ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_handle_in_msgs_task, self, NULL);
TEN_ASSERT(!rc, "Should not happen.");
}

void ten_engine_append_to_in_msgs_queue(ten_engine_t *self,
Expand Down Expand Up @@ -238,11 +240,6 @@ bool ten_engine_dispatch_msg(ten_engine_t *self, ten_shared_ptr_t *msg) {
"When this function is executed, there should be only one "
"destination remaining in the message's dest.");

if (ten_engine_is_closing(self)) {
// Do not dispatch the message if the engine is closing.
return true;
}

ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc),
"Should not happen.");
Expand Down
19 changes: 12 additions & 7 deletions core/src/ten_runtime/extension/ten_env/metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ bool ten_extension_set_property_async(ten_extension_t *self, const char *path,
ten_extension_set_property_context_t *set_property_context =
set_property_context_create(path, value, cb, cb_data);

ten_runloop_post_task_tail(ten_extension_get_attached_runloop(self),
ten_extension_set_property_task, self,
set_property_context);
int rc = ten_runloop_post_task_tail(ten_extension_get_attached_runloop(self),
ten_extension_set_property_task, self,
set_property_context);
TEN_ASSERT(!rc, "Should not happen.");

return true;
}
Expand Down Expand Up @@ -205,8 +206,10 @@ bool ten_extension_peek_property_async(
ten_extension_peek_property_context_t *context =
ten_extension_peek_property_context_create(path, cb, cb_data);

ten_runloop_post_task_tail(self->extension_thread->runloop,
ten_extension_peek_property_task, self, context);
int rc = ten_runloop_post_task_tail(self->extension_thread->runloop,
ten_extension_peek_property_task, self,
context);
TEN_ASSERT(!rc, "Should not happen.");

return true;
}
Expand Down Expand Up @@ -300,8 +303,10 @@ bool ten_extension_peek_manifest_async(
ten_extension_peek_manifest_context_t *context =
ten_extension_peek_manifest_context_create(path, cb, cb_data);

ten_runloop_post_task_tail(self->extension_thread->runloop,
ten_extension_peek_manifest_task, self, context);
int rc = ten_runloop_post_task_tail(self->extension_thread->runloop,
ten_extension_peek_manifest_task, self,
context);
TEN_ASSERT(!rc, "Should not happen.");

return true;
}
21 changes: 12 additions & 9 deletions core/src/ten_runtime/extension_group/ten_env/metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ void ten_extension_group_set_property_async(
ten_extension_group_set_property_context_t *set_property_context =
set_property_context_create(name, value, cb, cb_data);

ten_runloop_post_task_tail(ten_extension_group_get_attached_runloop(self),
ten_extension_group_set_property_task, self,
set_property_context);
int rc = ten_runloop_post_task_tail(
ten_extension_group_get_attached_runloop(self),
ten_extension_group_set_property_task, self, set_property_context);
TEN_ASSERT(!rc, "Should not happen.");
}

ten_value_t *ten_extension_group_peek_property(
Expand Down Expand Up @@ -154,9 +155,10 @@ void ten_extension_group_peek_property_async(
ten_extension_group_peek_property_context_t *context =
ten_extension_group_peek_property_context_create(name, cb, cb_data);

ten_runloop_post_task_tail(ten_extension_group_get_attached_runloop(self),
ten_extension_group_peek_property_task, self,
context);
int rc = ten_runloop_post_task_tail(
ten_extension_group_get_attached_runloop(self),
ten_extension_group_peek_property_task, self, context);
TEN_ASSERT(!rc, "Should not happen.");
}

ten_value_t *ten_extension_group_peek_manifest(ten_extension_group_t *self,
Expand Down Expand Up @@ -237,7 +239,8 @@ void ten_extension_group_peek_manifest_async(
ten_extension_group_peek_manifest_context_t *context =
ten_extension_group_peek_manifest_context_create(name, cb, cb_data);

ten_runloop_post_task_tail(ten_extension_group_get_attached_runloop(self),
ten_extension_group_peek_manifest_task, self,
context);
int rc = ten_runloop_post_task_tail(
ten_extension_group_get_attached_runloop(self),
ten_extension_group_peek_manifest_task, self, context);
TEN_ASSERT(!rc, "Should not happen.");
}
Loading

0 comments on commit 4f6b51e

Please sign in to comment.