Skip to content

Commit

Permalink
Add support for Multipoint Groups and telemetry (#288)
Browse files Browse the repository at this point in the history
* Add gRPC based API to interface with MCM Agent.
* Add Manager to control the life cycle of Multipoint Groups.
* Add basic support for Bridges.
* Add Manager to control the life cycle of Bridges.
* Add metrics to the connection base class to support telemetry.
* Add metric collector engine to support telemetry.
* Add printing call stack when the app crashes.
* Minor fixes to the build system.

Signed-off-by: Konstantin Ilichev <[email protected]>
  • Loading branch information
ko80 authored Dec 11, 2024
1 parent c65ef80 commit 55c8b29
Show file tree
Hide file tree
Showing 32 changed files with 1,852 additions and 558 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ as_root ldconfig
export LD_LIBRARY_PATH="${PREFIX_DIR}/usr/local/lib:/usr/local/lib64"
"${MCM_BUILD_DIR}/bin/sdk_unit_tests"
"${MCM_BUILD_DIR}/bin/media_proxy_unit_tests"
ln -s "${MCM_BUILD_DIR}" "${SCRIPT_DIR}/build"
ln -sf "${MCM_BUILD_DIR}" "${SCRIPT_DIR}/build"

log_info "Build Succeeded"
2 changes: 2 additions & 0 deletions ffmpeg-plugin/build-ffmpeg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ BUILD_DIR="${BUILD_DIR:-${REPOSITORY_DIR}/_build}"
lib_setup_ffmpeg_dir_and_version "${FFMPEG_VER:-7.0}"
export FFMPEG_DIR="${BUILD_DIR}/${FFMPEG_SUB_DIR}"

cp -f "${SCRIPT_DIR}/mcm_"* "${FFMPEG_DIR}/libavdevice/"

make -C "${FFMPEG_DIR}" -j "$(nproc)"
as_root make -C "${FFMPEG_DIR}" install

Expand Down
14 changes: 13 additions & 1 deletion media-proxy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ project(MediaProxy VERSION 0.0.1 LANGUAGES CXX C)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -pthread")

# Set the build type to Debug by default
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Debug CACHE STRING "Choose the type of build." FORCE)
endif()

# Enable debug symbols
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -g")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g")

# Ensure that the binary is not stripped
set(CMAKE_EXE_LINKER_FLAGS_DEBUG "${CMAKE_EXE_LINKER_FLAGS_DEBUG} -Wl,--no-strip")

# Enabled by default
option(ENABLE_ZERO_COPY "Enable zero-copy mode to avoid memory copying between Media Proxy and MTL" ON)
IF(ENABLE_ZERO_COPY)
Expand All @@ -36,7 +48,6 @@ endif()
# Define the .proto files
set(PROTO_FILES
${PROTO_DIR}/mediaproxy.proto
${PROTO_DIR}/controller.proto
${PROTO_DIR}/sdk.proto
)

