From 60ac51d7f1302ef97c6daa60d4f1f174c691855f Mon Sep 17 00:00:00 2001 From: Muneeb Ahmed <54290492+muneebahmed10@users.noreply.github.com> Date: Fri, 16 Oct 2020 14:25:09 -0700 Subject: [PATCH] Add macro to break from potential infinite loops (#349) * Break from potentially infinite loops in connection sharing demo * Use QoS1 subscription for resilience against network disconnect Co-authored-by: Oscar Michael Abrina --- .../DemoTasks/MultitaskMQTTExample.c | 93 ++++++++++++++++++- 1 file changed, 89 insertions(+), 4 deletions(-) diff --git a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/DemoTasks/MultitaskMQTTExample.c b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/DemoTasks/MultitaskMQTTExample.c index 295778e47b2..c052a86c561 100644 --- a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/DemoTasks/MultitaskMQTTExample.c +++ b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/DemoTasks/MultitaskMQTTExample.c @@ -250,6 +250,18 @@ */ #define mqttexampleUNSUBSCRIBE_COMPLETE_BIT ( 1U << 1 ) +/** + * @brief The maximum number of loop iterations to wait before declaring failure. + * + * Each `while` loop waiting for a task notification will wait for a total + * number of ticks equal to `mqttexampleDEMO_TICKS_TO_WAIT` * this number of + * iterations before the loop exits. + * + * @note This value should not be too small, as the reason for a long loop + * may be a loss of network connection. + */ +#define mqttexampleMAX_WAIT_ITERATIONS ( 20 ) + /** * @brief Topic filter used by the subscriber task. */ @@ -823,7 +835,7 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, { pxResendSubscriptions[ j ].pTopicFilter = pxSubscriptions[ i ].pcSubscriptionFilter; pxResendSubscriptions[ j ].topicFilterLength = pxSubscriptions[ i ].usFilterLength; - pxResendSubscriptions[ j ].qos = MQTTQoS0; + pxResendSubscriptions[ j ].qos = MQTTQoS1; j++; } } @@ -1546,6 +1558,7 @@ void prvPublishTask( void * pvParameters ) char * payloadBuffers[ mqttexamplePUBLISH_COUNT ]; char * topicBuffers[ mqttexamplePUBLISH_COUNT ]; CommandContext_t * pxContexts[ mqttexamplePUBLISH_COUNT ] = { 0 }; + uint32_t ulWaitCounter = 0; /* We use QoS 1 so that the operation won't be counted as complete until we * receive the publish acknowledgment. */ @@ -1571,13 +1584,23 @@ void prvPublishTask( void * pvParameters ) xCommandAdded = prvAddCommandToQueue( &xCommand ); /* Ensure command was added to queue. */ configASSERT( xCommandAdded == pdTRUE ); + ulWaitCounter = 0; while( ( ulNotification & ( 1U << i ) ) != ( 1U << i ) ) { LogInfo( ( "Waiting for publish %d to complete.", i + 1 ) ); xTaskNotifyWait( 0, ( 1U << i ), &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); + + if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) + { + LogError( ( "Synchronous publish loop iteration %d" + " exceeded maximum wait time.", ( i + 1 ) ) ); + break; + } } + configASSERT( ( ulNotification & ( 1U << i ) ) == ( 1U << i ) ); + LogInfo( ( "Publish operation complete. Sleeping for %d ms.\n", mqttexamplePUBLISH_DELAY_SYNC_MS ) ); vTaskDelay( pdMS_TO_TICKS( mqttexamplePUBLISH_DELAY_SYNC_MS ) ); } @@ -1629,12 +1652,23 @@ void prvPublishTask( void * pvParameters ) continue; } + ulWaitCounter = 0; + while( ( ulNotification & ( 1U << i ) ) != ( 1U << i ) ) { LogInfo( ( "Waiting to free publish context %d.", i + 1 ) ); xTaskNotifyWait( 0, ( 1U << i ), &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); + + if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) + { + LogError( ( "Loop free iteration %d exceeded maximum" + " wait time.", ( i + 1 ) ) ); + break; + } } + configASSERT( ( ulNotification & ( 1U < i ) ) == ( 1U << i ) ); + vPortFree( pxContexts[ i ] ); vPortFree( topicBuffers[ i ] ); vPortFree( payloadBuffers[ i ] ); @@ -1666,11 +1700,12 @@ void prvSubscribeTask( void * pvParameters ) uint32_t ulNotification = 0; CommandContext_t xContext; PublishElement_t xReceivedPublish; + uint32_t ulWaitCounter = 0; /* The QoS does not affect when subscribe operations are marked completed - * as it does for publishes. Since the QoS does not impact this demo, we - * will use QoS 0, as it is the simplest. */ - xSubscribeInfo.qos = MQTTQoS0; + * as it does for publishes. However, we still use QoS 1 here so that the + * broker will resend publishes if there is a network disconnect. */ + xSubscribeInfo.qos = MQTTQoS1; xSubscribeInfo.pTopicFilter = mqttexampleSUBSCRIBE_TOPIC_FILTER; xSubscribeInfo.topicFilterLength = ( uint16_t ) strlen( xSubscribeInfo.pTopicFilter ); LogInfo( ( "Topic filter: %.*s", xSubscribeInfo.topicFilterLength, xSubscribeInfo.pTopicFilter ) ); @@ -1689,13 +1724,27 @@ void prvSubscribeTask( void * pvParameters ) /* Ensure command was added to queue. */ configASSERT( xCommandAdded == pdTRUE ); + /* This demo relies on the server processing our subscription before any publishes. + * Since this demo uses multiple tasks, we do not retry failed subscriptions, as the + * server has likely already processed our first publish, and so this demo will not + * complete successfully. */ while( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) != mqttexampleSUBSCRIBE_COMPLETE_BIT ) { LogInfo( ( "Waiting for subscribe operation to complete." ) ); xTaskNotifyWait( 0, mqttexampleSUBSCRIBE_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); + + if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) + { + LogError( ( "Subscribe Loop exceeded maximum wait time." ) ); + break; + } } + configASSERT( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) == mqttexampleSUBSCRIBE_COMPLETE_BIT ); + configASSERT( xContext.xReturnStatus == MQTTSuccess ); + LogInfo( ( "Operation wait complete.\n" ) ); + ulWaitCounter = 0; while( 1 ) { @@ -1712,6 +1761,8 @@ void prvSubscribeTask( void * pvParameters ) LogInfo( ( "Received publish on topic %.*s", pxReceivedPublish->topicNameLength, pxReceivedPublish->pTopicName ) ); LogInfo( ( "Message payload: %.*s\n", ( int ) pxReceivedPublish->payloadLength, ( const char * ) pxReceivedPublish->pPayload ) ); usNumReceived++; + /* Reset the wait counter every time a publish is received. */ + ulWaitCounter = 0; } /* Break if all publishes have been received. */ @@ -1720,6 +1771,17 @@ void prvSubscribeTask( void * pvParameters ) break; } + /* Break if we have been stuck in this loop for too long. The total wait + * here will be ( (loop delay + queue check delay) * `mqttexampleMAX_WAIT_ITERATIONS` ). + * For example, with a 1000 ms queue delay, a 400 ms loop delay, and a + * maximum iteration of 20, this will wait 28 seconds after receiving + * the last publish. */ + if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) + { + LogError( ( "Publish receive loop exceeded maximum wait time." ) ); + break; + } + LogInfo( ( "No messages queued, received %u publishes, sleeping for %d ms\n", usNumReceived, mqttexampleSUBSCRIBE_TASK_DELAY_MS ) ); @@ -1739,13 +1801,21 @@ void prvSubscribeTask( void * pvParameters ) /* Ensure command was added to queue. */ configASSERT( xCommandAdded == pdTRUE ); LogInfo( ( "Starting wait on operation\n" ) ); + ulWaitCounter = 0; while( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) != mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) { LogInfo( ( "Waiting for unsubscribe operation to complete." ) ); xTaskNotifyWait( 0, mqttexampleUNSUBSCRIBE_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); + + if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) + { + LogError( ( "Unsubscribe loop exceeded maximum wait time." ) ); + break; + } } + configASSERT( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) == mqttexampleUNSUBSCRIBE_COMPLETE_BIT ); LogInfo( ( "Operation wait complete.\n" ) ); /* Create command to stop command loop. */ @@ -1773,6 +1843,7 @@ static void prvMQTTDemoTask( void * pvParameters ) uint32_t ulNotification = 0; Command_t xCommand; MQTTStatus_t xMQTTStatus; + uint32_t ulWaitCounter = 0; ( void ) pvParameters; @@ -1838,6 +1909,7 @@ static void prvMQTTDemoTask( void * pvParameters ) LogInfo( ( "Running command loop" ) ); prvCommandLoop(); + ulWaitCounter = 0; /* Delete created tasks and queues. * Wait for subscriber task to exit before cleaning up. */ @@ -1845,15 +1917,28 @@ static void prvMQTTDemoTask( void * pvParameters ) { LogInfo( ( "Waiting for subscribe task to exit." ) ); xTaskNotifyWait( 0, mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); + + if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) + { + LogError( ( "Subscribe task exceeded maximum wait time." ) ); + break; + } } configASSERT( ( ulNotification & mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ) == mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ); + ulWaitCounter = 0; /* Wait for publishing task to exit before cleaning up. */ while( ( ulNotification & mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) != mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) { LogInfo( ( "Waiting for publish task to exit." ) ); xTaskNotifyWait( 0, mqttexamplePUBLISHER_TASK_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); + + if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) + { + LogError( ( "Publish task exceeded maximum wait time." ) ); + break; + } } configASSERT( ( ulNotification & mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) == mqttexamplePUBLISHER_TASK_COMPLETE_BIT );