From 78bb4a3738c1f4bbd369986547af5d72b09f8c62 Mon Sep 17 00:00:00 2001 From: Konstantin Ilichev Date: Mon, 9 Dec 2024 01:02:41 +0000 Subject: [PATCH] Add support for Multipoint Groups and telemetry * Add gRPC based API to interface with MCM Agent. * Add Manager to control the life cycle of Multipoint Groups. * Add basic support for Bridges. * Add Manager to control the life cycle of Bridges. * Add metrics to the connection base class to support telemetry. * Add metric collector engine to support telemetry. * Add printing call stack when the app crashes. * Minor fixes to the build system. Signed-off-by: Konstantin Ilichev --- build.sh | 2 +- ffmpeg-plugin/build-ffmpeg.sh | 2 + media-proxy/CMakeLists.txt | 14 +- media-proxy/include/api_server_grpc.h | 73 ---- media-proxy/include/mesh/client_api.h | 2 +- media-proxy/include/mesh/conn.h | 29 +- media-proxy/include/mesh/conn_registry.h | 5 +- media-proxy/include/mesh/manager_bridges.h | 39 ++ media-proxy/include/mesh/manager_local.h | 14 +- media-proxy/include/mesh/manager_multipoint.h | 77 ++++ media-proxy/include/mesh/metrics.h | 79 ++++ media-proxy/include/mesh/metrics_collector.h | 46 +++ media-proxy/include/mesh/mocked_bridge.h | 36 ++ media-proxy/include/mesh/multipoint.h | 47 +++ media-proxy/include/mesh/proxy_api.h | 54 +++ media-proxy/src/api_server_grpc.cc | 83 ---- media-proxy/src/media_proxy.cc | 74 +++- media-proxy/src/mesh/client_api.cc | 26 +- media-proxy/src/mesh/conn.cc | 80 ++-- media-proxy/src/mesh/conn_local.cc | 36 +- media-proxy/src/mesh/conn_local_tx.cc | 7 +- media-proxy/src/mesh/manager_bridges.cc | 115 ++++++ media-proxy/src/mesh/manager_local.cc | 123 ++++-- media-proxy/src/mesh/manager_multipoint.cc | 356 ++++++++++++++++++ media-proxy/src/mesh/metrics.cc | 33 ++ media-proxy/src/mesh/metrics_collector.cc | 89 +++++ media-proxy/src/mesh/multipoint.cc | 158 ++++++++ media-proxy/src/mesh/proxy_api.cc | 331 ++++++++++++++++ protos/controller.proto | 257 ------------- protos/mediaproxy.proto | 101 ++++- protos/sdk.proto | 2 +- sdk/src/mesh_client_api.cc | 20 +- 32 files changed, 1852 insertions(+), 558 deletions(-) delete mode 100644 media-proxy/include/api_server_grpc.h create mode 100644 media-proxy/include/mesh/manager_bridges.h create mode 100644 media-proxy/include/mesh/manager_multipoint.h create mode 100644 media-proxy/include/mesh/metrics.h create mode 100644 media-proxy/include/mesh/metrics_collector.h create mode 100644 media-proxy/include/mesh/mocked_bridge.h create mode 100644 media-proxy/include/mesh/multipoint.h create mode 100644 media-proxy/include/mesh/proxy_api.h delete mode 100644 media-proxy/src/api_server_grpc.cc create mode 100644 media-proxy/src/mesh/manager_bridges.cc create mode 100644 media-proxy/src/mesh/manager_multipoint.cc create mode 100644 media-proxy/src/mesh/metrics.cc create mode 100644 media-proxy/src/mesh/metrics_collector.cc create mode 100644 media-proxy/src/mesh/multipoint.cc create mode 100644 media-proxy/src/mesh/proxy_api.cc delete mode 100644 protos/controller.proto diff --git a/build.sh b/build.sh index e3f299ed..c6cae466 100755 --- a/build.sh +++ b/build.sh @@ -38,6 +38,6 @@ as_root ldconfig export LD_LIBRARY_PATH="${PREFIX_DIR}/usr/local/lib:/usr/local/lib64" "${MCM_BUILD_DIR}/bin/sdk_unit_tests" "${MCM_BUILD_DIR}/bin/media_proxy_unit_tests" -ln -s "${MCM_BUILD_DIR}" "${SCRIPT_DIR}/build" +ln -sf "${MCM_BUILD_DIR}" "${SCRIPT_DIR}/build" log_info "Build Succeeded" diff --git a/ffmpeg-plugin/build-ffmpeg.sh b/ffmpeg-plugin/build-ffmpeg.sh index 6beff4dd..c9965639 100755 --- a/ffmpeg-plugin/build-ffmpeg.sh +++ b/ffmpeg-plugin/build-ffmpeg.sh @@ -15,6 +15,8 @@ BUILD_DIR="${BUILD_DIR:-${REPOSITORY_DIR}/_build}" lib_setup_ffmpeg_dir_and_version "${FFMPEG_VER:-7.0}" export FFMPEG_DIR="${BUILD_DIR}/${FFMPEG_SUB_DIR}" +cp -f "${SCRIPT_DIR}/mcm_"* "${FFMPEG_DIR}/libavdevice/" + make -C "${FFMPEG_DIR}" -j "$(nproc)" as_root make -C "${FFMPEG_DIR}" install diff --git a/media-proxy/CMakeLists.txt b/media-proxy/CMakeLists.txt index 6176cef4..86f75eed 100644 --- a/media-proxy/CMakeLists.txt +++ b/media-proxy/CMakeLists.txt @@ -10,6 +10,18 @@ project(MediaProxy VERSION 0.0.1 LANGUAGES CXX C) set(CMAKE_CXX_STANDARD 20) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -pthread") +# Set the build type to Debug by default +if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Debug CACHE STRING "Choose the type of build." FORCE) +endif() + +# Enable debug symbols +set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -g") +set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g") + +# Ensure that the binary is not stripped +set(CMAKE_EXE_LINKER_FLAGS_DEBUG "${CMAKE_EXE_LINKER_FLAGS_DEBUG} -Wl,--no-strip") + # Enabled by default option(ENABLE_ZERO_COPY "Enable zero-copy mode to avoid memory copying between Media Proxy and MTL" ON) IF(ENABLE_ZERO_COPY) @@ -36,7 +48,6 @@ endif() # Define the .proto files set(PROTO_FILES ${PROTO_DIR}/mediaproxy.proto - ${PROTO_DIR}/controller.proto ${PROTO_DIR}/sdk.proto ) @@ -111,6 +122,7 @@ target_link_libraries(media_proxy_lib PUBLIC m ${MTL_LIB} ${MEMIF_LIB} ${LIBFABR add_executable(media_proxy ${proxy_srcs} src/media_proxy.cc) target_link_libraries(media_proxy PRIVATE media_proxy_lib) +target_link_libraries(media_proxy PRIVATE dl) install(TARGETS media_proxy DESTINATION ${CMAKE_INSTALL_PATH} COMPONENT media_proxy) install(FILES imtl.json DESTINATION ${CMAKE_CONFIG_PATH} COMPONENT config) diff --git a/media-proxy/include/api_server_grpc.h b/media-proxy/include/api_server_grpc.h deleted file mode 100644 index ac42e641..00000000 --- a/media-proxy/include/api_server_grpc.h +++ /dev/null @@ -1,73 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "controller.grpc.pb.h" -#include "proxy_context.h" -#include -#include -#include -#include -#include -#include - -#pragma once - -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::Status; - -using controller::Configure; -using controller::ControlReply; -using controller::RxControlRequest; -using controller::St20pRxOps; -using controller::StInit; -using controller::StopControlRequest; -using controller::StRxPort; -using controller::TxControlRequest; - -using controller::MsmDataPlane; -using controller::StreamData; -using controller::StreamResult; - -using controller::Health; -using controller::HealthCheckRequest; -using controller::HealthCheckResponse; -using controller::HealthCheckResponse_ServingStatus; - -class ConfigureServiceImpl final : public Configure::Service { -public: - ConfigureServiceImpl(ProxyContext* ctx); - Status TxStart(ServerContext* context, const TxControlRequest* request, ControlReply* reply) override; - Status RxStart(ServerContext* context, const RxControlRequest* request, ControlReply* reply) override; - Status TxStop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) override; - Status RxStop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) override; - Status Stop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) override; - -private: - ProxyContext* m_ctx; -}; - -class MsmDataPlaneServiceImpl final : public MsmDataPlane::Service { -public: - MsmDataPlaneServiceImpl(ProxyContext* ctx); - Status stream_add_del(ServerContext* context, const StreamData* request, StreamResult* reply) override; - -private: - ProxyContext* m_ctx; -}; - -class HealthServiceImpl final : public Health::Service { -public: - HealthServiceImpl(ProxyContext* ctx); - Status Check(ServerContext* context, const HealthCheckRequest* request, HealthCheckResponse* reply) override; - Status Watch(ServerContext* context, const HealthCheckRequest* request, HealthCheckResponse* reply) override; - -private: - ProxyContext* m_ctx; -}; - -void RunRPCServer(ProxyContext* ctx); diff --git a/media-proxy/include/mesh/client_api.h b/media-proxy/include/mesh/client_api.h index eb380db8..309295ff 100644 --- a/media-proxy/include/mesh/client_api.h +++ b/media-proxy/include/mesh/client_api.h @@ -11,7 +11,7 @@ namespace mesh { -void RunClientAPIServer(context::Context& ctx); +void RunSDKAPIServer(context::Context& ctx); } // namespace mesh diff --git a/media-proxy/include/mesh/conn.h b/media-proxy/include/mesh/conn.h index 29f86345..0bc793a9 100644 --- a/media-proxy/include/mesh/conn.h +++ b/media-proxy/include/mesh/conn.h @@ -10,6 +10,7 @@ #include #include #include "concurrency.h" +#include "metrics.h" namespace mesh::connection { @@ -67,6 +68,7 @@ enum class Result { error_out_of_memory, error_general_failure, error_context_cancelled, + // TODO: more error codes to be added... }; @@ -76,7 +78,7 @@ enum class Result { * Base abstract class of connection. All connection implementations must * inherit this class. */ -class Connection { +class Connection : public telemetry::MetricsProvider { public: Connection(); @@ -86,7 +88,8 @@ class Connection { State state(); Status status(); - Result set_link(context::Context& ctx, Connection *new_link); + virtual Result set_link(context::Context& ctx, Connection *new_link, + Connection *requester = nullptr); Connection * link(); Result establish(context::Context& ctx); @@ -117,22 +120,30 @@ class Connection { uint32_t& sent); virtual void on_delete(context::Context& ctx) {} - Kind _kind; // must be properly set in the derived classes ctor - Connection *_link; + Kind _kind = Kind::undefined; // must be properly set in the derived class ctor + Connection *_link = nullptr; + std::atomic setting_link = false; // held in set_link() + std::atomic transmitting = false; // held in on_receive() struct { std::atomic inbound_bytes; std::atomic outbound_bytes; - std::atomic transactions_successful; + std::atomic transactions_succeeded; std::atomic transactions_failed; std::atomic errors; + + int64_t prev_timestamp_ms; + uint64_t prev_inbound_bytes; + uint64_t prev_outbound_bytes; + uint32_t prev_errors; + uint32_t prev_transactions_succeeded; } metrics; private: - std::atomic _state; - std::atomic _status; - std::atomic setting_link; // held in set_link() - std::atomic transmitting; // held in on_receive() + virtual void collect(telemetry::Metric& metric, const int64_t& timestamp_ms); + + std::atomic _state = State::not_configured; + std::atomic _status = Status::initial; }; const char * kind2str(Kind kind, bool brief = false); diff --git a/media-proxy/include/mesh/conn_registry.h b/media-proxy/include/mesh/conn_registry.h index 63f3fcc7..7628d8e5 100644 --- a/media-proxy/include/mesh/conn_registry.h +++ b/media-proxy/include/mesh/conn_registry.h @@ -21,7 +21,7 @@ namespace mesh::connection { */ class Registry { public: - int add(std::string& id, Connection *conn) { + int add(const std::string& id, Connection *conn) { std::unique_lock lk(mx); if (conns.contains(id)) return -1; @@ -62,6 +62,9 @@ class Registry { // TODO: Check if the mutex is still needed. The local connection manager // has it own mutex, which makes this one redundant. + // UPD: The bridges manager is in a single thread flow of gRPC stream + // handler of StartCommandQueueRequest, so the mutex here is redundant. + // Consider removing this mutex. std::shared_mutex mx; }; diff --git a/media-proxy/include/mesh/manager_bridges.h b/media-proxy/include/mesh/manager_bridges.h new file mode 100644 index 00000000..f54e3e74 --- /dev/null +++ b/media-proxy/include/mesh/manager_bridges.h @@ -0,0 +1,39 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef MANAGER_BRIDGES_H +#define MANAGER_BRIDGES_H + +#include +#include "concurrency.h" +#include "conn_registry.h" + +namespace mesh::connection { + +class BridgesManager { +public: + int create_bridge(context::Context& ctx, Connection*& bridge, + const std::string& id, Kind kind); + + int delete_bridge(context::Context& ctx, const std::string& id); + + Connection * get_bridge(context::Context& ctx, const std::string& id); + + void shutdown(context::Context& ctx); + + void lock(); + void unlock(); + +private: + Registry registry; // This regustry uses Agent assigned ids + std::shared_mutex mx; +}; + +extern BridgesManager bridges_manager; + +} // namespace mesh::connection + +#endif // MANAGER_BRIDGES_H diff --git a/media-proxy/include/mesh/manager_local.h b/media-proxy/include/mesh/manager_local.h index dc902634..7cb88264 100644 --- a/media-proxy/include/mesh/manager_local.h +++ b/media-proxy/include/mesh/manager_local.h @@ -16,15 +16,21 @@ namespace mesh::connection { class LocalManager { public: - int create_connection(context::Context& ctx, std::string& id, - mcm_conn_param *param, memif_conn_param *memif_param); + int create_connection_sdk(context::Context& ctx, std::string& id, + mcm_conn_param *param, memif_conn_param *memif_param); - int delete_connection(context::Context& ctx, const std::string& id); + int delete_connection_sdk(context::Context& ctx, const std::string& id); + + Connection * get_connection(context::Context& ctx, const std::string& id); void shutdown(context::Context& ctx); + void lock(); + void unlock(); + private: - Registry registry; + Registry registry_sdk; // This registry uses SDK ids + Registry registry; // This registry uses Agent assigned ids std::shared_mutex mx; }; diff --git a/media-proxy/include/mesh/manager_multipoint.h b/media-proxy/include/mesh/manager_multipoint.h new file mode 100644 index 00000000..d15b4c03 --- /dev/null +++ b/media-proxy/include/mesh/manager_multipoint.h @@ -0,0 +1,77 @@ +#ifndef MANAGER_MULTIPOINT_H +#define MANAGER_MULTIPOINT_H + +#include +#include +#include +#include +#include +#include +#include +#include "multipoint.h" + +namespace mesh::multipoint { + +class GroupChangeConfig { +public: + std::string group_id; + std::vector added_conn_ids; + std::vector deleted_conn_ids; + std::vector added_bridge_ids; + std::vector deleted_bridge_ids; +}; + +class GroupConfig { +public: + std::vector conn_ids; + std::vector bridge_ids; +}; + +class Config { +public: + std::unordered_map groups; +}; + +class GroupManager { +public: + Result apply_config(context::Context& ctx, const Config& new_cfg); + Result reconcile_config(context::Context& ctx, + std::vector added_groups, + std::vector deleted_groups, + std::vector updated_groups); +private: + Result associate(context::Context& ctx, Group *group, Connection *conn); + + int add_group(const std::string& id, Group *group) { + std::unique_lock lk(mx); + if (groups.contains(id)) + return -1; + groups[id] = group; + return 0; + } + + bool delete_group(const std::string& id) { + std::unique_lock lk(mx); + return groups.erase(id) > 0; + } + + Group * get_group(const std::string& id) { + std::shared_lock lk(mx); + auto it = groups.find(id); + if (it != groups.end()) { + return it->second; + } + return nullptr; + } + + Config cfg; + + std::unordered_map groups; + std::shared_mutex mx; +}; + +extern GroupManager group_manager; + +} // namespace mesh::multipoint + +#endif // MANAGER_MULTIPOINT_H diff --git a/media-proxy/include/mesh/metrics.h b/media-proxy/include/mesh/metrics.h new file mode 100644 index 00000000..317f37e5 --- /dev/null +++ b/media-proxy/include/mesh/metrics.h @@ -0,0 +1,79 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef METRICS_H +#define METRICS_H + +#include +#include +#include + +namespace mesh::telemetry { + +class MetricField { +public: + MetricField(const std::string& name, const std::string& str_value) + : name(name), value(str_value) {} + + MetricField(const std::string& name, uint64_t& uint_value) + : name(name), value(uint_value) {} + + MetricField(const std::string& name, double& double_value) + : name(name), value(double_value) {} + + MetricField(const std::string& name, bool bool_value) + : name(name), value(bool_value) {} + + std::string name; + std::variant value; +}; + +class Metric { +public: + Metric(int64_t timestamp_ms) + : timestamp_ms(timestamp_ms) {} + + void addFieldString(const std::string& name, const std::string& str_value) { + fields.emplace_back(name, str_value); + } + + void addFieldUint64(const std::string& name, uint64_t uint_value) { + fields.emplace_back(name, uint_value); + } + + void addFieldDouble(const std::string& name, double double_value) { + fields.emplace_back(name, double_value); + } + + void addFieldBool(const std::string& name, bool bool_value) { + fields.emplace_back(name, bool_value); + } + + int64_t timestamp_ms; + std::string provider_id; + std::vector fields; +}; + +class MetricsProvider { +public: + void assign_id(const std::string& id); + + std::string id; + +protected: + MetricsProvider(); + virtual ~MetricsProvider(); + + virtual void collect(Metric& metric, const int64_t& timestamp_ms) {} + // virtual std::unordered_map metrics_map(); + // virtual void reset(uint64_t mask) {} + + friend class MetricsCollector; +}; + +} // namespace mesh::telemetry + +#endif // METRICS_H diff --git a/media-proxy/include/mesh/metrics_collector.h b/media-proxy/include/mesh/metrics_collector.h new file mode 100644 index 00000000..fb2e6a73 --- /dev/null +++ b/media-proxy/include/mesh/metrics_collector.h @@ -0,0 +1,46 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef METRICS_COLLECTOR_H +#define METRICS_COLLECTOR_H + +#include "metrics.h" +#include "concurrency.h" +#include + +namespace mesh::telemetry { + +class Registry { +public: + void register_provider(MetricsProvider *provider); + void unregister_provider(MetricsProvider *provider); + void lock(); + void unlock(); + +protected: + std::list providers; + std::mutex mx; + + friend class MetricsCollector; +}; + +class MetricsCollector : MetricsProvider { +public: + MetricsCollector() { assign_id("collector"); } + + void run(context::Context& ctx); + +private: + virtual void collect(Metric& metric, const int64_t& timestamp_ms); + + std::atomic total; +}; + +extern Registry registry; + +} // namespace mesh::telemetry + +#endif // METRICS_COLLECTOR_H diff --git a/media-proxy/include/mesh/mocked_bridge.h b/media-proxy/include/mesh/mocked_bridge.h new file mode 100644 index 00000000..dfd326ae --- /dev/null +++ b/media-proxy/include/mesh/mocked_bridge.h @@ -0,0 +1,36 @@ +#ifndef MOCKED_BRIDGE_H +#define MOCKED_BRIDGE_H + +#include "conn.h" + +namespace mesh::connection { + +class MockedBridge : public Connection { +public: + MockedBridge() {} + + void configure(context::Context& ctx, Kind kind) { + _kind = kind; + set_state(ctx, State::configured); + } + + Result on_establish(context::Context& ctx) override { + set_state(ctx, State::active); + return Result::success; + } + + Result on_shutdown(context::Context& ctx) override { + set_state(ctx, State::closed); + return Result::success; + } + + Result on_receive(context::Context& ctx, void *ptr, uint32_t sz, + uint32_t& sent) override { + sent = sz; + return Result::success; + } +}; + +} // namespace mesh::connection + +#endif // MOCKED_BRIDGE_H diff --git a/media-proxy/include/mesh/multipoint.h b/media-proxy/include/mesh/multipoint.h new file mode 100644 index 00000000..c7e294b2 --- /dev/null +++ b/media-proxy/include/mesh/multipoint.h @@ -0,0 +1,47 @@ +#ifndef MULTIPOINT_H +#define MULTIPOINT_H + +#include "conn.h" +#include + +namespace mesh::multipoint { + +using namespace mesh::connection; + +class Group : public Connection { + +public: + Group(const std::string& group_id); + ~Group() override; + + void configure(context::Context& ctx); + + Result set_link(context::Context& ctx, Connection *new_link, + Connection *requester = nullptr) override; + + Result assign_input(context::Context& ctx, Connection *input); + Result add_output(context::Context& ctx, Connection *output); + + void delete_all_outputs() { + outputs.clear(); + } + + int outputs_num() { + return outputs.size(); + } + +private: + Result on_establish(context::Context& ctx) override; + Result on_receive(context::Context& ctx, void *ptr, uint32_t sz, + uint32_t& sent) override; + Result on_shutdown(context::Context& ctx) override; + void on_delete(context::Context& ctx) override; + +private: + std::list outputs; + +}; + +} // namespace mesh::multipoint + +#endif // MULTIPOINT_H diff --git a/media-proxy/include/mesh/proxy_api.h b/media-proxy/include/mesh/proxy_api.h new file mode 100644 index 00000000..6404feff --- /dev/null +++ b/media-proxy/include/mesh/proxy_api.h @@ -0,0 +1,54 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef PROXY_API_H +#define PROXY_API_H + +#include +#include "mediaproxy.grpc.pb.h" +#include "metrics.h" +#include "concurrency.h" + +using mediaproxy::CommandReply; + +namespace mesh { + +using grpc::Channel; +using mediaproxy::ProxyAPI; + +class ProxyAPIClient { +public: + ProxyAPIClient(std::shared_ptr channel) + : stub_(ProxyAPI::NewStub(channel)) {} + + int RegisterConnection(std::string& conn_id, std::string& kind); + int UnregisterConnection(const std::string& conn_id); + int SendMetrics(const std::vector& metrics); + int StartCommandQueue(context::Context& ctx); + int SendCommandReply(CommandReply& request); + + int Run(context::Context& ctx); + void Shutdown(); + +protected: + int RegisterMediaProxy(uint32_t sdk_port); + int UnregisterMediaProxy(); + + friend int RunProxyAPIClient(context::Context& ctx); + +private: + std::unique_ptr stub_; + std::string proxy_id; + std::jthread th; +}; + +extern std::unique_ptr proxyApiClient; + +int RunProxyAPIClient(context::Context& ctx); + +} // namespace mesh + +#endif // PROXY_API_H diff --git a/media-proxy/src/api_server_grpc.cc b/media-proxy/src/api_server_grpc.cc deleted file mode 100644 index 736bab2b..00000000 --- a/media-proxy/src/api_server_grpc.cc +++ /dev/null @@ -1,83 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "api_server_grpc.h" - -ConfigureServiceImpl::ConfigureServiceImpl(ProxyContext* ctx) - : m_ctx(ctx) -{ -} - -Status ConfigureServiceImpl::TxStart(ServerContext* context, const TxControlRequest* request, ControlReply* reply) -{ - std::cout << "\nReceived command: TxStart." << std::endl; - return Status::OK; -} - -Status ConfigureServiceImpl::RxStart(ServerContext* context, const RxControlRequest* request, ControlReply* reply) -{ - std::cout << "\nReceived command: RxStart." << std::endl; - return Status::OK; -} - -Status ConfigureServiceImpl::TxStop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) -{ - std::cout << "\nReceived command: Stop." << std::endl; - return Status::OK; -} - -Status ConfigureServiceImpl::RxStop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) -{ - std::cout << "\nReceived command: Stop." << std::endl; - return Status::OK; -} - -Status ConfigureServiceImpl::Stop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) -{ - std::cout << "\nReceived command: Stop." << std::endl; - return Status::OK; -} - -MsmDataPlaneServiceImpl::MsmDataPlaneServiceImpl(ProxyContext* ctx) - : m_ctx(ctx) -{ -} - -Status MsmDataPlaneServiceImpl::stream_add_del(ServerContext* context, const StreamData* request, StreamResult* reply) -{ - return Status::OK; -} - -HealthServiceImpl::HealthServiceImpl(ProxyContext* ctx) - : m_ctx(ctx) -{ -} - -Status HealthServiceImpl::Check(ServerContext* context, const HealthCheckRequest* request, HealthCheckResponse* reply) -{ - reply->set_status(controller::HealthCheckResponse_ServingStatus_SERVING); - - return Status::OK; -} - -Status HealthServiceImpl::Watch(ServerContext* context, const HealthCheckRequest* request, HealthCheckResponse* reply) -{ - return Status::OK; -} - -void RunRPCServer(ProxyContext* ctx) -{ - ConfigureServiceImpl service(ctx); - - ServerBuilder builder; - builder.AddListeningPort(ctx->getRPCListenAddress(), grpc::InsecureServerCredentials()); - builder.RegisterService(&service); - - std::unique_ptr server(builder.BuildAndStart()); - INFO("gRPC Server listening on %s", ctx->getRPCListenAddress().c_str()); - - server->Wait(); -} diff --git a/media-proxy/src/media_proxy.cc b/media-proxy/src/media_proxy.cc index 6638f5ff..9672ddf5 100644 --- a/media-proxy/src/media_proxy.cc +++ b/media-proxy/src/media_proxy.cc @@ -7,7 +7,6 @@ #include #include -#include "api_server_grpc.h" #include "api_server_tcp.h" #include @@ -15,6 +14,11 @@ #include "client_api.h" #include "logger.h" #include "manager_local.h" +#include "proxy_api.h" +#include "metrics_collector.h" + +#include +#include #ifndef IMTL_CONFIG_PATH #define IMTL_CONFIG_PATH "./imtl.json" @@ -49,6 +53,39 @@ void usage(FILE* fp, const char* path) DEFAULT_TCP_PORT); } +void PrintStackTrace() { + const int max_frames = 128; + void* buffer[max_frames]; + int num_frames = backtrace(buffer, max_frames); + char** symbols = backtrace_symbols(buffer, num_frames); + + std::cerr << "Stack trace:" << std::endl; + for (int i = 0; i < num_frames; ++i) { + Dl_info info; + if (dladdr(buffer[i], &info) && info.dli_sname) { + char* demangled = nullptr; + int status = -1; + if (info.dli_sname[0] == '_') { + demangled = abi::__cxa_demangle(info.dli_sname, nullptr, 0, &status); + } + std::cerr << " " << (status == 0 ? demangled : info.dli_sname) << " + " + << ((char*)buffer[i] - (char*)info.dli_saddr) << " at " + << info.dli_fname << std::endl; + free(demangled); + } else { + std::cerr << symbols[i] << std::endl; + } + } + + free(symbols); +} + +void SignalHandler(int signal) { + std::cerr << "Error: signal " << signal << std::endl; + PrintStackTrace(); + exit(1); +} + using namespace mesh; // Main context with cancellation @@ -56,6 +93,8 @@ auto ctx = context::WithCancel(context::Background()); int main(int argc, char* argv[]) { + signal(SIGSEGV, SignalHandler); + std::string grpc_port = DEFAULT_GRPC_PORT; std::string tcp_port = DEFAULT_TCP_PORT; std::string dev_port = DEFAULT_DEV_PORT; @@ -119,8 +158,6 @@ int main(int argc, char* argv[]) proxy_ctx->setDevicePort(dev_port); proxy_ctx->setDataPlaneAddress(dp_ip); - // mesh::Experiment1(); - // Intercept shutdown signals to cancel the main context auto signal_handler = [](int sig) { if (sig == SIGINT || sig == SIGTERM) { @@ -131,30 +168,45 @@ int main(int argc, char* argv[]) std::signal(SIGINT, signal_handler); std::signal(SIGTERM, signal_handler); - /* start gRPC server */ - std::jthread rpcThread(RunRPCServer, proxy_ctx); + // mesh::Experiment3(ctx); + + // Start ProxyAPI client + RunProxyAPIClient(ctx); + + // Start metrics collector + std::thread metricsCollectorThread([&]() { + telemetry::MetricsCollector collector; + collector.run(ctx); + }); /* start TCP server */ std::thread tcpThread(RunTCPServer, proxy_ctx); - // Start ClientAPI server - std::thread clientApiThread([]() { RunClientAPIServer(ctx); }); + // Start SDK API server + auto sdk_ctx = context::WithCancel(context::Background()); + std::thread sdkApiThread([&]() { RunSDKAPIServer(sdk_ctx); }); // Wait until the main context is cancelled ctx.done(); - clientApiThread.join(); - // Stop Local connection manager log::info("Shutting down Local conn manager"); auto tctx = context::WithTimeout(context::Background(), - std::chrono::milliseconds(5000)); + std::chrono::milliseconds(3000)); connection::local_manager.shutdown(ctx); + metricsCollectorThread.join(); + + // Shutdown ProxyAPI client + proxyApiClient->Shutdown(); + + // Shutdown SDK API server + sdk_ctx.cancel(); + sdkApiThread.join(); + log::info("Media Proxy exited"); exit(0); - rpcThread.join(); tcpThread.join(); delete (proxy_ctx); diff --git a/media-proxy/src/mesh/client_api.cc b/media-proxy/src/mesh/client_api.cc index dde83fb5..35b6022a 100644 --- a/media-proxy/src/mesh/client_api.cc +++ b/media-proxy/src/mesh/client_api.cc @@ -22,13 +22,13 @@ using grpc::ServerBuilder; using grpc::ServerContext; using grpc::Status; using grpc::StatusCode; -using sdk::ClientAPI; +using sdk::SDKAPI; using sdk::CreateConnectionRequest; using sdk::CreateConnectionResponse; using sdk::DeleteConnectionRequest; using sdk::DeleteConnectionResponse; -class ClientAPIServiceImpl final : public ClientAPI::Service { +class SDKAPIServiceImpl final : public SDKAPI::Service { public: Status CreateConnection(ServerContext* sctx, const CreateConnectionRequest* req, CreateConnectionResponse* resp) override { @@ -48,7 +48,7 @@ class ClientAPIServiceImpl final : public ClientAPI::Service { std::string conn_id; auto& mgr = connection::local_manager; - int err = mgr.create_connection(ctx, conn_id, ¶m, &memif_param); + int err = mgr.create_connection_sdk(ctx, conn_id, ¶m, &memif_param); if (err) { log::error("create_local_conn() failed (%d)", err); return Status(StatusCode::INTERNAL, @@ -63,8 +63,8 @@ class ClientAPIServiceImpl final : public ClientAPI::Service { sizeof(memif_conn_param)); resp->set_memif_conn_param(memif_param_str); - log::info("Connection created")("id", resp->conn_id()) - ("client_id", resp->client_id()); + log::info("[SDK] Connection created")("id", resp->conn_id()) + ("client_id", resp->client_id()); return Status::OK; } @@ -72,33 +72,33 @@ class ClientAPIServiceImpl final : public ClientAPI::Service { DeleteConnectionResponse* resp) override { auto ctx = context::WithCancel(context::Background()); - const auto& conn_id = req->conn_id(); + auto conn_id = req->conn_id(); auto& mgr = connection::local_manager; - int err = mgr.delete_connection(ctx, conn_id); + int err = mgr.delete_connection_sdk(ctx, conn_id); if (err) log::error("delete_local_conn err (%d)", err); else - log::info("Connection deleted")("id", req->conn_id()) - ("client_id", req->client_id()); + log::info("[SDK] Connection deleted")("id", req->conn_id()) + ("client_id", req->client_id()); return Status::OK; } }; -void RunClientAPIServer(context::Context& ctx) { +void RunSDKAPIServer(context::Context& ctx) { std::string server_address("0.0.0.0:50050"); // gRPC default 50051 - ClientAPIServiceImpl service; + SDKAPIServiceImpl service; ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); - log::info("Client API Server listening on %s", server_address.c_str()); + log::info("SDK API Server listening on %s", server_address.c_str()); std::jthread th([&]() { ctx.done(); - log::info("Shutting down Client API Server"); + log::info("Shutting down SDK API Server"); server->Shutdown(); }); diff --git a/media-proxy/src/mesh/conn.cc b/media-proxy/src/mesh/conn.cc index 186e4e11..05517ecd 100644 --- a/media-proxy/src/mesh/conn.cc +++ b/media-proxy/src/mesh/conn.cc @@ -5,23 +5,13 @@ */ #include "conn.h" +#include namespace mesh::connection { Connection::Connection() { - _kind = Kind::undefined; - _state = State::not_configured; - _status = Status::initial; - setting_link = false; - transmitting = false; - _link = nullptr; - - metrics.inbound_bytes = 0; - metrics.outbound_bytes = 0; - metrics.transactions_successful = 0; - metrics.transactions_failed = 0; - metrics.errors = 0; + std::memset(&metrics, 0, sizeof(metrics)); } Connection::~Connection() @@ -133,19 +123,19 @@ Result Connection::transmit(context::Context& ctx, void *ptr, uint32_t sz) metrics.inbound_bytes += sz; - transmitting.store(true); - setting_link.wait(true); + transmitting.store(true, std::memory_order_release); + setting_link.wait(true, std::memory_order_acquire); uint32_t sent = 0; Result res = _link->do_receive(ctx, ptr, sz, sent); - transmitting.store(false); + transmitting.store(false, std::memory_order_release); transmitting.notify_one(); metrics.outbound_bytes += sent; if (res == Result::success) - metrics.transactions_successful++; + metrics.transactions_succeeded++; else metrics.transactions_failed++; @@ -167,7 +157,7 @@ Result Connection::do_receive(context::Context& ctx, void *ptr, uint32_t sz, metrics.outbound_bytes += sent; if (res == Result::success) - metrics.transactions_successful++; + metrics.transactions_succeeded++; else metrics.transactions_failed++; @@ -187,8 +177,11 @@ Result Connection::on_receive(context::Context& ctx, void *ptr, uint32_t sz, return Result::error_not_supported; } -Result Connection::set_link(context::Context& ctx, Connection *new_link) +Result Connection::set_link(context::Context& ctx, Connection *new_link, + Connection *requester) { + (void)requester; + // WARNING: Changing a link will affect the hot path of Data Plane. // Avoid any unnecessary operations that can increase latency. @@ -198,12 +191,12 @@ Result Connection::set_link(context::Context& ctx, Connection *new_link) // TODO: generate an Event (conn_changing_link). // Use context to cancel sending the Event. - setting_link.store(true); - transmitting.wait(true); + setting_link.store(true, std::memory_order_release); + transmitting.wait(true, std::memory_order_acquire); _link = new_link; - setting_link.store(false); + setting_link.store(false, std::memory_order_release); setting_link.notify_one(); // TODO: generate a post Event (conn_link_changed). @@ -225,6 +218,50 @@ Result Connection::set_result(Result res) return res; } +void Connection::collect(telemetry::Metric& metric, const int64_t& timestamp_ms) +{ + uint64_t in = metrics.inbound_bytes; + uint64_t out = metrics.outbound_bytes; + uint32_t strn = metrics.transactions_succeeded; + + metric.addFieldString("state", state2str(state())); + metric.addFieldBool("link", link() != nullptr); + metric.addFieldUint64("in", in); + metric.addFieldUint64("out", out); + metric.addFieldUint64("strn", strn); + metric.addFieldUint64("ftrn", metrics.transactions_failed); + metric.addFieldUint64("err", metrics.errors); + + auto dt = timestamp_ms - metrics.prev_timestamp_ms; + + if (metrics.prev_inbound_bytes) { + uint64_t bw = (in - metrics.prev_inbound_bytes); + bw = (bw * 8 * 1000) / dt; + metric.addFieldDouble("inbw", (bw/1000)/1000.0); + } + metrics.prev_inbound_bytes = in; + + if (metrics.prev_outbound_bytes) { + uint64_t bw = (out - metrics.prev_outbound_bytes); + bw = (bw * 8 * 1000) / dt; + metric.addFieldDouble("outbw", (bw/1000)/1000.0); + } + metrics.prev_outbound_bytes = out; + + if (metrics.prev_transactions_succeeded) { + uint64_t tps = (strn - metrics.prev_transactions_succeeded); + tps = (tps * 10 * 1000) / dt; + metric.addFieldDouble("tps", tps/10.0); + } + metrics.prev_transactions_succeeded = strn; + + metrics.prev_timestamp_ms = timestamp_ms; + + auto errors_delta = metrics.errors - metrics.prev_errors; + metrics.prev_errors = metrics.errors; + metric.addFieldUint64("errd", errors_delta); +} + static const char str_unknown[] = "?unknown?"; const char * kind2str(Kind kind, bool brief) @@ -274,7 +311,6 @@ const char * result2str(Result res) case Result::error_bad_argument: return "bad argument"; case Result::error_out_of_memory: return "out of memory"; case Result::error_general_failure: return "general failure"; - case Result::error_context_cancelled: return "context cancelled"; default: return str_unknown; } } diff --git a/media-proxy/src/mesh/conn_local.cc b/media-proxy/src/mesh/conn_local.cc index 3bc0ddb4..277b39e9 100644 --- a/media-proxy/src/mesh/conn_local.cc +++ b/media-proxy/src/mesh/conn_local.cc @@ -68,23 +68,22 @@ Result Local::on_establish(context::Context& ctx) auto ret = memif_create_socket(&memif_socket, &memif_socket_args, NULL); if (ret != MEMIF_ERR_SUCCESS) { log::error("memif_create_socket: %s", memif_strerror(ret)); - return Result::error_general_failure; + return set_result(Result::error_general_failure); } memif_conn_args.socket = memif_socket; - log::debug("Create memif interface."); + // log::debug("Create memif interface."); ret = memif_create(&memif_conn, &memif_conn_args, Local::callback_on_connect, Local::callback_on_disconnect, Local::callback_on_interrupt, this); if (ret != MEMIF_ERR_SUCCESS) { log::error("memif_create: %s", memif_strerror(ret)); - return Result::error_general_failure; + return set_result(Result::error_general_failure); } // Start the memif event loop. - // TODO: Replace ctx with a context passed at creation. try { th = std::jthread([this]() { for (;;) { @@ -96,11 +95,11 @@ Result Local::on_establish(context::Context& ctx) } catch (const std::system_error& e) { log::error("thread create failed (%d)", ret); - return Result::error_out_of_memory; + return set_result(Result::error_out_of_memory); } set_state(ctx, State::active); - return Result::success; + return set_result(Result::success); } int Local::callback_on_connect(memif_conn_handle_t conn, void *private_ctx) @@ -112,6 +111,7 @@ int Local::callback_on_connect(memif_conn_handle_t conn, void *private_ctx) int err = memif_refill_queue(_this->memif_conn, 0, -1, 0); if (err != MEMIF_ERR_SUCCESS) { log::error("memif_refill_queue: %s", memif_strerror(err)); + _this->metrics.errors++; return err; } @@ -119,7 +119,7 @@ int Local::callback_on_connect(memif_conn_handle_t conn, void *private_ctx) print_memif_details(_this->memif_conn); - log::debug("Memif ready"); + // log::debug("Memif ready"); return MEMIF_ERR_SUCCESS; } @@ -136,9 +136,11 @@ int Local::callback_on_disconnect(memif_conn_handle_t conn, void *private_ctx) _this->ready = false; auto err = memif_cancel_poll_event(_this->memif_socket); - if (err != MEMIF_ERR_SUCCESS) + if (err != MEMIF_ERR_SUCCESS) { log::error("on_disconnect memif_cancel_poll_event: %s", memif_strerror(err)); + _this->metrics.errors++; + } return MEMIF_ERR_SUCCESS; } @@ -163,26 +165,30 @@ int Local::callback_on_interrupt(memif_conn_handle_t conn, void *private_ctx, err = memif_rx_burst(_this->memif_conn, qid, &shm_bufs, 1, &buf_num); if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF) { log::error("memif_rx_burst: %s", memif_strerror(err)); + _this->metrics.errors++; return err; } _this->on_memif_receive(shm_bufs.data, shm_bufs.len); err = memif_refill_queue(_this->memif_conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) + if (err != MEMIF_ERR_SUCCESS) { log::error("memif_refill_queue: %s", memif_strerror(err)); + _this->metrics.errors++; + } return 0; } Result Local::on_shutdown(context::Context& ctx) { - log::debug("Memif shutdown"); + // log::debug("Memif shutdown"); auto err = memif_cancel_poll_event(memif_socket); if (err != MEMIF_ERR_SUCCESS) { log::error("on_shutdown memif_cancel_poll_event: %s", memif_strerror(err)); + metrics.errors++; } th.join(); @@ -199,19 +205,19 @@ Result Local::on_shutdown(context::Context& ctx) uint64_t out = metrics.outbound_bytes; log::info("Local %s conn shutdown", kind2str(_kind, true)) - ("frames", metrics.transactions_successful) + ("frames", metrics.transactions_succeeded) ("in", in)("out", out)("equal", in == out); uint64_t errors = metrics.errors; uint64_t failures = metrics.transactions_failed; if (errors || failures) - log::error("Local %s conn shutdown", kind2str(_kind, true)) - ("frames_failed", failures) - ("errors", errors); + log::warn("Local %s conn shutdown", kind2str(_kind, true)) + ("frames_failed", failures) + ("errors", errors); set_state(ctx, State::closed); - return Result::success; + return set_result(Result::success); } } // namespace mesh::connection diff --git a/media-proxy/src/mesh/conn_local_tx.cc b/media-proxy/src/mesh/conn_local_tx.cc index 298acc16..677887a5 100644 --- a/media-proxy/src/mesh/conn_local_tx.cc +++ b/media-proxy/src/mesh/conn_local_tx.cc @@ -42,12 +42,12 @@ Result LocalTx::on_receive(context::Context& ctx, void *ptr, uint32_t sz, if (err != MEMIF_ERR_SUCCESS) { log::error("rx_st20p_consume_frame: Failed to alloc memif buffer: %s", memif_strerror(err)); - return Result::error_general_failure; + return set_result(Result::error_general_failure); } if (!shm_bufs.data) { log::error("Local Tx: shm_bufs.data == NULL"); - return Result::error_general_failure; + return set_result(Result::error_general_failure); } memcpy(shm_bufs.data, ptr, sz); @@ -57,9 +57,10 @@ Result LocalTx::on_receive(context::Context& ctx, void *ptr, uint32_t sz, err = memif_tx_burst(memif_conn, qid, &shm_bufs, rx_buf_num, &rx); if (err != MEMIF_ERR_SUCCESS) { log::error("rx_st20p_consume_frame memif_tx_burst: %s", memif_strerror(err)); + metrics.errors++; } - return Result::success; + return set_result(Result::success); } } // namespace mesh::connection diff --git a/media-proxy/src/mesh/manager_bridges.cc b/media-proxy/src/mesh/manager_bridges.cc new file mode 100644 index 00000000..afe1e796 --- /dev/null +++ b/media-proxy/src/mesh/manager_bridges.cc @@ -0,0 +1,115 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "manager_bridges.h" +#include "conn_registry.h" +#include "logger.h" +#include "mocked_bridge.h" + +namespace mesh::connection { + +BridgesManager bridges_manager; + +int BridgesManager::create_bridge(context::Context& ctx, Connection*& bridge, + const std::string& id, Kind kind) +{ + // DEBUG + auto mocked_bridge = new(std::nothrow) MockedBridge; + if (!mocked_bridge) + return -ENOMEM; + // DEBUG + + // if (param->type == is_tx) + // conn = new(std::nothrow) LocalRx; + // else + // conn = new(std::nothrow) LocalTx; + + // if (!conn) + // return -ENOMEM; + + // auto res = bridge->configure_memif(ctx, &memif_ops, frame_size); + // if (res != Result::success) { + // delete conn; + // return -1; + // } + + mocked_bridge->configure(ctx, kind); + + bridge = mocked_bridge; + + auto res = bridge->establish(ctx); + if (res != Result::success) { + delete bridge; + return -1; + } + + lock(); + thread::Defer d([this]{ unlock(); }); + + // Assign id accessed by metrics collector. + bridge->assign_id(id); + + registry.add(id, bridge); + + return 0; +} + +int BridgesManager::delete_bridge(context::Context& ctx, const std::string& id) +{ + auto bridge = registry.get(id); + if (!bridge) + return -1; + + { + lock(); + thread::Defer d([this]{ unlock(); }); + + if (bridge->link()) { + // log::debug("Shutdown the link"); + bridge->link()->set_link(ctx, nullptr, bridge); + bridge->set_link(ctx, nullptr); + } + + registry.remove(id); + } + + auto res = bridge->shutdown(ctx); + delete bridge; + + return 0; +} + +Connection * BridgesManager::get_bridge(context::Context& ctx, + const std::string& id) +{ + return registry.get(id); +} + +void BridgesManager::shutdown(context::Context& ctx) +{ + auto ids = registry.get_all_ids(); + + for (const std::string& id : ids) { + auto err = delete_bridge(ctx, id); + if (err) + log::error("Error deleting bridge (%d)", err) + ("bridge_id", id); + } +} + +void BridgesManager::lock() +{ + // log::debug("Bridges Manager mx lock"); + mx.lock(); +} + +void BridgesManager::unlock() +{ + // log::debug("Bridges Manager mx unlock"); + mx.unlock(); +} + +} // namespace mesh::connection diff --git a/media-proxy/src/mesh/manager_local.cc b/media-proxy/src/mesh/manager_local.cc index 5cfcb30f..ae18a238 100644 --- a/media-proxy/src/mesh/manager_local.cc +++ b/media-proxy/src/mesh/manager_local.cc @@ -11,17 +11,18 @@ #include "uuid.h" #include "logger.h" #include -#include +#include "proxy_api.h" namespace mesh::connection { LocalManager local_manager; -std::string tx_id, rx_id; +// Temporary Multipoint group business logic. +// std::string tx_id, rx_id; -int LocalManager::create_connection(context::Context& ctx, std::string& id, - mcm_conn_param *param, - memif_conn_param *memif_param) +int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, + mcm_conn_param *param, + memif_conn_param *memif_param) { if (!param) return -1; @@ -29,7 +30,7 @@ int LocalManager::create_connection(context::Context& ctx, std::string& id, bool found = false; for (int i = 0; i < 5; i++) { id = generate_uuid_v4(); - int err = registry.add(id, nullptr); + int err = registry_sdk.add(id, nullptr); if (!err) { found = true; break; @@ -78,47 +79,74 @@ int LocalManager::create_connection(context::Context& ctx, std::string& id, conn->get_params(memif_param); - std::unique_lock lk(mx); + // Prepare parameters to register in Media Proxy + std::string kind = param->type == is_tx ? "rx" : "tx"; - registry.replace(id, conn); + lock(); + thread::Defer d([this]{ unlock(); }); - // Temporary Multipoint group business logic. - // TODO: Remove when Multipoint Groups are implemented. - if (param->type == is_tx) { - auto tx_conn = registry.get(tx_id); - if (tx_conn) { - conn->set_link(ctx, tx_conn); - tx_conn->set_link(ctx, conn); - } - rx_id = id; - } else { - auto rx_conn = registry.get(rx_id); - if (rx_conn) { - conn->set_link(ctx, rx_conn); - rx_conn->set_link(ctx, conn); - } - tx_id = id; + // Register local connection in Media Proxy + std::string agent_assigned_id; + int err = proxyApiClient->RegisterConnection(agent_assigned_id, kind); + if (err) { + delete conn; + return -1; } + // Assign id accessed by metrics collector. + conn->assign_id(agent_assigned_id); + + registry_sdk.replace(id, conn); + registry.add(agent_assigned_id, conn); + // log::debug("Added local conn")("conn_id", conn->id)("id", id); + + // // Temporary Multipoint group business logic. + // // TODO: Remove when Multipoint Groups are implemented. + // if (param->type == is_tx) { + // auto tx_conn = registry_sdk.get(tx_id); + // if (tx_conn) { + // conn->set_link(ctx, tx_conn); + // tx_conn->set_link(ctx, conn); + // } + // rx_id = id; + // } else { + // auto rx_conn = registry_sdk.get(rx_id); + // if (rx_conn) { + // conn->set_link(ctx, rx_conn); + // rx_conn->set_link(ctx, conn); + // } + // tx_id = id; + // } + return 0; } -int LocalManager::delete_connection(context::Context& ctx, const std::string& id) +int LocalManager::delete_connection_sdk(context::Context& ctx, const std::string& id) { - std::unique_lock lk(mx); - - auto conn = registry.get(id); + auto conn = registry_sdk.get(id); if (!conn) return -1; - auto link = conn->link(); - if (link) { - log::debug("Shutdown the link"); - link->set_link(ctx, nullptr); - conn->set_link(ctx, nullptr); - } + log::debug("Delete local conn")("conn_id", conn->id)("id", id); + + { + lock(); + thread::Defer d([this]{ unlock(); }); + + int err = proxyApiClient->UnregisterConnection(conn->id); + if (err) { + // TODO: Handle the error. + } + + if (conn->link()) { + // log::debug("Shutdown the link"); + conn->link()->set_link(ctx, nullptr, conn); + conn->set_link(ctx, nullptr); + } - registry.remove(id); + registry.remove(conn->id); + registry_sdk.remove(id); + } auto res = conn->shutdown(ctx); delete conn; @@ -126,15 +154,34 @@ int LocalManager::delete_connection(context::Context& ctx, const std::string& id return 0; } +Connection * LocalManager::get_connection(context::Context& ctx, + const std::string& id) +{ + return registry.get(id); +} + void LocalManager::shutdown(context::Context& ctx) { - auto ids = registry.get_all_ids(); + auto ids = registry_sdk.get_all_ids(); for (const std::string& id : ids) { - auto err = this->delete_connection(ctx, id); + auto err = delete_connection_sdk(ctx, id); if (err) - log::error("Error deleting local conn (%d)", err); + log::error("Error deleting local conn (%d)", err) + ("conn_id", id); } } +void LocalManager::lock() +{ + // log::debug("Local Manager mx lock"); + mx.lock(); +} + +void LocalManager::unlock() +{ + // log::debug("Local Manager mx unlock"); + mx.unlock(); +} + } // namespace mesh::connection diff --git a/media-proxy/src/mesh/manager_multipoint.cc b/media-proxy/src/mesh/manager_multipoint.cc new file mode 100644 index 00000000..6c19871f --- /dev/null +++ b/media-proxy/src/mesh/manager_multipoint.cc @@ -0,0 +1,356 @@ +#include "manager_multipoint.h" +#include +#include +#include +#include "logger.h" +#include "manager_local.h" +#include "manager_bridges.h" + +namespace mesh::multipoint { + +GroupManager group_manager; + +Result GroupManager::apply_config(context::Context& ctx, const Config& new_cfg) +{ + std::unordered_set current_group_ids; + std::unordered_set new_group_ids; + + for (const auto& [id, _] : cfg.groups) + current_group_ids.insert(id); + + for (const auto& [id, _] : new_cfg.groups) + new_group_ids.insert(id); + + std::vector added_groups_ids; + std::vector deleted_groups_ids; + std::vector common_groups_ids; + + for (const auto& id : new_group_ids) { + if (current_group_ids.find(id) == current_group_ids.end()) + added_groups_ids.emplace_back(id); + } + for (const auto& id : current_group_ids) { + if (new_group_ids.find(id) == new_group_ids.end()) + deleted_groups_ids.emplace_back(id); + else + common_groups_ids.emplace_back(id); + } + + std::vector added_groups, deleted_groups, updated_groups; + + // Lambda function to find added and deleted connection ids in the group + auto fn = [&](std::string group_id, + const std::vector& current_ids, + const std::vector& new_ids, + std::vector& added_ids, + std::vector& deleted_ids) { + + std::unordered_set current_set(current_ids.begin(), + current_ids.end()); + std::unordered_set new_set(new_ids.begin(), + new_ids.end()); + + for (const auto& id : new_ids) { + if (current_set.find(id) == current_set.end()) + added_ids.emplace_back(id); + } + + for (const auto& id : current_ids) { + if (new_set.find(id) == new_set.end()) + deleted_ids.emplace_back(id); + } + }; + + for (const auto& group_id : common_groups_ids) { + auto it_current = cfg.groups.find(group_id); + if (it_current == cfg.groups.end()) + continue; + + auto it_new = new_cfg.groups.find(group_id); + if (it_new == new_cfg.groups.end()) + continue; + + std::vector added_conn_ids; + std::vector deleted_conn_ids; + + fn(group_id, it_current->second.conn_ids, it_new->second.conn_ids, + added_conn_ids, deleted_conn_ids); + + std::vector added_bridge_ids; + std::vector deleted_bridge_ids; + + fn(group_id, it_current->second.bridge_ids, it_new->second.bridge_ids, + added_bridge_ids, deleted_bridge_ids); + + if (added_conn_ids.empty() && deleted_conn_ids.empty() && + added_bridge_ids.empty() && deleted_bridge_ids.empty()) + continue; + + updated_groups.emplace_back(group_id, added_conn_ids, deleted_conn_ids, + added_bridge_ids, deleted_bridge_ids); + } + + for (const auto& group_id : added_groups_ids) { + auto it = new_cfg.groups.find(group_id); + if (it != new_cfg.groups.end()) + added_groups.emplace_back(group_id, it->second.conn_ids, + std::vector{}, + it->second.bridge_ids); + } + + for (const auto& group_id : deleted_groups_ids) { + auto it = cfg.groups.find(group_id); + if (it != cfg.groups.end()) + deleted_groups.emplace_back(group_id, std::vector{}, + it->second.conn_ids, + std::vector{}, + it->second.bridge_ids); + } + + cfg = std::move(new_cfg); + + // // Print added_groups_ids + // log::debug("----------------------------------------------"); + // for (const auto& group : added_groups) { + // log::debug("Added Group ID: %s", group.group_id.c_str()); + // for (const auto& id : group.added_conn_ids) { + // log::debug("Added Conn ID: %s", id.c_str()); + // } + // for (const auto& id : group.deleted_conn_ids) { + // log::debug("???Deleted Conn ID: %s", id.c_str()); + // } + // } + + // // Print deleted_groups_ids + // log::debug("----------------------------------------------"); + // for (const auto& group : deleted_groups) { + // log::debug("Deleted Group ID: %s", group.group_id.c_str()); + // for (const auto& id : group.added_conn_ids) { + // log::debug("???Added Conn ID: %s", id.c_str()); + // } + // for (const auto& id : group.deleted_conn_ids) { + // log::debug("Deleted Conn ID: %s", id.c_str()); + // } + // } + + // // Print updated_groups + // log::debug("----------------------------------------------"); + // for (const auto& group : updated_groups) { + // log::debug("Updated Group ID: %s", group.group_id.c_str()); + // for (const auto& id : group.added_conn_ids) { + // log::debug("Added Conn ID: %s", id.c_str()); + // } + // for (const auto& id : group.deleted_conn_ids) { + // log::debug("Deleted Conn ID: %s", id.c_str()); + // } + // } + // log::debug("----------------------------------------------"); + + if (ctx.cancelled()) + return Result::error_context_cancelled; + + return reconcile_config(ctx, added_groups, deleted_groups, updated_groups); +} + +Result GroupManager::reconcile_config(context::Context& ctx, + std::vector added_groups, + std::vector deleted_groups, + std::vector updated_groups) +{ + local_manager.lock(); + thread::Defer d([]{ local_manager.unlock(); }); + + // Delete entire groups, including associated connections and bridges + for (const auto& cfg : deleted_groups) { + Group *group = get_group(cfg.group_id); + if (!group) { + log::error("[RECONCILE] Delete group: not found")("group_id", cfg.group_id); + continue; + } + + log::info("[RECONCILE] Delete group and its conns")("group_id", cfg.group_id); + + if (group->link()) { + group->link()->set_link(ctx, nullptr); + group->set_link(ctx, nullptr); + } + + group->shutdown(ctx); + group->delete_all_outputs(); + + for (const auto& bridge_id : cfg.deleted_bridge_ids) { + auto err = bridges_manager.delete_bridge(ctx, bridge_id); + if (err) + log::error("[RECONCILE] Delete group del bridge: not found") + ("group_id", cfg.group_id) + ("bridge_id", bridge_id); + } + + delete_group(cfg.group_id); + delete group; + } + + // Delete some connections and bridges in existing groups + for (const auto& cfg : updated_groups) { + Group *group = get_group(cfg.group_id); + if (!group) { + log::error("[RECONCILE] Update group del: not found") + ("group_id", cfg.group_id) + ("conns", cfg.deleted_conn_ids.size()); + continue; + } + + for (const auto& conn_id : cfg.deleted_conn_ids) { + auto conn = local_manager.get_connection(ctx, conn_id); + if (!conn) { + // log::error("[RECONCILE] Delete conn: not found") + // ("group_id", cfg.group_id) + // ("conn_id", conn_id); + continue; + } + + log::info("[RECONCILE] Delete conn")("group_id", cfg.group_id) + ("conn_id", conn_id); + + if (conn->link()) { + conn->link()->set_link(ctx, nullptr, conn); + conn->set_link(ctx, nullptr); + } + } + + for (const auto& bridge_id : cfg.deleted_bridge_ids) { + auto err = bridges_manager.delete_bridge(ctx, bridge_id); + if (err) + log::error("[RECONCILE] Update group del bridge: not found") + ("group_id", cfg.group_id) + ("bridge_id", bridge_id); + } + } + + // Lambda function for adding local connections to a group + auto add_conns = [this, &ctx](Group *group, std::vector conn_ids) { + for (const auto& conn_id : conn_ids) { + auto conn = local_manager.get_connection(ctx, conn_id); + if (!conn) { + log::error("[RECONCILE] Add conn: not found") + ("group_id", group->id) + ("conn_id", conn_id); + continue; + } + + log::info("[RECONCILE] Add conn")("group_id", group->id) + ("conn_id", conn_id); + + auto res = associate(ctx, group, conn); + if (res != Result::success) + log::error("[RECONCILE] Add conn wrong kind") + ("group_id", group->id) + ("conn_id", conn_id); + } + }; + + // Lambda function for adding bridges to a group + auto add_bridges = [this, &ctx](Group *group, + std::vector bridge_ids) { + for (const auto& bridge_id : bridge_ids) { + Connection *bridge; + Kind kind = Kind::transmitter; + + // DEBUG + auto err = bridges_manager.create_bridge(ctx, bridge, bridge_id, kind); + // DEBUG + if (!bridge) { + log::error("[RECONCILE] Add bridge: ") + ("group_id", group->id) + ("bridge_id", bridge_id); + ("kind", kind2str(kind)); + continue; + } + + log::info("[RECONCILE] Add bridge")("group_id", group->id) + ("bridge_id", bridge_id); + + auto res = associate(ctx, group, bridge); + if (res != Result::success) + log::error("[RECONCILE] Add bridge wrong kind") + ("group_id", group->id) + ("bridge_id", bridge_id); + } + }; + + // Add new groups and their connections and bridges + for (const auto& cfg : added_groups) { + auto group = new(std::nothrow) Group(cfg.group_id); + if (!group) + return Result::error_out_of_memory; + + log::info("[RECONCILE] Add group")("group_id", group->id) + ("conns", cfg.added_conn_ids.size()) + ("bridges", cfg.added_bridge_ids.size()); + + group->configure(ctx); + auto res = group->establish(ctx); + if (res != Result::success) { + log::error("[RECONCILE] Group establish err: %s", result2str(res)) + ("group_id", group->id); + } + + auto err = add_group(cfg.group_id, group); + if (err) { + log::error("[RECONCILE] Add group err: %d", err)("group_id", cfg.group_id); + delete group; + continue; + } + + add_conns(group, cfg.added_conn_ids); + add_bridges(group, cfg.added_bridge_ids); + } + + // Add new connections to existing groups + for (const auto& cfg : updated_groups) { + Group *group = get_group(cfg.group_id); + if (!group) { + log::error("[RECONCILE] Update group: not found")("group_id", cfg.group_id); + continue; + } + + add_conns(group, cfg.added_conn_ids); + add_bridges(group, cfg.added_bridge_ids); + } + + log::info("[RECONCILE] Completed")("groups", groups.size()); + for (const auto& pair : groups) { + Group* group = pair.second; + + log::info("- Group") + ("group_id", group->id) + ("input", group->link() ? "assigned" : "n/a") + ("outputs", group->outputs_num()); + + } + + // TODO: Remove all bridges that are not associated to any group. + + return Result::success; +} + +Result GroupManager::associate(context::Context& ctx, Group *group, + Connection *conn) { + switch (conn->kind()) { + case Kind::receiver: + if (group->assign_input(ctx, conn) == Result::success) + conn->set_link(ctx, group); + return Result::success; + + case Kind::transmitter: + if (conn->set_link(ctx, group) == Result::success) + group->add_output(ctx, conn); + return Result::success; + + default: + return Result::error_bad_argument; + } +}; + + +} // namespace mesh::multipoint diff --git a/media-proxy/src/mesh/metrics.cc b/media-proxy/src/mesh/metrics.cc new file mode 100644 index 00000000..ced133da --- /dev/null +++ b/media-proxy/src/mesh/metrics.cc @@ -0,0 +1,33 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "metrics.h" +#include "metrics_collector.h" +#include "logger.h" + +namespace mesh::telemetry { + +MetricsProvider::MetricsProvider() +{ + registry.register_provider(this); +} + +MetricsProvider::~MetricsProvider() +{ + registry.unregister_provider(this); +} + +void MetricsProvider::assign_id(const std::string& id) +{ + this->id = id; +} + +// std::unordered_map Provider::metrics_map() +// { +// return std::unordered_map(); +// } + +} // namespace mesh::telemetry diff --git a/media-proxy/src/mesh/metrics_collector.cc b/media-proxy/src/mesh/metrics_collector.cc new file mode 100644 index 00000000..0d76690f --- /dev/null +++ b/media-proxy/src/mesh/metrics_collector.cc @@ -0,0 +1,89 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "metrics_collector.h" +#include "proxy_api.h" +#include "logger.h" +#include "manager_local.h" + +namespace mesh::telemetry { + +Registry registry; + +void Registry::register_provider(MetricsProvider *provider) +{ + std::lock_guard lk(mx); + providers.push_back(provider); +} + +void Registry::unregister_provider(MetricsProvider *provider) +{ + std::lock_guard lk(mx); + providers.remove(provider); +} + +void Registry::lock() { + mx.lock(); +} + +void Registry::unlock() { + mx.unlock(); +} + +void MetricsCollector::run(context::Context& ctx) +{ + std::vector metrics; + + while (!ctx.cancelled()) { + auto now = std::chrono::system_clock::now().time_since_epoch(); + int64_t timestamp_ms = + std::chrono::duration_cast(now).count(); + + // Temporarily block all known metric providers and entities + // responsible for creation or deleting metric providers + // from being deleted. The hot path of connections is not affected + // by this locking. + connection::local_manager.lock(); + registry.lock(); + + uint64_t n = 0; + for (auto provider : registry.providers) { + // Don't collect a metric if no id is assigned to the provider. + if (provider->id.empty()) + continue; + + metrics.emplace_back(timestamp_ms); + auto& metric = metrics.back(); + provider->collect(metric, timestamp_ms); + + // Remove the metric if no fields were added (highly unlikely). + if (metric.fields.empty()) { + metrics.pop_back(); + } else { + metric.provider_id = provider->id; + n++; + } + } + total += n; + + // Finally remove locks in reverse order. + registry.unlock(); + connection::local_manager.unlock(); + + proxyApiClient->SendMetrics(metrics); + + metrics.clear(); + + thread::Sleep(ctx, std::chrono::milliseconds(1000)); + } +} + +void MetricsCollector::collect(Metric& metric, const int64_t& timestamp_ms) +{ + metric.addFieldUint64("total", total); +} + +} // namespace mesh::telemetry diff --git a/media-proxy/src/mesh/multipoint.cc b/media-proxy/src/mesh/multipoint.cc new file mode 100644 index 00000000..d132eff8 --- /dev/null +++ b/media-proxy/src/mesh/multipoint.cc @@ -0,0 +1,158 @@ +#include "multipoint.h" +#include "logger.h" + +namespace mesh::multipoint { + +using namespace mesh::connection; + +Group::Group(const std::string& group_id) : Connection() +{ + _kind = Kind::transmitter; + id = group_id; +} + +Group::~Group() +{ +} + +void Group::configure(context::Context& ctx) +{ + set_state(ctx, State::configured); +} + +Result Group::set_link(context::Context& ctx, Connection *new_link, + Connection *requester) +{ + if (!new_link && requester) { + // Remove the requester as the group input + if (requester == _link) { + log::info("[GROUP] Remove input")("group_id", id)("id", requester->id); + return Connection::set_link(ctx, nullptr); + } + + // Remove the requester from the group outputs list + for (auto it = outputs.begin(); it != outputs.end(); ++it) { + if (*it != requester) + continue; + + log::info("[GROUP] Delete output")("group_id", id)("id", requester->id); + + setting_link.store(true, std::memory_order_release); + transmitting.wait(true, std::memory_order_acquire); + + outputs.erase(it); + + setting_link.store(false, std::memory_order_release); + setting_link.notify_one(); + break; + } + return Result::success; + } + + // log::info("[GROUP] Set link")("group_id", id)("new_link", new_link) + // ("requester", requester); + return Connection::set_link(ctx, new_link); +} + +Result Group::assign_input(context::Context& ctx, Connection *input) { + if (input->kind() != Kind::receiver) + return Result::error_bad_argument; + + log::info("[GROUP] Assign input")("group_id", id)("id", input->id); + + return set_link(ctx, input); +} + +Result Group::add_output(context::Context& ctx, Connection *output) { + if (output->kind() != Kind::transmitter) + return Result::error_bad_argument; + + log::info("[GROUP] Add output")("group_id", id)("id", output->id); + + setting_link.store(true, std::memory_order_release); + transmitting.wait(true, std::memory_order_acquire); + + outputs.emplace_back(output); + + setting_link.store(false, std::memory_order_release); + setting_link.notify_one(); + + return Result::success; +} + +Result Group::on_establish(context::Context& ctx) +{ + set_state(ctx, State::active); + set_status(ctx, Status::healthy); + + return Result::success; +} + +Result Group::on_receive(context::Context& ctx, void *ptr, + uint32_t sz, uint32_t& sent) +{ + if (state() != State::active) + return set_result(Result::error_wrong_state); + + if (!_link) + return set_result(Result::error_no_link_assigned); + + metrics.inbound_bytes += sz; + + auto res = Result::success; + uint32_t total_sent = 0; + uint32_t errors = 0; + + if (outputs.empty()) + return Result::error_no_link_assigned; + + transmitting.store(true, std::memory_order_release); + setting_link.wait(true, std::memory_order_acquire); + + for (Connection *output : outputs) { + if (!output) { + errors++; + continue; + } + + uint32_t out_sent = 0; + res = output->do_receive(ctx, ptr, sz, out_sent); + + total_sent += out_sent; + + if (res != Result::success) + errors++; + } + + transmitting.store(false, std::memory_order_release); + transmitting.notify_one(); + + sent = sz; + metrics.outbound_bytes += total_sent; + metrics.errors += errors; + + if (!errors) + metrics.transactions_succeeded++; + else + metrics.transactions_failed++; + + return Result::success; +} + +Result Group::on_shutdown(context::Context& ctx) +{ + set_link(ctx, nullptr); + + outputs.clear(); + + set_state(ctx, State::closed); + set_status(ctx, Status::shutdown); + + return Result::success; +} + +void Group::on_delete(context::Context& ctx) +{ +} + +} // namespace mesh::multipoint diff --git a/media-proxy/src/mesh/proxy_api.cc b/media-proxy/src/mesh/proxy_api.cc new file mode 100644 index 00000000..f80f01e8 --- /dev/null +++ b/media-proxy/src/mesh/proxy_api.cc @@ -0,0 +1,331 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "proxy_api.h" + +#include +#include +#include + +#include +#include +#include "logger.h" +#include "manager_multipoint.h" + +namespace mesh { + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::Status; +using mediaproxy::ProxyAPI; +using mediaproxy::RegisterMediaProxyRequest; +using mediaproxy::RegisterMediaProxyReply; +using mediaproxy::UnregisterMediaProxyRequest; +using mediaproxy::UnregisterMediaProxyReply; +using mediaproxy::RegisterConnectionRequest; +using mediaproxy::RegisterConnectionReply; +using mediaproxy::UnregisterConnectionRequest; +using mediaproxy::UnregisterConnectionReply; +using mediaproxy::SendMetricsRequest; +using mediaproxy::SendMetricsReply; +using mediaproxy::Metric; +using mediaproxy::MetricField; +using mediaproxy::StartCommandQueueRequest; +using mediaproxy::CommandRequest; +using mediaproxy::CommandReplyReceipt; +using mediaproxy::DebugReply; +using mediaproxy::ApplyConfigReply; + +std::unique_ptr proxyApiClient; + +int ProxyAPIClient::RegisterMediaProxy(uint32_t sdk_port) +{ + RegisterMediaProxyRequest request; + request.set_sdk_port(sdk_port); + + RegisterMediaProxyReply reply; + ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::seconds(5)); + + Status status = stub_->RegisterMediaProxy(&context, request, &reply); + + if (status.ok()) { + proxy_id = reply.proxy_id(); + return 0; + } else { + log::error("RegisterMediaProxy RPC failed: %s", + status.error_message().c_str()); + return -1; + } +} + +int ProxyAPIClient::UnregisterMediaProxy() +{ + UnregisterMediaProxyRequest request; + request.set_proxy_id(proxy_id); + + UnregisterMediaProxyReply reply; + ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::seconds(5)); + + Status status = stub_->UnregisterMediaProxy(&context, request, &reply); + + if (status.ok()) { + return 0; + } else { + log::error("UnregisterMediaProxy RPC failed: %s", + status.error_message().c_str()); + return -1; + } +} + +int ProxyAPIClient::RegisterConnection(std::string& conn_id, std::string& kind) +{ + RegisterConnectionRequest request; + request.set_proxy_id(proxy_id); + request.set_kind(kind); + + RegisterConnectionReply reply; + ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::seconds(5)); + + Status status = stub_->RegisterConnection(&context, request, &reply); + + if (status.ok()) { + conn_id = reply.conn_id(); + return 0; + } else { + log::error("RegisterConnection RPC failed: %s", + status.error_message().c_str()); + return -1; + } +} + +int ProxyAPIClient::UnregisterConnection(const std::string& conn_id) +{ + UnregisterConnectionRequest request; + request.set_conn_id(conn_id); + request.set_proxy_id(proxy_id); + + UnregisterConnectionReply reply; + ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::seconds(5)); + + Status status = stub_->UnregisterConnection(&context, request, &reply); + + if (status.ok()) { + return 0; + } else { + log::error("UnregisterConnection RPC failed: %s", + status.error_message().c_str()); + return -1; + } +} + +int ProxyAPIClient::SendMetrics(const std::vector& metrics) +{ + SendMetricsRequest request; + + for (const auto& metric : metrics) { + Metric* out_metric = request.add_metrics(); + out_metric->set_timestamp_ms(metric.timestamp_ms); + out_metric->set_provider_id(metric.provider_id); + + for (const auto& field : metric.fields) { + MetricField* out_field = out_metric->add_fields(); + out_field->set_name(field.name); + if (std::holds_alternative(field.value)) { + out_field->set_str_value(std::get(field.value)); + } else if (std::holds_alternative(field.value)) { + out_field->set_uint_value(std::get(field.value)); + } else if (std::holds_alternative(field.value)) { + out_field->set_double_value(std::get(field.value)); + } else if (std::holds_alternative(field.value)) { + out_field->set_bool_value(std::get(field.value)); + } + } + } + + SendMetricsReply reply; + ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::seconds(5)); + + Status status = stub_->SendMetrics(&context, request, &reply); + + if (!status.ok()) { + log::error("Failed to send metrics: %s", + status.error_message().c_str()); + return -1; + } + return 0; +} + +int ProxyAPIClient::SendCommandReply(CommandReply& request) +{ + CommandReplyReceipt reply; + + ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::seconds(5)); + + Status status = stub_->SendCommandReply(&context, request, &reply); + + if (!status.ok()) { + log::error("SendCommandReply RPC failed: %s", + status.error_message().c_str()); + return -1; + } + + // log::debug("Command reply sent successfully"); + return 0; +} + +int ProxyAPIClient::StartCommandQueue(context::Context& ctx) +{ + StartCommandQueueRequest request; + request.set_proxy_id(proxy_id); + + ClientContext context; + + std::jthread shutdown_thread([&]() { + ctx.done(); + context.TryCancel(); + }); + + std::unique_ptr> reader( + stub_->StartCommandQueue(&context, request)); + + CommandRequest command_request; + while (reader->Read(&command_request)) { + CommandReply result_request; + result_request.set_req_id(command_request.req_id()); + result_request.set_proxy_id(proxy_id); + + switch (command_request.command_case()) { + + case CommandRequest::kDebug: + { + log::debug("Received Debug command: %s", + command_request.debug().in_text().c_str()) + ("req_id", command_request.req_id()); + + + // Create and populate the CommandReply message + DebugReply* reply = new DebugReply(); + reply->set_out_text("Okay Okay!"); + result_request.set_allocated_debug(reply); + + SendCommandReply(result_request); + } + break; + + case CommandRequest::kApplyConfig: + { + auto& req = command_request.apply_config(); + + multipoint::Config config; + + log::info("[AGENT] ApplyConfig") + ("groups", req.groups_size()) + ("bridges", req.bridges_size()); + + for (const auto& group : req.groups()) { + multipoint::GroupConfig group_config; + + log::debug("- Multipoint Group") + ("group_id", group.group_id()) + ("conns", group.conn_ids_size()) + ("bridges", group.bridge_ids_size()); + + for (const auto& conn_id : group.conn_ids()) { + // log::debug("-- Conn")("conn_id", conn_id); + group_config.conn_ids.emplace_back(conn_id); + } + for (const auto& bridge_id : group.bridge_ids()) { + // log::debug("-- Bridge")("bridge_id", bridge_id); + group_config.bridge_ids.emplace_back(bridge_id); + } + + config.groups[group.group_id()] = group_config; + } + + for (const auto& bridge : req.bridges()) { + log::debug("- Bridge config") + ("bridge_id", bridge.bridge_id()) + ("type", bridge.type()); + // TODO: Add more configuration fields if needed + } + + multipoint::group_manager.apply_config(ctx, config); + // TODO: Do we need to check and return the result here? + + ApplyConfigReply* reply = new ApplyConfigReply(); + result_request.set_allocated_apply_config(reply); + + SendCommandReply(result_request); + } + break; + + // case CommandRequest::kResetMetrics: + // std::cout << "Received Reset Metrics Command: " << request->reset_metrics().metric_id() << std::endl; + // break; + + default: + log::error("Unknown proxy command") + ("req_id", command_request.req_id()); + break; + } + + } + + Status status = reader->Finish(); + if (!status.ok() && status.error_code() != grpc::StatusCode::CANCELLED) { + log::error("StartCommandQueue RPC failed: %s", + status.error_message().c_str()); + return -1; + } + return 0; +} + +int ProxyAPIClient::Run(context::Context& ctx) +{ + RegisterMediaProxy(12345); + + try { + th = std::jthread([&]() { + StartCommandQueue(ctx); + }); + } + catch (const std::system_error& e) { + log::error("Proxy API thread create failed (out of memory)"); + return -1; + } + + return 0; +} + +void ProxyAPIClient::Shutdown() +{ + UnregisterMediaProxy(); + + th.join(); +} + +int RunProxyAPIClient(context::Context& ctx) +{ + proxyApiClient = std::make_unique( + grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials())); + + return proxyApiClient->Run(ctx); +} + +} // namespace mesh diff --git a/protos/controller.proto b/protos/controller.proto deleted file mode 100644 index 18ab486d..00000000 --- a/protos/controller.proto +++ /dev/null @@ -1,257 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -syntax = "proto3"; - -package controller; - -service Configure { - rpc TxStart (TxControlRequest) returns (ControlReply) {} - rpc RxStart (RxControlRequest) returns (ControlReply) {} - rpc TxStop (StopControlRequest) returns (ControlReply) {} - rpc RxStop (StopControlRequest) returns (ControlReply) {} - rpc Stop (StopControlRequest) returns (ControlReply) {} -} - -enum StPmdType { - ST_PMD_DPDK_USER = 0; - ST_PMD_DPDK_AF_XDP = 1; - ST_PMD_DPDK_AF_PACKET = 2; - ST_PMD_KERNEL_SOCKET = 3; - ST_PMD_NATIVE_AF_XDP = 4; - ST_PMD_TYPE_MAX = 5; -}; - -enum StLogLevel { - MTL_LOG_LEVEL_DEBUG = 0; - MTL_LOG_LEVEL_INFO = 1; - MTL_LOG_LEVEL_WARNING = 2; - MTL_LOG_LEVEL_ERROR = 3; - MTL_LOG_LEVEL_MAX = 4; -}; - -enum StFps { - ST_FPS_P59_94 = 0; - ST_FPS_P50 = 1; - ST_FPS_P29_97 = 2; - ST_FPS_P25 = 3; - ST_FPS_P119_88 = 4; - ST_FPS_P120 = 5; - ST_FPS_P100 = 6; - ST_FPS_P60 = 7; - ST_FPS_P30 = 8; - ST_FPS_P24 = 9; - ST_FPS_P23_98 = 10; - ST_FPS_MAX = 11; -}; - -enum St20Fmt { - ST20_FMT_YUV_422_10BIT = 0; - ST20_FMT_YUV_422_8BIT = 1; - ST20_FMT_YUV_422_12BIT = 2; - ST20_FMT_YUV_422_16BIT = 3; - ST20_FMT_YUV_420_8BIT = 4; - ST20_FMT_YUV_420_10BIT = 5; - ST20_FMT_YUV_420_12BIT = 6; - ST20_FMT_RGB_8BIT = 7; - ST20_FMT_RGB_10BIT = 8; - ST20_FMT_RGB_12BIT = 9; - ST20_FMT_RGB_16BIT = 10; - ST20_FMT_YUV_444_8BIT = 11; - ST20_FMT_YUV_444_10BIT = 12; - ST20_FMT_YUV_444_12BIT = 13; - ST20_FMT_YUV_444_16BIT = 14; - ST20_FMT_MAX = 15; -}; - -enum StFrameFmt { - ST_FRAME_FMT_YUV422PLANAR10LE = 0; - ST_FRAME_FMT_V210 = 1; - ST_FRAME_FMT_YUV422PLANAR8 = 2; - ST_FRAME_FMT_YUV422PACKED8 = 3; - ST_FRAME_FMT_YUV422RFC4175PG2BE10 = 4; - ST_FRAME_FMT_ARGB = 8; - ST_FRAME_FMT_BGRA = 9; - ST_FRAME_FMT_RGB8 = 10; - ST_FRAME_FMT_JPEGXS_CODESTREAM = 24; - ST_FRAME_FMT_MAX = 25; -}; - -enum StPluginDevice { - ST_PLUGIN_DEVICE_AUTO = 0; - ST_PLUGIN_DEVICE_CPU = 1; - ST_PLUGIN_DEVICE_GPU = 2; - ST_PLUGIN_DEVICE_FPGA = 3; - ST_PLUGIN_DEVICE_TEST = 4; - ST_PLUGIN_DEVICE_MAX = 5; -}; - -message StInit { - uint32 number_ports = 1; - string primary_port = 2; - string redundant_port = 3; - StPmdType pmd_type = 4; - repeated uint32 primary_sip_addr = 5; - repeated uint32 redundant_sip_addr = 6; - uint64 flags = 7; - StLogLevel log_level = 8; - string logical_cores = 9; - uint32 tx_sessions_cnt_max = 10; - uint32 rx_sessions_cnt_max = 11; -} - -message StRxPort { - uint32 number_ports = 1; - uint32 payload_type = 2; - string port = 3; - repeated uint32 sip_addr = 4; - uint32 udp_port = 5; -} - -message StTxPort { - uint32 number_ports = 1; - uint32 payload_type = 2; - string port = 3; - repeated uint32 dip_addr = 4; - uint32 udp_port = 5; -} - -message St20pRxOps { - StRxPort rx_port = 1; - string name = 2; - uint32 width = 3; - uint32 height = 4; - uint32 framebuffer_cnt = 5; - StFps fps = 6; - St20Fmt transport_fmt = 7; - StFrameFmt output_fmt = 8; - StPluginDevice device = 9; -} - -message St20pTxOps { - StTxPort tx_port = 1; - string name = 2; - uint32 width = 3; - uint32 height = 4; - uint32 framebuffer_cnt = 5; - StFps fps = 6; - St20Fmt transport_fmt = 7; - StFrameFmt input_fmt = 8; - StPluginDevice device = 9; - repeated uint32 tx_dst_mac = 10; -} - -message MemIFOps { - string app_name = 1; - string interface_name = 2; - string socket_path = 3; -} - -message TxControlRequest { - StInit st_init = 1; - St20pTxOps st20_tx = 2; - MemIFOps memif_ops = 3; -} - -message TxSessionParams { - string name = 1; - StTxPort tx_port = 2; - uint32 width = 3; - uint32 height = 4; - StFps fps = 6; - StFrameFmt format = 8; -} - -message RxControlRequest { - StInit st_init = 1; - St20pRxOps st20_rx = 2; - MemIFOps memif_ops = 3; -} - -message StopControlRequest { - int32 session_id = 1; - // stop parameters... -} - -message ControlReply { - string message = 1; -} - -// for MSM Control Plane -enum StreamOperation { - CREATE = 0; - UPDATE = 1; - DELETE = 2; - ADD_EP = 3; - DEL_EP = 4; - UPD_EP = 5; -} - -enum ProxyProtocol { - TCP = 0; - UDP = 1; - QUIC = 2; - RTP = 3; -} - -enum Encap { - TCP_IP = 0; - UDP_IP = 1; - QUIC_IP = 2; - RTP_UDP = 3; - RTP_UDP_MUX = 4; - RTP_TCP = 5; - RTP_TCP_MUX = 6; - RTP_QUIC_STREAM = 7; - RTP_QUIC_DGRAM = 8; -} - -message Endpoint { - string ip = 1; - uint32 port = 2; - uint32 quic_stream = 3; - uint32 encap = 4; -} - -message StreamData { - uint32 id = 1; - StreamOperation operation = 2; - ProxyProtocol protocol = 3; - Endpoint endpoint = 4; - bool enable = 5; -} - -message StreamResult { - bool success = 1; - string error_message = 2; -} - -service MsmDataPlane { - rpc stream_add_del (StreamData) returns (StreamResult) {} -} - -// Health check request to find out the readiness/liveness. -message HealthCheckRequest { - string service = 1; -} - -// Health check response. -message HealthCheckResponse { - enum ServingStatus { - UNKNOWN = 0; - SERVING = 1; - NOT_SERVING = 2; - SERVICE_UNKNOWN = 3; // Used only by the Watch method. - - } - ServingStatus status = 1; -} - -// Health check request. -service Health { - rpc Check(HealthCheckRequest) returns (HealthCheckResponse); - rpc Watch(HealthCheckRequest) returns (HealthCheckResponse); -} diff --git a/protos/mediaproxy.proto b/protos/mediaproxy.proto index 281a33bb..fa415048 100644 --- a/protos/mediaproxy.proto +++ b/protos/mediaproxy.proto @@ -4,11 +4,11 @@ syntax = "proto3"; -option go_package = "control-plane-agent/api/proxy/proto/mediaproxy"; - package mediaproxy; -service ControlAPI { +option go_package = "control-plane-agent/api/proxy/proto/mediaproxy"; + +service ProxyAPI { rpc RegisterMediaProxy (RegisterMediaProxyRequest) returns (RegisterMediaProxyReply) {} rpc UnregisterMediaProxy (UnregisterMediaProxyRequest) returns (UnregisterMediaProxyReply) {} @@ -17,14 +17,35 @@ service ControlAPI { rpc UnregisterConnection (UnregisterConnectionRequest) returns (UnregisterConnectionReply) {} - rpc StartCommandQueue (StartCommandQueueRequest) returns (stream CommandMessage) {} + rpc StartCommandQueue (StartCommandQueueRequest) returns (stream CommandRequest) {} + + rpc SendCommandReply (CommandReply) returns (CommandReplyReceipt) {} - // rpc ReportCommandResult (ReportCommandResultRequest) returns (ReportCommandResultReply) {} + rpc SendMetrics (SendMetricsRequest) returns (SendMetricsReply) {} } -// message ReportCommandResultRequest { -// string proxy_id = 1; // id assigned by Agent at registration -// } +message MetricField { + string name = 1; + oneof value { + string str_value = 2; + uint64 uint_value = 3; + double double_value = 4; + bool bool_value = 5; + } +} + +message Metric { + int64 timestamp_ms = 1; + string provider_id = 2; + repeated MetricField fields = 3; +} + +message SendMetricsRequest { + repeated Metric metrics = 1; +} + +message SendMetricsReply { +} message RegisterMediaProxyRequest { uint32 sdk_port = 1; // SDK API port - something to identify Media Proxy @@ -39,12 +60,11 @@ message UnregisterMediaProxyRequest { } message UnregisterMediaProxyReply { - // Empty reply } message RegisterConnectionRequest { string proxy_id = 1; - uint32 kind = 2; + string kind = 2; uint32 conn_type = 3; uint32 payload_type = 4; uint64 buffer_size = 5; @@ -60,14 +80,65 @@ message UnregisterConnectionRequest { } message UnregisterConnectionReply { - // Empty reply } message StartCommandQueueRequest { - string proxy_id = 1; // Media Proxy id + string proxy_id = 1; +} + +message CommandRequest { + string req_id = 1; + oneof command { + ApplyConfigRequest apply_config = 6; + ResetMetricsRequest reset_metrics = 100; + DebugRequest debug = 101; + } +} + +message MultipointGroup { + string group_id = 1; + repeated string conn_ids = 2; + repeated string bridge_ids = 3; +} + +message Bridge { + string bridge_id = 1; + string type = 2; + // TODO: Add more configuration fields +} + +message ApplyConfigRequest { + repeated MultipointGroup groups = 1; + repeated Bridge bridges = 2; +} + +message ResetMetricsRequest { +} + +message DebugRequest { + string in_text = 1; +} + +message CommandReply { + string req_id = 1; + string proxy_id = 2; + oneof reply { + ApplyConfigReply apply_config = 7; + ResetMetricsReply reset_metrics = 100; + DebugReply debug = 101; + } +} + +message ApplyConfigReply { + repeated string error = 1; +} + +message ResetMetricsReply { +} + +message DebugReply { + string out_text = 1; } -message CommandMessage { - string opcode = 1; - string id = 2; +message CommandReplyReceipt { } diff --git a/protos/sdk.proto b/protos/sdk.proto index 59004841..2c32ff88 100644 --- a/protos/sdk.proto +++ b/protos/sdk.proto @@ -6,7 +6,7 @@ syntax = "proto3"; package sdk; -service ClientAPI { +service SDKAPI { rpc CreateConnection (CreateConnectionRequest) returns (CreateConnectionResponse); rpc DeleteConnection (DeleteConnectionRequest) returns (DeleteConnectionResponse); } diff --git a/sdk/src/mesh_client_api.cc b/sdk/src/mesh_client_api.cc index a36e8785..ffa001bc 100644 --- a/sdk/src/mesh_client_api.cc +++ b/sdk/src/mesh_client_api.cc @@ -17,7 +17,7 @@ using grpc::Channel; using grpc::ClientContext; using grpc::Status; -using sdk::ClientAPI; +using sdk::SDKAPI; using sdk::CreateConnectionRequest; using sdk::CreateConnectionResponse; using sdk::DeleteConnectionRequest; @@ -25,10 +25,10 @@ using sdk::DeleteConnectionResponse; using namespace mesh; -class ClientAPIClient { +class SDKAPIClient { public: - ClientAPIClient(std::shared_ptr channel) - : stub_(ClientAPI::NewStub(channel)) {} + SDKAPIClient(std::shared_ptr channel) + : stub_(SDKAPI::NewStub(channel)) {} int CreateConnection(std::string& conn_id, mcm_conn_param *param, memif_conn_param *memif_param) { @@ -94,21 +94,21 @@ class ClientAPIClient { std::string client_id; private: - std::unique_ptr stub_; + std::unique_ptr stub_; }; void * mesh_grpc_create_client() { auto client = new(std::nothrow) - ClientAPIClient(grpc::CreateChannel("localhost:50050", // gRPC default 50051 - grpc::InsecureChannelCredentials())); + SDKAPIClient(grpc::CreateChannel("localhost:50050", // gRPC default 50051 + grpc::InsecureChannelCredentials())); return client; } void mesh_grpc_destroy_client(void *client) { if (client) { - auto cli = static_cast(client); + auto cli = static_cast(client); delete cli; } } @@ -118,7 +118,7 @@ class GrpcConn { // This declaration must go first to allow proper type casting. mcm_conn_context *handle; - ClientAPIClient *client; + SDKAPIClient *client; std::string conn_id; }; @@ -132,7 +132,7 @@ void * mesh_grpc_create_conn(void *client, mcm_conn_param *param) if (!client) return NULL; - auto cli = static_cast(client); + auto cli = static_cast(client); auto conn = new(std::nothrow) GrpcConn(); if (!conn)