Skip to content

Commit

Permalink
Add macro to break from potential infinite loops (FreeRTOS#349)
Browse files Browse the repository at this point in the history
* Break from potentially infinite loops in connection sharing demo

* Use QoS1 subscription for resilience against network disconnect

Co-authored-by: Oscar Michael Abrina <[email protected]>
  • Loading branch information
muneebahmed10 and yourslab authored Oct 16, 2020
1 parent 15a7337 commit 60ac51d
Showing 1 changed file with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@
*/
#define mqttexampleUNSUBSCRIBE_COMPLETE_BIT ( 1U << 1 )

/**
* @brief The maximum number of loop iterations to wait before declaring failure.
*
* Each `while` loop waiting for a task notification will wait for a total
* number of ticks equal to `mqttexampleDEMO_TICKS_TO_WAIT` * this number of
* iterations before the loop exits.
*
* @note This value should not be too small, as the reason for a long loop
* may be a loss of network connection.
*/
#define mqttexampleMAX_WAIT_ITERATIONS ( 20 )

/**
* @brief Topic filter used by the subscriber task.
*/
Expand Down Expand Up @@ -823,7 +835,7 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext,
{
pxResendSubscriptions[ j ].pTopicFilter = pxSubscriptions[ i ].pcSubscriptionFilter;
pxResendSubscriptions[ j ].topicFilterLength = pxSubscriptions[ i ].usFilterLength;
pxResendSubscriptions[ j ].qos = MQTTQoS0;
pxResendSubscriptions[ j ].qos = MQTTQoS1;
j++;
}
}
Expand Down Expand Up @@ -1546,6 +1558,7 @@ void prvPublishTask( void * pvParameters )
char * payloadBuffers[ mqttexamplePUBLISH_COUNT ];
char * topicBuffers[ mqttexamplePUBLISH_COUNT ];
CommandContext_t * pxContexts[ mqttexamplePUBLISH_COUNT ] = { 0 };
uint32_t ulWaitCounter = 0;

/* We use QoS 1 so that the operation won't be counted as complete until we
* receive the publish acknowledgment. */
Expand All @@ -1571,13 +1584,23 @@ void prvPublishTask( void * pvParameters )
xCommandAdded = prvAddCommandToQueue( &xCommand );
/* Ensure command was added to queue. */
configASSERT( xCommandAdded == pdTRUE );
ulWaitCounter = 0;

while( ( ulNotification & ( 1U << i ) ) != ( 1U << i ) )
{
LogInfo( ( "Waiting for publish %d to complete.", i + 1 ) );
xTaskNotifyWait( 0, ( 1U << i ), &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT );

if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError( ( "Synchronous publish loop iteration %d"
" exceeded maximum wait time.", ( i + 1 ) ) );
break;
}
}

configASSERT( ( ulNotification & ( 1U << i ) ) == ( 1U << i ) );

LogInfo( ( "Publish operation complete. Sleeping for %d ms.\n", mqttexamplePUBLISH_DELAY_SYNC_MS ) );
vTaskDelay( pdMS_TO_TICKS( mqttexamplePUBLISH_DELAY_SYNC_MS ) );
}
Expand Down Expand Up @@ -1629,12 +1652,23 @@ void prvPublishTask( void * pvParameters )
continue;
}

ulWaitCounter = 0;

while( ( ulNotification & ( 1U << i ) ) != ( 1U << i ) )
{
LogInfo( ( "Waiting to free publish context %d.", i + 1 ) );
xTaskNotifyWait( 0, ( 1U << i ), &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT );

if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError( ( "Loop free iteration %d exceeded maximum"
" wait time.", ( i + 1 ) ) );
break;
}
}

configASSERT( ( ulNotification & ( 1U < i ) ) == ( 1U << i ) );

vPortFree( pxContexts[ i ] );
vPortFree( topicBuffers[ i ] );
vPortFree( payloadBuffers[ i ] );
Expand Down Expand Up @@ -1666,11 +1700,12 @@ void prvSubscribeTask( void * pvParameters )
uint32_t ulNotification = 0;
CommandContext_t xContext;
PublishElement_t xReceivedPublish;
uint32_t ulWaitCounter = 0;

/* The QoS does not affect when subscribe operations are marked completed
* as it does for publishes. Since the QoS does not impact this demo, we
* will use QoS 0, as it is the simplest. */
xSubscribeInfo.qos = MQTTQoS0;
* as it does for publishes. However, we still use QoS 1 here so that the
* broker will resend publishes if there is a network disconnect. */
xSubscribeInfo.qos = MQTTQoS1;
xSubscribeInfo.pTopicFilter = mqttexampleSUBSCRIBE_TOPIC_FILTER;
xSubscribeInfo.topicFilterLength = ( uint16_t ) strlen( xSubscribeInfo.pTopicFilter );
LogInfo( ( "Topic filter: %.*s", xSubscribeInfo.topicFilterLength, xSubscribeInfo.pTopicFilter ) );
Expand All @@ -1689,13 +1724,27 @@ void prvSubscribeTask( void * pvParameters )
/* Ensure command was added to queue. */
configASSERT( xCommandAdded == pdTRUE );

