Skip to content

Commit

Permalink
Incorporating PIC state machine level retry changes into webrtc signa…
Browse files Browse the repository at this point in the history
…ling state machine (#1341)

* Incorporating PIC state machine level retry changes into webrtc signaling state machine

* Add a n optional check for free retry strategy

* Remove unused goto label

* add missing sleep in get token state machine execute API

* update log line

* Update Producer hash

* Adding a high level retry strategy while creating signaling client
  • Loading branch information
kateyanurag authored Dec 6, 2021
1 parent 99583a9 commit 877d116
Show file tree
Hide file tree
Showing 12 changed files with 552 additions and 66 deletions.
2 changes: 1 addition & 1 deletion CMake/Dependencies/libkvsCommonLws-CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ include(ExternalProject)

ExternalProject_Add(libkvsCommonLws-download
GIT_REPOSITORY https://github.com/awslabs/amazon-kinesis-video-streams-producer-c.git
GIT_TAG 99c1a8cd8cec88f99c9c4ce3944b53ae341d1491
GIT_TAG 0d2ee038235ee9bd9c33d7788a7dc49ffd53a8d7
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/build
CMAKE_ARGS
-DCMAKE_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX}
Expand Down
1 change: 1 addition & 0 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE
pSampleConfiguration->clientInfo.version = SIGNALING_CLIENT_INFO_CURRENT_VERSION;
pSampleConfiguration->clientInfo.loggingLevel = logLevel;
pSampleConfiguration->clientInfo.cacheFilePath = NULL; // Use the default path
pSampleConfiguration->clientInfo.signalingClientCreationMaxRetryAttempts = DEFAULT_CREATE_SIGNALING_CLIENT_RETRY_ATTEMPTS;
pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32;
pSampleConfiguration->pregenerateCertTimerId = MAX_UINT32;

Expand Down
2 changes: 2 additions & 0 deletions samples/Samples.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ extern "C" {
#define MASTER_DATA_CHANNEL_MESSAGE "This message is from the KVS Master"
#define VIEWER_DATA_CHANNEL_MESSAGE "This message is from the KVS Viewer"

#define DEFAULT_CREATE_SIGNALING_CLIENT_RETRY_ATTEMPTS 3

/* Uncomment the following line in order to enable IoT credentials checks in the provided samples */
//#define IOT_CORE_ENABLE_CREDENTIALS 1

Expand Down
22 changes: 12 additions & 10 deletions src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h
Original file line number Diff line number Diff line change
Expand Up @@ -1177,16 +1177,18 @@ typedef struct {
* @brief Populate Signaling client with client ID and application log level
*/
typedef struct {
UINT32 version; //!< Version of the structure
CHAR clientId[MAX_SIGNALING_CLIENT_ID_LEN + 1]; //!< Client id to use. Defines if the client is a producer/consumer
UINT32 loggingLevel; //!< Verbosity level for the logging. One of LOG_LEVEL_XXX
//!< values or the default verbosity will be assumed. Currently,
//!< default value is LOG_LEVEL_WARNING
PCHAR cacheFilePath; //!< File cache path override. The default
//!< path is "./.SignalingCache_vN" which might not work for
//!< devices which have read only partition where the code is
//!< located. For default value or when file caching is not
//!< being used this value can be NULL or point to an EMPTY_STRING.
UINT32 version; //!< Version of the structure
CHAR clientId[MAX_SIGNALING_CLIENT_ID_LEN + 1]; //!< Client id to use. Defines if the client is a producer/consumer
UINT32 loggingLevel; //!< Verbosity level for the logging. One of LOG_LEVEL_XXX
//!< values or the default verbosity will be assumed. Currently,
//!< default value is LOG_LEVEL_WARNING
PCHAR cacheFilePath; //!< File cache path override. The default
//!< path is "./.SignalingCache_vN" which might not work for
//!< devices which have read only partition where the code is
//!< located. For default value or when file caching is not
//!< being used this value can be NULL or point to an EMPTY_STRING.
KvsRetryStrategyCallbacks signalingRetryStrategyCallbacks; //!< Retry strategy callbacks used while creating signaling client
UINT32 signalingClientCreationMaxRetryAttempts; //!< Max attempts to create signaling client before returning error to the caller
} SignalingClientInfo, *PSignalingClientInfo;

/**
Expand Down
81 changes: 64 additions & 17 deletions src/source/Ice/IceAgentStateMachine.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,70 @@
* Static definitions of the states
*/
StateMachineState ICE_AGENT_STATE_MACHINE_STATES[] = {
{ICE_AGENT_STATE_NEW, ICE_AGENT_STATE_NONE | ICE_AGENT_STATE_NEW, fromNewIceAgentState, executeNewIceAgentState, INFINITE_RETRY_COUNT_SENTINEL,
STATUS_ICE_INVALID_STATE},
{ICE_AGENT_STATE_CHECK_CONNECTION, ICE_AGENT_STATE_NEW | ICE_AGENT_STATE_CHECK_CONNECTION, fromCheckConnectionIceAgentState,
executeCheckConnectionIceAgentState, INFINITE_RETRY_COUNT_SENTINEL, STATUS_ICE_INVALID_STATE},
{ICE_AGENT_STATE_CONNECTED, ICE_AGENT_STATE_CHECK_CONNECTION | ICE_AGENT_STATE_CONNECTED, fromConnectedIceAgentState,
executeConnectedIceAgentState, INFINITE_RETRY_COUNT_SENTINEL, STATUS_ICE_INVALID_STATE},
{ICE_AGENT_STATE_NOMINATING, ICE_AGENT_STATE_CONNECTED | ICE_AGENT_STATE_NOMINATING, fromNominatingIceAgentState, executeNominatingIceAgentState,
INFINITE_RETRY_COUNT_SENTINEL, STATUS_ICE_INVALID_STATE},
{ICE_AGENT_STATE_READY, ICE_AGENT_STATE_CONNECTED | ICE_AGENT_STATE_NOMINATING | ICE_AGENT_STATE_READY | ICE_AGENT_STATE_DISCONNECTED,
fromReadyIceAgentState, executeReadyIceAgentState, INFINITE_RETRY_COUNT_SENTINEL, STATUS_ICE_INVALID_STATE},
{ICE_AGENT_STATE_DISCONNECTED,
ICE_AGENT_STATE_CHECK_CONNECTION | ICE_AGENT_STATE_CONNECTED | ICE_AGENT_STATE_NOMINATING | ICE_AGENT_STATE_READY | ICE_AGENT_STATE_DISCONNECTED,
fromDisconnectedIceAgentState, executeDisconnectedIceAgentState, INFINITE_RETRY_COUNT_SENTINEL, STATUS_ICE_INVALID_STATE},
{ICE_AGENT_STATE_FAILED,
ICE_AGENT_STATE_CHECK_CONNECTION | ICE_AGENT_STATE_CONNECTED | ICE_AGENT_STATE_NOMINATING | ICE_AGENT_STATE_READY |
ICE_AGENT_STATE_DISCONNECTED | ICE_AGENT_STATE_FAILED,
fromFailedIceAgentState, executeFailedIceAgentState, INFINITE_RETRY_COUNT_SENTINEL, STATUS_ICE_INVALID_STATE},
{
ICE_AGENT_STATE_NEW,
ICE_AGENT_STATE_NONE | ICE_AGENT_STATE_NEW,
fromNewIceAgentState,
executeNewIceAgentState,
NULL,
INFINITE_RETRY_COUNT_SENTINEL,
STATUS_ICE_INVALID_STATE
},
{
ICE_AGENT_STATE_CHECK_CONNECTION,
ICE_AGENT_STATE_NEW | ICE_AGENT_STATE_CHECK_CONNECTION,
fromCheckConnectionIceAgentState,
executeCheckConnectionIceAgentState,
NULL,
INFINITE_RETRY_COUNT_SENTINEL,
STATUS_ICE_INVALID_STATE
},
{
ICE_AGENT_STATE_CONNECTED,
ICE_AGENT_STATE_CHECK_CONNECTION | ICE_AGENT_STATE_CONNECTED,
fromConnectedIceAgentState,
executeConnectedIceAgentState,
NULL,
INFINITE_RETRY_COUNT_SENTINEL,
STATUS_ICE_INVALID_STATE
},
{
ICE_AGENT_STATE_NOMINATING,
ICE_AGENT_STATE_CONNECTED | ICE_AGENT_STATE_NOMINATING,
fromNominatingIceAgentState,
executeNominatingIceAgentState,
NULL,
INFINITE_RETRY_COUNT_SENTINEL,
STATUS_ICE_INVALID_STATE
},
{
ICE_AGENT_STATE_READY,
ICE_AGENT_STATE_CONNECTED | ICE_AGENT_STATE_NOMINATING | ICE_AGENT_STATE_READY | ICE_AGENT_STATE_DISCONNECTED,
fromReadyIceAgentState,
executeReadyIceAgentState,
NULL,
INFINITE_RETRY_COUNT_SENTINEL,
STATUS_ICE_INVALID_STATE
},
{
ICE_AGENT_STATE_DISCONNECTED,
ICE_AGENT_STATE_CHECK_CONNECTION | ICE_AGENT_STATE_CONNECTED | ICE_AGENT_STATE_NOMINATING | ICE_AGENT_STATE_READY | ICE_AGENT_STATE_DISCONNECTED,
fromDisconnectedIceAgentState,
executeDisconnectedIceAgentState,
NULL,
INFINITE_RETRY_COUNT_SENTINEL,
STATUS_ICE_INVALID_STATE
},
{
ICE_AGENT_STATE_FAILED,
ICE_AGENT_STATE_CHECK_CONNECTION | ICE_AGENT_STATE_CONNECTED | ICE_AGENT_STATE_NOMINATING | ICE_AGENT_STATE_READY |
ICE_AGENT_STATE_DISCONNECTED | ICE_AGENT_STATE_FAILED,
fromFailedIceAgentState,
executeFailedIceAgentState,
NULL,
INFINITE_RETRY_COUNT_SENTINEL,
STATUS_ICE_INVALID_STATE
},
};

UINT32 ICE_AGENT_STATE_MACHINE_STATE_COUNT = ARRAY_SIZE(ICE_AGENT_STATE_MACHINE_STATES);
Expand Down
88 changes: 86 additions & 2 deletions src/source/Signaling/Client.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,68 @@
#define LOG_CLASS "SignalingClient"
#include "../Include_i.h"

STATUS createRetryStrategyForCreatingSignalingClient(PSignalingClientInfo pClientInfo, PKvsRetryStrategy pKvsRetryStrategy)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;

CHK(pKvsRetryStrategy != NULL, STATUS_NULL_ARG);

if (pClientInfo->signalingRetryStrategyCallbacks.createRetryStrategyFn == NULL ||
pClientInfo->signalingRetryStrategyCallbacks.freeRetryStrategyFn == NULL ||
pClientInfo->signalingRetryStrategyCallbacks.executeRetryStrategyFn == NULL) {

DLOGV("Using exponential backoff retry strategy for creating signaling client");
pClientInfo->signalingRetryStrategyCallbacks.createRetryStrategyFn = exponentialBackoffRetryStrategyCreate;
pClientInfo->signalingRetryStrategyCallbacks.freeRetryStrategyFn = exponentialBackoffRetryStrategyFree;
pClientInfo->signalingRetryStrategyCallbacks.executeRetryStrategyFn = getExponentialBackoffRetryStrategyWaitTime;
}

// Create retry strategy will use default config 'DEFAULT_EXPONENTIAL_BACKOFF_CONFIGURATION' defined in -
// https://github.com/awslabs/amazon-kinesis-video-streams-pic/blob/develop/src/utils/include/com/amazonaws/kinesis/video/utils/Include.h
CHK_STATUS(pClientInfo->signalingRetryStrategyCallbacks.createRetryStrategyFn(pKvsRetryStrategy));

CHK(pKvsRetryStrategy->retryStrategyType == KVS_RETRY_STRATEGY_EXPONENTIAL_BACKOFF_WAIT, STATUS_INTERNAL_ERROR);
CHK(pKvsRetryStrategy->pRetryStrategy != NULL, STATUS_INTERNAL_ERROR);

CleanUp:

if (STATUS_FAILED(retStatus)) {
DLOGE("Some internal error occurred while setting up retry strategy for creating signaling client [0x%08x]", retStatus);
pClientInfo->signalingRetryStrategyCallbacks.freeRetryStrategyFn(pKvsRetryStrategy);
}

LEAVES();
return retStatus;
}

STATUS freeRetryStrategyForCreatingSignalingClient(PSignalingClientInfo pClientInfo, PKvsRetryStrategy pKvsRetryStrategy)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;

CHK(pClientInfo != NULL && pKvsRetryStrategy != NULL, STATUS_NULL_ARG);

if (pKvsRetryStrategy->pRetryStrategy != NULL) {
pClientInfo->signalingRetryStrategyCallbacks.freeRetryStrategyFn(pKvsRetryStrategy);
}

CleanUp:

LEAVES();
return retStatus;
}

STATUS createSignalingClientSync(PSignalingClientInfo pClientInfo, PChannelInfo pChannelInfo, PSignalingClientCallbacks pCallbacks,
PAwsCredentialProvider pCredentialProvider, PSIGNALING_CLIENT_HANDLE pSignalingHandle)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSignalingClient pSignalingClient = NULL;
SignalingClientInfoInternal signalingClientInfoInternal;
KvsRetryStrategy createSignalingClientRetryStrategy = {NULL, NULL, KVS_RETRY_STRATEGY_DISABLED};
UINT32 signalingClientCreationMaxRetryCount;
UINT64 signalingClientCreationWaitTime;

DLOGV("Creating Signaling Client Sync");
CHK(pSignalingHandle != NULL && pClientInfo != NULL, STATUS_NULL_ARG);
Expand All @@ -16,16 +71,45 @@ STATUS createSignalingClientSync(PSignalingClientInfo pClientInfo, PChannelInfo
MEMSET(&signalingClientInfoInternal, 0x00, SIZEOF(signalingClientInfoInternal));
signalingClientInfoInternal.signalingClientInfo = *pClientInfo;

CHK_STATUS(createSignalingSync(&signalingClientInfoInternal, pChannelInfo, pCallbacks, pCredentialProvider, &pSignalingClient));
CHK_STATUS(createRetryStrategyForCreatingSignalingClient(pClientInfo, &createSignalingClientRetryStrategy));

*pSignalingHandle = TO_SIGNALING_CLIENT_HANDLE(pSignalingClient);
signalingClientCreationMaxRetryCount = pClientInfo->signalingClientCreationMaxRetryAttempts;
while (TRUE) {
retStatus = createSignalingSync(&signalingClientInfoInternal, pChannelInfo, pCallbacks, pCredentialProvider, &pSignalingClient);
// NOTE: This will retry on all status codes except SUCCESS.
// This includes status codes for bad arguments, internal non-recoverable errors etc.
// Retrying on non-recoverable errors is useless, but it is quite complex to segregate recoverable
// and non-recoverable errors at this layer. So to simplify, we would retry on all non-success status codes.
// It is the application's responsibility to fix any validation/null-arg/bad configuration type errors.
CHK(retStatus != STATUS_SUCCESS, retStatus);

DLOGE("Create Signaling Sync API returned [0x%08x] %d\n", retStatus, signalingClientCreationMaxRetryCount);
if (signalingClientCreationMaxRetryCount <= 0) {
break;
}

// Wait before attempting to create signaling client
CHK_STATUS(pClientInfo->signalingRetryStrategyCallbacks.executeRetryStrategyFn(
&createSignalingClientRetryStrategy, &signalingClientCreationWaitTime));

DLOGE("Attempting to back off for [%lf] milliseconds before creating signaling client again. "
"Signaling client creation retry count [%d]",
retStatus, signalingClientCreationWaitTime/1000.0, signalingClientCreationMaxRetryCount);
THREAD_SLEEP(signalingClientCreationWaitTime);
signalingClientCreationMaxRetryCount--;
}

CleanUp:

if (STATUS_FAILED(retStatus)) {
DLOGE("Create signaling client API failed with return code [0x%08x]", retStatus);
freeSignaling(&pSignalingClient);
} else {
*pSignalingHandle = TO_SIGNALING_CLIENT_HANDLE(pSignalingClient);
}

freeRetryStrategyForCreatingSignalingClient(pClientInfo, &createSignalingClientRetryStrategy);

LEAVES();
return retStatus;
}
Expand Down
69 changes: 69 additions & 0 deletions src/source/Signaling/Signaling.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ STATUS createSignalingSync(PSignalingClientInfoInternal pClientInfo, PChannelInf
// Store the credential provider
pSignalingClient->pCredentialProvider = pCredentialProvider;

CHK_STATUS(configureRetryStrategyForSignalingStateMachine(pSignalingClient));

// Create the state machine
CHK_STATUS(createStateMachine(SIGNALING_STATE_MACHINE_STATES, SIGNALING_STATE_MACHINE_STATE_COUNT,
CUSTOM_DATA_FROM_SIGNALING_CLIENT(pSignalingClient), signalingGetCurrentTime,
Expand Down Expand Up @@ -207,6 +209,8 @@ STATUS freeSignaling(PSignalingClient* ppSignalingClient)

freeStateMachine(pSignalingClient->pStateMachine);

freeClientRetryStrategy(pSignalingClient);

freeChannelInfo(&pSignalingClient->pChannelInfo);

stackQueueFree(pSignalingClient->pMessageQueue);
Expand Down Expand Up @@ -268,6 +272,71 @@ STATUS freeSignaling(PSignalingClient* ppSignalingClient)
return retStatus;
}

STATUS setupDefaultRetryStrategyForSignalingStateMachine(PSignalingClient pSignalingClient) {
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKvsRetryStrategyCallbacks pKvsRetryStrategyCallbacks = &(pSignalingClient->clientInfo.signalingStateMachineRetryStrategyCallbacks);

// Use default as exponential backoff wait
pKvsRetryStrategyCallbacks->createRetryStrategyFn = exponentialBackoffRetryStrategyCreate;
pKvsRetryStrategyCallbacks->freeRetryStrategyFn = exponentialBackoffRetryStrategyFree;
pKvsRetryStrategyCallbacks->executeRetryStrategyFn = getExponentialBackoffRetryStrategyWaitTime;
pKvsRetryStrategyCallbacks->getCurrentRetryAttemptNumberFn = getExponentialBackoffRetryCount;

// Use a default exponential backoff config for state machine level retries
pSignalingClient->clientInfo.signalingStateMachineRetryStrategy.pRetryStrategyConfig =
(PRetryStrategyConfig)&DEFAULT_SIGNALING_STATE_MACHINE_EXPONENTIAL_BACKOFF_RETRY_CONFIGURATION;

LEAVES();
return retStatus;
}

STATUS configureRetryStrategyForSignalingStateMachine(PSignalingClient pSignalingClient) {
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKvsRetryStrategyCallbacks pKvsRetryStrategyCallbacks = NULL;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);
pKvsRetryStrategyCallbacks = &(pSignalingClient->clientInfo.signalingStateMachineRetryStrategyCallbacks);

// If the callbacks for retry strategy are already set, then use that otherwise
// build the client with a default retry strategy.
if (pKvsRetryStrategyCallbacks->createRetryStrategyFn == NULL ||
pKvsRetryStrategyCallbacks->freeRetryStrategyFn == NULL ||
pKvsRetryStrategyCallbacks->executeRetryStrategyFn == NULL ||
pKvsRetryStrategyCallbacks->getCurrentRetryAttemptNumberFn == NULL) {

CHK_STATUS(setupDefaultRetryStrategyForSignalingStateMachine(pSignalingClient));
}

CHK_STATUS(pKvsRetryStrategyCallbacks->createRetryStrategyFn(
&(pSignalingClient->clientInfo.signalingStateMachineRetryStrategy)));

CleanUp:

LEAVES();
return retStatus;
}

STATUS freeClientRetryStrategy(PSignalingClient pSignalingClient) {
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKvsRetryStrategyCallbacks pKvsRetryStrategyCallbacks = NULL;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);

pKvsRetryStrategyCallbacks = &(pSignalingClient->clientInfo.signalingStateMachineRetryStrategyCallbacks);
CHK(pKvsRetryStrategyCallbacks->freeRetryStrategyFn != NULL, STATUS_SUCCESS);

CHK_STATUS(pKvsRetryStrategyCallbacks->freeRetryStrategyFn(
&(pSignalingClient->clientInfo.signalingStateMachineRetryStrategy)));

CleanUp:

LEAVES();
return retStatus;
}

STATUS terminateOngoingOperations(PSignalingClient pSignalingClient)
{
ENTERS();
Expand Down
Loading

0 comments on commit 877d116

Please sign in to comment.