Expand Down Expand Up @@ -111,6 +122,7 @@ target_link_libraries(media_proxy_lib PUBLIC m ${MTL_LIB} ${MEMIF_LIB} ${LIBFABR

add_executable(media_proxy ${proxy_srcs} src/media_proxy.cc)
target_link_libraries(media_proxy PRIVATE media_proxy_lib)
target_link_libraries(media_proxy PRIVATE dl)

install(TARGETS media_proxy DESTINATION ${CMAKE_INSTALL_PATH} COMPONENT media_proxy)
install(FILES imtl.json DESTINATION ${CMAKE_CONFIG_PATH} COMPONENT config)
73 changes: 0 additions & 73 deletions media-proxy/include/api_server_grpc.h

This file was deleted.

2 changes: 1 addition & 1 deletion media-proxy/include/mesh/client_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace mesh {

void RunClientAPIServer(context::Context& ctx);
void RunSDKAPIServer(context::Context& ctx);

} // namespace mesh

Expand Down
29 changes: 20 additions & 9 deletions media-proxy/include/mesh/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <atomic>
#include <cstddef>
#include "concurrency.h"
#include "metrics.h"

namespace mesh::connection {

Expand Down Expand Up @@ -67,6 +68,7 @@ enum class Result {
error_out_of_memory,
error_general_failure,
error_context_cancelled,

// TODO: more error codes to be added...
};

Expand All @@ -76,7 +78,7 @@ enum class Result {
* Base abstract class of connection. All connection implementations must
* inherit this class.
*/
class Connection {
class Connection : public telemetry::MetricsProvider {

public:
Connection();
Expand All @@ -86,7 +88,8 @@ class Connection {
State state();
Status status();

Result set_link(context::Context& ctx, Connection *new_link);
virtual Result set_link(context::Context& ctx, Connection *new_link,
Connection *requester = nullptr);
Connection * link();

Result establish(context::Context& ctx);
Expand Down Expand Up @@ -117,22 +120,30 @@ class Connection {
uint32_t& sent);
virtual void on_delete(context::Context& ctx) {}

Kind _kind; // must be properly set in the derived classes ctor
Connection *_link;
Kind _kind = Kind::undefined; // must be properly set in the derived class ctor
Connection *_link = nullptr;
std::atomic<bool> setting_link = false; // held in set_link()
std::atomic<bool> transmitting = false; // held in on_receive()

struct {
std::atomic<uint64_t> inbound_bytes;
std::atomic<uint64_t> outbound_bytes;
std::atomic<uint32_t> transactions_successful;
std::atomic<uint32_t> transactions_succeeded;
std::atomic<uint32_t> transactions_failed;
std::atomic<uint32_t> errors;

int64_t prev_timestamp_ms;
uint64_t prev_inbound_bytes;
uint64_t prev_outbound_bytes;
uint32_t prev_errors;
uint32_t prev_transactions_succeeded;
} metrics;

private:
std::atomic<State> _state;
std::atomic<Status> _status;
std::atomic<bool> setting_link; // held in set_link()
std::atomic<bool> transmitting; // held in on_receive()
virtual void collect(telemetry::Metric& metric, const int64_t& timestamp_ms);

std::atomic<State> _state = State::not_configured;
std::atomic<Status> _status = Status::initial;
};

const char * kind2str(Kind kind, bool brief = false);
Expand Down
5 changes: 4 additions & 1 deletion media-proxy/include/mesh/conn_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace mesh::connection {
*/
class Registry {
public:
int add(std::string& id, Connection *conn) {
int add(const std::string& id, Connection *conn) {
std::unique_lock lk(mx);
if (conns.contains(id))
return -1;
Expand Down Expand Up @@ -62,6 +62,9 @@ class Registry {

// TODO: Check if the mutex is still needed. The local connection manager
// has it own mutex, which makes this one redundant.
// UPD: The bridges manager is in a single thread flow of gRPC stream
// handler of StartCommandQueueRequest, so the mutex here is redundant.
// Consider removing this mutex.
std::shared_mutex mx;
};

Expand Down
39 changes: 39 additions & 0 deletions media-proxy/include/mesh/manager_bridges.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#ifndef MANAGER_BRIDGES_H
#define MANAGER_BRIDGES_H

#include <string>
#include "concurrency.h"
#include "conn_registry.h"

namespace mesh::connection {

class BridgesManager {
public:
int create_bridge(context::Context& ctx, Connection*& bridge,
const std::string& id, Kind kind);

int delete_bridge(context::Context& ctx, const std::string& id);

Connection * get_bridge(context::Context& ctx, const std::string& id);

void shutdown(context::Context& ctx);

void lock();
void unlock();

private:
Registry registry; // This regustry uses Agent assigned ids
std::shared_mutex mx;
};

extern BridgesManager bridges_manager;

} // namespace mesh::connection

#endif // MANAGER_BRIDGES_H
14 changes: 10 additions & 4 deletions media-proxy/include/mesh/manager_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ namespace mesh::connection {

class LocalManager {
public:
int create_connection(context::Context& ctx, std::string& id,
mcm_conn_param *param, memif_conn_param *memif_param);
int create_connection_sdk(context::Context& ctx, std::string& id,
mcm_conn_param *param, memif_conn_param *memif_param);

int delete_connection(context::Context& ctx, const std::string& id);
int delete_connection_sdk(context::Context& ctx, const std::string& id);

Connection * get_connection(context::Context& ctx, const std::string& id);

void shutdown(context::Context& ctx);

void lock();
void unlock();

private:
Registry registry;
Registry registry_sdk; // This registry uses SDK ids
Registry registry; // This registry uses Agent assigned ids
std::shared_mutex mx;
};

Expand Down
77 changes: 77 additions & 0 deletions media-proxy/include/mesh/manager_multipoint.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#ifndef MANAGER_MULTIPOINT_H
#define MANAGER_MULTIPOINT_H

#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <shared_mutex>
#include <memory>
#include <iostream>
#include "multipoint.h"

namespace mesh::multipoint {

class GroupChangeConfig {
public:
std::string group_id;
std::vector<std::string> added_conn_ids;
std::vector<std::string> deleted_conn_ids;
std::vector<std::string> added_bridge_ids;
std::vector<std::string> deleted_bridge_ids;
};

class GroupConfig {
public:
std::vector<std::string> conn_ids;
std::vector<std::string> bridge_ids;
};

class Config {
public:
std::unordered_map<std::string, GroupConfig> groups;
};

class GroupManager {
public:
Result apply_config(context::Context& ctx, const Config& new_cfg);
Result reconcile_config(context::Context& ctx,
std::vector<GroupChangeConfig> added_groups,
std::vector<GroupChangeConfig> deleted_groups,
std::vector<GroupChangeConfig> updated_groups);
private:
Result associate(context::Context& ctx, Group *group, Connection *conn);

int add_group(const std::string& id, Group *group) {
std::unique_lock lk(mx);
if (groups.contains(id))
return -1;
groups[id] = group;
return 0;
}

bool delete_group(const std::string& id) {
std::unique_lock lk(mx);
return groups.erase(id) > 0;
}

Group * get_group(const std::string& id) {
std::shared_lock lk(mx);
auto it = groups.find(id);
if (it != groups.end()) {
return it->second;
}
return nullptr;
}

Config cfg;

std::unordered_map<std::string, Group *> groups;
std::shared_mutex mx;
};

extern GroupManager group_manager;

} // namespace mesh::multipoint

#endif // MANAGER_MULTIPOINT_H
Loading

0 comments on commit 55c8b29

Please sign in to comment.