/* This demo relies on the server processing our subscription before any publishes.
* Since this demo uses multiple tasks, we do not retry failed subscriptions, as the
* server has likely already processed our first publish, and so this demo will not
* complete successfully. */
while( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) != mqttexampleSUBSCRIBE_COMPLETE_BIT )
{
LogInfo( ( "Waiting for subscribe operation to complete." ) );
xTaskNotifyWait( 0, mqttexampleSUBSCRIBE_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT );

if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError( ( "Subscribe Loop exceeded maximum wait time." ) );
break;
}
}

configASSERT( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) == mqttexampleSUBSCRIBE_COMPLETE_BIT );
configASSERT( xContext.xReturnStatus == MQTTSuccess );

LogInfo( ( "Operation wait complete.\n" ) );
ulWaitCounter = 0;

while( 1 )
{
Expand All @@ -1712,6 +1761,8 @@ void prvSubscribeTask( void * pvParameters )
LogInfo( ( "Received publish on topic %.*s", pxReceivedPublish->topicNameLength, pxReceivedPublish->pTopicName ) );
LogInfo( ( "Message payload: %.*s\n", ( int ) pxReceivedPublish->payloadLength, ( const char * ) pxReceivedPublish->pPayload ) );
usNumReceived++;
/* Reset the wait counter every time a publish is received. */
ulWaitCounter = 0;
}

/* Break if all publishes have been received. */
Expand All @@ -1720,6 +1771,17 @@ void prvSubscribeTask( void * pvParameters )
break;
}

/* Break if we have been stuck in this loop for too long. The total wait
* here will be ( (loop delay + queue check delay) * `mqttexampleMAX_WAIT_ITERATIONS` ).
* For example, with a 1000 ms queue delay, a 400 ms loop delay, and a
* maximum iteration of 20, this will wait 28 seconds after receiving
* the last publish. */
if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError( ( "Publish receive loop exceeded maximum wait time." ) );
break;
}

LogInfo( ( "No messages queued, received %u publishes, sleeping for %d ms\n",
usNumReceived,
mqttexampleSUBSCRIBE_TASK_DELAY_MS ) );
Expand All @@ -1739,13 +1801,21 @@ void prvSubscribeTask( void * pvParameters )
/* Ensure command was added to queue. */
configASSERT( xCommandAdded == pdTRUE );
LogInfo( ( "Starting wait on operation\n" ) );
ulWaitCounter = 0;

while( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) != mqttexampleUNSUBSCRIBE_COMPLETE_BIT )
{
LogInfo( ( "Waiting for unsubscribe operation to complete." ) );
xTaskNotifyWait( 0, mqttexampleUNSUBSCRIBE_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT );

if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError( ( "Unsubscribe loop exceeded maximum wait time." ) );
break;
}
}

configASSERT( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) == mqttexampleUNSUBSCRIBE_COMPLETE_BIT );
LogInfo( ( "Operation wait complete.\n" ) );

/* Create command to stop command loop. */
Expand Down Expand Up @@ -1773,6 +1843,7 @@ static void prvMQTTDemoTask( void * pvParameters )
uint32_t ulNotification = 0;
Command_t xCommand;
MQTTStatus_t xMQTTStatus;
uint32_t ulWaitCounter = 0;

( void ) pvParameters;

Expand Down Expand Up @@ -1838,22 +1909,36 @@ static void prvMQTTDemoTask( void * pvParameters )

LogInfo( ( "Running command loop" ) );
prvCommandLoop();
ulWaitCounter = 0;

/* Delete created tasks and queues.
* Wait for subscriber task to exit before cleaning up. */
while( ( ulNotification & mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ) != mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT )
{
LogInfo( ( "Waiting for subscribe task to exit." ) );
xTaskNotifyWait( 0, mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT );

if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError( ( "Subscribe task exceeded maximum wait time." ) );
break;
}
}

configASSERT( ( ulNotification & mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ) == mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT );
ulWaitCounter = 0;

/* Wait for publishing task to exit before cleaning up. */
while( ( ulNotification & mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) != mqttexamplePUBLISHER_TASK_COMPLETE_BIT )
{
LogInfo( ( "Waiting for publish task to exit." ) );
xTaskNotifyWait( 0, mqttexamplePUBLISHER_TASK_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT );

if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError( ( "Publish task exceeded maximum wait time." ) );
break;
}
}

configASSERT( ( ulNotification & mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) == mqttexamplePUBLISHER_TASK_COMPLETE_BIT );
Expand Down

0 comments on commit 60ac51d

Please sign in to comment.