Skip to content

Commit

Permalink
feat!: add error handler to the send_xxx methods in Go binding (#344)
Browse files Browse the repository at this point in the history
Co-authored-by: Hu Yueh-Wei <[email protected]>
  • Loading branch information
sunxilin and halajohn authored Nov 28, 2024
1 parent e219331 commit 6441b08
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ extern void tenGoOnCmdResult(ten_go_handle_t ten_env_bridge,
ten_go_handle_t result_handler,
ten_go_error_t cgo_error);

extern void tenGoOnError(ten_go_handle_t ten_env_bridge,
ten_go_handle_t error_handler,
ten_go_error_t cgo_error);

TEN_RUNTIME_PRIVATE_API ten_go_callback_info_t *ten_go_callback_info_create(
ten_go_handle_t handler_id);

Expand Down
85 changes: 55 additions & 30 deletions core/src/ten_runtime/binding/go/interface/ten/ten_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package ten
import "C"

import (
"fmt"
"runtime"
"strings"
"unsafe"
Expand All @@ -21,14 +20,18 @@ type (
// ResultHandler is a function type that represents a handler for the result
// of a command.
ResultHandler func(TenEnv, CmdResult, error)

// ErrorHandler is a function type that represents a handler for errors of a
// non-command type message.
ErrorHandler func(TenEnv, error)
)

// TenEnv represents the interface for the TEN (Run Time Environment) component.
type TenEnv interface {
SendCmd(cmd Cmd, handler ResultHandler) error
SendData(data Data) error
SendVideoFrame(videoFrame VideoFrame) error
SendAudioFrame(audioFrame AudioFrame) error
SendData(data Data, handler ErrorHandler) error
SendVideoFrame(videoFrame VideoFrame, handler ErrorHandler) error
SendAudioFrame(audioFrame AudioFrame, handler ErrorHandler) error

ReturnResult(result CmdResult, cmd Cmd) error
ReturnResultDirectly(result CmdResult) error
Expand Down Expand Up @@ -193,7 +196,7 @@ func tenGoCAsyncApiCallback(
}()
}

func (p *tenEnv) SendData(data Data) error {
func (p *tenEnv) SendData(data Data, handler ErrorHandler) error {
if data == nil {
return newTenError(
ErrnoInvalidArgument,
Expand All @@ -203,33 +206,33 @@ func (p *tenEnv) SendData(data Data) error {

defer data.keepAlive()

// Create a channel to wait for the async operation in C to complete.
done := make(chan error, 1)
callbackHandle := newGoHandle(done)
cb := goHandleNil
if handler != nil {
cb = newGoHandle(handler)
}

err := withCGO(func() error {
apiStatus := C.ten_go_ten_env_send_data(
p.cPtr,
data.getCPtr(),
C.uintptr_t(callbackHandle),
cHandle(cb),
)
err := withCGoError(&apiStatus)
return err
})

if err != nil {
// Clean up the handle if there was an error.
loadAndDeleteGoHandle(callbackHandle)
return err
loadAndDeleteGoHandle(cb)
}

// Wait for the async operation to complete.
err = <-done

return err
}

func (p *tenEnv) SendVideoFrame(videoFrame VideoFrame) error {
func (p *tenEnv) SendVideoFrame(
videoFrame VideoFrame,
handler ErrorHandler,
) error {
if videoFrame == nil {
return newTenError(
ErrnoInvalidArgument,
Expand All @@ -239,39 +242,61 @@ func (p *tenEnv) SendVideoFrame(videoFrame VideoFrame) error {

defer videoFrame.keepAlive()

return withCGO(func() error {
cb := goHandleNil
if handler != nil {
cb = newGoHandle(handler)
}

err := withCGO(func() error {
apiStatus := C.ten_go_ten_env_send_video_frame(
p.cPtr,
videoFrame.getCPtr(),
cHandle(cb),
)
return withCGoError(&apiStatus)
})

if err != nil {
// Clean up the handle if there was an error.
loadAndDeleteGoHandle(cb)
}

return err
}

func (p *tenEnv) SendAudioFrame(audioFrame AudioFrame) error {
func (p *tenEnv) SendAudioFrame(
audioFrame AudioFrame,
handler ErrorHandler,
) error {
if audioFrame == nil {
return newTenError(
ErrnoInvalidArgument,
"audioFrame is required.",
)
}

res, ok := p.process(func() any {
defer audioFrame.keepAlive()
defer audioFrame.keepAlive()

if res := C.ten_go_ten_env_send_audio_frame(p.cPtr, audioFrame.getCPtr()); !res {
return newTenError(
ErrnoGeneric,
fmt.Sprintf("Failed to SendAudioFrame (%v)", audioFrame),
)
}
return nil
}).(error)
if ok {
return res
cb := goHandleNil
if handler != nil {
cb = newGoHandle(handler)
}

return nil
err := withCGO(func() error {
apiStatus := C.ten_go_ten_env_send_audio_frame(
p.cPtr,
audioFrame.getCPtr(),
cHandle(cb),
)
return withCGoError(&apiStatus)
})

if err != nil {
// Clean up the handle if there was an error.
loadAndDeleteGoHandle(cb)
}

return err
}

func (p *tenEnv) OnConfigureDone() error {
Expand Down
10 changes: 6 additions & 4 deletions core/src/ten_runtime/binding/go/interface/ten/ten_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ ten_go_error_t ten_go_ten_env_send_cmd(uintptr_t bridge_addr,

ten_go_error_t ten_go_ten_env_send_data(uintptr_t bridge_addr,
uintptr_t data_bridge_addr,
uintptr_t callback_handle);
ten_go_handle_t handler_id);

ten_go_error_t ten_go_ten_env_send_video_frame(
uintptr_t bridge_addr, uintptr_t video_frame_bridge_addr);
uintptr_t bridge_addr, uintptr_t video_frame_bridge_addr,
ten_go_handle_t handler_id);

bool ten_go_ten_env_send_audio_frame(uintptr_t bridge_addr,
uintptr_t audio_frame_bridge_addr);
ten_go_error_t ten_go_ten_env_send_audio_frame(
uintptr_t bridge_addr, uintptr_t audio_frame_bridge_addr,
ten_go_handle_t handler_id);

bool ten_go_ten_env_is_cmd_connected(uintptr_t bridge_addr,
const char *cmd_name);
Expand Down
27 changes: 27 additions & 0 deletions core/src/ten_runtime/binding/go/interface/ten/ten_env_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,33 @@ func tenGoOnCmdResult(
cb.(ResultHandler)(tenEnvObj, cr, err)
}

//export tenGoOnError
func tenGoOnError(
tenEnvObjID C.uintptr_t,
errorHandler C.uintptr_t,
cgoError C.ten_go_error_t,
) {
tenEnvObj, ok := handle(tenEnvObjID).get().(TenEnv)
if !ok {
panic(
fmt.Sprintf(
"Failed to get ten env from handle map, id: %d.",
uintptr(tenEnvObjID),
),
)
}

cb := loadAndDeleteGoHandle(goHandle(errorHandler))
if cb == nil || cb == goHandleNil {
// Should not happen.
panic("The error handler is not found from handle map.")
}

err := withCGoError(&cgoError)

cb.(ErrorHandler)(tenEnvObj, err)
}

//export tenGoSetPropertyCallback
func tenGoSetPropertyCallback(
tenEnvObjID C.uintptr_t,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//
#include <stdlib.h>

#include "include_internal/ten_runtime/binding/go/internal/common.h"
#include "include_internal/ten_runtime/binding/go/msg/msg.h"
#include "include_internal/ten_runtime/binding/go/ten_env/ten_env.h"
#include "include_internal/ten_runtime/binding/go/ten_env/ten_env_internal.h"
Expand All @@ -19,17 +20,20 @@

typedef struct ten_env_notify_send_audio_frame_info_t {
ten_shared_ptr_t *c_audio_frame;
ten_go_handle_t callback_handle;
} ten_env_notify_send_audio_frame_info_t;

static ten_env_notify_send_audio_frame_info_t *
ten_env_notify_send_audio_frame_info_create(ten_shared_ptr_t *c_audio_frame) {
ten_env_notify_send_audio_frame_info_create(ten_shared_ptr_t *c_audio_frame,
ten_go_handle_t callback_handle) {
TEN_ASSERT(c_audio_frame, "Invalid argument.");

ten_env_notify_send_audio_frame_info_t *info =
TEN_MALLOC(sizeof(ten_env_notify_send_audio_frame_info_t));
TEN_ASSERT(info, "Failed to allocate memory.");

info->c_audio_frame = c_audio_frame;
info->callback_handle = callback_handle;

return info;
}
Expand All @@ -43,30 +47,92 @@ static void ten_env_notify_send_audio_frame_info_destroy(
info->c_audio_frame = NULL;
}

info->callback_handle = 0;

TEN_FREE(info);
}

static void proxy_handle_audio_frame_error(
ten_env_t *ten_env, TEN_UNUSED ten_shared_ptr_t *cmd_result,
void *callback_info_, ten_error_t *err) {
ten_go_callback_info_t *callback_info = callback_info_;
TEN_ASSERT(callback_info, "Should not happen.");

ten_go_error_t cgo_error;
ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK);

if (err) {
ten_go_error_from_error(&cgo_error, err);
}

TEN_ASSERT(callback_info->callback_id != TEN_GO_NO_RESPONSE_HANDLER,
"Should not happen.");

ten_go_ten_env_t *ten_env_bridge = ten_go_ten_env_wrap(ten_env);

tenGoOnError(ten_env_bridge->bridge.go_instance, callback_info->callback_id,
cgo_error);

ten_go_callback_info_destroy(callback_info);
}

static void ten_env_proxy_notify_send_audio_frame(ten_env_t *ten_env,
void *user_audio_frame) {
TEN_ASSERT(user_audio_frame, "Invalid argument.");
void *user_data) {
TEN_ASSERT(user_data, "Invalid argument.");
TEN_ASSERT(ten_env && ten_env_check_integrity(ten_env, true),
"Should not happen.");

ten_env_notify_send_audio_frame_info_t *notify_info = user_audio_frame;
ten_env_notify_send_audio_frame_info_t *notify_info = user_data;
TEN_ASSERT(notify_info, "Should not happen.");

ten_go_error_t cgo_error;
ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK);

ten_error_t err;
ten_error_init(&err);

TEN_UNUSED bool res = ten_env_send_audio_frame(
ten_env, notify_info->c_audio_frame, NULL, NULL, NULL);
bool res = false;

if (notify_info->callback_handle == TEN_GO_NO_RESPONSE_HANDLER) {
res = ten_env_send_audio_frame(ten_env, notify_info->c_audio_frame, NULL,
NULL, &err);
if (!res) {
// The error cannot be handled by the developer, all we can do is to log
// the error.
TEN_LOGE(
"Failed to send audio frame, but no error handler is provided."
"errno: %d, errmsg: %s",
ten_error_errno(&err), ten_error_errmsg(&err));
}
} else {
ten_go_callback_info_t *info =
ten_go_callback_info_create(notify_info->callback_handle);

res = ten_env_send_audio_frame(ten_env, notify_info->c_audio_frame,
proxy_handle_audio_frame_error, notify_info,
&err);

if (!res) {
// Prepare error information to pass to Go.
ten_go_error_from_error(&cgo_error, &err);

ten_go_ten_env_t *ten_env_bridge = ten_go_ten_env_wrap(ten_env);

tenGoOnError(ten_env_bridge->bridge.go_instance,
notify_info->callback_handle, cgo_error);

ten_go_callback_info_destroy(info);
}
}

ten_error_deinit(&err);

ten_env_notify_send_audio_frame_info_destroy(notify_info);
}

bool ten_go_ten_env_send_audio_frame(uintptr_t bridge_addr,
uintptr_t audio_frame_bridge_addr) {
ten_go_error_t ten_go_ten_env_send_audio_frame(
uintptr_t bridge_addr, uintptr_t audio_frame_bridge_addr,
ten_go_handle_t handler_id) {
ten_go_ten_env_t *self = ten_go_ten_env_reinterpret(bridge_addr);
TEN_ASSERT(self && ten_go_ten_env_check_integrity(self),
"Should not happen.");
Expand All @@ -75,25 +141,30 @@ bool ten_go_ten_env_send_audio_frame(uintptr_t bridge_addr,
TEN_ASSERT(audio_frame && ten_go_msg_check_integrity(audio_frame),
"Should not happen.");

bool result = true;
TEN_GO_TEN_ENV_IS_ALIVE_REGION_BEGIN(self, result = false;);
ten_go_error_t cgo_error;
ten_go_error_init_with_errno(&cgo_error, TEN_ERRNO_OK);

TEN_GO_TEN_ENV_IS_ALIVE_REGION_BEGIN(
self, { ten_go_error_set_errno(&cgo_error, TEN_ERRNO_TEN_IS_CLOSED); });

ten_error_t err;
ten_error_init(&err);

ten_env_notify_send_audio_frame_info_t *notify_info =
ten_env_notify_send_audio_frame_info_create(
ten_go_msg_move_c_msg(audio_frame));
ten_go_msg_move_c_msg(audio_frame),
handler_id <= 0 ? TEN_GO_NO_RESPONSE_HANDLER : handler_id);

if (!ten_env_proxy_notify(self->c_ten_env_proxy,
ten_env_proxy_notify_send_audio_frame, notify_info,
false, &err)) {
ten_env_notify_send_audio_frame_info_destroy(notify_info);
result = false;
ten_go_error_from_error(&cgo_error, &err);
}

ten_error_deinit(&err);
TEN_GO_TEN_ENV_IS_ALIVE_REGION_END(self);

ten_is_close:
return result;
return cgo_error;
}
Loading

0 comments on commit 6441b08

Please sign in to comment.