Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to configure the channel to restart and to reconnect #7

Merged
merged 1 commit into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake-scripts/libwebsockets-CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ include(ExternalProject)

ExternalProject_Add(project_libwebsockets
GIT_REPOSITORY https://github.com/warmcat/libwebsockets.git
GIT_TAG v3.2.0
GIT_TAG v3.2-stable
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/build
CMAKE_ARGS
-DCMAKE_INSTALL_PREFIX=${CMAKE_SOURCE_DIR}/../local/
Expand Down
22 changes: 19 additions & 3 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ STATUS handleRemoteCandidate(PSampleConfiguration pSampleConfiguration, PSignali
return retStatus;
}

STATUS createSampleConfiguration(PSampleConfiguration* ppSampleConfiguration, BOOL isOfferer, BOOL trickleIce, BOOL useTurn)
STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE roleType, BOOL trickleIce, BOOL useTurn, PSampleConfiguration* ppSampleConfiguration)
{
STATUS retStatus = STATUS_SUCCESS;
PCHAR pAccessKey, pSecretKey, pSessionToken;
Expand Down Expand Up @@ -373,10 +373,26 @@ STATUS createSampleConfiguration(PSampleConfiguration* ppSampleConfiguration, BO
pSampleConfiguration->trickleIce = trickleIce;
pSampleConfiguration->useTurn = useTurn;

if (isOfferer) {
pSampleConfiguration->channelInfo.version = CHANNEL_INFO_CURRENT_VERSION;
pSampleConfiguration->channelInfo.pChannelName = channelName;
pSampleConfiguration->channelInfo.pKmsKeyId = NULL;
pSampleConfiguration->channelInfo.tagCount = 0;
pSampleConfiguration->channelInfo.pTags = NULL;
pSampleConfiguration->channelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER;
pSampleConfiguration->channelInfo.channelRoleType = roleType;
pSampleConfiguration->channelInfo.cachingEndpoint = FALSE;
pSampleConfiguration->channelInfo.retry = TRUE;
pSampleConfiguration->channelInfo.reconnect = TRUE;
pSampleConfiguration->channelInfo.pCertPath = pSampleConfiguration->pCaCertPath;
pSampleConfiguration->channelInfo.pControlPlaneUrl = KINESIS_VIDEO_BETA_CONTROL_PLANE_URL;

if (pSampleConfiguration->channelInfo.channelRoleType == SIGNALING_CHANNEL_ROLE_TYPE_VIEWER) {
STRCPY(pSampleConfiguration->peerId, SAMPLE_MASTER_CLIENT_ID);
ATOMIC_STORE_BOOL(&pSampleConfiguration->peerIdReceived, TRUE);
} else {
ATOMIC_STORE_BOOL(&pSampleConfiguration->peerIdReceived, FALSE);
}
ATOMIC_STORE_BOOL(&pSampleConfiguration->peerIdReceived, isOfferer);

ATOMIC_STORE_BOOL(&pSampleConfiguration->peerConnectionStarted, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->interrupted, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->candidateGatheringDone, FALSE);
Expand Down
3 changes: 2 additions & 1 deletion samples/Samples.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ typedef struct {
volatile ATOMIC_BOOL interrupted;
volatile ATOMIC_BOOL candidateGatheringDone;
volatile SIZE_T frameIndex;
ChannelInfo channelInfo;
PCHAR pCaCertPath;
PCHAR pRegion;
PAwsCredentialProvider pCredentialProvider;
Expand Down Expand Up @@ -76,7 +77,7 @@ STATUS readFrameFromDisk(PBYTE, PUINT32, PCHAR);
PVOID sendVideoPackets(PVOID);
PVOID sendAudioPackets(PVOID);
PVOID sendGstreamerAudioVideo(PVOID);
STATUS createSampleConfiguration(PSampleConfiguration*, BOOL, BOOL, BOOL);
STATUS createSampleConfiguration(PCHAR, SIGNALING_CHANNEL_ROLE_TYPE, BOOL, BOOL, PSampleConfiguration*);
STATUS freeSampleConfiguration(PSampleConfiguration*);
STATUS viewerMessageReceived(UINT64, PReceivedSignalingMessage);
STATUS signalingClientStateChanged(UINT64, SIGNALING_CLIENT_STATE);
Expand Down
2 changes: 0 additions & 2 deletions samples/gstSample.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ typedef struct __GstAppSrcs {
GstElement* pGstVideoAppSrc;
} GstAppSrcs, *PGstAppSrcs;



#pragma pack(pop, samples_i)

#ifdef __cplusplus
Expand Down
27 changes: 9 additions & 18 deletions samples/kvsWebRTCClientMaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ VOID sigintHandler(INT32 sigNum)
INT32 main(INT32 argc, CHAR *argv[])
{
STATUS retStatus = STATUS_SUCCESS;
ChannelInfo channelInfo;
SignalingClientCallbacks signalingClientCallbacks;
SignalingClientInfo clientInfo;
UINT32 frameSize;
Expand All @@ -22,8 +21,11 @@ INT32 main(INT32 argc, CHAR *argv[])
signal(SIGINT, sigintHandler);

// do tricketIce by default
CHK_STATUS(createSampleConfiguration(&pSampleConfiguration, FALSE, TRUE, TRUE));

CHK_STATUS(createSampleConfiguration(argc > 1 ? argv[1] : SAMPLE_CHANNEL_NAME,
SIGNALING_CHANNEL_ROLE_TYPE_MASTER,
TRUE,
TRUE,
&pSampleConfiguration));
// Set the audio and video handlers
pSampleConfiguration->audioSource = sendAudioPackets;
pSampleConfiguration->videoSource = sendVideoPackets;
Expand All @@ -47,22 +49,11 @@ INT32 main(INT32 argc, CHAR *argv[])
clientInfo.loggingLevel = LOG_LEVEL_VERBOSE;
STRCPY(clientInfo.clientId, SAMPLE_MASTER_CLIENT_ID);

MEMSET(&channelInfo, 0x00, SIZEOF(ChannelInfo));
channelInfo.version = CHANNEL_INFO_CURRENT_VERSION;
channelInfo.pChannelName = argc > 1 ? argv[1] : SAMPLE_CHANNEL_NAME;
channelInfo.pKmsKeyId = NULL;
channelInfo.tagCount = 0;
channelInfo.pTags = NULL;
channelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER;
channelInfo.channelRoleType = SIGNALING_CHANNEL_ROLE_TYPE_MASTER;
channelInfo.cachingEndpoint = FALSE;
channelInfo.pCertPath = pSampleConfiguration->pCaCertPath;
channelInfo.pControlPlaneUrl = KINESIS_VIDEO_BETA_CONTROL_PLANE_URL;
CHK_STATUS(createSignalingClientSync(&clientInfo,
&channelInfo,
&signalingClientCallbacks,
pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle));
&pSampleConfiguration->channelInfo,
&signalingClientCallbacks,
pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle));

// Initialize the peer connection
CHK_STATUS(initializePeerConnection(pSampleConfiguration));
Expand Down
28 changes: 12 additions & 16 deletions samples/kvsWebRTCClientMasterGstreamerSample.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,20 @@ STATUS createSampleGstAppSrcs(PGstAppSrcs* ppGstAppSrc)
INT32 main(INT32 argc, CHAR *argv[])
{
STATUS retStatus = STATUS_SUCCESS;
ChannelInfo channelInfo;
SignalingClientCallbacks signalingClientCallbacks;
SignalingClientInfo clientInfo;
PSampleConfiguration pSampleConfiguration = NULL;
PGstAppSrcs pGstAppSrcs = NULL;

// do trickle-ice by default
CHK_STATUS(createSampleConfiguration(&pSampleConfiguration, FALSE, TRUE, TRUE));
CHK_STATUS(createSampleConfiguration(argc > 1 ? argv[1] : SAMPLE_CHANNEL_NAME,
SIGNALING_CHANNEL_ROLE_TYPE_MASTER,
TRUE,
TRUE,
&pSampleConfiguration));

CHK_STATUS(createSampleGstAppSrcs(&pGstAppSrcs));

pSampleConfiguration->videoSource = sendGstreamerAudioVideo;
pSampleConfiguration->mediaType = SAMPLE_STREAMING_VIDEO_ONLY;
pSampleConfiguration->receiveAudioVideoSource = receiveGstreamerAudioVideo;
Expand Down Expand Up @@ -324,20 +329,11 @@ INT32 main(INT32 argc, CHAR *argv[])
clientInfo.loggingLevel = LOG_LEVEL_VERBOSE;
STRCPY(clientInfo.clientId, SAMPLE_MASTER_CLIENT_ID);

MEMSET(&channelInfo, 0x00, SIZEOF(ChannelInfo));
channelInfo.version = CHANNEL_INFO_CURRENT_VERSION;
channelInfo.pChannelName = argc > 1 ? argv[1] : SAMPLE_CHANNEL_NAME;
channelInfo.pKmsKeyId = NULL;
channelInfo.tagCount = 0;
channelInfo.pTags = NULL;
channelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER;
channelInfo.channelRoleType = SIGNALING_CHANNEL_ROLE_TYPE_MASTER;
channelInfo.cachingEndpoint = FALSE;
channelInfo.pCertPath = pSampleConfiguration->pCaCertPath;
channelInfo.pControlPlaneUrl = KINESIS_VIDEO_BETA_CONTROL_PLANE_URL;
CHK_STATUS(createSignalingClientSync(&clientInfo, &channelInfo, &signalingClientCallbacks,
pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle));
CHK_STATUS(createSignalingClientSync(&clientInfo,
&pSampleConfiguration->channelInfo,
&signalingClientCallbacks,
pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle));

// Initialize the peer connection
CHK_STATUS(initializePeerConnection(pSampleConfiguration));
Expand Down
26 changes: 10 additions & 16 deletions samples/kvsWebRTCClientViewer.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
INT32 main(INT32 argc, CHAR *argv[])
{
STATUS retStatus = STATUS_SUCCESS;
ChannelInfo channelInfo;
SignalingClientCallbacks signalingClientCallbacks;
SignalingClientInfo clientInfo;
RtcSessionDescriptionInit offerSessionDescriptionInit;
UINT32 buffLen = 0;
SignalingMessage message;
PSampleConfiguration pSampleConfiguration = NULL;
// do trickle-ice by default
CHK_STATUS(createSampleConfiguration(&pSampleConfiguration, TRUE, TRUE, TRUE));
CHK_STATUS(createSampleConfiguration(argc > 1 ? argv[1] : SAMPLE_CHANNEL_NAME,
SIGNALING_CHANNEL_ROLE_TYPE_VIEWER,
TRUE,
TRUE,
&pSampleConfiguration));

// Initalize KVS WebRTC. This must be done before anything else, and must only be done once.
CHK_STATUS(initKvsWebRtc());
Expand All @@ -26,20 +29,11 @@ INT32 main(INT32 argc, CHAR *argv[])
clientInfo.loggingLevel = LOG_LEVEL_VERBOSE;
STRCPY(clientInfo.clientId, SAMPLE_VIEWER_CLIENT_ID);

MEMSET(&channelInfo, 0x00, SIZEOF(ChannelInfo));
channelInfo.version = CHANNEL_INFO_CURRENT_VERSION;
channelInfo.pChannelName = argc > 1 ? argv[1] : SAMPLE_CHANNEL_NAME;
channelInfo.pKmsKeyId = NULL;
channelInfo.tagCount = 0;
channelInfo.pTags = NULL;
channelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER;
channelInfo.channelRoleType = SIGNALING_CHANNEL_ROLE_TYPE_VIEWER;
channelInfo.cachingEndpoint = FALSE;
channelInfo.pCertPath = pSampleConfiguration->pCaCertPath;
channelInfo.pControlPlaneUrl = KINESIS_VIDEO_BETA_CONTROL_PLANE_URL;
CHK_STATUS(createSignalingClientSync(&clientInfo, &channelInfo, &signalingClientCallbacks,
pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle));
CHK_STATUS(createSignalingClientSync(&clientInfo,
&pSampleConfiguration->channelInfo,
&signalingClientCallbacks,
pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle));

// Initialize the peer connection
CHK_STATUS(initializePeerConnection(pSampleConfiguration));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ extern "C" {
#define STATUS_SIGNALING_NO_PAYLOAD_IN_MESSAGE STATUS_SIGNALING_BASE + 0x0000002A
#define STATUS_SIGNALING_DUPLICATE_MESSAGE_BEING_SENT STATUS_SIGNALING_BASE + 0x0000002B
#define STATUS_SIGNALING_ICE_TTL_LESS_THAN_GRACE_PERIOD STATUS_SIGNALING_BASE + 0x0000002C
#define STATUS_SIGNALING_DISCONNECTED_CALLBACK_FAILED STATUS_SIGNALING_BASE + 0x0000002D

//
// PeerConnection related errors starting from 0x5e000000
Expand Down Expand Up @@ -807,8 +808,11 @@ typedef struct {
// Endpoint caching TTL
UINT64 endpointCachingPeriod;

// Whether to continuously retry on failures
BOOL continuousRetry;
// Whether to retry the network calls on errors up to max retry times
BOOL retry;

// Whether to reconnect on connection dropped
BOOL reconnect;

// Number of tags associated with the stream
UINT32 tagCount;
Expand Down
6 changes: 4 additions & 2 deletions src/source/Signaling/ChannelInfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ STATUS createChannelInfo(PCHAR pChannelName,
SIGNALING_CHANNEL_ROLE_TYPE channelRoleType,
BOOL cachingEndpoint,
UINT64 endpointCachingPeriod,
BOOL continuousRetry,
BOOL retry,
BOOL reconnect,
UINT32 tagCount,
PTag pTags,
PChannelInfo* ppChannelInfo)
Expand Down Expand Up @@ -97,7 +98,8 @@ STATUS createChannelInfo(PCHAR pChannelName,
pChannelInfo->channelRoleType = channelRoleType;
pChannelInfo->cachingEndpoint = cachingEndpoint;
pChannelInfo->endpointCachingPeriod = endpointCachingPeriod;
pChannelInfo->continuousRetry = continuousRetry;
pChannelInfo->retry = retry;
pChannelInfo->reconnect = reconnect;
pChannelInfo->tagCount = tagCount;

// Set the current pointer to the end
Expand Down
2 changes: 1 addition & 1 deletion src/source/Signaling/ChannelInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extern "C" {
#define SIGNALING_CHANNEL_ROLE_TYPE_MASTER_STR (PCHAR) "MASTER"
#define SIGNALING_CHANNEL_ROLE_TYPE_VIEWER_STR (PCHAR) "VIEWER"

STATUS createChannelInfo(PCHAR, PCHAR, PCHAR, PCHAR, PCHAR, PCHAR, PCHAR, PCHAR, SIGNALING_CHANNEL_TYPE, SIGNALING_CHANNEL_ROLE_TYPE, BOOL, UINT64, BOOL, UINT32, PTag, PChannelInfo*);
STATUS createChannelInfo(PCHAR, PCHAR, PCHAR, PCHAR, PCHAR, PCHAR, PCHAR, PCHAR, SIGNALING_CHANNEL_TYPE, SIGNALING_CHANNEL_ROLE_TYPE, BOOL, UINT64, BOOL, BOOL, UINT32, PTag, PChannelInfo*);

/**
* Frees the channel info object.
Expand Down
47 changes: 41 additions & 6 deletions src/source/Signaling/LwsApiCalls.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ INT32 lwsHttpCallbackRoutine(struct lws *wsi, enum lws_callback_reasons reason,
UINT32 headerCount;
PRequestHeader pRequestHeader;

DLOGV("HTTPS callback with reason %d", reason);

customData = lws_get_opaque_user_data(wsi);
pLwsCallInfo = (PLwsCallInfo) customData;

Expand All @@ -40,8 +42,6 @@ INT32 lwsHttpCallbackRoutine(struct lws *wsi, enum lws_callback_reasons reason,
pRequestInfo = pLwsCallInfo->callInfo.pRequestInfo;
pBuffer = pLwsCallInfo->buffer + LWS_PRE;

DLOGV("HTTPS callback with reason %d", reason);

switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
pCurPtr = pDataIn == NULL ? "(None)" : (PCHAR) pDataIn;
Expand Down Expand Up @@ -215,6 +215,8 @@ INT32 lwsWssCallbackRoutine(struct lws *wsi, enum lws_callback_reasons reason,
PSignalingClient pSignalingClient = NULL;
SIZE_T offset, bufferSize;

DLOGV("WSS callback with reason %d", reason);

customData = lws_get_opaque_user_data(wsi);
pSignalingClient = (PSignalingClient) customData;

Expand All @@ -227,8 +229,6 @@ INT32 lwsWssCallbackRoutine(struct lws *wsi, enum lws_callback_reasons reason,
pLwsCallInfo = pSignalingClient->pOngoingCallInfo;
pRequestInfo = pLwsCallInfo->callInfo.pRequestInfo;

DLOGV("WSS callback with reason %d", reason);

switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
pCurPtr = pDataIn == NULL ? "(None)" : (PCHAR) pDataIn;
Expand Down Expand Up @@ -263,7 +263,7 @@ INT32 lwsWssCallbackRoutine(struct lws *wsi, enum lws_callback_reasons reason,
case LWS_CALLBACK_CLIENT_CLOSED:
DLOGD("Client WSS closed");

retValue = -1;
CHK_STATUS(terminateConnectionWithStatus(pSignalingClient, SERVICE_CALL_RESULT_OK));

break;

Expand Down Expand Up @@ -338,7 +338,7 @@ INT32 lwsWssCallbackRoutine(struct lws *wsi, enum lws_callback_reasons reason,
LWS_WRITE_TEXT);

if (size < 0) {
DLOGW("Write failed");
DLOGW("Write failed. Returned write size is %d", size);
// Quit
retValue = -1;
CHK(FALSE, retStatus);
Expand Down Expand Up @@ -1169,6 +1169,10 @@ PVOID lwsListenerHandler(PVOID args)
// Make a blocking call
CHK_STATUS(lwsCompleteSync(pLwsCallInfo));

// Fire the re-connector thread
CHK_STATUS(THREAD_CREATE(&pSignalingClient->restarterTid, reconnectHandler, (PVOID) pSignalingClient));
CHK_STATUS(THREAD_DETACH(pSignalingClient->restarterTid));

CleanUp:

if (STATUS_FAILED(retStatus) && pSignalingClient != NULL) {
Expand All @@ -1193,6 +1197,37 @@ PVOID lwsListenerHandler(PVOID args)
return (PVOID) (ULONG_PTR) retStatus;
}

PVOID reconnectHandler(PVOID args)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSignalingClient pSignalingClient = (PSignalingClient) args;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);

while (TRUE) {
// Check for a shutdown
CHK(!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown), retStatus);

retStatus = stepSignalingStateMachine(pSignalingClient, STATUS_SUCCESS);

// Break out of the loop and terminate the thread
CHK(STATUS_FAILED(retStatus), retStatus);

// Reset the retry count to allow to renew the same state
resetStateMachineRetryCount(pSignalingClient->pStateMachine);
}

CleanUp:

if (pSignalingClient != NULL) {
ATOMIC_STORE(&pSignalingClient->restarterTid, (SIZE_T) INVALID_TID_VALUE);
}

LEAVES();
return (PVOID) (ULONG_PTR) retStatus;
}

STATUS sendLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessageType, PCHAR peerClientId,
PCHAR pMessage, UINT32 messageLen, PCHAR pCorrelationId, UINT32 correlationIdLen)
{
Expand Down
7 changes: 7 additions & 0 deletions src/source/Signaling/LwsApiCalls.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ extern "C" {
#define SIGNALING_SERVICE_API_CALL_CONNECTION_TIMEOUT (2 * HUNDREDS_OF_NANOS_IN_A_SECOND)
#define SIGNALING_SERVICE_API_CALL_COMPLETION_TIMEOUT (5 * HUNDREDS_OF_NANOS_IN_A_SECOND)
#define SIGNALING_SERVICE_API_CALL_TIMEOUT_IN_SECONDS ((SIGNALING_SERVICE_API_CALL_CONNECTION_TIMEOUT + SIGNALING_SERVICE_API_CALL_COMPLETION_TIMEOUT) / HUNDREDS_OF_NANOS_IN_A_SECOND)
#define SIGNALING_SERVICE_TCP_KEEPALIVE_IN_SECONDS 3
#define SIGNALING_SERVICE_TCP_KEEPALIVE_PROBE_COUNT 3
#define SIGNALING_SERVICE_TCP_KEEPALIVE_PROBE_INTERVAL_IN_SECONDS 1
#define SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS 10

// Protocol indexes
#define PROTOCOL_INDEX_HTTPS 0
Expand Down Expand Up @@ -167,6 +171,9 @@ STATUS lwsCompleteSync(PLwsCallInfo);
// LWS listener handler
PVOID lwsListenerHandler(PVOID);

// Retry thread
PVOID reconnectHandler(PVOID);

// LWS callback routine
INT32 lwsHttpCallbackRoutine(struct lws*, enum lws_callback_reasons, PVOID, PVOID, size_t);
INT32 lwsWssCallbackRoutine(struct lws*, enum lws_callback_reasons, PVOID, PVOID, size_t);
Expand Down
Loading