Skip to content

Commit

Permalink
feat!: provide error_handler in send_data/A/Vframe (#341)
Browse files Browse the repository at this point in the history
* feat: provide error_handler in send_data/A/Vframe

* feat: add error param in callback of send_cmd in go binding

* fix: refine codes

---------

Co-authored-by: Hu Yueh-Wei <[email protected]>
  • Loading branch information
sunxilin and halajohn authored Nov 28, 2024
1 parent 4e40411 commit 88859c2
Show file tree
Hide file tree
Showing 189 changed files with 2,358 additions and 1,752 deletions.
116 changes: 99 additions & 17 deletions core/include/ten_runtime/binding/cpp/detail/ten_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "ten_utils/lib/error.h"
#include "ten_utils/log/log.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/macro/mark.h"
#include "ten_utils/value/value.h"
#include "ten_utils/value/value_json.h"

Expand All @@ -47,7 +46,9 @@ class ten_env_proxy_t;
class ten_env_internal_accessor_t;

using result_handler_func_t =
std::function<void(ten_env_t &, std::unique_ptr<cmd_result_t>)>;
std::function<void(ten_env_t &, std::unique_ptr<cmd_result_t>, error_t *)>;

using error_handler_func_t = std::function<void(ten_env_t &, error_t *)>;

class ten_env_t {
public:
Expand All @@ -72,7 +73,9 @@ class ten_env_t {
err);
}

bool send_data(std::unique_ptr<data_t> &&data, error_t *err = nullptr) {
bool send_data(std::unique_ptr<data_t> &&data,
error_handler_func_t &&error_handler = nullptr,
error_t *err = nullptr) {
TEN_ASSERT(c_ten_env && data, "Should not happen.");

if (!data) {
Expand All @@ -88,20 +91,39 @@ class ten_env_t {
return false;
}

auto rc = ten_env_send_data(
c_ten_env, data->get_underlying_msg(),
err != nullptr ? err->get_internal_representation() : nullptr);
auto rc = false;

if (error_handler == nullptr) {
rc = ten_env_send_data(
c_ten_env, data->get_underlying_msg(), nullptr, nullptr,
err != nullptr ? err->get_internal_representation() : nullptr);
} else {
auto *error_handler_ptr =
new error_handler_func_t(std::move(error_handler));

rc = ten_env_send_data(
c_ten_env, data->get_underlying_msg(), proxy_handle_error,
error_handler_ptr,
err != nullptr ? err->get_internal_representation() : nullptr);
if (!rc) {
delete error_handler_ptr;
}
}

if (rc) {
// Only when the data has been sent successfully, we should give back
// the ownership of the data message to the TEN runtime.
auto *cpp_data_ptr = data.release();
delete cpp_data_ptr;
} else {
TEN_LOGE("Failed to send_data: %s.", data->get_name());
}

return rc;
}

bool send_video_frame(std::unique_ptr<video_frame_t> &&frame,
error_handler_func_t &&error_handler = nullptr,
error_t *err = nullptr) {
TEN_ASSERT(c_ten_env, "Should not happen.");

Expand All @@ -110,21 +132,39 @@ class ten_env_t {
return false;
}

auto rc = ten_env_send_video_frame(
c_ten_env, frame->get_underlying_msg(),
err != nullptr ? err->get_internal_representation() : nullptr);
auto rc = false;

if (error_handler == nullptr) {
rc = ten_env_send_video_frame(
c_ten_env, frame->get_underlying_msg(), nullptr, nullptr,
err != nullptr ? err->get_internal_representation() : nullptr);
} else {
auto *error_handler_ptr =
new error_handler_func_t(std::move(error_handler));

rc = ten_env_send_video_frame(
c_ten_env, frame->get_underlying_msg(), proxy_handle_error,
error_handler_ptr,
err != nullptr ? err->get_internal_representation() : nullptr);
if (!rc) {
delete error_handler_ptr;
}
}

if (rc) {
// Only when the message has been sent successfully, we should give back
// the ownership of the message to the TEN runtime.
auto *cpp_frame_ptr = frame.release();
delete cpp_frame_ptr;
} else {
TEN_LOGE("Failed to send_video_frame %s.", frame->get_name());
}

return rc;
}

bool send_audio_frame(std::unique_ptr<audio_frame_t> &&frame,
error_handler_func_t &&error_handler = nullptr,
error_t *err = nullptr) {
TEN_ASSERT(c_ten_env, "Should not happen.");

Expand All @@ -133,15 +173,32 @@ class ten_env_t {
return false;
}

auto rc = ten_env_send_audio_frame(
c_ten_env, frame->get_underlying_msg(),
err != nullptr ? err->get_internal_representation() : nullptr);
auto rc = false;

if (error_handler == nullptr) {
rc = ten_env_send_audio_frame(
c_ten_env, frame->get_underlying_msg(), nullptr, nullptr,
err != nullptr ? err->get_internal_representation() : nullptr);
} else {
auto *error_handler_ptr =
new error_handler_func_t(std::move(error_handler));

rc = ten_env_send_audio_frame(
c_ten_env, frame->get_underlying_msg(), proxy_handle_error,
error_handler_ptr,
err != nullptr ? err->get_internal_representation() : nullptr);
if (!rc) {
delete error_handler_ptr;
}
}

if (rc) {
// Only when the message has been sent successfully, we should give back
// the ownership of the message to the TEN runtime.
auto *cpp_frame_ptr = frame.release();
delete cpp_frame_ptr;
} else {
TEN_LOGE("Failed to send_audio_frame %s.", frame->get_name());
}

return rc;
Expand Down Expand Up @@ -675,10 +732,9 @@ class ten_env_t {
return rc;
}

static void proxy_handle_result(TEN_UNUSED ten_extension_t *extension,
::ten_env_t *ten_env,
ten_shared_ptr_t *c_cmd_result,
void *cb_data) {
static void proxy_handle_result(::ten_env_t *ten_env,
ten_shared_ptr_t *c_cmd_result, void *cb_data,
ten_error_t *err) {
auto *result_handler = static_cast<result_handler_func_t *>(cb_data);
auto *cpp_ten_env =
static_cast<ten_env_t *>(ten_binding_handle_get_me_in_target_lang(
Expand All @@ -700,7 +756,12 @@ class ten_env_t {
// executing, processing should be based on this cached value.
bool is_completed = ten_cmd_result_is_completed(c_cmd_result, nullptr);

(*result_handler)(*cpp_ten_env, std::move(cmd_result));
if (err != nullptr) {
error_t cpp_err(err, false);
(*result_handler)(*cpp_ten_env, std::move(cmd_result), &cpp_err);
} else {
(*result_handler)(*cpp_ten_env, std::move(cmd_result), nullptr);
}

if (is_completed) {
// Only when is_final is true should the result handler be cleared.
Expand All @@ -709,6 +770,27 @@ class ten_env_t {
delete result_handler;
}
}

static void proxy_handle_error(::ten_env_t *ten_env,
ten_shared_ptr_t *c_cmd_result, void *cb_data,
ten_error_t *err) {
TEN_ASSERT(c_cmd_result == nullptr, "Should not happen.");

auto *error_handler = static_cast<error_handler_func_t *>(cb_data);
auto *cpp_ten_env =
static_cast<ten_env_t *>(ten_binding_handle_get_me_in_target_lang(
reinterpret_cast<ten_binding_handle_t *>(ten_env)));

if (err == nullptr) {
(*error_handler)(*cpp_ten_env, nullptr);
} else {
error_t cpp_err(err, false);
(*error_handler)(*cpp_ten_env, &cpp_err);
}

// The error handler should be cleared.
delete error_handler;
};
};

} // namespace ten
25 changes: 17 additions & 8 deletions core/include/ten_runtime/binding/cpp/detail/test/env_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace ten {
class ten_env_tester_t;
class extension_tester_t;

using ten_env_tester_send_cmd_result_handler_func_t =
std::function<void(ten_env_tester_t &, std::unique_ptr<cmd_result_t>)>;
using ten_env_tester_send_cmd_result_handler_func_t = std::function<void(
ten_env_tester_t &, std::unique_ptr<cmd_result_t>, error_t *)>;

class ten_env_tester_t {
public:
Expand Down Expand Up @@ -194,19 +194,28 @@ class ten_env_tester_t {
}

static void proxy_handle_result(::ten_env_tester_t *c_ten_env_tester,
ten_shared_ptr_t *c_cmd_result,
void *cb_data) {
ten_shared_ptr_t *c_cmd_result, void *cb_data,
ten_error_t *err) {
auto *result_handler =
static_cast<ten_env_tester_send_cmd_result_handler_func_t *>(cb_data);
auto *cpp_ten_env_tester = static_cast<ten_env_tester_t *>(
ten_binding_handle_get_me_in_target_lang(
reinterpret_cast<ten_binding_handle_t *>(c_ten_env_tester)));

auto cmd_result = cmd_result_t::create(
// Clone a C shared_ptr to be owned by the C++ instance.
ten_shared_ptr_clone(c_cmd_result));
std::unique_ptr<cmd_result_t> cmd_result = nullptr;

(*result_handler)(*cpp_ten_env_tester, std::move(cmd_result));
if (c_cmd_result != nullptr) {
cmd_result = cmd_result_t::create(
// Clone a C shared_ptr to be owned by the C++ instance.
ten_shared_ptr_clone(c_cmd_result));
}

if (err != nullptr) {
error_t cpp_err(err, false);
(*result_handler)(*cpp_ten_env_tester, std::move(cmd_result), &cpp_err);
} else {
(*result_handler)(*cpp_ten_env_tester, std::move(cmd_result), nullptr);
}

if (ten_cmd_result_is_final(c_cmd_result, nullptr)) {
// Only when is_final is true should the result handler be cleared.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace ten {

using ten_client_proxy_send_cmd_result_handler_func_t =
std::function<void(std::unique_ptr<ten::cmd_result_t>)>;
std::function<void(std::unique_ptr<ten::cmd_result_t>, error_t *err)>;

class ten_client_proxy_event_handler_t {
public:
Expand Down Expand Up @@ -86,12 +86,13 @@ class ten_client_proxy_internal_impl_t : public ten::extension_tester_t {
}
}

static void proxy_on_cmd_response(
static void proxy_on_cmd_result(
std::unique_ptr<ten::cmd_result_t> cmd_result,
const ten::ten_client_proxy_send_cmd_result_handler_func_t
&result_handler) {
&result_handler,
error_t *err) {
if (result_handler) {
result_handler(std::move(cmd_result));
result_handler(std::move(cmd_result), err);
}
}

Expand All @@ -118,8 +119,9 @@ class ten_client_proxy_internal_impl_t : public ten::extension_tester_t {
ten_env_tester.send_cmd(
std::move(*cmd_shared),
[result_handler](ten::ten_env_tester_t & /*ten_env_tester*/,
std::unique_ptr<ten::cmd_result_t> cmd_result) {
proxy_on_cmd_response(std::move(cmd_result), result_handler);
std::unique_ptr<ten::cmd_result_t> cmd_result,
error_t *err) {
proxy_on_cmd_result(std::move(cmd_result), result_handler, err);
});
},
nullptr);
Expand Down
33 changes: 18 additions & 15 deletions core/include/ten_runtime/ten_env/internal/send.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,41 @@
#include <stdbool.h>

#include "ten_utils/lib/error.h"
#include "ten_utils/lib/json.h"
#include "ten_utils/lib/smart_ptr.h"

typedef struct ten_env_t ten_env_t;
typedef struct ten_extension_t ten_extension_t;

typedef void (*ten_env_cmd_result_handler_func_t)(
ten_extension_t *extension, ten_env_t *ten_env,
ten_shared_ptr_t *cmd_result, void *cmd_result_handler_user_data);
typedef void (*ten_env_msg_result_handler_func_t)(
ten_env_t *ten_env, ten_shared_ptr_t *cmd_result,
void *cmd_result_handler_user_data, ten_error_t *err);

typedef bool (*ten_env_send_cmd_func_t)(
ten_env_t *self, ten_shared_ptr_t *cmd,
ten_env_cmd_result_handler_func_t result_handler,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_cmd(
ten_env_t *self, ten_shared_ptr_t *cmd,
ten_env_cmd_result_handler_func_t result_handler,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_cmd_ex(
ten_env_t *self, ten_shared_ptr_t *cmd,
ten_env_cmd_result_handler_func_t result_handler,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_data(ten_env_t *self, ten_shared_ptr_t *data,
ten_error_t *err);
TEN_RUNTIME_API bool ten_env_send_data(
ten_env_t *self, ten_shared_ptr_t *data,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_video_frame(ten_env_t *self,
ten_shared_ptr_t *frame,
ten_error_t *err);
TEN_RUNTIME_API bool ten_env_send_video_frame(
ten_env_t *self, ten_shared_ptr_t *frame,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_audio_frame(ten_env_t *self,
ten_shared_ptr_t *frame,
ten_error_t *err);
TEN_RUNTIME_API bool ten_env_send_audio_frame(
ten_env_t *self, ten_shared_ptr_t *frame,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);
3 changes: 2 additions & 1 deletion core/include/ten_runtime/test/env_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ TEN_RUNTIME_API bool ten_env_tester_on_start_done(ten_env_tester_t *self,
ten_error_t *err);

typedef void (*ten_env_tester_cmd_result_handler_func_t)(
ten_env_tester_t *self, ten_shared_ptr_t *cmd_result, void *user_data);
ten_env_tester_t *self, ten_shared_ptr_t *cmd_result, void *user_data,
ten_error_t *error);

TEN_RUNTIME_API bool ten_env_tester_send_cmd(
ten_env_tester_t *self, ten_shared_ptr_t *cmd,
Expand Down
18 changes: 9 additions & 9 deletions core/include_internal/ten_runtime/binding/go/internal/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ TEN_RUNTIME_PRIVATE_API void ten_go_bridge_destroy_c_part(
TEN_RUNTIME_PRIVATE_API void ten_go_bridge_destroy_go_part(
ten_go_bridge_t *self);

TEN_RUNTIME_PRIVATE_API void ten_go_status_init_with_errno(
ten_go_status_t *self, ten_errno_t errno);
TEN_RUNTIME_PRIVATE_API void ten_go_error_init_with_errno(ten_go_error_t *self,
ten_errno_t errno);

TEN_RUNTIME_PRIVATE_API void ten_go_status_from_error(ten_go_status_t *self,
ten_error_t *err);
TEN_RUNTIME_PRIVATE_API void ten_go_error_from_error(ten_go_error_t *self,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API void ten_go_status_set_errno(ten_go_status_t *self,
ten_errno_t errno);
TEN_RUNTIME_PRIVATE_API void ten_go_error_set_errno(ten_go_error_t *self,
ten_errno_t errno);

TEN_RUNTIME_PRIVATE_API void ten_go_status_set(ten_go_status_t *self,
ten_errno_t errno,
const char *msg);
TEN_RUNTIME_PRIVATE_API void ten_go_error_set(ten_go_error_t *self,
ten_errno_t errno,
const char *msg);
4 changes: 2 additions & 2 deletions core/include_internal/ten_runtime/binding/go/internal/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
#include "ten_utils/lib/json.h"

TEN_RUNTIME_PRIVATE_API ten_json_t *ten_go_json_loads(const void *json_bytes,
int json_bytes_len,
ten_go_status_t *status);
int json_bytes_len,
ten_go_error_t *status);
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ TEN_RUNTIME_PRIVATE_API ten_go_handle_t
ten_go_ten_env_go_handle(ten_go_ten_env_t *self);

extern void tenGoCAsyncApiCallback(ten_go_handle_t callback,
ten_go_status_t status);
ten_go_error_t cgo_error);
Loading

0 comments on commit 88859c2

Please sign in to comment.