Skip to content

Commit

Permalink
WebRTC Client singleton and early STUN DNS resolution (#1812)
Browse files Browse the repository at this point in the history
* STUN DNS resolution and webrtc client singleton

* Add initializer flag for the main context

* Move lock out of Stun context

* Add semaphore for the singleton access

* Lock on the sempahore before freeing to ensure clean shutdown

* Cleanup  sem acquired bool flag

* Use atomic refcounter

* Clang format issues

* Have initialized check in thread

* Move context initialize setting to top of cleanup
  • Loading branch information
disa6302 authored Sep 26, 2023
1 parent 8656c38 commit 1bd85f0
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- master
jobs:
clang-format-check:
runs-on: macos-11
runs-on: macos-latest
steps:
- name: Clone repository
uses: actions/checkout@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ extern "C" {
#define STATUS_PEERCONNECTION_CREATE_ANSWER_WITHOUT_REMOTE_DESCRIPTION STATUS_PEERCONNECTION_BASE + 0x00000001
#define STATUS_PEERCONNECTION_CODEC_INVALID STATUS_PEERCONNECTION_BASE + 0x00000002
#define STATUS_PEERCONNECTION_CODEC_MAX_EXCEEDED STATUS_PEERCONNECTION_BASE + 0x00000003
#define STATUS_PEERCONNECTION_UNSUPPORTED_HOSTNAME STATUS_PEERCONNECTION_BASE + 0x00000004
/*!@} */

/////////////////////////////////////////////////////
Expand Down Expand Up @@ -696,9 +697,10 @@ extern "C" {
/**
* Parameterized string for KVS STUN Server
*/
#define KINESIS_VIDEO_STUN_URL_POSTFIX "amazonaws.com"
#define KINESIS_VIDEO_STUN_URL_POSTFIX_CN "amazonaws.com.cn"
#define KINESIS_VIDEO_STUN_URL "stun:stun.kinesisvideo.%s.%s:443"
#define KINESIS_VIDEO_STUN_URL_POSTFIX "amazonaws.com"
#define KINESIS_VIDEO_STUN_URL_POSTFIX_CN "amazonaws.com.cn"
#define KINESIS_VIDEO_STUN_URL "stun:stun.kinesisvideo.%s.%s:443"
#define KINESIS_VIDEO_STUN_URL_WITHOUT_PORT "stun.kinesisvideo.%s.%s"

/**
* Default signaling SSL port
Expand Down
5 changes: 5 additions & 0 deletions src/source/Ice/IceAgent.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ STATUS createIceAgent(PCHAR username, PCHAR password, PIceAgentCallbacks pIceAge
pIceAgent->iceServersCount = 0;
for (i = 0; i < MAX_ICE_SERVERS_COUNT; i++) {
if (pRtcConfiguration->iceServers[i].urls[0] != '\0') {
if (STRSTR(pRtcConfiguration->iceServers[i].urls, "stun")) {
pIceAgent->iceServers[pIceAgent->iceServersCount].setIpFn = pIceAgent->iceAgentCallbacks.setStunServerIpFn;
} else {
pIceAgent->iceServers[pIceAgent->iceServersCount].setIpFn = NULL;
}
PROFILE_CALL_WITH_T_OBJ(
retStatus = parseIceServer(&pIceAgent->iceServers[pIceAgent->iceServersCount], (PCHAR) pRtcConfiguration->iceServers[i].urls,
(PCHAR) pRtcConfiguration->iceServers[i].username, (PCHAR) pRtcConfiguration->iceServers[i].credential),
Expand Down
1 change: 1 addition & 0 deletions src/source/Ice/IceAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ typedef struct {
IceInboundPacketFunc inboundPacketFn;
IceConnectionStateChangedFunc connectionStateChangedFn;
IceNewLocalCandidateFunc newLocalCandidateFn;
IceServerSetIpFunc setStunServerIpFn;
} IceAgentCallbacks, *PIceAgentCallbacks;

typedef struct {
Expand Down
17 changes: 16 additions & 1 deletion src/source/Ice/IceUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ STATUS parseIceServer(PIceServer pIceServer, PCHAR url, PCHAR username, PCHAR cr
STATUS retStatus = STATUS_SUCCESS;
PCHAR separator = NULL, urlNoPrefix = NULL, paramStart = NULL;
UINT32 port = ICE_STUN_DEFAULT_PORT;
CHAR addressResolved[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};

// username and credential is only mandatory for turn server
CHK(url != NULL && pIceServer != NULL, STATUS_NULL_ARG);
Expand Down Expand Up @@ -249,8 +250,22 @@ STATUS parseIceServer(PIceServer pIceServer, PCHAR url, PCHAR username, PCHAR cr
STRNCPY(pIceServer->url, urlNoPrefix, MAX_ICE_CONFIG_URI_LEN);
}

CHK_STATUS(getIpWithHostName(pIceServer->url, &pIceServer->ipAddress));
if (pIceServer->setIpFn != NULL) {
retStatus = pIceServer->setIpFn(0, pIceServer->url, &pIceServer->ipAddress);
}

// Adding a NULL_ARG check specifically to cover for the case where early STUN
// resolution might not be enabled
if (retStatus == STATUS_NULL_ARG || pIceServer->setIpFn == NULL) {
// Reset the retStatus to ensure the appropriate status code is returned from
// getIpWithHostName
retStatus = STATUS_SUCCESS;
CHK_STATUS(getIpWithHostName(pIceServer->url, &pIceServer->ipAddress));
}

pIceServer->ipAddress.port = (UINT16) getInt16((INT16) port);
getIpAddrStr(&pIceServer->ipAddress, addressResolved, ARRAY_SIZE(addressResolved));
DLOGP("ICE Server address for %s: %s", pIceServer->url, addressResolved);

CleanUp:

Expand Down
1 change: 1 addition & 0 deletions src/source/Ice/IceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ typedef struct {
CHAR username[MAX_ICE_CONFIG_USER_NAME_LEN + 1];
CHAR credential[MAX_ICE_CONFIG_CREDENTIAL_LEN + 1];
KVS_SOCKET_PROTOCOL transport;
IceServerSetIpFunc setIpFn;
} IceServer, *PIceServer;

STATUS parseIceServer(PIceServer, PCHAR, PCHAR, PCHAR);
Expand Down
4 changes: 0 additions & 4 deletions src/source/Ice/Network.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ STATUS getIpWithHostName(PCHAR hostname, PKvsIpAddress destIp)
struct in_addr inaddr;

CHAR addr[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};
CHAR addressResolved[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};

CHK(hostname != NULL, STATUS_NULL_ARG);
DLOGI("ICE SERVER Hostname received: %s", hostname);
Expand Down Expand Up @@ -442,12 +441,9 @@ STATUS getIpWithHostName(PCHAR hostname, PKvsIpAddress destIp)
}
freeaddrinfo(res);
CHK_ERR(resolved, STATUS_HOSTNAME_NOT_FOUND, "Could not find network address of %s", hostname);
getIpAddrStr(destIp, addressResolved, ARRAY_SIZE(addressResolved));
DLOGP("ICE Server address for %s with getaddrinfo: %s", hostname, addressResolved);
}

else {
DLOGP("ICE Server address for %s: %s", hostname, addr);
inet_pton(AF_INET, addr, &inaddr);
destIp->family = KVS_IP_FAMILY_TYPE_IPV4;
MEMCPY(destIp->address, &inaddr, IPV4_ADDRESS_LENGTH);
Expand Down
3 changes: 3 additions & 0 deletions src/source/Include_i.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ typedef struct {
// Used for ensuring alignment
#define ALIGN_UP_TO_MACHINE_WORD(x) ROUND_UP((x), SIZEOF(SIZE_T))

typedef STATUS (*IceServerSetIpFunc)(UINT64, PCHAR, PKvsIpAddress);
STATUS getIpAddrStr(PKvsIpAddress pKvsIpAddress, PCHAR pBuffer, UINT32 bufferLen);

////////////////////////////////////////////////////
// Project forward declarations
////////////////////////////////////////////////////
Expand Down
211 changes: 207 additions & 4 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,46 @@

static volatile ATOMIC_BOOL gKvsWebRtcInitialized = (SIZE_T) FALSE;

// Function to get access to the Singleton instance
PWebRtcClientContext getWebRtcClientInstance()
{
static WebRtcClientContext w = {.pStunIpAddrCtx = NULL, .stunCtxlock = INVALID_MUTEX_VALUE, .contextRefCnt = 0, .isContextInitialized = FALSE};
ATOMIC_INCREMENT(&w.contextRefCnt);
return &w;
}

VOID releaseHoldOnInstance(PWebRtcClientContext pWebRtcClientContext)
{
ATOMIC_DECREMENT(&pWebRtcClientContext->contextRefCnt);
}

STATUS createWebRtcClientInstance()
{
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;

CHK_WARN(!ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), retStatus, "WebRtc client context already initialized, nothing to do");
CHK_ERR(!IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock), retStatus, "Mutex seems to have been created already");

pWebRtcClientContext->stunCtxlock = MUTEX_CREATE(FALSE);
CHK_ERR(IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock), STATUS_NULL_ARG, "Mutex creation failed");
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;
CHK_WARN(pWebRtcClientContext->pStunIpAddrCtx == NULL, STATUS_INVALID_OPERATION, "STUN object already allocated");
pWebRtcClientContext->pStunIpAddrCtx = (PStunIpAddrContext) MEMCALLOC(1, SIZEOF(StunIpAddrContext));
CHK_ERR(pWebRtcClientContext->pStunIpAddrCtx != NULL, STATUS_NULL_ARG, "Memory allocation for WebRtc client object failed");
pWebRtcClientContext->pStunIpAddrCtx->expirationDuration = 2 * HUNDREDS_OF_NANOS_IN_AN_HOUR;
ATOMIC_STORE_BOOL(&pWebRtcClientContext->isContextInitialized, TRUE);
DLOGI("Initialized WebRTC Client instance");
CleanUp:
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
releaseHoldOnInstance(pWebRtcClientContext);
return retStatus;
}

STATUS allocateSrtp(PKvsPeerConnection pKvsPeerConnection)
{
DtlsKeyingMaterial dtlsKeyingMaterial;
Expand Down Expand Up @@ -687,6 +727,121 @@ STATUS rtcpReportsCallback(UINT32 timerId, UINT64 currentTime, UINT64 customData
return retStatus;
}

// Not thread safe
STATUS getStunAddr(PStunIpAddrContext pStunIpAddrCtx)
{
INT32 errCode;
STATUS retStatus = STATUS_SUCCESS;
struct addrinfo *rp, *res;
struct sockaddr_in* ipv4Addr;
BOOL resolved = FALSE;

errCode = getaddrinfo(pStunIpAddrCtx->hostname, NULL, NULL, &res);
if (errCode != 0) {
DLOGI("Failed to resolve hostname with errcode: %d", errCode);
retStatus = STATUS_RESOLVE_HOSTNAME_FAILED;
} else {
for (rp = res; rp != NULL && !resolved; rp = rp->ai_next) {
if (rp->ai_family == AF_INET) {
ipv4Addr = (struct sockaddr_in*) rp->ai_addr;
pStunIpAddrCtx->kvsIpAddr.family = KVS_IP_FAMILY_TYPE_IPV4;
pStunIpAddrCtx->kvsIpAddr.port = 0;
MEMCPY(pStunIpAddrCtx->kvsIpAddr.address, &ipv4Addr->sin_addr, IPV4_ADDRESS_LENGTH);
resolved = TRUE;
}
}
freeaddrinfo(res);
}
if (!resolved) {
retStatus = STATUS_RESOLVE_HOSTNAME_FAILED;
}
return retStatus;
}

STATUS onSetStunServerIp(UINT64 customData, PCHAR url, PKvsIpAddress pIpAddr)
{
UNUSED_PARAM(customData);
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
CHK_WARN(ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), STATUS_NULL_ARG, "WebRTC Client object Object not initialized");

UINT64 currentTime = GETTIME();

MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;

CHK(STRCMP(url, pWebRtcClientContext->pStunIpAddrCtx->hostname) == 0, STATUS_PEERCONNECTION_UNSUPPORTED_HOSTNAME);

if (pWebRtcClientContext->pStunIpAddrCtx->isIpInitialized) {
DLOGI("Initialized successfully");
if (currentTime > (pWebRtcClientContext->pStunIpAddrCtx->startTime + pWebRtcClientContext->pStunIpAddrCtx->expirationDuration)) {
DLOGI("Expired...need to refresh STUN address");
// Reset start time
pWebRtcClientContext->pStunIpAddrCtx->startTime = 0;
CHK_ERR(getStunAddr(pWebRtcClientContext->pStunIpAddrCtx) == STATUS_SUCCESS, retStatus, "Failed to resolve after cache expiry");
}
MEMCPY(pIpAddr, &pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr, SIZEOF(pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr));
} else {
DLOGE("Initialization failed");
}
CleanUp:
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
DLOGD("Exiting from stun server IP callback");
releaseHoldOnInstance(pWebRtcClientContext);
return retStatus;
}

PVOID resolveStunIceServerIp(PVOID args)
{
UNUSED_PARAM(args);
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
BOOL locked = FALSE;
CHAR addressResolved[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};
PCHAR pRegion;
PCHAR pHostnamePostfix;

if (ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized)) {
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;

if (pWebRtcClientContext->pStunIpAddrCtx == NULL) {
DLOGE("Failed to resolve STUN IP address because webrtc client instance was not created");
} else {
if ((pRegion = GETENV(DEFAULT_REGION_ENV_VAR)) == NULL) {
pRegion = DEFAULT_AWS_REGION;
}

pHostnamePostfix = KINESIS_VIDEO_STUN_URL_POSTFIX;
// If region is in CN, add CN region uri postfix
if (STRSTR(pRegion, "cn-")) {
pHostnamePostfix = KINESIS_VIDEO_STUN_URL_POSTFIX_CN;
}

SNPRINTF(pWebRtcClientContext->pStunIpAddrCtx->hostname, SIZEOF(pWebRtcClientContext->pStunIpAddrCtx->hostname),
KINESIS_VIDEO_STUN_URL_WITHOUT_PORT, pRegion, pHostnamePostfix);
if (getStunAddr(pWebRtcClientContext->pStunIpAddrCtx) == STATUS_SUCCESS) {
getIpAddrStr(&pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr, addressResolved, ARRAY_SIZE(addressResolved));
DLOGI("ICE Server address for %s with getaddrinfo: %s", pWebRtcClientContext->pStunIpAddrCtx->hostname, addressResolved);
pWebRtcClientContext->pStunIpAddrCtx->isIpInitialized = TRUE;
} else {
DLOGE("Failed to resolve %s", pWebRtcClientContext->pStunIpAddrCtx->hostname);
}
pWebRtcClientContext->pStunIpAddrCtx->startTime = GETTIME();
}
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
} else {
DLOGW("STUN DNS thread invoked without context being initialized");
}
releaseHoldOnInstance(pWebRtcClientContext);
DLOGD("Exiting from stun server IP resolution thread");
return NULL;
}

STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection* ppPeerConnection)
{
ENTERS();
Expand Down Expand Up @@ -738,6 +893,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection
iceAgentCallbacks.inboundPacketFn = onInboundPacket;
iceAgentCallbacks.connectionStateChangedFn = onIceConnectionStateChange;
iceAgentCallbacks.newLocalCandidateFn = onNewIceLocalCandidate;
iceAgentCallbacks.setStunServerIpFn = onSetStunServerIp;

CHK_STATUS(createConnectionListener(&pConnectionListener));
// IceAgent will own the lifecycle of pConnectionListener;
CHK_STATUS(createIceAgent(pKvsPeerConnection->localIceUfrag, pKvsPeerConnection->localIcePwd, &iceAgentCallbacks, pConfiguration,
Expand Down Expand Up @@ -1416,7 +1573,7 @@ STATUS initKvsWebRtc(VOID)
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
CHK(!ATOMIC_LOAD_BOOL(&gKvsWebRtcInitialized), retStatus);

DLOGI("Initializing WebRTC library...");
SRAND(GETTIME());

CHK(srtp_init() == srtp_err_status_ok, STATUS_SRTP_INIT_FAILED);
Expand All @@ -1427,12 +1584,15 @@ STATUS initKvsWebRtc(VOID)
KVS_CRYPTO_INIT();
LOG_GIT_HASH();

SET_INSTRUMENTED_ALLOCATORS();
#ifdef ENABLE_DATA_CHANNEL
CHK_STATUS(initSctpSession());
#endif
#ifdef ENABLE_KVS_THREADPOOL
DLOGI("KVS WebRtc library using thread pool");
CHK_STATUS(createWebRtcClientInstance());
CHK_STATUS(createThreadPoolContext());
CHK_STATUS(threadpoolContextPush(resolveStunIceServerIp, NULL));
#endif
ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, TRUE);

Expand All @@ -1442,6 +1602,48 @@ STATUS initKvsWebRtc(VOID)
return retStatus;
}

STATUS cleanupWebRtcClientInstance()
{
STATUS retStatus = STATUS_SUCCESS;

// Stun object cleanup
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();

DLOGD("Releasing webrtc client context instance from cleanupWebRtcClientInstance");
releaseHoldOnInstance(pWebRtcClientContext);

CHK_WARN(ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), STATUS_INVALID_OPERATION,
"WebRtc context not initialized, nothing to clean up");

ATOMIC_STORE_BOOL(&pWebRtcClientContext->isContextInitialized, FALSE);

while (ATOMIC_LOAD(&pWebRtcClientContext->contextRefCnt) > 0) {
DLOGV("Waiting on all references to be returned...%d", pWebRtcClientContext->contextRefCnt);
THREAD_SLEEP(100 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}

/* Start of handling STUN object */
// Need this check to ensure we do not clean up the object in the next
// step while the resolve thread is ongoing
CHK_WARN(pWebRtcClientContext->pStunIpAddrCtx != NULL, STATUS_NULL_ARG, "Destroying STUN object without setting up");
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
SAFE_MEMFREE(pWebRtcClientContext->pStunIpAddrCtx);
pWebRtcClientContext->pStunIpAddrCtx = NULL;
DLOGI("Destroyed STUN IP object");
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
/* End of handling STUN object */

if (IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock)) {
MUTEX_FREE(pWebRtcClientContext->stunCtxlock);
pWebRtcClientContext->stunCtxlock = INVALID_MUTEX_VALUE;
}

DLOGI("Destroyed WebRtc client context");

CleanUp:
return retStatus;
}

STATUS deinitKvsWebRtc(VOID)
{
ENTERS();
Expand All @@ -1454,12 +1656,13 @@ STATUS deinitKvsWebRtc(VOID)

srtp_shutdown();

ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, FALSE);
#ifdef ENABLE_KVS_THREADPOOL
DLOGI("Destroying KVS Webrtc library threadpool");
cleanupWebRtcClientInstance();
destroyThreadPoolContext();
DLOGI("Destroyed threadpool");
RESET_INSTRUMENTED_ALLOCATORS();
#endif

ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, FALSE);
CleanUp:

LEAVES();
Expand Down
Loading

0 comments on commit 1bd85f0

Please sign in to comment.