Skip to content

Commit

Permalink
Move common implementation from ST2110Tx/ST2110Rx to ST2110
Browse files Browse the repository at this point in the history
  • Loading branch information
tszumski committed Dec 3, 2024
1 parent b154f4b commit 94c59be
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 243 deletions.
128 changes: 107 additions & 21 deletions media-proxy/include/mesh/st2110.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,125 @@ namespace mesh::connection {
#define ST_APP_PAYLOAD_TYPE_ST20 (112)
#define ST_APP_PAYLOAD_TYPE_ST22 (114)

int mesh_video_format_to_st_format(int fmt, st_frame_fmt& st_fmt);
int mesh_audio_format_to_st_format(int fmt, st30_fmt& st_fmt);
int mesh_audio_sampling_to_st_sampling(int sampling, st30_sampling& st_sampling);
int mesh_audio_ptime_to_st_ptime(int ptime, st30_ptime& st_ptime);

void *get_frame_data_ptr(st_frame *src);
void *get_frame_data_ptr(st30_frame *src);

void get_mtl_dev_params(mtl_init_params& st_param, const std::string& dev_port,
mtl_log_level log_level, const char local_ip_addr[MESH_IP_ADDRESS_SIZE]);
mtl_handle get_mtl_device(const std::string& dev_port, mtl_log_level log_level,
const char local_ip_addr[MESH_IP_ADDRESS_SIZE], int& session_id);

/**
* ST2110
*
* Base abstract class of SPMTE ST2110-xx bridge. ST2110Rx/ST2110Tx
* inherit this class.
*/
class ST2110 : public Connection {
template <typename FRAME, typename HANDLE, typename OPS> class ST2110 : public Connection {
public:
static int mesh_video_format_to_st_format(int fmt, st_frame_fmt& st_fmt);
static int mesh_audio_format_to_st_format(int fmt, st30_fmt& st_fmt);
static int mesh_audio_sampling_to_st_sampling(int sampling, st30_sampling& st_sampling);
static int mesh_audio_ptime_to_st_ptime(int ptime, st30_ptime& st_ptime);
virtual ~ST2110() {
shutdown(_ctx);
if (ops.name)
free((void *)ops.name);
};

static void *get_frame_data_ptr(st_frame *src);
static void *get_frame_data_ptr(st30_frame *src);
protected:
mtl_handle mtl_device = nullptr;
HANDLE mtl_session = nullptr;
OPS ops = {0};
size_t transfer_size = 0;
std::atomic<bool> frame_available;
context::Context _ctx = context::WithCancel(context::Background());

static void get_mtl_dev_params(mtl_init_params& st_param, const std::string& dev_port,
mtl_log_level log_level,
const char local_ip_addr[MESH_IP_ADDRESS_SIZE]);
static mtl_handle get_mtl_device(const std::string& dev_port, mtl_log_level log_level,
const char local_ip_addr[MESH_IP_ADDRESS_SIZE],
int& session_id);
virtual FRAME *get_frame(HANDLE) = 0;
virtual int put_frame(HANDLE, FRAME *) = 0;
virtual HANDLE create_session(mtl_handle, OPS *) = 0;
virtual int close_session(HANDLE) = 0;

virtual ~ST2110() {};
static int frame_available_cb(void *ptr) {
auto _this = static_cast<ST2110 *>(ptr);
if (!_this) {
return -1;
}

protected:
static int frame_available_cb(void *ptr);
void init_frame_available();
void notify_frame_available();
void wait_frame_available();
_this->notify_frame_available();

mtl_handle mtl_device = nullptr;
std::atomic<bool> frame_available;
return 0;
}

void notify_frame_available() {
frame_available.store(true, std::memory_order_release);
frame_available.notify_one();
}

void wait_frame_available() {
frame_available.wait(false, std::memory_order_acquire);
frame_available = false;
}

virtual int configure_common(context::Context& ctx, const std::string& dev_port,
const MeshConfig_ST2110& cfg_st2110) {
int session_id = 0;
mtl_device = get_mtl_device(dev_port, MTL_LOG_LEVEL_CRIT, cfg_st2110.local_ip_addr, session_id);
if (!mtl_device) {
log::error("Failed to get MTL device");
return -1;
}

strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN);
ops.port.num_port = 1;

char session_name[NAME_MAX] = "";
snprintf(session_name, NAME_MAX, "mcm_mtl_%d", session_id);
if (ops.name)
free((void *)ops.name);
ops.name = strdup(session_name);
ops.framebuff_cnt = 4;

ops.priv = this; // app handle register to lib
ops.notify_frame_available = frame_available_cb;

log::info("ST2110: configure")
("port", ops.port.port[MTL_PORT_P])
("num_port", (int)ops.port.num_port)
("udp_port", ops.port.udp_port[MTL_PORT_P])
("name", ops.name)
("framebuff_cnt", ops.framebuff_cnt);

return 0;
}

Result on_establish(context::Context& ctx) override {
_ctx = context::WithCancel(ctx);
frame_available = false;

mtl_session = create_session(mtl_device, &ops);
if (!mtl_session) {
log::error("Failed to create session");
set_state(ctx, State::closed);
return set_result(Result::error_general_failure);
}

set_state(ctx, State::active);
return set_result(Result::success);
}

Result on_shutdown(context::Context& ctx) override {
_ctx.cancel();
notify_frame_available();

if (mtl_session) {
close_session(mtl_session);
mtl_session = nullptr;
}
set_state(ctx, State::closed);
return set_result(Result::success);
};
};

} // namespace mesh::connection
Expand Down
115 changes: 34 additions & 81 deletions media-proxy/include/mesh/st2110rx.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,124 +11,77 @@ namespace mesh::connection {
* Base abstract class of ST2110Rx. ST2110_20Rx/ST2110_22Rx/ST2110_30Rx
* inherit this class.
*/
template <typename FRAME, typename HANDLE, typename OPS> class ST2110Rx : public ST2110 {
template <typename FRAME, typename HANDLE, typename OPS>
class ST2110Rx : public ST2110<FRAME, HANDLE, OPS> {
public:
ST2110Rx() { _kind = Kind::receiver; }

~ST2110Rx() {
shutdown(_ctx);
if (ops.name)
free((void *)ops.name);
}
ST2110Rx() { this->_kind = Kind::receiver; }

protected:
HANDLE mtl_session = nullptr;
OPS ops = {0};
size_t transfer_size = 0;
std::jthread frame_thread_handle;
context::Context _ctx = context::WithCancel(context::Background());

virtual FRAME *get_frame(HANDLE) = 0;
virtual int put_frame(HANDLE, FRAME *) = 0;
virtual HANDLE create_session(mtl_handle, OPS *) = 0;
virtual int close_session(HANDLE) = 0;

int configure_common(context::Context& ctx, const std::string& dev_port,
const MeshConfig_ST2110& cfg_st2110) {
int session_id = 0;
mtl_device = get_mtl_device(dev_port, MTL_LOG_LEVEL_CRIT, cfg_st2110.local_ip_addr, session_id);
if (!mtl_device) {
log::error("Failed to get MTL device");
return -1;
}

inet_pton(AF_INET, cfg_st2110.remote_ip_addr, ops.port.ip_addr[MTL_PORT_P]);
inet_pton(AF_INET, cfg_st2110.local_ip_addr, ops.port.mcast_sip_addr[MTL_PORT_P]);
ops.port.udp_port[MTL_PORT_P] = cfg_st2110.local_port;

strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN);
ops.port.num_port = 1;
const MeshConfig_ST2110& cfg_st2110) override{
ST2110<FRAME, HANDLE, OPS>::configure_common(ctx, dev_port, cfg_st2110);

char session_name[NAME_MAX] = "";
snprintf(session_name, NAME_MAX, "mcm_mtl_rx_%d", session_id);
if (ops.name)
free((void *)ops.name);
ops.name = strdup(session_name);
ops.framebuff_cnt = 4;

ops.priv = this; // app handle register to lib
ops.notify_frame_available = frame_available_cb;
inet_pton(AF_INET, cfg_st2110.remote_ip_addr, this->ops.port.ip_addr[MTL_PORT_P]);
inet_pton(AF_INET, cfg_st2110.local_ip_addr, this->ops.port.mcast_sip_addr[MTL_PORT_P]);
this->ops.port.udp_port[MTL_PORT_P] = cfg_st2110.local_port;

log::info("ST2110Rx: configure")
("port", ops.port.port[MTL_PORT_P])
("ip_addr", std::to_string(ops.port.ip_addr[MTL_PORT_P][0]) + " " +
std::to_string(ops.port.ip_addr[MTL_PORT_P][1]) + " " +
std::to_string(ops.port.ip_addr[MTL_PORT_P][2]) + " " +
std::to_string(ops.port.ip_addr[MTL_PORT_P][3]))
("mcast_sip_addr", std::to_string(ops.port.mcast_sip_addr[MTL_PORT_P][0]) + " " +
std::to_string(ops.port.mcast_sip_addr[MTL_PORT_P][1]) + " " +
std::to_string(ops.port.mcast_sip_addr[MTL_PORT_P][2]) + " " +
std::to_string(ops.port.mcast_sip_addr[MTL_PORT_P][3]))
("num_port", ops.port.num_port)
("udp_port", ops.port.udp_port[MTL_PORT_P])
("name", ops.name)
("framebuff_cnt", ops.framebuff_cnt);

("ip_addr", std::to_string(this->ops.port.ip_addr[MTL_PORT_P][0]) + " " +
std::to_string(this->ops.port.ip_addr[MTL_PORT_P][1]) + " " +
std::to_string(this->ops.port.ip_addr[MTL_PORT_P][2]) + " " +
std::to_string(this->ops.port.ip_addr[MTL_PORT_P][3]))
("mcast_sip_addr", std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][0]) + " " +
std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][1]) + " " +
std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][2]) + " " +
std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][3]));
return 0;
}

