From 6441b088f665944852d40a75f6a96f2bf5bbb759 Mon Sep 17 00:00:00 2001 From: xxxxl_sun <31622273+sunxilin@users.noreply.github.com> Date: Thu, 28 Nov 2024 17:48:49 +0800 Subject: [PATCH] feat!: add error handler to the send_xxx methods in Go binding (#344) Co-authored-by: Hu Yueh-Wei --- .../binding/go/ten_env/ten_env_internal.h | 4 + .../binding/go/interface/ten/ten_env.go | 85 ++++++++++------ .../binding/go/interface/ten/ten_env.h | 10 +- .../go/interface/ten/ten_env_export.go | 27 ++++++ .../native/ten_env/ten_env_send_audio_frame.c | 97 ++++++++++++++++--- .../go/native/ten_env/ten_env_send_cmd.c | 2 +- .../go/native/ten_env/ten_env_send_data.c | 94 ++++++++++++++---- .../native/ten_env/ten_env_send_video_frame.c | 77 +++++++++++++-- .../extension/extension_a/extension.go | 6 ++ .../extension/extension_b/extension.go | 13 ++- .../extension/extension_a/extension.go | 61 +++++++++++- .../extension/addon_a/extension.go | 2 +- 12 files changed, 395 insertions(+), 83 deletions(-) diff --git a/core/include_internal/ten_runtime/binding/go/ten_env/ten_env_internal.h b/core/include_internal/ten_runtime/binding/go/ten_env/ten_env_internal.h index c0845a62e9..3400e4ea3c 100644 --- a/core/include_internal/ten_runtime/binding/go/ten_env/ten_env_internal.h +++ b/core/include_internal/ten_runtime/binding/go/ten_env/ten_env_internal.h @@ -62,6 +62,10 @@ extern void tenGoOnCmdResult(ten_go_handle_t ten_env_bridge, ten_go_handle_t result_handler, ten_go_error_t cgo_error); +extern void tenGoOnError(ten_go_handle_t ten_env_bridge, + ten_go_handle_t error_handler, + ten_go_error_t cgo_error); + TEN_RUNTIME_PRIVATE_API ten_go_callback_info_t *ten_go_callback_info_create( ten_go_handle_t handler_id); diff --git a/core/src/ten_runtime/binding/go/interface/ten/ten_env.go b/core/src/ten_runtime/binding/go/interface/ten/ten_env.go index a43045e006..83cb289740 100644 --- a/core/src/ten_runtime/binding/go/interface/ten/ten_env.go +++ b/core/src/ten_runtime/binding/go/interface/ten/ten_env.go @@ -11,7 +11,6 @@ package ten import "C" import ( - "fmt" "runtime" "strings" "unsafe" @@ -21,14 +20,18 @@ type ( // ResultHandler is a function type that represents a handler for the result // of a command. ResultHandler func(TenEnv, CmdResult, error) + + // ErrorHandler is a function type that represents a handler for errors of a + // non-command type message. + ErrorHandler func(TenEnv, error) ) // TenEnv represents the interface for the TEN (Run Time Environment) component. type TenEnv interface { SendCmd(cmd Cmd, handler ResultHandler) error - SendData(data Data) error - SendVideoFrame(videoFrame VideoFrame) error - SendAudioFrame(audioFrame AudioFrame) error + SendData(data Data, handler ErrorHandler) error + SendVideoFrame(videoFrame VideoFrame, handler ErrorHandler) error + SendAudioFrame(audioFrame AudioFrame, handler ErrorHandler) error ReturnResult(result CmdResult, cmd Cmd) error ReturnResultDirectly(result CmdResult) error @@ -193,7 +196,7 @@ func tenGoCAsyncApiCallback( }() } -func (p *tenEnv) SendData(data Data) error { +func (p *tenEnv) SendData(data Data, handler ErrorHandler) error { if data == nil { return newTenError( ErrnoInvalidArgument, @@ -203,15 +206,16 @@ func (p *tenEnv) SendData(data Data) error { defer data.keepAlive() - // Create a channel to wait for the async operation in C to complete. - done := make(chan error, 1) - callbackHandle := newGoHandle(done) + cb := goHandleNil + if handler != nil { + cb = newGoHandle(handler) + } err := withCGO(func() error { apiStatus := C.ten_go_ten_env_send_data( p.cPtr, data.getCPtr(), - C.uintptr_t(callbackHandle), + cHandle(cb), ) err := withCGoError(&apiStatus) return err @@ -219,17 +223,16 @@ func (p *tenEnv) SendData(data Data) error { if err != nil { // Clean up the handle if there was an error. - loadAndDeleteGoHandle(callbackHandle) - return err + loadAndDeleteGoHandle(cb) } - // Wait for the async operation to complete. - err = <-done - return err } -func (p *tenEnv) SendVideoFrame(videoFrame VideoFrame) error { +func (p *tenEnv) SendVideoFrame( + videoFrame VideoFrame, + handler ErrorHandler, +) error { if videoFrame == nil { return newTenError( ErrnoInvalidArgument, @@ -239,16 +242,32 @@ func (p *tenEnv) SendVideoFrame(videoFrame VideoFrame) error { defer videoFrame.keepAlive() - return withCGO(func() error { + cb := goHandleNil + if handler != nil { + cb = newGoHandle(handler) + } + + err := withCGO(func() error { apiStatus := C.ten_go_ten_env_send_video_frame( p.cPtr, videoFrame.getCPtr(), + cHandle(cb), ) return withCGoError(&apiStatus) }) + + if err != nil { + // Clean up the handle if there was an error. + loadAndDeleteGoHandle(cb) + } + + return err } -func (p *tenEnv) SendAudioFrame(audioFrame AudioFrame) error { +func (p *tenEnv) SendAudioFrame( + audioFrame AudioFrame, + handler ErrorHandler, +) error { if audioFrame == nil { return newTenError( ErrnoInvalidArgument, @@ -256,22 +275,28 @@ func (p *tenEnv) SendAudioFrame(audioFrame AudioFrame) error { ) } - res, ok := p.process(func() any { - defer audioFrame.keepAlive() + defer audioFrame.keepAlive() - if res := C.ten_go_ten_env_send_audio_frame(p.cPtr, audioFrame.getCPtr()); !res { - return newTenError( - ErrnoGeneric, - fmt.Sprintf("Failed to SendAudioFrame (%v)", audioFrame), - ) - } - return nil - }).(error) - if ok { - return res + cb := goHandleNil + if handler != nil { + cb = newGoHandle(handler) } - return nil + err := withCGO(func() error { + apiStatus := C.ten_go_ten_env_send_audio_frame( + p.cPtr, + audioFrame.getCPtr(), + cHandle(cb), + ) + return withCGoError(&apiStatus) + }) + + if err != nil { + // Clean up the handle if there was an error. + loadAndDeleteGoHandle(cb) + } + + return err } func (p *tenEnv) OnConfigureDone() error { diff --git a/core/src/ten_runtime/binding/go/interface/ten/ten_env.h b/core/src/ten_runtime/binding/go/interface/ten/ten_env.h index 4976d9eeb2..f3887de0df 100644 --- a/core/src/ten_runtime/binding/go/interface/ten/ten_env.h +++ b/core/src/ten_runtime/binding/go/interface/ten/ten_env.h @@ -48,13 +48,15 @@ ten_go_error_t ten_go_ten_env_send_cmd(uintptr_t bridge_addr, ten_go_error_t ten_go_ten_env_send_data(uintptr_t bridge_addr, uintptr_t data_bridge_addr, - uintptr_t callback_handle); + ten_go_handle_t handler_id); ten_go_error_t ten_go_ten_env_send_video_frame( - uintptr_t bridge_addr, uintptr_t video_frame_bridge_addr); + uintptr_t bridge_addr, uintptr_t video_frame_bridge_addr, + ten_go_handle_t handler_id); -bool ten_go_ten_env_send_audio_frame(uintptr_t bridge_addr, - uintptr_t audio_frame_bridge_addr); +ten_go_error_t ten_go_ten_env_send_audio_frame( + uintptr_t bridge_addr, uintptr_t audio_frame_bridge_addr, + ten_go_handle_t handler_id); bool ten_go_ten_env_is_cmd_connected(uintptr_t bridge_addr, const char *cmd_name); diff --git a/core/src/ten_runtime/binding/go/interface/ten/ten_env_export.go b/core/src/ten_runtime/binding/go/interface/ten/ten_env_export.go index db0db77bea..c85c2ac9cc 100644 --- a/core/src/ten_runtime/binding/go/interface/ten/ten_env_export.go +++ b/core/src/ten_runtime/binding/go/interface/ten/ten_env_export.go @@ -86,6 +86,33 @@ func tenGoOnCmdResult( cb.(ResultHandler)(tenEnvObj, cr, err) } +//export tenGoOnError +func tenGoOnError( + tenEnvObjID C.uintptr_t, + errorHandler C.uintptr_t, + cgoError C.ten_go_error_t, +) { + tenEnvObj, ok := handle(tenEnvObjID).get().(TenEnv) + if !ok { + panic( + fmt.Sprintf( + "Failed to get ten env from handle map, id: %d.", + uintptr(tenEnvObjID), + ), + ) + } + + cb := loadAndDeleteGoHandle(goHandle(errorHandler)) + if cb == nil || cb == goHandleNil { + // Should not happen. + panic("The error handler is not found from handle map.") + } + + err := withCGoError(&cgoError) + + cb.(ErrorHandler)(tenEnvObj, err) +} + //export tenGoSetPropertyCallback func tenGoSetPropertyCallback( tenEnvObjID C.uintptr_t, diff --git a/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_audio_frame.c b/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_audio_frame.c index 748daa0b8c..d0e9f566a3 100644 --- a/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_audio_frame.c +++ b/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_audio_frame.c @@ -6,6 +6,7 @@ // #include +#include "include_internal/ten_runtime/binding/go/internal/common.h" #include "include_internal/ten_runtime/binding/go/msg/msg.h" #include "include_internal/ten_runtime/binding/go/ten_env/ten_env.h" #include "include_internal/ten_runtime/binding/go/ten_env/ten_env_internal.h" @@ -19,10 +20,12 @@ typedef struct ten_env_notify_send_audio_frame_info_t { ten_shared_ptr_t *c_audio_frame; + ten_go_handle_t callback_handle; } ten_env_notify_send_audio_frame_info_t; static ten_env_notify_send_audio_frame_info_t * -ten_env_notify_send_audio_frame_info_create(ten_shared_ptr_t *c_audio_frame) { +ten_env_notify_send_audio_frame_info_create(ten_shared_ptr_t *c_audio_frame, + ten_go_handle_t callback_handle) { TEN_ASSERT(c_audio_frame, "Invalid argument."); ten_env_notify_send_audio_frame_info_t *info = @@ -30,6 +33,7 @@ ten_env_notify_send_audio_frame_info_create(ten_shared_ptr_t *c_audio_frame) { TEN_ASSERT(info, "Failed to allocate memory."); info->c_audio_frame = c_audio_frame; + info->callback_handle = callback_handle; return info; } @@ -43,30 +47,92 @@ static void ten_env_notify_send_audio_frame_info_destroy( info->c_audio_frame = NULL; } + info->callback_handle = 0; + TEN_FREE(info); } +static void proxy_handle_audio_frame_error( + ten_env_t *ten_env, TEN_UNUSED ten_shared_ptr_t *cmd_result, + void *callback_info_, ten_error_t *err) { + ten_go_callback_info_t *callback_info = callback_info_; + TEN_ASSERT(callback_info, "Should not happen."); + + ten_go_error_t cgo_error; + ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK); + + if (err) { + ten_go_error_from_error(&cgo_error, err); + } + + TEN_ASSERT(callback_info->callback_id != TEN_GO_NO_RESPONSE_HANDLER, + "Should not happen."); + + ten_go_ten_env_t *ten_env_bridge = ten_go_ten_env_wrap(ten_env); + + tenGoOnError(ten_env_bridge->bridge.go_instance, callback_info->callback_id, + cgo_error); + + ten_go_callback_info_destroy(callback_info); +} + static void ten_env_proxy_notify_send_audio_frame(ten_env_t *ten_env, - void *user_audio_frame) { - TEN_ASSERT(user_audio_frame, "Invalid argument."); + void *user_data) { + TEN_ASSERT(user_data, "Invalid argument."); TEN_ASSERT(ten_env && ten_env_check_integrity(ten_env, true), "Should not happen."); - ten_env_notify_send_audio_frame_info_t *notify_info = user_audio_frame; + ten_env_notify_send_audio_frame_info_t *notify_info = user_data; + TEN_ASSERT(notify_info, "Should not happen."); + + ten_go_error_t cgo_error; + ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK); ten_error_t err; ten_error_init(&err); - TEN_UNUSED bool res = ten_env_send_audio_frame( - ten_env, notify_info->c_audio_frame, NULL, NULL, NULL); + bool res = false; + + if (notify_info->callback_handle == TEN_GO_NO_RESPONSE_HANDLER) { + res = ten_env_send_audio_frame(ten_env, notify_info->c_audio_frame, NULL, + NULL, &err); + if (!res) { + // The error cannot be handled by the developer, all we can do is to log + // the error. + TEN_LOGE( + "Failed to send audio frame, but no error handler is provided." + "errno: %d, errmsg: %s", + ten_error_errno(&err), ten_error_errmsg(&err)); + } + } else { + ten_go_callback_info_t *info = + ten_go_callback_info_create(notify_info->callback_handle); + + res = ten_env_send_audio_frame(ten_env, notify_info->c_audio_frame, + proxy_handle_audio_frame_error, notify_info, + &err); + + if (!res) { + // Prepare error information to pass to Go. + ten_go_error_from_error(&cgo_error, &err); + + ten_go_ten_env_t *ten_env_bridge = ten_go_ten_env_wrap(ten_env); + + tenGoOnError(ten_env_bridge->bridge.go_instance, + notify_info->callback_handle, cgo_error); + + ten_go_callback_info_destroy(info); + } + } ten_error_deinit(&err); ten_env_notify_send_audio_frame_info_destroy(notify_info); } -bool ten_go_ten_env_send_audio_frame(uintptr_t bridge_addr, - uintptr_t audio_frame_bridge_addr) { +ten_go_error_t ten_go_ten_env_send_audio_frame( + uintptr_t bridge_addr, uintptr_t audio_frame_bridge_addr, + ten_go_handle_t handler_id) { ten_go_ten_env_t *self = ten_go_ten_env_reinterpret(bridge_addr); TEN_ASSERT(self && ten_go_ten_env_check_integrity(self), "Should not happen."); @@ -75,25 +141,30 @@ bool ten_go_ten_env_send_audio_frame(uintptr_t bridge_addr, TEN_ASSERT(audio_frame && ten_go_msg_check_integrity(audio_frame), "Should not happen."); - bool result = true; - TEN_GO_TEN_ENV_IS_ALIVE_REGION_BEGIN(self, result = false;); + ten_go_error_t cgo_error; + ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK); + + TEN_GO_TEN_ENV_IS_ALIVE_REGION_BEGIN( + self, { ten_go_error_set_errno(&cgo_error, TEN_ERRNO_TEN_IS_CLOSED); }); ten_error_t err; ten_error_init(&err); ten_env_notify_send_audio_frame_info_t *notify_info = ten_env_notify_send_audio_frame_info_create( - ten_go_msg_move_c_msg(audio_frame)); + ten_go_msg_move_c_msg(audio_frame), + handler_id <= 0 ? TEN_GO_NO_RESPONSE_HANDLER : handler_id); if (!ten_env_proxy_notify(self->c_ten_env_proxy, ten_env_proxy_notify_send_audio_frame, notify_info, false, &err)) { ten_env_notify_send_audio_frame_info_destroy(notify_info); - result = false; + ten_go_error_from_error(&cgo_error, &err); } ten_error_deinit(&err); TEN_GO_TEN_ENV_IS_ALIVE_REGION_END(self); + ten_is_close: - return result; + return cgo_error; } diff --git a/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_cmd.c b/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_cmd.c index 78e4b2ee80..fc2c1d4073 100644 --- a/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_cmd.c +++ b/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_cmd.c @@ -18,7 +18,6 @@ #include "ten_utils/lib/alloc.h" #include "ten_utils/lib/error.h" #include "ten_utils/macro/check.h" -#include "ten_utils/macro/mark.h" typedef struct ten_env_notify_send_cmd_info_t { ten_shared_ptr_t *c_cmd; @@ -61,6 +60,7 @@ static void ten_env_proxy_notify_send_cmd(ten_env_t *ten_env, void *user_data) { "Should not happen."); ten_env_notify_send_cmd_info_t *notify_info = user_data; + TEN_ASSERT(notify_info, "Should not happen."); ten_error_t err; ten_error_init(&err); diff --git a/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_data.c b/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_data.c index e2f9cd2d06..39555e56cb 100644 --- a/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_data.c +++ b/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_data.c @@ -20,22 +20,24 @@ #include "ten_utils/lib/alloc.h" #include "ten_utils/lib/error.h" #include "ten_utils/lib/smart_ptr.h" +#include "ten_utils/log/log.h" #include "ten_utils/macro/check.h" +#include "ten_utils/macro/mark.h" typedef struct ten_env_notify_send_data_info_t { - ten_go_msg_t *data_bridge; - uintptr_t callback_handle; + ten_shared_ptr_t *c_data; + ten_go_handle_t callback_handle; } ten_env_notify_send_data_info_t; static ten_env_notify_send_data_info_t *ten_env_notify_send_data_info_create( - ten_go_msg_t *data_bridge, uintptr_t callback_handle) { - TEN_ASSERT(data_bridge, "Invalid argument."); + ten_shared_ptr_t *c_data, ten_go_handle_t callback_handle) { + TEN_ASSERT(c_data, "Invalid argument."); ten_env_notify_send_data_info_t *info = TEN_MALLOC(sizeof(ten_env_notify_send_data_info_t)); TEN_ASSERT(info, "Failed to allocate memory."); - info->data_bridge = data_bridge; + info->c_data = c_data; info->callback_handle = callback_handle; return info; @@ -45,9 +47,40 @@ static void ten_env_notify_send_data_info_destroy( ten_env_notify_send_data_info_t *info) { TEN_ASSERT(info, "Invalid argument."); + if (info->c_data) { + ten_shared_ptr_destroy(info->c_data); + info->c_data = NULL; + } + + info->callback_handle = 0; + TEN_FREE(info); } +static void proxy_handle_data_error(ten_env_t *ten_env, + TEN_UNUSED ten_shared_ptr_t *cmd_result, + void *callback_info_, ten_error_t *err) { + ten_go_callback_info_t *callback_info = callback_info_; + TEN_ASSERT(callback_info, "Should not happen."); + + ten_go_error_t cgo_error; + ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK); + + if (err) { + ten_go_error_from_error(&cgo_error, err); + } + + TEN_ASSERT(callback_info->callback_id != TEN_GO_NO_RESPONSE_HANDLER, + "Should not happen."); + + ten_go_ten_env_t *ten_env_bridge = ten_go_ten_env_wrap(ten_env); + + tenGoOnError(ten_env_bridge->bridge.go_instance, callback_info->callback_id, + cgo_error); + + ten_go_callback_info_destroy(callback_info); +} + static void ten_env_proxy_notify_send_data(ten_env_t *ten_env, void *user_data) { TEN_ASSERT(user_data, "Invalid argument."); @@ -55,6 +88,7 @@ static void ten_env_proxy_notify_send_data(ten_env_t *ten_env, "Should not happen."); ten_env_notify_send_data_info_t *notify_info = user_data; + TEN_ASSERT(notify_info, "Should not happen."); ten_go_error_t cgo_error; ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK); @@ -62,35 +96,53 @@ static void ten_env_proxy_notify_send_data(ten_env_t *ten_env, ten_error_t err; ten_error_init(&err); - bool res = ten_env_send_data(ten_env, notify_info->data_bridge->c_msg, NULL, - NULL, &err); - if (res) { - // `send_data` succeeded, transferring the ownership of the data message out - // of the Go data message. - ten_shared_ptr_destroy(notify_info->data_bridge->c_msg); - notify_info->data_bridge->c_msg = NULL; + bool res = false; + + if (notify_info->callback_handle == TEN_GO_NO_RESPONSE_HANDLER) { + res = ten_env_send_data(ten_env, notify_info->c_data, NULL, NULL, &err); + if (!res) { + // The error cannot be handled by the developer, all we can do is to log + // the error. + TEN_LOGE( + "Failed to send data, but no error handler is provided. errno: %d, " + "errmsg: %s", + ten_error_errno(&err), ten_error_errmsg(&err)); + } } else { - // Prepare error information to pass to Go. - ten_go_error_from_error(&cgo_error, &err); - } + ten_go_callback_info_t *info = + ten_go_callback_info_create(notify_info->callback_handle); + res = ten_env_send_data(ten_env, notify_info->c_data, + proxy_handle_data_error, info, &err); + + if (!res) { + // Prepare error information to pass to Go. + ten_go_error_from_error(&cgo_error, &err); - // Call back into Go to signal that the async operation in C is complete. - tenGoCAsyncApiCallback(notify_info->callback_handle, cgo_error); + ten_go_ten_env_t *ten_env_bridge = ten_go_ten_env_wrap(ten_env); + + tenGoOnError(ten_env_bridge->bridge.go_instance, + notify_info->callback_handle, cgo_error); + + ten_go_callback_info_destroy(info); + } + } ten_error_deinit(&err); + // The notify_info is no longer needed. ten_env_notify_send_data_info_destroy(notify_info); } ten_go_error_t ten_go_ten_env_send_data(uintptr_t bridge_addr, uintptr_t data_bridge_addr, - uintptr_t callback_handle) { + ten_go_handle_t handler_id) { ten_go_ten_env_t *self = ten_go_ten_env_reinterpret(bridge_addr); TEN_ASSERT(self && ten_go_ten_env_check_integrity(self), "Should not happen."); ten_go_msg_t *data = ten_go_msg_reinterpret(data_bridge_addr); TEN_ASSERT(data && ten_go_msg_check_integrity(data), "Should not happen."); + TEN_ASSERT(ten_go_msg_c_msg(data), "Should not happen."); ten_go_error_t cgo_error; ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK); @@ -102,7 +154,9 @@ ten_go_error_t ten_go_ten_env_send_data(uintptr_t bridge_addr, ten_error_init(&err); ten_env_notify_send_data_info_t *notify_info = - ten_env_notify_send_data_info_create(data, callback_handle); + ten_env_notify_send_data_info_create( + ten_go_msg_move_c_msg(data), + handler_id <= 0 ? TEN_GO_NO_RESPONSE_HANDLER : handler_id); if (!ten_env_proxy_notify(self->c_ten_env_proxy, ten_env_proxy_notify_send_data, notify_info, false, @@ -112,8 +166,8 @@ ten_go_error_t ten_go_ten_env_send_data(uintptr_t bridge_addr, ten_go_error_from_error(&cgo_error, &err); } - ten_error_deinit(&err); TEN_GO_TEN_ENV_IS_ALIVE_REGION_END(self); + ten_error_deinit(&err); ten_is_close: return cgo_error; diff --git a/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_video_frame.c b/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_video_frame.c index 1ea6375888..16cbbf20de 100644 --- a/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_video_frame.c +++ b/core/src/ten_runtime/binding/go/native/ten_env/ten_env_send_video_frame.c @@ -22,10 +22,12 @@ typedef struct ten_env_notify_send_video_frame_info_t { ten_shared_ptr_t *c_video_frame; + ten_go_handle_t callback_handle; } ten_env_notify_send_video_frame_info_t; static ten_env_notify_send_video_frame_info_t * -ten_env_notify_send_video_frame_info_create(ten_shared_ptr_t *c_video_frame) { +ten_env_notify_send_video_frame_info_create(ten_shared_ptr_t *c_video_frame, + ten_go_handle_t callback_handle) { TEN_ASSERT(c_video_frame, "Invalid argument."); ten_env_notify_send_video_frame_info_t *info = @@ -33,6 +35,7 @@ ten_env_notify_send_video_frame_info_create(ten_shared_ptr_t *c_video_frame) { TEN_ASSERT(info, "Failed to allocate memory."); info->c_video_frame = c_video_frame; + info->callback_handle = callback_handle; return info; } @@ -46,9 +49,35 @@ static void ten_env_notify_send_video_frame_info_destroy( info->c_video_frame = NULL; } + info->callback_handle = 0; + TEN_FREE(info); } +static void proxy_handle_video_frame_error( + ten_env_t *ten_env, TEN_UNUSED ten_shared_ptr_t *cmd_result, + void *callback_info_, ten_error_t *err) { + ten_go_callback_info_t *callback_info = callback_info_; + TEN_ASSERT(callback_info, "Should not happen."); + + ten_go_error_t cgo_error; + ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK); + + if (err) { + ten_go_error_from_error(&cgo_error, err); + } + + TEN_ASSERT(callback_info->callback_id != TEN_GO_NO_RESPONSE_HANDLER, + "Should not happen."); + + ten_go_ten_env_t *ten_env_bridge = ten_go_ten_env_wrap(ten_env); + + tenGoOnError(ten_env_bridge->bridge.go_instance, callback_info->callback_id, + cgo_error); + + ten_go_callback_info_destroy(callback_info); +} + static void ten_env_proxy_notify_send_video_frame(ten_env_t *ten_env, void *user_video_frame) { TEN_ASSERT(user_video_frame, "Invalid argument."); @@ -56,12 +85,45 @@ static void ten_env_proxy_notify_send_video_frame(ten_env_t *ten_env, "Should not happen."); ten_env_notify_send_video_frame_info_t *notify_info = user_video_frame; + TEN_ASSERT(notify_info, "Should not happen."); + + ten_go_error_t cgo_error; + ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK); ten_error_t err; ten_error_init(&err); - TEN_UNUSED bool res = ten_env_send_video_frame( - ten_env, notify_info->c_video_frame, NULL, NULL, NULL); + bool res = false; + + if (notify_info->callback_handle == TEN_GO_NO_RESPONSE_HANDLER) { + res = ten_env_send_video_frame(ten_env, notify_info->c_video_frame, NULL, + NULL, &err); + if (!res) { + // The error cannot be handled by the developer, all we can do is to log + // the error. + TEN_LOGE( + "Failed to send video frame, but no error handler is provided. " + "errno: %d, errmsg: %s", + ten_error_errno(&err), ten_error_errmsg(&err)); + } + } else { + ten_go_callback_info_t *info = + ten_go_callback_info_create(notify_info->callback_handle); + res = ten_env_send_video_frame(ten_env, notify_info->c_video_frame, + proxy_handle_video_frame_error, info, &err); + + if (!res) { + // Prepare error information to pass to Go. + ten_go_error_from_error(&cgo_error, &err); + + ten_go_ten_env_t *ten_env_bridge = ten_go_ten_env_wrap(ten_env); + + tenGoOnError(ten_env_bridge->bridge.go_instance, + notify_info->callback_handle, cgo_error); + + ten_go_callback_info_destroy(info); + } + } ten_error_deinit(&err); @@ -69,7 +131,8 @@ static void ten_env_proxy_notify_send_video_frame(ten_env_t *ten_env, } ten_go_error_t ten_go_ten_env_send_video_frame( - uintptr_t bridge_addr, uintptr_t video_frame_bridge_addr) { + uintptr_t bridge_addr, uintptr_t video_frame_bridge_addr, + ten_go_handle_t handler_id) { ten_go_ten_env_t *self = ten_go_ten_env_reinterpret(bridge_addr); TEN_ASSERT(self && ten_go_ten_env_check_integrity(self), "Should not happen."); @@ -89,7 +152,8 @@ ten_go_error_t ten_go_ten_env_send_video_frame( ten_env_notify_send_video_frame_info_t *notify_info = ten_env_notify_send_video_frame_info_create( - ten_go_msg_move_c_msg(video_frame)); + ten_go_msg_move_c_msg(video_frame), + handler_id <= 0 ? TEN_GO_NO_RESPONSE_HANDLER : handler_id); if (!ten_env_proxy_notify(self->c_ten_env_proxy, ten_env_proxy_notify_send_video_frame, notify_info, @@ -98,8 +162,9 @@ ten_go_error_t ten_go_ten_env_send_video_frame( ten_go_error_from_error(&cgo_error, &err); } - ten_error_deinit(&err); TEN_GO_TEN_ENV_IS_ALIVE_REGION_END(self); + ten_error_deinit(&err); + ten_is_close: return cgo_error; } diff --git a/tests/ten_runtime/integration/go/frequently_cgo_call_go/frequently_cgo_call_go_app/ten_packages/extension/extension_a/extension.go b/tests/ten_runtime/integration/go/frequently_cgo_call_go/frequently_cgo_call_go_app/ten_packages/extension/extension_a/extension.go index b059535cda..ddbe80bf88 100644 --- a/tests/ten_runtime/integration/go/frequently_cgo_call_go/frequently_cgo_call_go_app/ten_packages/extension/extension_a/extension.go +++ b/tests/ten_runtime/integration/go/frequently_cgo_call_go/frequently_cgo_call_go_app/ten_packages/extension/extension_a/extension.go @@ -28,6 +28,7 @@ func newExtensionA(name string) ten.Extension { func (p *extensionA) OnStart(tenEnv ten.TenEnv) { go func() { var wg sync.WaitGroup + var counter int32 wg.Add(concurrency) @@ -43,6 +44,11 @@ func (p *extensionA) OnStart(tenEnv ten.TenEnv) { if err != nil { fmt.Printf("Error in goroutine %d: %v\n", i, err) } + + if atomic.AddInt32(&counter, 1)%5000 == 0 { + fmt.Printf("extension_a %d goroutines completed\n", counter) + } + }(i % 100) } diff --git a/tests/ten_runtime/integration/go/frequently_cgo_call_go/frequently_cgo_call_go_app/ten_packages/extension/extension_b/extension.go b/tests/ten_runtime/integration/go/frequently_cgo_call_go/frequently_cgo_call_go_app/ten_packages/extension/extension_b/extension.go index 56c29fa423..acad8f016c 100644 --- a/tests/ten_runtime/integration/go/frequently_cgo_call_go/frequently_cgo_call_go_app/ten_packages/extension/extension_b/extension.go +++ b/tests/ten_runtime/integration/go/frequently_cgo_call_go/frequently_cgo_call_go_app/ten_packages/extension/extension_b/extension.go @@ -41,11 +41,13 @@ func (p *extensionB) OnCmd( var count uint32 = 0 // An empty string in cmd is permitted. - if em, err := cmd.GetPropertyString("empty_string"); err != nil || em != "" { + if em, err := cmd.GetPropertyString("empty_string"); err != nil || + em != "" { panic("Should not happen.") } - if em, err := cmd.GetPropertyBytes("some_bytes"); err != nil || len(em) != 3 { + if em, err := cmd.GetPropertyBytes("some_bytes"); err != nil || + len(em) != 3 { panic("Should not happen.") } @@ -66,7 +68,12 @@ func (p *extensionB) OnCmd( panic("should not happen") } - if atomic.AddUint32(&count, 1) == concurrency { + total := atomic.AddUint32(&count, 1) + if total%5000 == 0 { + fmt.Printf("extension_b %d goroutine done\n", total) + } + + if total == concurrency { done <- struct{}{} } }(i % 100) diff --git a/tests/ten_runtime/integration/go/no_dest_go/no_dest_go_app/ten_packages/extension/extension_a/extension.go b/tests/ten_runtime/integration/go/no_dest_go/no_dest_go_app/ten_packages/extension/extension_a/extension.go index c30fc458f5..064493abea 100644 --- a/tests/ten_runtime/integration/go/no_dest_go/no_dest_go_app/ten_packages/extension/extension_a/extension.go +++ b/tests/ten_runtime/integration/go/no_dest_go/no_dest_go_app/ten_packages/extension/extension_a/extension.go @@ -16,10 +16,10 @@ type baseExtension struct { ten.DefaultExtension } -func (ext *baseExtension) OnStart(tenEnv ten.TenEnv) { +func (ext *aExtension) OnStart(tenEnv ten.TenEnv) { tenEnv.LogDebug("OnStart") - noDestCmd, _ := ten.NewCmd("unknown") + noDestCmd, _ := ten.NewCmd("unknownCmd") tenEnv.SendCmd(noDestCmd, func(te ten.TenEnv, cr ten.CmdResult, err error) { if err == nil { panic("SendCmd should fail if no destination is found.") @@ -27,11 +27,60 @@ func (ext *baseExtension) OnStart(tenEnv ten.TenEnv) { tenEnv.LogInfo("SendCmd failed as expected, err: " + err.Error()) - tenEnv.OnStartDone() + ext.counter++ + + if ext.counter == 4 { + tenEnv.OnStartDone() + } + }) + + noDestData, _ := ten.NewData("unknownData") + tenEnv.SendData(noDestData, func(te ten.TenEnv, err error) { + if err == nil { + panic("SendData should fail if no destination is found.") + } + + tenEnv.LogInfo("SendData failed as expected, err: " + err.Error()) + + ext.counter++ + + if ext.counter == 4 { + tenEnv.OnStartDone() + } + }) + + noDestVideoFrame, _ := ten.NewVideoFrame("unknownVideoFrame") + tenEnv.SendVideoFrame(noDestVideoFrame, func(te ten.TenEnv, err error) { + if err == nil { + panic("SendVideoFrame should fail if no destination is found.") + } + + tenEnv.LogInfo("SendVideoFrame failed as expected, err: " + err.Error()) + + ext.counter++ + + if ext.counter == 4 { + tenEnv.OnStartDone() + } + }) + + noDestAudioFrame, _ := ten.NewAudioFrame("unknownAudioFrame") + tenEnv.SendAudioFrame(noDestAudioFrame, func(te ten.TenEnv, err error) { + if err == nil { + panic("SendAudioFrame should fail if no destination is found.") + } + + tenEnv.LogInfo("SendAudioFrame failed as expected, err: " + err.Error()) + + ext.counter++ + + if ext.counter == 4 { + tenEnv.OnStartDone() + } }) } -func (ext *baseExtension) OnStop(tenEnv ten.TenEnv) { +func (ext *aExtension) OnStop(tenEnv ten.TenEnv) { tenEnv.LogDebug("OnStop") tenEnv.OnStopDone() @@ -39,10 +88,12 @@ func (ext *baseExtension) OnStop(tenEnv ten.TenEnv) { type aExtension struct { baseExtension + + counter int } func newAExtension(name string) ten.Extension { - return &aExtension{} + return &aExtension{counter: 0} } func (p *aExtension) OnCmd( diff --git a/tests/ten_runtime/integration/go/two_extension_one_group_cmd_go/two_extension_one_group_cmd_go_app/ten_packages/extension/addon_a/extension.go b/tests/ten_runtime/integration/go/two_extension_one_group_cmd_go/two_extension_one_group_cmd_go_app/ten_packages/extension/addon_a/extension.go index 3044c46b23..59a6ef903b 100644 --- a/tests/ten_runtime/integration/go/two_extension_one_group_cmd_go/two_extension_one_group_cmd_go_app/ten_packages/extension/addon_a/extension.go +++ b/tests/ten_runtime/integration/go/two_extension_one_group_cmd_go/two_extension_one_group_cmd_go_app/ten_packages/extension/addon_a/extension.go @@ -92,7 +92,7 @@ func (p *aExtension) OnCmd( panic("data SetPropertyBytes failed") } - err = tenEnv.SendData(data) + err = tenEnv.SendData(data, nil) if err != nil { panic("aExtension SendData failed") }