From 86fc7d15e3af13690106745f52e79cc07a957d66 Mon Sep 17 00:00:00 2001 From: Dakshit Babbar <100972343+DakshitBabbar@users.noreply.github.com> Date: Fri, 27 Sep 2024 09:30:00 +0530 Subject: [PATCH] MQTT Connection Status Thread Safety (#305) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Description ----------- Following is a brief summary of changes: 1. Check the connected flag before doing any send operation on the connection 2. Make all the APIs that do send operations, thread safe 3. Update the connected flag within MQTT_Disconnect regardless of the return status of the send operation Following are the specifics of the changes: 1. Add 3 new MQTTStatus_t values: MQTTStatusConnected, MQTTStatusNotConnected and MQTTStatusDisconnectPending 2. Added 1 new MQTTConnectionStatus_t value: MQTTDisconnectPending 3. Update the MQTT_Status_strerror function to handle the new MQTTStatus_t values 4. Add a new API function MQTT_CheckConnectStatus() that will check the value of the context→connectStatus flag safely. 5. Add this API to the core_mqtt.h file to make it available to users 6. Check the connected flag before doing any Send operation (following API's are updated) a. sendPublishAcks b. MQTT_Connect c. MQTT_Subscribe d. MQTT_Publish e. MQTT_Ping f. MQTT_Unsubscribe g. MQTT_Disconnect 7. Use the MQTT_PRE_STATE_UPDATE_HOOK() and MQTT_POST_STATE_UPDATE_HOOK() to make the send APIs thread safe 8. The connect status is set to MQTTDisconnectPending whenever a transport send or receive function returns a negative error code 9. `const` keyword for the the MQTTStatus_t is removed in the input parameters for the receive functions as we need to update the connection status when the receive function returns a negative error code Relevant Explanations --------------- - MQTT_PRE_SEND_HOOK(): The Pre and Post Send hook Macros are not required now, as the sending logic will be within the pre and post state update hook itself. (because we cannot allow other threads to change the connection state of the application until a send operation is complete). - I have split the handleSessionResumption function. The part of that function which was handling the clean session has been added within the mutex calls in the [MQTT_Connect API](https://github.com/FreeRTOS/coreMQTT/pull/305/files#diff-2534a3c0229ae9af3801f2a5c6a24eeef2cd0a686671f0371a11d2718ba4fdd6R2828) and the unclean session part is handled by this new function that is [called outside the mutex calls](https://github.com/FreeRTOS/coreMQTT/pull/305/files#diff-2534a3c0229ae9af3801f2a5c6a24eeef2cd0a686671f0371a11d2718ba4fdd6R2866). Pending Tasks --------------- - [ ] Doxygen example for the new API - [x] Unit Test Updates - [x] CBMC Proof --------- Co-authored-by: Dakshit Babbar Co-authored-by: GitHub Action --- docs/doxygen/include/size_table.md | 8 +- source/core_mqtt.c | 554 ++++++++++++++++-------- source/include/core_mqtt.h | 24 +- source/include/core_mqtt_serializer.h | 5 +- test/unit-test/core_mqtt_utest.c | 581 +++++++++++++++++++++++++- tools/cmock/coverage.cmake | 5 +- 6 files changed, 977 insertions(+), 200 deletions(-) diff --git a/docs/doxygen/include/size_table.md b/docs/doxygen/include/size_table.md index 10de57852..1fa92ff64 100644 --- a/docs/doxygen/include/size_table.md +++ b/docs/doxygen/include/size_table.md @@ -9,8 +9,8 @@ core_mqtt.c -
4.1K
-
3.5K
+
4.4K
+
3.8K
core_mqtt_state.c @@ -24,7 +24,7 @@ Total estimates -
8.6K
-
7.0K
+
8.9K
+
7.3K
diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 56ea23450..23324974d 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -266,7 +266,7 @@ static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ); * * @return Number of bytes received, or negative number on network error. */ -static int32_t recvExact( const MQTTContext_t * pContext, +static int32_t recvExact( MQTTContext_t * pContext, size_t bytesToRecv ); /** @@ -278,7 +278,7 @@ static int32_t recvExact( const MQTTContext_t * pContext, * * @return #MQTTRecvFailed or #MQTTNoDataAvailable. */ -static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, +static MQTTStatus_t discardPacket( MQTTContext_t * pContext, size_t remainingLength, uint32_t timeoutMs ); @@ -302,7 +302,7 @@ static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext, * * @return #MQTTSuccess or #MQTTRecvFailed. */ -static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, +static MQTTStatus_t receivePacket( MQTTContext_t * pContext, MQTTPacketInfo_t incomingPacket, uint32_t remainingTimeMs ); @@ -424,25 +424,28 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC * ##MQTTRecvFailed if transport recv failed; * #MQTTSuccess otherwise. */ -static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, +static MQTTStatus_t receiveConnack( MQTTContext_t * pContext, uint32_t timeoutMs, bool cleanSession, MQTTPacketInfo_t * pIncomingPacket, bool * pSessionPresent ); /** - * @brief Resends pending acks for a re-established MQTT session, or - * clears existing state records for a clean session. + * @brief Resends pending acks for a re-established MQTT session * * @param[in] pContext Initialized MQTT context. - * @param[in] sessionPresent Session present flag received from the MQTT broker. * * @return #MQTTSendFailed if transport send during resend failed; * #MQTTSuccess otherwise. */ -static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext, - bool sessionPresent ); +static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext ); +/** + * @brief Clears existing state records for a clean session. + * + * @param[in] pContext Initialized MQTT context. + */ +static void handleCleanSession( MQTTContext_t * pContext ); /** * @brief Send the publish packet without copying the topic string and payload in @@ -823,6 +826,11 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, { bytesSentOrError = sendResult; LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) ); + + if( pContext->connectStatus == MQTTConnected ) + { + pContext->connectStatus = MQTTDisconnectPending; + } } else { @@ -902,6 +910,11 @@ static int32_t sendBuffer( MQTTContext_t * pContext, { bytesSentOrError = sendResult; LogError( ( "sendBuffer: Unable to send packet: Network Error." ) ); + + if( pContext->connectStatus == MQTTConnected ) + { + pContext->connectStatus = MQTTDisconnectPending; + } } else { @@ -961,7 +974,7 @@ static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ) /*-----------------------------------------------------------*/ -static int32_t recvExact( const MQTTContext_t * pContext, +static int32_t recvExact( MQTTContext_t * pContext, size_t bytesToRecv ) { uint8_t * pIndex = NULL; @@ -997,6 +1010,15 @@ static int32_t recvExact( const MQTTContext_t * pContext, ( long int ) bytesRecvd ) ); totalBytesRecvd = bytesRecvd; receiveError = true; + + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + if( pContext->connectStatus == MQTTConnected ) + { + pContext->connectStatus = MQTTDisconnectPending; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } else if( bytesRecvd > 0 ) { @@ -1038,7 +1060,7 @@ static int32_t recvExact( const MQTTContext_t * pContext, /*-----------------------------------------------------------*/ -static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, +static MQTTStatus_t discardPacket( MQTTContext_t * pContext, size_t remainingLength, uint32_t timeoutMs ) { @@ -1174,7 +1196,7 @@ static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ -static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, +static MQTTStatus_t receivePacket( MQTTContext_t * pContext, MQTTPacketInfo_t incomingPacket, uint32_t remainingTimeMs ) { @@ -1264,6 +1286,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, uint8_t packetTypeByte = 0U; MQTTPubAckType_t packetType; MQTTFixedBuffer_t localBuffer; + MQTTConnectionStatus_t connectStatus; uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ]; localBuffer.pBuffer = pubAckPacket; @@ -1283,18 +1306,33 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - MQTT_PRE_SEND_HOOK( pContext ); + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); - /* Here, we are not using the vector approach for efficiency. There is just one buffer - * to be sent which can be achieved with a normal send call. */ - sendResult = sendBuffer( pContext, - localBuffer.pBuffer, - MQTT_PUBLISH_ACK_PACKET_SIZE ); + connectStatus = pContext->connectStatus; + + if( connectStatus != MQTTConnected ) + { + status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending; + } + + if( status == MQTTSuccess ) + { + /* Here, we are not using the vector approach for efficiency. There is just one buffer + * to be sent which can be achieved with a normal send call. */ + sendResult = sendBuffer( pContext, + localBuffer.pBuffer, + MQTT_PUBLISH_ACK_PACKET_SIZE ); - MQTT_POST_SEND_HOOK( pContext ); + if( sendResult < ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) + { + status = MQTTSendFailed; + } + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } - if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) + if( status == MQTTSuccess ) { pContext->controlPacketSent = true; @@ -1320,7 +1358,6 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, "PacketSize=%lu.", ( unsigned int ) packetTypeByte, ( long int ) sendResult, MQTT_PUBLISH_ACK_PACKET_SIZE ) ); - status = MQTTSendFailed; } } @@ -1680,6 +1717,15 @@ static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext, { /* The receive function has failed. Bubble up the error up to the user. */ status = MQTTRecvFailed; + + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + if( pContext->connectStatus == MQTTConnected ) + { + pContext->connectStatus = MQTTDisconnectPending; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } else if( ( recvBytes == 0 ) && ( pContext->index == 0U ) ) { @@ -2301,7 +2347,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ -static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, +static MQTTStatus_t receiveConnack( MQTTContext_t * pContext, uint32_t timeoutMs, bool cleanSession, MQTTPacketInfo_t * pIncomingPacket, @@ -2425,8 +2471,7 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, /*-----------------------------------------------------------*/ -static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext, - bool sessionPresent ) +static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext ) { MQTTStatus_t status = MQTTSuccess; MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; @@ -2435,43 +2480,43 @@ static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext, assert( pContext != NULL ); - /* Reset the index and clear the buffer when a new session is established. */ - pContext->index = 0; - ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); + /* Get the next packet ID for which a PUBREL need to be resent. */ + packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); - if( sessionPresent == true ) + /* Resend all the PUBREL acks after session is reestablished. */ + while( ( packetId != MQTT_PACKET_ID_INVALID ) && + ( status == MQTTSuccess ) ) { - /* Get the next packet ID for which a PUBREL need to be resent. */ + status = sendPublishAcks( pContext, packetId, state ); + packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); + } - /* Resend all the PUBREL acks after session is reestablished. */ - while( ( packetId != MQTT_PACKET_ID_INVALID ) && - ( status == MQTTSuccess ) ) - { - status = sendPublishAcks( pContext, packetId, state ); + return status; +} - packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); - } - } - else +static void handleCleanSession( MQTTContext_t * pContext ) +{ + assert( pContext != NULL ); + + /* Reset the index and clear the buffer when a new session is established. */ + pContext->index = 0; + ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); + + if( pContext->outgoingPublishRecordMaxCount > 0U ) { /* Clear any existing records if a new session is established. */ - if( pContext->outgoingPublishRecordMaxCount > 0U ) - { - ( void ) memset( pContext->outgoingPublishRecords, - 0x00, - pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) ); - } - - if( pContext->incomingPublishRecordMaxCount > 0U ) - { - ( void ) memset( pContext->incomingPublishRecords, - 0x00, - pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) ); - } + ( void ) memset( pContext->outgoingPublishRecords, + 0x00, + pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) ); } - return status; + if( pContext->incomingPublishRecordMaxCount > 0U ) + { + ( void ) memset( pContext->incomingPublishRecords, + 0x00, + pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) ); + } } static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, @@ -2668,6 +2713,47 @@ MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext, /*-----------------------------------------------------------*/ +MQTTStatus_t MQTT_CheckConnectStatus( MQTTContext_t * pContext ) +{ + MQTTConnectionStatus_t connectStatus; + MQTTStatus_t status = MQTTSuccess; + + if( pContext == NULL ) + { + LogError( ( "Argument cannot be NULL: pContext=%p", + ( void * ) pContext ) ); + status = MQTTBadParameter; + } + + if( status == MQTTSuccess ) + { + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + connectStatus = pContext->connectStatus; + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); + + switch( connectStatus ) + { + case MQTTConnected: + status = MQTTStatusConnected; + break; + + case MQTTDisconnectPending: + status = MQTTStatusDisconnectPending; + break; + + default: + status = MQTTStatusNotConnected; + break; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, @@ -2677,6 +2763,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, size_t remainingLength = 0UL, packetSize = 0UL; MQTTStatus_t status = MQTTSuccess; MQTTPacketInfo_t incomingPacket = { 0 }; + MQTTConnectionStatus_t connectStatus; incomingPacket.type = ( uint8_t ) 0; @@ -2704,45 +2791,89 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - MQTT_PRE_SEND_HOOK( pContext ); + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); - status = sendConnectWithoutCopy( pContext, - pConnectInfo, - pWillInfo, - remainingLength ); + connectStatus = pContext->connectStatus; - MQTT_POST_SEND_HOOK( pContext ); - } + if( connectStatus != MQTTNotConnected ) + { + status = ( connectStatus == MQTTConnected ) ? MQTTStatusConnected : MQTTStatusDisconnectPending; + } - /* Read CONNACK from transport layer. */ - if( status == MQTTSuccess ) - { - status = receiveConnack( pContext, - timeoutMs, - pConnectInfo->cleanSession, - &incomingPacket, - pSessionPresent ); + if( status == MQTTSuccess ) + { + status = sendConnectWithoutCopy( pContext, + pConnectInfo, + pWillInfo, + remainingLength ); + } + + /* Read CONNACK from transport layer. */ + if( status == MQTTSuccess ) + { + status = receiveConnack( pContext, + timeoutMs, + pConnectInfo->cleanSession, + &incomingPacket, + pSessionPresent ); + } + + if( ( status == MQTTSuccess ) && ( *pSessionPresent != true ) ) + { + handleCleanSession( pContext ); + } + + if( status == MQTTSuccess ) + { + pContext->connectStatus = MQTTConnected; + /* Initialize keep-alive fields after a successful connection. */ + pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds; + pContext->waitingForPingResp = false; + pContext->pingReqSendTimeMs = 0U; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } - if( status == MQTTSuccess ) + if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) ) { - /* Resend PUBRELs when reestablishing a session, or clear records for new sessions. */ - status = handleSessionResumption( pContext, *pSessionPresent ); + /* Resend PUBRELs when reestablishing a session */ + status = handleUncleanSessionResumption( pContext ); } if( status == MQTTSuccess ) { LogInfo( ( "MQTT connection established with the broker." ) ); - pContext->connectStatus = MQTTConnected; - /* Initialize keep-alive fields after a successful connection. */ - pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds; - pContext->waitingForPingResp = false; - pContext->pingReqSendTimeMs = 0U; + } + else if( ( status == MQTTStatusConnected ) || ( status == MQTTStatusDisconnectPending ) ) + { + LogInfo( ( "MQTT Connection is either already established or a disconnect is pending, return status = %s.", + MQTT_Status_strerror( status ) ) ); + } + else if( pContext == NULL ) + { + LogError( ( "MQTT connection failed with status = %s.", + MQTT_Status_strerror( status ) ) ); } else { LogError( ( "MQTT connection failed with status = %s.", MQTT_Status_strerror( status ) ) ); + + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + if( pContext->connectStatus == MQTTConnected ) + { + /* This will only be executed if after the connack is received + * the retransmits fail for some reason on an unclean session + * connection. In this case we need to retry the re-transmits + * which can only be done using the connect API and that can only + * be done once we are disconnected, hence we ask the user to + * call disconnect here */ + pContext->connectStatus = MQTTDisconnectPending; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } return status; @@ -2755,6 +2886,7 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, size_t subscriptionCount, uint16_t packetId ) { + MQTTConnectionStatus_t connectStatus; size_t remainingLength = 0UL, packetSize = 0UL; /* Validate arguments. */ @@ -2777,16 +2909,26 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - MQTT_PRE_SEND_HOOK( pContext ); + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + connectStatus = pContext->connectStatus; + + if( connectStatus != MQTTConnected ) + { + status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending; + } - /* Send MQTT SUBSCRIBE packet. */ - status = sendSubscribeWithoutCopy( pContext, - pSubscriptionList, - subscriptionCount, - packetId, - remainingLength ); + if( status == MQTTSuccess ) + { + /* Send MQTT SUBSCRIBE packet. */ + status = sendSubscribeWithoutCopy( pContext, + pSubscriptionList, + subscriptionCount, + packetId, + remainingLength ); + } - MQTT_POST_SEND_HOOK( pContext ); + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } return status; @@ -2802,7 +2944,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, size_t remainingLength = 0UL; size_t packetSize = 0UL; MQTTPublishState_t publishStatus = MQTTStateNull; - bool stateUpdateHookExecuted = false; + MQTTConnectionStatus_t connectStatus; /* Maximum number of bytes required by the 'fixed' part of the PUBLISH * packet header according to the MQTT specifications. @@ -2836,67 +2978,71 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, &headerSize ); } - if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) + if( status == MQTTSuccess ) { + /* Take the mutex as multiple send calls are required for sending this + * packet. */ MQTT_PRE_STATE_UPDATE_HOOK( pContext ); - /* Set the flag so that the corresponding hook can be called later. */ - stateUpdateHookExecuted = true; + connectStatus = pContext->connectStatus; - status = MQTT_ReserveState( pContext, - packetId, - pPublishInfo->qos ); - - /* State already exists for a duplicate packet. - * If a state doesn't exist, it will be handled as a new publish in - * state engine. */ - if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) ) + if( connectStatus != MQTTConnected ) { - status = MQTTSuccess; + status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending; } - } - if( status == MQTTSuccess ) - { - /* Take the mutex as multiple send calls are required for sending this - * packet. */ - MQTT_PRE_SEND_HOOK( pContext ); + if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) + { + /* Set the flag so that the corresponding hook can be called later. */ - status = sendPublishWithoutCopy( pContext, - pPublishInfo, - mqttHeader, - headerSize, - packetId ); + status = MQTT_ReserveState( pContext, + packetId, + pPublishInfo->qos ); - /* Give the mutex away for the next taker. */ - MQTT_POST_SEND_HOOK( pContext ); - } + /* State already exists for a duplicate packet. + * If a state doesn't exist, it will be handled as a new publish in + * state engine. */ + if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) ) + { + status = MQTTSuccess; + } + } - if( ( status == MQTTSuccess ) && - ( pPublishInfo->qos > MQTTQoS0 ) ) - { - /* Update state machine after PUBLISH is sent. - * Only to be done for QoS1 or QoS2. */ - status = MQTT_UpdateStatePublish( pContext, - packetId, - MQTT_SEND, - pPublishInfo->qos, - &publishStatus ); + if( status == MQTTSuccess ) + { + status = sendPublishWithoutCopy( pContext, + pPublishInfo, + mqttHeader, + headerSize, + packetId ); + } - if( status != MQTTSuccess ) + if( ( status == MQTTSuccess ) && + ( pPublishInfo->qos > MQTTQoS0 ) ) { - LogError( ( "Update state for publish failed with status %s." - " However PUBLISH packet was sent to the broker." - " Any further handling of ACKs for the packet Id" - " will fail.", - MQTT_Status_strerror( status ) ) ); + /* Update state machine after PUBLISH is sent. + * Only to be done for QoS1 or QoS2. */ + status = MQTT_UpdateStatePublish( pContext, + packetId, + MQTT_SEND, + pPublishInfo->qos, + &publishStatus ); + + if( status != MQTTSuccess ) + { + LogError( ( "Update state for publish failed with status %s." + " However PUBLISH packet was sent to the broker." + " Any further handling of ACKs for the packet Id" + " will fail.", + MQTT_Status_strerror( status ) ) ); + } } - } - if( stateUpdateHookExecuted == true ) - { - /* Regardless of the status, if the mutex was taken due to the - * packet being of QoS > QoS0, then it should be relinquished. */ + /* mutex should be released and not before updating the state + * because we need to make sure that the state is updated + * after sending the publish packet, before the receive + * loop receives ack for this and would want to update its state + */ MQTT_POST_STATE_UPDATE_HOOK( pContext ); } @@ -2919,6 +3065,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) /* MQTT ping packets are of fixed length. */ uint8_t pingreqPacket[ 2U ]; MQTTFixedBuffer_t localBuffer; + MQTTConnectionStatus_t connectStatus; localBuffer.pBuffer = pingreqPacket; localBuffer.size = sizeof( pingreqPacket ); @@ -2956,32 +3103,42 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) { /* Take the mutex as the send call should not be interrupted in * between. */ - MQTT_PRE_SEND_HOOK( pContext ); - /* Send the serialized PINGREQ packet to transport layer. - * Here, we do not use the vectored IO approach for efficiency as the - * Ping packet does not have numerous fields which need to be copied - * from the user provided buffers. Thus it can be sent directly. */ - sendResult = sendBuffer( pContext, - localBuffer.pBuffer, - packetSize ); + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); - /* Give the mutex away. */ - MQTT_POST_SEND_HOOK( pContext ); + connectStatus = pContext->connectStatus; - /* It is an error to not send the entire PINGREQ packet. */ - if( sendResult < ( int32_t ) packetSize ) + if( connectStatus != MQTTConnected ) { - LogError( ( "Transport send failed for PINGREQ packet." ) ); - status = MQTTSendFailed; + status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending; } - else + + if( status == MQTTSuccess ) { - pContext->pingReqSendTimeMs = pContext->lastPacketTxTime; - pContext->waitingForPingResp = true; - LogDebug( ( "Sent %ld bytes of PINGREQ packet.", - ( long int ) sendResult ) ); + /* Send the serialized PINGREQ packet to transport layer. + * Here, we do not use the vectored IO approach for efficiency as the + * Ping packet does not have numerous fields which need to be copied + * from the user provided buffers. Thus it can be sent directly. */ + sendResult = sendBuffer( pContext, + localBuffer.pBuffer, + packetSize ); + + /* It is an error to not send the entire PINGREQ packet. */ + if( sendResult < ( int32_t ) packetSize ) + { + LogError( ( "Transport send failed for PINGREQ packet." ) ); + status = MQTTSendFailed; + } + else + { + pContext->pingReqSendTimeMs = pContext->lastPacketTxTime; + pContext->waitingForPingResp = true; + LogDebug( ( "Sent %ld bytes of PINGREQ packet.", + ( long int ) sendResult ) ); + } } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } return status; @@ -2994,6 +3151,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, size_t subscriptionCount, uint16_t packetId ) { + MQTTConnectionStatus_t connectStatus; size_t remainingLength = 0UL, packetSize = 0UL; /* Validate arguments. */ @@ -3017,16 +3175,25 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, if( status == MQTTSuccess ) { /* Take the mutex because the below call should not be interrupted. */ - MQTT_PRE_SEND_HOOK( pContext ); + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); - status = sendUnsubscribeWithoutCopy( pContext, - pSubscriptionList, - subscriptionCount, - packetId, - remainingLength ); + connectStatus = pContext->connectStatus; + + if( connectStatus != MQTTConnected ) + { + status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending; + } - /* Give the mutex away. */ - MQTT_POST_SEND_HOOK( pContext ); + if( status == MQTTSuccess ) + { + status = sendUnsubscribeWithoutCopy( pContext, + pSubscriptionList, + subscriptionCount, + packetId, + remainingLength ); + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } return status; @@ -3041,6 +3208,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) MQTTStatus_t status = MQTTSuccess; MQTTFixedBuffer_t localBuffer; uint8_t disconnectPacket[ 2U ]; + MQTTConnectionStatus_t connectStatus; localBuffer.pBuffer = disconnectPacket; localBuffer.size = 2U; @@ -3069,38 +3237,46 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) if( status == MQTTSuccess ) { /* Take the mutex because the below call should not be interrupted. */ - MQTT_PRE_SEND_HOOK( pContext ); - - /* Here we do not use vectors as the disconnect packet has fixed fields - * which do not reside in user provided buffers. Thus, it can be sent - * using a simple send call. */ - sendResult = sendBuffer( pContext, - localBuffer.pBuffer, - packetSize ); + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); - /* Give the mutex away. */ - MQTT_POST_SEND_HOOK( pContext ); + connectStatus = pContext->connectStatus; - if( sendResult < ( int32_t ) packetSize ) + if( connectStatus == MQTTNotConnected ) { - LogError( ( "Transport send failed for DISCONNECT packet." ) ); - status = MQTTSendFailed; + status = MQTTStatusNotConnected; } - else + + if( status == MQTTSuccess ) { - LogDebug( ( "Sent %ld bytes of DISCONNECT packet.", - ( long int ) sendResult ) ); - } - } + LogInfo( ( "Disconnected from the broker." ) ); + pContext->connectStatus = MQTTNotConnected; - if( status == MQTTSuccess ) - { - LogInfo( ( "Disconnected from the broker." ) ); - pContext->connectStatus = MQTTNotConnected; + /* Reset the index and clean the buffer on a successful disconnect. */ + pContext->index = 0; + ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); + + LogError( ( "MQTT Connection Disconnected Successfully" ) ); + + /* Here we do not use vectors as the disconnect packet has fixed fields + * which do not reside in user provided buffers. Thus, it can be sent + * using a simple send call. */ + sendResult = sendBuffer( pContext, + localBuffer.pBuffer, + packetSize ); - /* Reset the index and clean the buffer on a successful disconnect. */ - pContext->index = 0; - ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); + if( sendResult < ( int32_t ) packetSize ) + { + LogError( ( "Transport send failed for DISCONNECT packet." ) ); + status = MQTTSendFailed; + } + else + { + LogDebug( ( "Sent %ld bytes of DISCONNECT packet.", + ( long int ) sendResult ) ); + } + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } return status; @@ -3372,6 +3548,18 @@ const char * MQTT_Status_strerror( MQTTStatus_t status ) str = "MQTTNeedMoreBytes"; break; + case MQTTStatusConnected: + str = "MQTTStatusConnected"; + break; + + case MQTTStatusNotConnected: + str = "MQTTStatusNotConnected"; + break; + + case MQTTStatusDisconnectPending: + str = "MQTTStatusDisconnectPending"; + break; + default: str = "Invalid MQTT Status code"; break; diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h index 8e0bad49e..ba5219348 100644 --- a/source/include/core_mqtt.h +++ b/source/include/core_mqtt.h @@ -107,8 +107,9 @@ typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext, */ typedef enum MQTTConnectionStatus { - MQTTNotConnected, /**< @brief MQTT Connection is inactive. */ - MQTTConnected /**< @brief MQTT Connection is active. */ + MQTTNotConnected, /**< @brief MQTT Connection is inactive. */ + MQTTConnected, /**< @brief MQTT Connection is active. */ + MQTTDisconnectPending /**< @brief MQTT Connection needs to be disconnected as a transport error has occurred. */ } MQTTConnectionStatus_t; /** @@ -414,6 +415,25 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext, size_t incomingPublishCount ); /* @[declare_mqtt_initstatefulqos] */ +/** + * @brief Checks the MQTT connection status with the broker. + * + * @param[in] pContext Initialized MQTT context. + * + * @return #MQTTBadParameter if invalid parameters are passed; + * #MQTTStatusConnected if the MQTT connection is established with the broker. + * #MQTTStatusNotConnected if the MQTT connection is broker. + * #MQTTStatusDisconnectPending if Transport Interface has failed and MQTT connection needs to be closed. + * + * Example + * @code{c} + * + * @endcode + */ +/* @[declare_mqtt_checkconnectstatus] */ +MQTTStatus_t MQTT_CheckConnectStatus( MQTTContext_t * pContext ); +/* @[declare_mqtt_checkconnectstatus] */ + /** * @brief Establish an MQTT session. * diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index 4837ee654..86b3c6396 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -96,9 +96,12 @@ typedef enum MQTTStatus MQTTIllegalState, /**< An illegal state in the state record. */ MQTTStateCollision, /**< A collision with an existing state record entry. */ MQTTKeepAliveTimeout, /**< Timeout while waiting for PINGRESP. */ - MQTTNeedMoreBytes /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received + MQTTNeedMoreBytes, /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received incomplete data; it should be called again (probably after a delay). */ + MQTTStatusConnected, /**< MQTT connection is established with the broker */ + MQTTStatusNotConnected, /**< MQTT connection is not established with the broker */ + MQTTStatusDisconnectPending /**< Transport Interface has failed and MQTT connection needs to be closed */ } MQTTStatus_t; /** diff --git a/test/unit-test/core_mqtt_utest.c b/test/unit-test/core_mqtt_utest.c index c9acae5aa..131f255df 100644 --- a/test/unit-test/core_mqtt_utest.c +++ b/test/unit-test/core_mqtt_utest.c @@ -1062,6 +1062,11 @@ static void expectProcessLoopCalls( MQTTContext_t * const pContext, } } + if( expectMoreCalls && ( pContext->connectStatus != MQTTConnected ) ) + { + expectMoreCalls = false; + } + /* Update the state based on the sent packet. */ if( expectMoreCalls ) { @@ -1156,6 +1161,46 @@ void test_MQTT_Init_Invalid_Params( void ) TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); } +/* ========================================================================== */ + +/** + * @brief Test that NULL parameter causes MQTT_CheckConnectStatus to return MQTTBadParameter. + */ +void test_MQTT_CheckConnectStatus_invalid_params( void ) +{ + MQTTStatus_t mqttStatus = { 0 }; + + mqttStatus = MQTT_CheckConnectStatus( NULL ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); +} + +/** + * @brief Test that MQTT_CheckConnectStatus returns correct status corresponding to the connection status. + */ +void test_MQTT_CheckConnectStatus_return_correct_status( void ) +{ + MQTTStatus_t mqttStatus = { 0 }; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + context.connectStatus = MQTTConnected; + mqttStatus = MQTT_CheckConnectStatus( &context ); + TEST_ASSERT_EQUAL( MQTTStatusConnected, mqttStatus ); + + context.connectStatus = MQTTNotConnected; + mqttStatus = MQTT_CheckConnectStatus( &context ); + TEST_ASSERT_EQUAL( MQTTStatusNotConnected, mqttStatus ); + + context.connectStatus = MQTTDisconnectPending; + mqttStatus = MQTT_CheckConnectStatus( &context ); + TEST_ASSERT_EQUAL( MQTTStatusDisconnectPending, mqttStatus ); +} + + /* ========================================================================== */ static uint8_t * MQTT_SerializeConnectFixedHeader_cb( uint8_t * pIndex, @@ -1172,6 +1217,72 @@ static uint8_t * MQTT_SerializeConnectFixedHeader_cb( uint8_t * pIndex, return pIndex; } +/** + * @brief Test MQTT_Connect, when the status is already MQTTConnected. + */ +void test_MQTT_Connect_sendConnect_already_connected( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + size_t remainingLength; + size_t packetSize; + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + /* set MQTT Connection status as connected*/ + mqttContext.connectStatus = MQTTConnected; + + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + MQTT_GetConnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreArg_pPacketSize(); + MQTT_GetConnectPacketSize_IgnoreArg_pRemainingLength(); + MQTT_GetConnectPacketSize_ReturnThruPtr_pPacketSize( &packetSize ); + MQTT_GetConnectPacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); + + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); + + TEST_ASSERT_EQUAL_INT( MQTTStatusConnected, status ); +} + +/** + * @brief Test MQTT_Connect, when the status is MQTTDisconnectPending. + */ +void test_MQTT_Connect_sendConnect_disconnect_pending( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + size_t remainingLength; + size_t packetSize; + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + /* set MQTT Connection status as connected*/ + mqttContext.connectStatus = MQTTDisconnectPending; + + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + MQTT_GetConnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreArg_pPacketSize(); + MQTT_GetConnectPacketSize_IgnoreArg_pRemainingLength(); + MQTT_GetConnectPacketSize_ReturnThruPtr_pPacketSize( &packetSize ); + MQTT_GetConnectPacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); + + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); + + TEST_ASSERT_EQUAL_INT( MQTTStatusDisconnectPending, status ); +} + /** * @brief Test MQTT_Connect, except for receiving the CONNACK. */ @@ -1312,6 +1423,7 @@ void test_MQTT_Connect_sendConnect2( void ) packetSize = 13; remainingLength = 11; mqttContext.transportInterface.send = transportSendFailure; + mqttContext.transportInterface.writev = NULL; MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); MQTT_GetConnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetConnectPacketSize_IgnoreArg_pPacketSize(); @@ -1891,12 +2003,32 @@ void test_MQTT_Connect_resendPendingAcks( void ) MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTBadParameter ); MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTBadParameter, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); + TEST_ASSERT_TRUE( sessionPresentResult ); + + /* Test 3. One packet found in ack pending state, but Transport Send failed. */ + sessionPresentResult = false; + mqttContext.connectStatus = MQTTNotConnected; + mqttContext.keepAliveIntervalSec = 0; + mqttContext.transportInterface.send = transportSendFailure; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); TEST_ASSERT_EQUAL_INT( MQTTSendFailed, status ); - TEST_ASSERT_EQUAL_INT( MQTTNotConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); TEST_ASSERT_TRUE( sessionPresentResult ); + mqttContext.transportInterface.send = transportSendSuccess; - /* Test 3. One packet found in ack pending state, Sent + /* Test 4. One packet found in ack pending state, Sent * PUBREL successfully. */ + mqttContext.connectStatus = MQTTNotConnected; MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); @@ -1913,7 +2045,7 @@ void test_MQTT_Connect_resendPendingAcks( void ) TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); - /* Test 4. Three packets found in ack pending state. Sent PUBREL successfully + /* Test 5. Three packets found in ack pending state. Sent PUBREL successfully * for first and failed for second and no attempt for third. */ mqttContext.keepAliveIntervalSec = 0; mqttContext.connectStatus = MQTTNotConnected; @@ -1935,11 +2067,12 @@ void test_MQTT_Connect_resendPendingAcks( void ) /* Query for any remaining packets pending to ack. */ MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 2 ); status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); - TEST_ASSERT_EQUAL_INT( MQTTSendFailed, status ); - TEST_ASSERT_EQUAL_INT( MQTTNotConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( MQTTBadParameter, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); - /* Test 5. Two packets found in ack pending state. Sent PUBREL successfully + /* Test 6. Two packets found in ack pending state. Sent PUBREL successfully * for first and failed for second. */ + mqttContext.connectStatus = MQTTNotConnected; MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); @@ -2429,6 +2562,7 @@ void test_MQTT_Publish_DuplicatePublish( void ) mqttContext.outgoingPublishRecordMaxCount = 10; mqttContext.outgoingPublishRecords = outgoingPublishRecord; + mqttContext.connectStatus = MQTTConnected; publishInfo.qos = MQTTQoS1; publishInfo.dup = true; @@ -2442,6 +2576,40 @@ void test_MQTT_Publish_DuplicatePublish( void ) TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); } +/** + * @brief Test that MQTT_Publish works as intended when the connection status is anything but MQTTConnected. + */ +void test_MQTT_Publish_not_connected( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTPublishInfo_t publishInfo = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTStatus_t status; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + transport.send = transportSendFailure; + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &publishInfo, 0x0, sizeof( publishInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + /* Test 1 connection status is MQTTNotConnected */ + mqttContext.connectStatus = MQTTNotConnected; + MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + status = MQTT_Publish( &mqttContext, &publishInfo, 10 ); + TEST_ASSERT_EQUAL_INT( MQTTStatusNotConnected, status ); + + /* Test 2 connection status is MQTTDisconnectPending */ + mqttContext.connectStatus = MQTTDisconnectPending; + MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + status = MQTT_Publish( &mqttContext, &publishInfo, 10 ); + TEST_ASSERT_EQUAL_INT( MQTTStatusDisconnectPending, status ); +} + /** * @brief Test that MQTT_Publish works as intended. */ @@ -2470,6 +2638,7 @@ void test_MQTT_Publish_DuplicatePublish_UpdateFailed( void ) mqttContext.outgoingPublishRecordMaxCount = 10; mqttContext.outgoingPublishRecords = outgoingPublishRecord; + mqttContext.connectStatus = MQTTConnected; publishInfo.qos = MQTTQoS1; publishInfo.dup = true; @@ -2513,6 +2682,7 @@ void test_MQTT_Publish_WriteVSendsPartialBytes( void ) mqttContext.outgoingPublishRecordMaxCount = 10; mqttContext.outgoingPublishRecords = outgoingPublishRecord; + mqttContext.connectStatus = MQTTConnected; publishInfo.qos = MQTTQoS1; publishInfo.dup = false; @@ -2547,6 +2717,7 @@ void test_MQTT_Publish7( void ) MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + mqttContext.connectStatus = MQTTConnected; /* We need sendPacket to be called with at least 1 byte to send, so that * it can return failure. This argument is the output of serializing the @@ -2576,6 +2747,8 @@ void test_MQTT_Publish8( void ) memset( &publishInfo, 0x0, sizeof( publishInfo ) ); MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTConnected; + /* We want to test the first call to sendPacket within sendPublish succeeding, * and the second one failing. */ mqttContext.transportInterface.send = transportSendSucceedThenFail; @@ -2606,6 +2779,8 @@ void test_MQTT_Publish9( void ) memset( &publishInfo, 0x0, sizeof( publishInfo ) ); MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTConnected; + MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); mqttContext.transportInterface.send = transportSendSuccess; @@ -2632,6 +2807,8 @@ void test_MQTT_Publish10( void ) memset( &publishInfo, 0x0, sizeof( publishInfo ) ); MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTConnected; + /* Test that sending a publish without a payload succeeds. */ publishInfo.pPayload = NULL; publishInfo.payloadLength = 0; @@ -2661,6 +2838,8 @@ void test_MQTT_Publish11( void ) memset( &publishInfo, 0x0, sizeof( publishInfo ) ); MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTConnected; + /* Restore the test payload and length. */ publishInfo.pPayload = "Test"; publishInfo.payloadLength = 4; @@ -2710,6 +2889,8 @@ void test_MQTT_Publish12( void ) mqttContext.outgoingPublishRecords->qos = MQTTQoS2; + mqttContext.connectStatus = MQTTConnected; + expectedState = MQTTPublishSend; MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); @@ -2746,6 +2927,8 @@ void test_MQTT_Publish13( void ) memset( &publishInfo, 0x0, sizeof( publishInfo ) ); MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTConnected; + MQTT_InitStatefulQoS( &mqttContext, &outgoingRecords, 4, &incomingRecords, 4 ); @@ -2781,6 +2964,8 @@ void test_MQTT_Publish14( void ) memset( &publishInfo, 0x0, sizeof( publishInfo ) ); MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTConnected; + /* Duplicate publish. dup flag is marked by application. */ publishInfo.dup = true; @@ -2811,6 +2996,8 @@ void test_MQTT_Publish15( void ) memset( &publishInfo, 0x0, sizeof( publishInfo ) ); MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTConnected; + /* Duplicate publish. dup flag is marked by application. * State record is not present. */ publishInfo.dup = true; @@ -2846,6 +3033,8 @@ void test_MQTT_Publish_Send_Timeout( void ) /* Initialize the MQTT context. */ MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTConnected; + /* Setup for making sure that the test results in calling sendPacket function * where calls to transport send function are made (repeatedly to send packet * over the network).*/ @@ -2863,6 +3052,32 @@ void test_MQTT_Publish_Send_Timeout( void ) /* ========================================================================== */ +/** + * @brief Test that MQTT_Disconnect works as intended when the connection is already disconnected. + */ +void test_MQTT_Disconnect_already_disconnected( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + size_t disconnectSize = 2; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTNotConnected; + + /* Send failure with network error. */ + MQTT_GetDisconnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetDisconnectPacketSize_ReturnThruPtr_pPacketSize( &disconnectSize ); + MQTT_SerializeDisconnect_ExpectAnyArgsAndReturn( MQTTSuccess ); + status = MQTT_Disconnect( &mqttContext ); + TEST_ASSERT_EQUAL( MQTTStatusNotConnected, status ); +} + /** * @brief Test that MQTT_Disconnect works as intended. */ @@ -2923,6 +3138,7 @@ void test_MQTT_Disconnect2( void ) MQTT_SerializeDisconnect_ExpectAnyArgsAndReturn( MQTTSuccess ); status = MQTT_Disconnect( &mqttContext ); TEST_ASSERT_EQUAL( MQTTSendFailed, status ); + TEST_ASSERT_EQUAL( MQTTNotConnected, mqttContext.connectStatus ); } /** @@ -3015,6 +3231,52 @@ void test_MQTT_Disconnect4( void ) /* At disconnect, the buffer is cleared of any pending packets. */ TEST_ASSERT_EACH_EQUAL_UINT8( 0, mqttBuffer, MQTT_TEST_BUFFER_LENGTH ); } + +/** + * @brief Test that MQTT_Disconnect works as intended when status is disconnect pending. + */ +void test_MQTT_Disconnect4_status_disconnect_pending( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTStatus_t status; + uint8_t buffer[ 10 ]; + uint8_t * bufPtr = buffer; + NetworkContext_t networkContext = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + size_t disconnectSize = 2; + + /* Fill the buffer with garbage data. */ + memset( mqttBuffer, 0xAB, MQTT_TEST_BUFFER_LENGTH ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + networkContext.buffer = &bufPtr; + transport.pNetworkContext = &networkContext; + transport.recv = transportRecvSuccess; + transport.send = transportSendFailure; + transport.writev = NULL; + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + mqttContext.connectStatus = MQTTDisconnectPending; + + /* Successful send. */ + mqttContext.transportInterface.send = mockSend; + MQTT_GetDisconnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetDisconnectPacketSize_ReturnThruPtr_pPacketSize( &disconnectSize ); + MQTT_SerializeDisconnect_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_SerializeDisconnect_Stub( MQTT_SerializeDisconnect_stub ); + /* Write a disconnect packet into the buffer. */ + mqttBuffer[ 0 ] = MQTT_PACKET_TYPE_DISCONNECT; + + status = MQTT_Disconnect( &mqttContext ); + + TEST_ASSERT_EQUAL( MQTTSuccess, status ); + TEST_ASSERT_EQUAL( MQTTNotConnected, mqttContext.connectStatus ); + /* At disconnect, the buffer is cleared of any pending packets. */ + TEST_ASSERT_EACH_EQUAL_UINT8( 0, mqttBuffer, MQTT_TEST_BUFFER_LENGTH ); +} /* ========================================================================== */ /** @@ -3222,6 +3484,8 @@ void test_MQTT_ProcessLoop_HandleKeepAlive1( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned. */ MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize ); @@ -3273,6 +3537,25 @@ void test_MQTT_ProcessLoop_RecvFailed( void ) transport.recv = transportRecvFailure; setupNetworkBuffer( &networkBuffer ); + mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); + context.connectStatus = MQTTConnected; + + mqttStatus = MQTT_ProcessLoop( &context ); + + TEST_ASSERT_EQUAL( MQTTRecvFailed, mqttStatus ); +} + +void test_MQTT_ProcessLoop_RecvFailed_disconnected( void ) +{ + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTStatus_t mqttStatus; + + setupTransportInterface( &transport ); + transport.recv = transportRecvFailure; + setupNetworkBuffer( &networkBuffer ); + mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); mqttStatus = MQTT_ProcessLoop( &context ); @@ -3325,6 +3608,8 @@ void test_MQTT_ProcessLoop_discardPacket_second_recv_fail( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + context.networkBuffer.size = 20; incomingPacket.type = currentPacketType; @@ -3362,6 +3647,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path1( void ) mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingCallback, 10 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + modifyIncomingPacketStatus = MQTTSuccess; /* Assume QoS = 1 so that a PUBACK will be sent after receiving PUBLISH. @@ -3399,6 +3686,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path2( void ) mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingPublish, 10 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + modifyIncomingPacketStatus = MQTTSuccess; /* Assume QoS = 2 so that a PUBREC will be sent after receiving PUBLISH. @@ -3443,6 +3732,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path3( void ) &outgoingRecords, 4, &incomingRecords, 4 ); + context.connectStatus = MQTTConnected; + modifyIncomingPacketStatus = MQTTSuccess; /* Duplicate QoS1 publish received. @@ -3489,6 +3780,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path4( void ) mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingPublish, 10 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + modifyIncomingPacketStatus = MQTTSuccess; /* Duplicate QoS2 publish received. @@ -3530,6 +3823,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path5( void ) mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingPublish, 10 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + modifyIncomingPacketStatus = MQTTSuccess; /* A publish is received when already a state record exists, but dup @@ -3568,6 +3863,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path6( void ) mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingPublish, 10 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + modifyIncomingPacketStatus = MQTTSuccess; /* Duplicate QoS2 publish received with no collision. @@ -3640,6 +3937,7 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Error_Paths( void ) expectParams.pPubInfo = &publishInfo; /* The other loop parameter fields are irrelevant. */ expectProcessLoopCalls( &context, &expectParams ); + TEST_ASSERT_FALSE( isEventCallbackInvoked ); } @@ -3711,6 +4009,8 @@ void test_MQTT_ProcessLoop_handleIncomingAck_Happy_Paths( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + modifyIncomingPacketStatus = MQTTSuccess; /* Mock the receiving of a PUBACK packet type and expect the appropriate @@ -3809,6 +4109,8 @@ void test_MQTT_ProcessLoop_handleIncomingAck_Error_Paths( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + modifyIncomingPacketStatus = MQTTSuccess; /* Verify that MQTTBadResponse is propagated when deserialization fails upon @@ -3828,8 +4130,34 @@ void test_MQTT_ProcessLoop_handleIncomingAck_Error_Paths( void ) expectParams.stateAfterDeserialize = MQTTPubRelSend; expectParams.serializeStatus = MQTTNoMemory; expectParams.stateAfterSerialize = MQTTStateNull; - expectParams.processLoopStatus = MQTTSendFailed; + expectParams.processLoopStatus = MQTTNoMemory; + expectProcessLoopCalls( &context, &expectParams ); + + /* Verify that MQTTStatusNotConnected propagated when receiving a any ACK, + * here PUBREC but thr connection status is MQTTNotConnected. */ + currentPacketType = MQTT_PACKET_TYPE_PUBREC; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPubRelSend; + expectParams.stateAfterSerialize = MQTTPubCompPending; + expectParams.serializeStatus = MQTTSuccess; + expectParams.processLoopStatus = MQTTStatusNotConnected; + context.connectStatus = MQTTNotConnected; + expectProcessLoopCalls( &context, &expectParams ); + context.connectStatus = MQTTConnected; + + /* Verify that MQTTStatusNotConnected propagated when receiving a any ACK, + * here PUBREC but thr connection status is MQTTNotConnected. */ + currentPacketType = MQTT_PACKET_TYPE_PUBREC; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPubRelSend; + expectParams.stateAfterSerialize = MQTTPubCompPending; + expectParams.serializeStatus = MQTTSuccess; + expectParams.processLoopStatus = MQTTStatusDisconnectPending; + context.connectStatus = MQTTDisconnectPending; expectProcessLoopCalls( &context, &expectParams ); + context.connectStatus = MQTTConnected; /* Verify that MQTTBadResponse is propagated when deserialization fails upon * receiving a PUBACK. */ @@ -3898,6 +4226,8 @@ void test_MQTT_ProcessLoop_handleKeepAlive_Happy_Paths1( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + context.waitingForPingResp = false; context.keepAliveIntervalSec = 0; expectParams.incomingPublish = false; @@ -4183,6 +4513,8 @@ void test_MQTT_ProcessLoop_handleKeepAlive_Error_Paths3( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + globalEntryTime = PACKET_RX_TIMEOUT_MS + 1; context.keepAliveIntervalSec = 0; context.lastPacketTxTime = 0; @@ -4224,6 +4556,8 @@ void test_MQTT_ProcessLoop_handleKeepAlive_Error_Paths4( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + globalEntryTime = PACKET_RX_TIMEOUT_MS + 1; context.keepAliveIntervalSec = ( PACKET_TX_TIMEOUT_MS / 1000 ) + 1U; context.lastPacketTxTime = 0; @@ -4368,6 +4702,8 @@ void test_MQTT_ProcessLoop_Timer_Overflow( void ) mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, incomingPublishRecords, 10 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + MQTT_ProcessIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_ProcessIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); /* Assume QoS = 1 so that a PUBACK will be sent after receiving PUBLISH. */ @@ -4571,6 +4907,8 @@ void test_MQTT_Subscribe_happy_path( void ) &incomingRecords, 4 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); @@ -4583,6 +4921,58 @@ void test_MQTT_Subscribe_happy_path( void ) TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); } +/** + * @brief This test case verifies that MQTT_Subscribe does not return success if the connect status + * is anythin but MQTTConnected. + */ +void test_MQTT_Subscribe_happy_path_not_connected( void ) +{ + MQTTStatus_t mqttStatus; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTSubscribeInfo_t subscribeInfo = { 0 }; + size_t remainingLength = MQTT_SAMPLE_REMAINING_LENGTH; + size_t packetSize = MQTT_SAMPLE_REMAINING_LENGTH; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + setupSubscriptionInfo( &subscribeInfo ); + + /* Initialize context. */ + mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + mqttStatus = MQTT_InitStatefulQoS( &context, + &outgoingRecords, 4, + &incomingRecords, 4 ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + /* Test 1 connect status is MQTTNotConnected */ + context.connectStatus = MQTTNotConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ + MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); + MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); + MQTT_SerializeSubscribeHeader_Stub( MQTT_SerializeSubscribedHeader_cb ); + /* Expect the above calls when running MQTT_Subscribe. */ + mqttStatus = MQTT_Subscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID ); + TEST_ASSERT_EQUAL( MQTTStatusNotConnected, mqttStatus ); + + /* Test 2 connect status is MQTTDisconnectPending*/ + context.connectStatus = MQTTDisconnectPending; + /* Verify MQTTSuccess is returned with the following mocks. */ + MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); + MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); + MQTT_SerializeSubscribeHeader_Stub( MQTT_SerializeSubscribedHeader_cb ); + /* Expect the above calls when running MQTT_Subscribe. */ + mqttStatus = MQTT_Subscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID ); + TEST_ASSERT_EQUAL( MQTTStatusDisconnectPending, mqttStatus ); +} + /** * @brief This test case verifies that MQTT_Subscribe returns successfully * when valid parameters are passed and all bytes are sent. @@ -4612,6 +5002,8 @@ void test_MQTT_Subscribe_happy_path1( void ) &incomingRecords, 4 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); @@ -4654,6 +5046,8 @@ void test_MQTT_Subscribe_happy_path2( void ) &incomingRecords, 4 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); @@ -4711,6 +5105,8 @@ void test_MQTT_Subscribe_MultipleSubscriptions( void ) &incomingRecords, 4 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); @@ -4748,6 +5144,9 @@ void test_MQTT_Subscribe_error_paths1( void ) /* Initialize context. */ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + context.connectStatus = MQTTConnected; + /* Verify MQTTSendFailed is propagated when transport interface returns an error. */ MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); @@ -4785,6 +5184,8 @@ void test_MQTT_Subscribe_error_paths2( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); @@ -4793,6 +5194,44 @@ void test_MQTT_Subscribe_error_paths2( void ) TEST_ASSERT_EQUAL( MQTTSendFailed, mqttStatus ); } +/** + * @brief This test case verifies that MQTT_Subscribe returns MQTTSendFailed + * if transport interface fails to send and the connection status is converted to + * MQTTDisconnectPending + */ +void test_MQTT_Subscribe_error_paths_with_transport_failure( void ) +{ + MQTTStatus_t mqttStatus; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTSubscribeInfo_t subscribeInfo = { 0 }; + size_t remainingLength = MQTT_SAMPLE_REMAINING_LENGTH; + size_t packetSize = MQTT_SAMPLE_REMAINING_LENGTH; + + /* Verify that an error is propagated when transport interface returns an error. */ + setupNetworkBuffer( &networkBuffer ); + setupSubscriptionInfo( &subscribeInfo ); + subscribeInfo.qos = MQTTQoS0; + setupTransportInterface( &transport ); + transport.writev = NULL; + transport.send = transportSendFailure; + + /* Initialize context. */ + mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + context.connectStatus = MQTTConnected; + + MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); + MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); + MQTT_SerializeSubscribeHeader_Stub( MQTT_SerializeSubscribedHeader_cb ); + mqttStatus = MQTT_Subscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID ); + TEST_ASSERT_EQUAL( MQTTSendFailed, mqttStatus ); + TEST_ASSERT_EQUAL( MQTTDisconnectPending, context.connectStatus ); +} + /** * @brief This test case verifies that MQTT_Subscribe returns MQTTSendFailed * if transport interface send fails and timer overflows. @@ -4827,6 +5266,8 @@ void test_MQTT_Subscribe_error_paths_timerOverflowCheck( void ) mqttStatus = MQTT_Init( &context, &transport, getTimeMock, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); @@ -4870,6 +5311,8 @@ void test_MQTT_Subscribe_error_paths_timerOverflowCheck1( void ) mqttStatus = MQTT_Init( &context, &transport, getTimeMockBigTimeStep, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); @@ -4944,6 +5387,9 @@ void test_MQTT_Unsubscribe_happy_path( void ) /* Initialize context. */ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ MQTT_SerializeUnsubscribeHeader_Stub( MQTT_SerializeUnsubscribeHeader_cb ); MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); @@ -4955,6 +5401,50 @@ void test_MQTT_Unsubscribe_happy_path( void ) TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); } +/** + * @brief This test case verifies that MQTT_Unsubscribe does not return success + * when the connection status is anything but MQTTConnected + */ +void test_MQTT_Unsubscribe_not_connected( void ) +{ + MQTTStatus_t mqttStatus; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTSubscribeInfo_t subscribeInfo = { 0 }; + size_t remainingLength = MQTT_SAMPLE_REMAINING_LENGTH; + size_t packetSize = MQTT_SAMPLE_REMAINING_LENGTH; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + setupSubscriptionInfo( &subscribeInfo ); + subscribeInfo.qos = MQTTQoS0; + + /* Initialize context. */ + mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + /* Test 1 Connection status is MQTTNotConnected*/ + context.connectStatus = MQTTNotConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ + MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); + MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); + /* Expect the above calls when running MQTT_Unsubscribe. */ + mqttStatus = MQTT_Unsubscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID ); + TEST_ASSERT_EQUAL( MQTTStatusNotConnected, mqttStatus ); + + /* Test 2 Connection status is MQTTDisconnectPending*/ + context.connectStatus = MQTTDisconnectPending; + /* Verify MQTTSuccess is returned with the following mocks. */ + MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); + MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength ); + /* Expect the above calls when running MQTT_Unsubscribe. */ + mqttStatus = MQTT_Unsubscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID ); + TEST_ASSERT_EQUAL( MQTTStatusDisconnectPending, mqttStatus ); +} + /** * @brief This test case verifies that MQTT_Subscribe returns successfully * when valid parameters are passed and all bytes are sent. @@ -4984,6 +5474,8 @@ void test_MQTT_Unsubscribe_happy_path1( void ) &incomingRecords, 4 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); @@ -5027,6 +5519,8 @@ void test_MQTT_unsubscribe_happy_path2( void ) &incomingRecords, 4 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); @@ -5084,6 +5578,8 @@ void test_MQTT_Unsubscribe_MultipleSubscriptions( void ) &incomingRecords, 4 ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned with the following mocks. */ MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize ); @@ -5123,6 +5619,9 @@ void test_MQTT_Unsubscribe_error_path1( void ) /* Initialize context. */ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + context.connectStatus = MQTTConnected; + MQTT_SerializeUnsubscribeHeader_Stub( MQTT_SerializeUnsubscribeHeader_cb ); /* Verify MQTTSendFailed is propagated when transport interface returns an error. */ MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); @@ -5163,6 +5662,8 @@ void test_MQTT_Unsubscribe_error_path2( void ) mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + context.connectStatus = MQTTConnected; + MQTT_SerializeUnsubscribeHeader_Stub( MQTT_SerializeUnsubscribeHeader_cb ); transport.send = transportSendNoBytes; /* Use the mock function that returns zero bytes sent. */ MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); @@ -5206,6 +5707,9 @@ void test_MQTT_Ping_happy_path( void ) /* Initialize context. */ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + context.connectStatus = MQTTConnected; + /* Verify MQTTSuccess is returned. */ MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize ); @@ -5218,6 +5722,46 @@ void test_MQTT_Ping_happy_path( void ) TEST_ASSERT_TRUE( context.waitingForPingResp ); } +/** + * @brief This test case verifies that MQTT_Ping does not returns success + * if the connection status is anything but MQTTConnect. + */ +void test_MQTT_Ping_not_connected( void ) +{ + MQTTStatus_t mqttStatus; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + size_t pingreqSize = MQTT_PACKET_PINGREQ_SIZE; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + /* Initialize context. */ + mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + /* Test 1 when the connection status is MQTTNotConnected*/ + context.connectStatus = MQTTNotConnected; + /* Verify MQTTSuccess is returned. */ + MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize ); + MQTT_SerializePingreq_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Expect the above calls when running MQTT_Ping. */ + mqttStatus = MQTT_Ping( &context ); + TEST_ASSERT_EQUAL( MQTTStatusNotConnected, mqttStatus ); + + /* Test 2 when the connection status is MQTTDisconnectPending*/ + context.connectStatus = MQTTDisconnectPending; + /* Verify MQTTSuccess is returned. */ + MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize ); + MQTT_SerializePingreq_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Expect the above calls when running MQTT_Ping. */ + mqttStatus = MQTT_Ping( &context ); + TEST_ASSERT_EQUAL( MQTTStatusDisconnectPending, mqttStatus ); +} + /** * @brief This test case verifies that MQTT_Ping returns MQTTSendFailed * if transport interface send returns an error. @@ -5241,6 +5785,9 @@ void test_MQTT_Ping_error_path( void ) /* Initialize context. */ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + context.connectStatus = MQTTConnected; + /* Verify MQTTSendFailed is propagated when transport interface returns an error. */ MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize ); @@ -5255,6 +5802,9 @@ void test_MQTT_Ping_error_path( void ) /* Initialize context. */ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + context.connectStatus = MQTTConnected; + MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize ); MQTT_SerializePingreq_ExpectAnyArgsAndReturn( MQTTSuccess ); @@ -5266,6 +5816,9 @@ void test_MQTT_Ping_error_path( void ) /* Initialize context. */ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + context.connectStatus = MQTTConnected; + /* Verify MQTTBadParameter is propagated when getting PINGREQ packet size fails. */ MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTBadParameter ); MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize ); @@ -5853,7 +6406,19 @@ void test_MQTT_Status_strerror( void ) str = MQTT_Status_strerror( status ); TEST_ASSERT_EQUAL_STRING( "MQTTNeedMoreBytes", str ); - status = MQTTNeedMoreBytes + 1; + status = MQTTStatusConnected; + str = MQTT_Status_strerror( status ); + TEST_ASSERT_EQUAL_STRING( "MQTTStatusConnected", str ); + + status = MQTTStatusNotConnected; + str = MQTT_Status_strerror( status ); + TEST_ASSERT_EQUAL_STRING( "MQTTStatusNotConnected", str ); + + status = MQTTStatusDisconnectPending; + str = MQTT_Status_strerror( status ); + TEST_ASSERT_EQUAL_STRING( "MQTTStatusDisconnectPending", str ); + + status = MQTTStatusDisconnectPending + 1; str = MQTT_Status_strerror( status ); TEST_ASSERT_EQUAL_STRING( "Invalid MQTT Status code", str ); } diff --git a/tools/cmock/coverage.cmake b/tools/cmock/coverage.cmake index 570f92758..e56e0417c 100644 --- a/tools/cmock/coverage.cmake +++ b/tools/cmock/coverage.cmake @@ -15,8 +15,9 @@ execute_process( COMMAND lcov --directory ${CMAKE_BINARY_DIR} --initial --capture --rc lcov_branch_coverage=1 - --include "*source*" --output-file=${CMAKE_BINARY_DIR}/base_coverage.info + --include "*source*" + ) file(GLOB files "${CMAKE_BINARY_DIR}/bin/tests/*") @@ -59,8 +60,8 @@ execute_process( --add-tracefile ${CMAKE_BINARY_DIR}/base_coverage.info --add-tracefile ${CMAKE_BINARY_DIR}/second_coverage.info --output-file ${CMAKE_BINARY_DIR}/coverage.info - --include "*source*" --rc lcov_branch_coverage=1 + --include "*source*" ) execute_process( COMMAND genhtml --rc lcov_branch_coverage=1