Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporating PIC state machine level retry changes into webrtc signaling state machine #1326

Merged
merged 10 commits into from
Nov 22, 2021
2 changes: 1 addition & 1 deletion CMake/Dependencies/libkvsCommonLws-CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ include(ExternalProject)

ExternalProject_Add(libkvsCommonLws-download
GIT_REPOSITORY https://github.com/awslabs/amazon-kinesis-video-streams-producer-c.git
GIT_TAG 99c1a8cd8cec88f99c9c4ce3944b53ae341d1491
GIT_TAG 9a995a5793b4024f19912be9a319993b1e16005c
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/build
CMAKE_ARGS
-DCMAKE_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX}
Expand Down
30 changes: 30 additions & 0 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,29 @@ STATUS lookForSslCert(PSampleConfiguration* ppSampleConfiguration)
return retStatus;
}

STATUS setupDefaultSignalingClientRetryStrategy(PSignalingClientInfo pSignalingClientInfo)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PRetryStrategy pRetryStrategy = NULL;

CHK(pSignalingClientInfo != NULL, STATUS_NULL_ARG);

pSignalingClientInfo->signalingClientRetryStrategy.retryStrategyType = KVS_RETRY_STRATEGY_EXPONENTIAL_BACKOFF_WAIT;
pSignalingClientInfo->signalingClientRetryStrategy.createRetryStrategyFn = exponentialBackoffRetryStrategyCreate;
pSignalingClientInfo->signalingClientRetryStrategy.freeRetryStrategyFn = exponentialBackoffRetryStrategyFree;
pSignalingClientInfo->signalingClientRetryStrategy.executeRetryStrategyFn = getExponentialBackoffRetryStrategyWaitTime;

CHK_STATUS(pSignalingClientInfo->signalingClientRetryStrategy.createRetryStrategyFn(NULL /* use default config */, &pRetryStrategy));
pSignalingClientInfo->signalingClientRetryStrategy.pRetryStrategy = pRetryStrategy;

pSignalingClientInfo->signalingClientCreationMaxRetryCount = MAX_CREATE_SIGNALING_CLIENT_RETRIES;

CleanUp:
LEAVES();
return retStatus;
}

STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE roleType, BOOL trickleIce, BOOL useTurn,
PSampleConfiguration* ppSampleConfiguration)
{
Expand Down Expand Up @@ -782,6 +805,8 @@ STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE
pSampleConfiguration->clientInfo.version = SIGNALING_CLIENT_INFO_CURRENT_VERSION;
pSampleConfiguration->clientInfo.loggingLevel = logLevel;
pSampleConfiguration->clientInfo.cacheFilePath = NULL; // Use the default path
CHK_STATUS(setupDefaultSignalingClientRetryStrategy(&pSampleConfiguration->clientInfo));
disa6302 marked this conversation as resolved.
Show resolved Hide resolved

pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32;
pSampleConfiguration->pregenerateCertTimerId = MAX_UINT32;

Expand Down Expand Up @@ -1063,6 +1088,11 @@ STATUS freeSampleConfiguration(PSampleConfiguration* ppSampleConfiguration)
SAFE_MEMFREE(pSampleConfiguration->pVideoFrameBuffer);
SAFE_MEMFREE(pSampleConfiguration->pAudioFrameBuffer);

if (pSampleConfiguration->clientInfo.signalingClientRetryStrategy.freeRetryStrategyFn != NULL) {
CHK_STATUS(pSampleConfiguration->clientInfo.signalingClientRetryStrategy.freeRetryStrategyFn(
&(pSampleConfiguration->clientInfo.signalingClientRetryStrategy.pRetryStrategy)));
}

if (IS_VALID_CVAR_VALUE(pSampleConfiguration->cvar) && IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
CVAR_BROADCAST(pSampleConfiguration->cvar);
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
Expand Down
1 change: 1 addition & 0 deletions samples/Samples.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ extern "C" {

#define SAMPLE_HASH_TABLE_BUCKET_COUNT 50
#define SAMPLE_HASH_TABLE_BUCKET_LENGTH 2
#define MAX_CREATE_SIGNALING_CLIENT_RETRIES 3

#define IOT_CORE_CREDENTIAL_ENDPOINT ((PCHAR) "AWS_IOT_CORE_CREDENTIAL_ENDPOINT")
#define IOT_CORE_CERT ((PCHAR) "AWS_IOT_CORE_CERT")
Expand Down
36 changes: 24 additions & 12 deletions src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ extern "C" {
#define STATUS_SIGNALING_DELETE_CALL_FAILED STATUS_SIGNALING_BASE + 0x00000031
#define STATUS_SIGNALING_INVALID_METRICS_VERSION STATUS_SIGNALING_BASE + 0x00000032
#define STATUS_SIGNALING_INVALID_CLIENT_INFO_CACHE_FILE_PATH_LEN STATUS_SIGNALING_BASE + 0x00000033
#define STATUS_SIGNALING_LWS_CALL_FAILED STATUS_SIGNALING_BASE + 0x00000034


/*!@} */

Expand Down Expand Up @@ -495,7 +497,7 @@ extern "C" {
/**
* Version of signaling client
*/
#define SIGNALING_CLIENT_CURRENT_VERSION 0
#define SIGNALING_CLIENT_CURRENT_VERSION 1

/**
* Version of SignalingChannelDescription structure
Expand Down Expand Up @@ -573,6 +575,11 @@ extern "C" {
*/
#define SIGNALING_CONNECT_STATE_TIMEOUT (15 * HUNDREDS_OF_NANOS_IN_A_SECOND)

/**
* Default disconnect sync API timeout
*/
#define SIGNALING_DISCONNECT_STATE_TIMEOUT (15 * HUNDREDS_OF_NANOS_IN_A_SECOND)

/**
* Default refresh ICE server config API timeout
*/
Expand Down Expand Up @@ -635,7 +642,7 @@ extern "C" {
/**
* Signaling states default retry count. This will evaluate to the last call being made 20 seconds in which will hit a timeout first.
*/
#define SIGNALING_STATES_DEFAULT_RETRY_COUNT 10
#define SIGNALING_STATES_DEFAULT_RETRY_COUNT 1

/**
* Signaling caching policy default TTL period
Expand All @@ -653,6 +660,9 @@ extern "C" {
typedef UINT64 SIGNALING_CLIENT_HANDLE;
typedef SIGNALING_CLIENT_HANDLE* PSIGNALING_CLIENT_HANDLE;

typedef KvsRetryStrategy SignalingClientRetryStrategy;
typedef PKvsRetryStrategy PSignalingClientRetryStrategy;

/**
* @brief This is a sentinel indicating an invalid handle value
*/
Expand Down Expand Up @@ -1170,16 +1180,18 @@ typedef struct {
* @brief Populate Signaling client with client ID and application log level
*/
typedef struct {
UINT32 version; //!< Version of the structure
CHAR clientId[MAX_SIGNALING_CLIENT_ID_LEN + 1]; //!< Client id to use. Defines if the client is a producer/consumer
UINT32 loggingLevel; //!< Verbosity level for the logging. One of LOG_LEVEL_XXX
//!< values or the default verbosity will be assumed. Currently,
//!< default value is LOG_LEVEL_WARNING
PCHAR cacheFilePath; //!< File cache path override. The default
//!< path is "./.SignalingCache_vN" which might not work for
//!< devices which have read only partition where the code is
//!< located. For default value or when file caching is not
//!< being used this value can be NULL or point to an EMPTY_STRING.
UINT32 version; //!< Version of the structure
CHAR clientId[MAX_SIGNALING_CLIENT_ID_LEN + 1]; //!< Client id to use. Defines if the client is a producer/consumer
UINT32 loggingLevel; //!< Verbosity level for the logging. One of LOG_LEVEL_XXX
//!< values or the default verbosity will be assumed. Currently,
//!< default value is LOG_LEVEL_WARNING
PCHAR cacheFilePath; //!< File cache path override. The default
//!< path is "./.SignalingCache_vN" which might not work for
//!< devices which have read only partition where the code is
//!< located. For default value or when file caching is not
//!< being used this value can be NULL or point to an EMPTY_STRING.
SignalingClientRetryStrategy signalingClientRetryStrategy; //!< Retry strategy used while creating signaling client
UINT32 signalingClientCreationMaxRetryCount; //!< Maximum attempts which createSignalingClientSync API will make on failures to create signaling client
} SignalingClientInfo, *PSignalingClientInfo;

/**
Expand Down
47 changes: 46 additions & 1 deletion src/source/Signaling/Client.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,37 @@
#define LOG_CLASS "SignalingClient"
#include "../Include_i.h"

STATUS validateSignalingClientRetryStrategy(PSignalingClientInfo pClientInfo) {
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSignalingClientRetryStrategy pSignalingClientRetryStrategy;

CHK(pClientInfo != NULL, STATUS_NULL_ARG);

pSignalingClientRetryStrategy = &(pClientInfo->signalingClientRetryStrategy);

CHK(pSignalingClientRetryStrategy->retryStrategyType > KVS_RETRY_STRATEGY_DISABLED &&
disa6302 marked this conversation as resolved.
Show resolved Hide resolved
pSignalingClientRetryStrategy->pRetryStrategy != NULL &&
pSignalingClientRetryStrategy->executeRetryStrategyFn != NULL, STATUS_NULL_ARG);

CHK(pClientInfo->signalingClientCreationMaxRetryCount > 0, STATUS_NOT_IMPLEMENTED);

CleanUp:

LEAVES();
return retStatus;
}

STATUS createSignalingClientSync(PSignalingClientInfo pClientInfo, PChannelInfo pChannelInfo, PSignalingClientCallbacks pCallbacks,
PAwsCredentialProvider pCredentialProvider, PSIGNALING_CLIENT_HANDLE pSignalingHandle)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSignalingClient pSignalingClient = NULL;
PSignalingClientRetryStrategy pSignalingClientRetryStrategy = NULL;
SignalingClientInfoInternal signalingClientInfoInternal;
UINT32 signalingClientCreationMaxRetryCount;
UINT64 signalingClientCreationWaitTime;

DLOGV("Creating Signaling Client Sync");
CHK(pSignalingHandle != NULL && pClientInfo != NULL, STATUS_NULL_ARG);
Expand All @@ -16,7 +40,28 @@ STATUS createSignalingClientSync(PSignalingClientInfo pClientInfo, PChannelInfo
MEMSET(&signalingClientInfoInternal, 0x00, SIZEOF(signalingClientInfoInternal));
signalingClientInfoInternal.signalingClientInfo = *pClientInfo;

CHK_STATUS(createSignalingSync(&signalingClientInfoInternal, pChannelInfo, pCallbacks, pCredentialProvider, &pSignalingClient));
CHK_STATUS(validateSignalingClientRetryStrategy(pClientInfo));
disa6302 marked this conversation as resolved.
Show resolved Hide resolved

signalingClientCreationMaxRetryCount = pClientInfo->signalingClientCreationMaxRetryCount;
pSignalingClientRetryStrategy = &(pClientInfo->signalingClientRetryStrategy);

while (signalingClientCreationMaxRetryCount > 0) {
// Wait before cresting signaling client to ensure the first call from a large
// client fleet will be spread across the wait time window.
CHK_STATUS(pSignalingClientRetryStrategy->executeRetryStrategyFn(pSignalingClientRetryStrategy->pRetryStrategy, &signalingClientCreationWaitTime));
DLOGV("Attempting to back off for [%lf] milliseconds before creating signaling client. Signaling client creation retry count [%d]",
signalingClientCreationWaitTime/1000.0, signalingClientCreationMaxRetryCount);
THREAD_SLEEP(signalingClientCreationWaitTime);

retStatus = createSignalingSync(&signalingClientInfoInternal, pChannelInfo, pCallbacks, pCredentialProvider, &pSignalingClient);
if (retStatus == STATUS_SUCCESS) {
break;
}
signalingClientCreationMaxRetryCount--;
}

DLOGV("Create signaling client returned [%" PRId64 "].", retStatus);
CHK_STATUS(retStatus);

*pSignalingHandle = TO_SIGNALING_CLIENT_HANDLE(pSignalingClient);

Expand Down
70 changes: 58 additions & 12 deletions src/source/Signaling/LwsApiCalls.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,16 @@ INT32 lwsHttpCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason,
lwsl_hexdump_debug(pDataIn, dataSize);

if (dataSize != 0) {
CHK(NULL != (pLwsCallInfo->callInfo.responseData = (PCHAR) MEMALLOC(dataSize)), STATUS_NOT_ENOUGH_MEMORY);
CHK(NULL != (pLwsCallInfo->callInfo.responseData = (PCHAR) MEMALLOC(dataSize+1)), STATUS_NOT_ENOUGH_MEMORY);
MEMCPY(pLwsCallInfo->callInfo.responseData, pDataIn, dataSize);
pLwsCallInfo->callInfo.responseData[dataSize] = '\0';
pLwsCallInfo->callInfo.responseDataLen = (UINT32) dataSize;

if (pLwsCallInfo->callInfo.callResult != SERVICE_CALL_RESULT_OK) {
DLOGW("Received client http read response: %s", pLwsCallInfo->callInfo.responseData);
} else {
DLOGV("Received client http read response: %s", pLwsCallInfo->callInfo.responseData);
}
}

break;
Expand Down Expand Up @@ -632,6 +639,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo)
return retStatus;
}

BOOL isCallResultFailureRetryable(PCallInfo pCallInfo) {
return (STRNSTR(pCallInfo->responseData, "Signature expired", pCallInfo->responseDataLen) == NULL);
}

//////////////////////////////////////////////////////////////////////////
// API calls
//////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -678,8 +689,12 @@ STATUS describeChannelLws(PSignalingClient pSignalingClient, UINT64 time)
pResponseStr = pLwsCallInfo->callInfo.responseData;
resultLen = pLwsCallInfo->callInfo.responseDataLen;

CHK_ERR(isCallResultFailureRetryable(&pLwsCallInfo->callInfo), STATUS_SIGNALING_DESCRIBE_CALL_FAILED,
"DescribeChannel API call failed with: %s and will not be retried.", pResponseStr);

// Early return if we have a non-success result
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL, retStatus);
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL,
STATUS_SIGNALING_LWS_CALL_FAILED);

// Parse the response
jsmn_init(&parser);
Expand Down Expand Up @@ -753,6 +768,10 @@ STATUS describeChannelLws(PSignalingClient pSignalingClient, UINT64 time)

CleanUp:

if (STATUS_FAILED(retStatus)) {
DLOGE("Call Failed with Status: 0x%08x", retStatus);
}

freeLwsCallInfo(&pLwsCallInfo);

LEAVES();
Expand Down Expand Up @@ -823,8 +842,12 @@ STATUS createChannelLws(PSignalingClient pSignalingClient, UINT64 time)
pResponseStr = pLwsCallInfo->callInfo.responseData;
resultLen = pLwsCallInfo->callInfo.responseDataLen;

CHK_ERR(isCallResultFailureRetryable(&pLwsCallInfo->callInfo), STATUS_SIGNALING_CREATE_CALL_FAILED,
"CreateChannel API call failed with: %s and will not be retried.", pResponseStr);

// Early return if we have a non-success result
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL, retStatus);
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL,
STATUS_SIGNALING_LWS_CALL_FAILED);

// Parse out the ARN
jsmn_init(&parser);
Expand All @@ -848,6 +871,10 @@ STATUS createChannelLws(PSignalingClient pSignalingClient, UINT64 time)

CleanUp:

if (STATUS_FAILED(retStatus)) {
DLOGE("Call Failed with Status: 0x%08x", retStatus);
}

freeLwsCallInfo(&pLwsCallInfo);

LEAVES();
Expand Down Expand Up @@ -897,8 +924,12 @@ STATUS getChannelEndpointLws(PSignalingClient pSignalingClient, UINT64 time)
pResponseStr = pLwsCallInfo->callInfo.responseData;
resultLen = pLwsCallInfo->callInfo.responseDataLen;

CHK_ERR(isCallResultFailureRetryable(&pLwsCallInfo->callInfo), STATUS_SIGNALING_GET_ENDPOINT_CALL_FAILED,
"GetChannelEndpoint API call failed with: %s and will not be retried.", pResponseStr);

// Early return if we have a non-success result
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL, retStatus);
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL,
STATUS_SIGNALING_LWS_CALL_FAILED);

// Parse and extract the endpoints
jsmn_init(&parser);
Expand Down Expand Up @@ -973,6 +1004,10 @@ STATUS getChannelEndpointLws(PSignalingClient pSignalingClient, UINT64 time)

CleanUp:

if (STATUS_FAILED(retStatus)) {
DLOGE("Call Failed with Status: 0x%08x", retStatus);
}

freeLwsCallInfo(&pLwsCallInfo);

LEAVES();
Expand Down Expand Up @@ -1028,8 +1063,12 @@ STATUS getIceConfigLws(PSignalingClient pSignalingClient, UINT64 time)
pResponseStr = pLwsCallInfo->callInfo.responseData;
resultLen = pLwsCallInfo->callInfo.responseDataLen;

CHK_ERR(isCallResultFailureRetryable(&pLwsCallInfo->callInfo), STATUS_SIGNALING_GET_ICE_CONFIG_CALL_FAILED,
"GetIceConfig API call failed with: %s and will not be retried.", pResponseStr);

// Early return if we have a non-success result
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL, retStatus);
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL,
STATUS_SIGNALING_LWS_CALL_FAILED);

// Parse the response
jsmn_init(&parser);
Expand Down Expand Up @@ -1094,6 +1133,10 @@ STATUS getIceConfigLws(PSignalingClient pSignalingClient, UINT64 time)

CleanUp:

if (STATUS_FAILED(retStatus)) {
DLOGE("Call Failed with Status: 0x%08x", retStatus);
}

freeLwsCallInfo(&pLwsCallInfo);

LEAVES();
Expand Down Expand Up @@ -1145,13 +1188,18 @@ STATUS deleteChannelLws(PSignalingClient pSignalingClient, UINT64 time)

// Early return if we have a non-success result and it's not a resource not found
result = ATOMIC_LOAD(&pSignalingClient->result);
CHK((SERVICE_CALL_RESULT) result == SERVICE_CALL_RESULT_OK || (SERVICE_CALL_RESULT) result == SERVICE_CALL_RESOURCE_NOT_FOUND, retStatus);
CHK((SERVICE_CALL_RESULT) result == SERVICE_CALL_RESULT_OK || (SERVICE_CALL_RESULT) result == SERVICE_CALL_RESOURCE_NOT_FOUND,
STATUS_SIGNALING_LWS_CALL_FAILED);

// Mark the channel as deleted
ATOMIC_STORE_BOOL(&pSignalingClient->deleted, TRUE);

CleanUp:

if (STATUS_FAILED(retStatus)) {
DLOGE("Call Failed with Status: 0x%08x", retStatus);
}

freeLwsCallInfo(&pLwsCallInfo);

LEAVES();
Expand Down Expand Up @@ -1376,14 +1424,12 @@ PVOID reconnectHandler(PVOID args)
// thread but there is a slight chance of a race condition.
CHK(!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown), retStatus);

// Set the time out before execution
pSignalingClient->stepUntil = GETTIME() + SIGNALING_CONNECT_STATE_TIMEOUT;

// Update the diagnostics info
ATOMIC_INCREMENT(&pSignalingClient->diagnostics.numberOfReconnects);

// Attempt to reconnect by driving the state machine to connected state
CHK_STATUS(stepSignalingStateMachine(pSignalingClient, retStatus));
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, GETTIME() + SIGNALING_CONNECT_STATE_TIMEOUT,
SIGNALING_STATE_CONNECTED));

CleanUp:

Expand Down Expand Up @@ -1804,7 +1850,7 @@ STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT
SAFE_MEMFREE(pSignalingMessageWrapper);

// Iterate the state machinery
CHK_STATUS(stepSignalingStateMachine(pSignalingClient, retStatus));
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, GETTIME() + SIGNALING_CONNECT_STATE_TIMEOUT, SIGNALING_STATE_CONNECTED));

CHK(FALSE, retStatus);
break;
Expand All @@ -1817,7 +1863,7 @@ STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT
SAFE_MEMFREE(pSignalingMessageWrapper);

// Iterate the state machinery
CHK_STATUS(stepSignalingStateMachine(pSignalingClient, retStatus));
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, GETTIME() + SIGNALING_CONNECT_STATE_TIMEOUT, SIGNALING_STATE_CONNECTED));

CHK(FALSE, retStatus);
break;
Expand Down
Loading