Result on_establish(context::Context& ctx) override {
_ctx = context::WithCancel(ctx);
init_frame_available();

mtl_session = create_session(mtl_device, &ops);
if (!mtl_session) {
log::error("Failed to create session");
set_state(ctx, State::closed);
return set_result(Result::error_general_failure);
Result res = ST2110<FRAME, HANDLE, OPS>::on_establish(ctx);
if (res != Result::success) {
return res;
}

/* Start MTL session thread. */
try {
frame_thread_handle = std::jthread(&ST2110Rx::frame_thread, this);
} catch (const std::system_error& e) {
log::error("Failed to create thread");
set_state(ctx, State::closed);
return set_result(Result::error_out_of_memory);
this->set_state(ctx, State::closed);
return this->set_result(Result::error_out_of_memory);
}

set_state(ctx, State::active);
return set_result(Result::success);
this->set_state(ctx, State::active);
return this->set_result(Result::success);
}

Result on_shutdown(context::Context& ctx) override {
_ctx.cancel();
notify_frame_available();
Result res = ST2110<FRAME, HANDLE, OPS>::on_shutdown(ctx);
if (res != Result::success) {
return res;
}

frame_thread_handle.join();

if (mtl_session) {
close_session(mtl_session);
mtl_session = nullptr;
}
set_state(ctx, State::closed);
return set_result(Result::success);
this->set_state(ctx, State::closed);
return this->set_result(Result::success);
};

virtual void on_delete(context::Context& ctx) override {}

private:
void frame_thread() {
while (!_ctx.cancelled()) {
while (!this->_ctx.cancelled()) {
// Get full buffer from MTL
FRAME *frame_ptr = get_frame(mtl_session);
FRAME *frame_ptr = this->get_frame(this->mtl_session);
if (frame_ptr) {
// Forward buffer to emulated receiver
transmit(_ctx, get_frame_data_ptr(frame_ptr), transfer_size);
this->transmit(this->_ctx, get_frame_data_ptr(frame_ptr), this->transfer_size);
// Return used buffer to MTL
put_frame(mtl_session, frame_ptr);
this->put_frame(this->mtl_session, frame_ptr);
} else {
wait_frame_available();
this->wait_frame_available();
}
}
}
Expand Down
Loading

0 comments on commit 94c59be

Please sign in to comment.