diff --git a/docs/doxygen/include/size_table.md b/docs/doxygen/include/size_table.md
index 10de57852..1fa92ff64 100644
--- a/docs/doxygen/include/size_table.md
+++ b/docs/doxygen/include/size_table.md
@@ -9,8 +9,8 @@
core_mqtt.c |
- 4.1K |
- 3.5K |
+ 4.4K |
+ 3.8K |
core_mqtt_state.c |
@@ -24,7 +24,7 @@
Total estimates |
- 8.6K |
- 7.0K |
+ 8.9K |
+ 7.3K |
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 56ea23450..23324974d 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -266,7 +266,7 @@ static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType );
*
* @return Number of bytes received, or negative number on network error.
*/
-static int32_t recvExact( const MQTTContext_t * pContext,
+static int32_t recvExact( MQTTContext_t * pContext,
size_t bytesToRecv );
/**
@@ -278,7 +278,7 @@ static int32_t recvExact( const MQTTContext_t * pContext,
*
* @return #MQTTRecvFailed or #MQTTNoDataAvailable.
*/
-static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
+static MQTTStatus_t discardPacket( MQTTContext_t * pContext,
size_t remainingLength,
uint32_t timeoutMs );
@@ -302,7 +302,7 @@ static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
*
* @return #MQTTSuccess or #MQTTRecvFailed.
*/
-static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
+static MQTTStatus_t receivePacket( MQTTContext_t * pContext,
MQTTPacketInfo_t incomingPacket,
uint32_t remainingTimeMs );
@@ -424,25 +424,28 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC
* ##MQTTRecvFailed if transport recv failed;
* #MQTTSuccess otherwise.
*/
-static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
+static MQTTStatus_t receiveConnack( MQTTContext_t * pContext,
uint32_t timeoutMs,
bool cleanSession,
MQTTPacketInfo_t * pIncomingPacket,
bool * pSessionPresent );
/**
- * @brief Resends pending acks for a re-established MQTT session, or
- * clears existing state records for a clean session.
+ * @brief Resends pending acks for a re-established MQTT session
*
* @param[in] pContext Initialized MQTT context.
- * @param[in] sessionPresent Session present flag received from the MQTT broker.
*
* @return #MQTTSendFailed if transport send during resend failed;
* #MQTTSuccess otherwise.
*/
-static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
- bool sessionPresent );
+static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext );
+/**
+ * @brief Clears existing state records for a clean session.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ */
+static void handleCleanSession( MQTTContext_t * pContext );
/**
* @brief Send the publish packet without copying the topic string and payload in
@@ -823,6 +826,11 @@ static int32_t sendMessageVector( MQTTContext_t * pContext,
{
bytesSentOrError = sendResult;
LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) );
+
+ if( pContext->connectStatus == MQTTConnected )
+ {
+ pContext->connectStatus = MQTTDisconnectPending;
+ }
}
else
{
@@ -902,6 +910,11 @@ static int32_t sendBuffer( MQTTContext_t * pContext,
{
bytesSentOrError = sendResult;
LogError( ( "sendBuffer: Unable to send packet: Network Error." ) );
+
+ if( pContext->connectStatus == MQTTConnected )
+ {
+ pContext->connectStatus = MQTTDisconnectPending;
+ }
}
else
{
@@ -961,7 +974,7 @@ static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType )
/*-----------------------------------------------------------*/
-static int32_t recvExact( const MQTTContext_t * pContext,
+static int32_t recvExact( MQTTContext_t * pContext,
size_t bytesToRecv )
{
uint8_t * pIndex = NULL;
@@ -997,6 +1010,15 @@ static int32_t recvExact( const MQTTContext_t * pContext,
( long int ) bytesRecvd ) );
totalBytesRecvd = bytesRecvd;
receiveError = true;
+
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
+
+ if( pContext->connectStatus == MQTTConnected )
+ {
+ pContext->connectStatus = MQTTDisconnectPending;
+ }
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
else if( bytesRecvd > 0 )
{
@@ -1038,7 +1060,7 @@ static int32_t recvExact( const MQTTContext_t * pContext,
/*-----------------------------------------------------------*/
-static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
+static MQTTStatus_t discardPacket( MQTTContext_t * pContext,
size_t remainingLength,
uint32_t timeoutMs )
{
@@ -1174,7 +1196,7 @@ static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
/*-----------------------------------------------------------*/
-static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
+static MQTTStatus_t receivePacket( MQTTContext_t * pContext,
MQTTPacketInfo_t incomingPacket,
uint32_t remainingTimeMs )
{
@@ -1264,6 +1286,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
uint8_t packetTypeByte = 0U;
MQTTPubAckType_t packetType;
MQTTFixedBuffer_t localBuffer;
+ MQTTConnectionStatus_t connectStatus;
uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ];
localBuffer.pBuffer = pubAckPacket;
@@ -1283,18 +1306,33 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- MQTT_PRE_SEND_HOOK( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- /* Here, we are not using the vector approach for efficiency. There is just one buffer
- * to be sent which can be achieved with a normal send call. */
- sendResult = sendBuffer( pContext,
- localBuffer.pBuffer,
- MQTT_PUBLISH_ACK_PACKET_SIZE );
+ connectStatus = pContext->connectStatus;
+
+ if( connectStatus != MQTTConnected )
+ {
+ status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Here, we are not using the vector approach for efficiency. There is just one buffer
+ * to be sent which can be achieved with a normal send call. */
+ sendResult = sendBuffer( pContext,
+ localBuffer.pBuffer,
+ MQTT_PUBLISH_ACK_PACKET_SIZE );
- MQTT_POST_SEND_HOOK( pContext );
+ if( sendResult < ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
+ {
+ status = MQTTSendFailed;
+ }
+ }
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
- if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
+ if( status == MQTTSuccess )
{
pContext->controlPacketSent = true;
@@ -1320,7 +1358,6 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
"PacketSize=%lu.",
( unsigned int ) packetTypeByte, ( long int ) sendResult,
MQTT_PUBLISH_ACK_PACKET_SIZE ) );
- status = MQTTSendFailed;
}
}
@@ -1680,6 +1717,15 @@ static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
{
/* The receive function has failed. Bubble up the error up to the user. */
status = MQTTRecvFailed;
+
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
+
+ if( pContext->connectStatus == MQTTConnected )
+ {
+ pContext->connectStatus = MQTTDisconnectPending;
+ }
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
else if( ( recvBytes == 0 ) && ( pContext->index == 0U ) )
{
@@ -2301,7 +2347,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
/*-----------------------------------------------------------*/
-static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
+static MQTTStatus_t receiveConnack( MQTTContext_t * pContext,
uint32_t timeoutMs,
bool cleanSession,
MQTTPacketInfo_t * pIncomingPacket,
@@ -2425,8 +2471,7 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
/*-----------------------------------------------------------*/
-static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
- bool sessionPresent )
+static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
{
MQTTStatus_t status = MQTTSuccess;
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
@@ -2435,43 +2480,43 @@ static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
assert( pContext != NULL );
- /* Reset the index and clear the buffer when a new session is established. */
- pContext->index = 0;
- ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
+ /* Get the next packet ID for which a PUBREL need to be resent. */
+ packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
- if( sessionPresent == true )
+ /* Resend all the PUBREL acks after session is reestablished. */
+ while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
+ ( status == MQTTSuccess ) )
{
- /* Get the next packet ID for which a PUBREL need to be resent. */
+ status = sendPublishAcks( pContext, packetId, state );
+
packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
+ }
- /* Resend all the PUBREL acks after session is reestablished. */
- while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
- ( status == MQTTSuccess ) )
- {
- status = sendPublishAcks( pContext, packetId, state );
+ return status;
+}
- packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
- }
- }
- else
+static void handleCleanSession( MQTTContext_t * pContext )
+{
+ assert( pContext != NULL );
+
+ /* Reset the index and clear the buffer when a new session is established. */
+ pContext->index = 0;
+ ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
+
+ if( pContext->outgoingPublishRecordMaxCount > 0U )
{
/* Clear any existing records if a new session is established. */
- if( pContext->outgoingPublishRecordMaxCount > 0U )
- {
- ( void ) memset( pContext->outgoingPublishRecords,
- 0x00,
- pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) );
- }
-
- if( pContext->incomingPublishRecordMaxCount > 0U )
- {
- ( void ) memset( pContext->incomingPublishRecords,
- 0x00,
- pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
- }
+ ( void ) memset( pContext->outgoingPublishRecords,
+ 0x00,
+ pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) );
}
- return status;
+ if( pContext->incomingPublishRecordMaxCount > 0U )
+ {
+ ( void ) memset( pContext->incomingPublishRecords,
+ 0x00,
+ pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
+ }
}
static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
@@ -2668,6 +2713,47 @@ MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext,
/*-----------------------------------------------------------*/
+MQTTStatus_t MQTT_CheckConnectStatus( MQTTContext_t * pContext )
+{
+ MQTTConnectionStatus_t connectStatus;
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( pContext == NULL )
+ {
+ LogError( ( "Argument cannot be NULL: pContext=%p",
+ ( void * ) pContext ) );
+ status = MQTTBadParameter;
+ }
+
+ if( status == MQTTSuccess )
+ {
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
+
+ connectStatus = pContext->connectStatus;
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
+
+ switch( connectStatus )
+ {
+ case MQTTConnected:
+ status = MQTTStatusConnected;
+ break;
+
+ case MQTTDisconnectPending:
+ status = MQTTStatusDisconnectPending;
+ break;
+
+ default:
+ status = MQTTStatusNotConnected;
+ break;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
const MQTTConnectInfo_t * pConnectInfo,
const MQTTPublishInfo_t * pWillInfo,
@@ -2677,6 +2763,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
size_t remainingLength = 0UL, packetSize = 0UL;
MQTTStatus_t status = MQTTSuccess;
MQTTPacketInfo_t incomingPacket = { 0 };
+ MQTTConnectionStatus_t connectStatus;
incomingPacket.type = ( uint8_t ) 0;
@@ -2704,45 +2791,89 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- MQTT_PRE_SEND_HOOK( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- status = sendConnectWithoutCopy( pContext,
- pConnectInfo,
- pWillInfo,
- remainingLength );
+ connectStatus = pContext->connectStatus;
- MQTT_POST_SEND_HOOK( pContext );
- }
+ if( connectStatus != MQTTNotConnected )
+ {
+ status = ( connectStatus == MQTTConnected ) ? MQTTStatusConnected : MQTTStatusDisconnectPending;
+ }
- /* Read CONNACK from transport layer. */
- if( status == MQTTSuccess )
- {
- status = receiveConnack( pContext,
- timeoutMs,
- pConnectInfo->cleanSession,
- &incomingPacket,
- pSessionPresent );
+ if( status == MQTTSuccess )
+ {
+ status = sendConnectWithoutCopy( pContext,
+ pConnectInfo,
+ pWillInfo,
+ remainingLength );
+ }
+
+ /* Read CONNACK from transport layer. */
+ if( status == MQTTSuccess )
+ {
+ status = receiveConnack( pContext,
+ timeoutMs,
+ pConnectInfo->cleanSession,
+ &incomingPacket,
+ pSessionPresent );
+ }
+
+ if( ( status == MQTTSuccess ) && ( *pSessionPresent != true ) )
+ {
+ handleCleanSession( pContext );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ pContext->connectStatus = MQTTConnected;
+ /* Initialize keep-alive fields after a successful connection. */
+ pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds;
+ pContext->waitingForPingResp = false;
+ pContext->pingReqSendTimeMs = 0U;
+ }
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
- if( status == MQTTSuccess )
+ if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) )
{
- /* Resend PUBRELs when reestablishing a session, or clear records for new sessions. */
- status = handleSessionResumption( pContext, *pSessionPresent );
+ /* Resend PUBRELs when reestablishing a session */
+ status = handleUncleanSessionResumption( pContext );
}
if( status == MQTTSuccess )
{
LogInfo( ( "MQTT connection established with the broker." ) );
- pContext->connectStatus = MQTTConnected;
- /* Initialize keep-alive fields after a successful connection. */
- pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds;
- pContext->waitingForPingResp = false;
- pContext->pingReqSendTimeMs = 0U;
+ }
+ else if( ( status == MQTTStatusConnected ) || ( status == MQTTStatusDisconnectPending ) )
+ {
+ LogInfo( ( "MQTT Connection is either already established or a disconnect is pending, return status = %s.",
+ MQTT_Status_strerror( status ) ) );
+ }
+ else if( pContext == NULL )
+ {
+ LogError( ( "MQTT connection failed with status = %s.",
+ MQTT_Status_strerror( status ) ) );
}
else
{
LogError( ( "MQTT connection failed with status = %s.",
MQTT_Status_strerror( status ) ) );
+
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
+
+ if( pContext->connectStatus == MQTTConnected )
+ {
+ /* This will only be executed if after the connack is received
+ * the retransmits fail for some reason on an unclean session
+ * connection. In this case we need to retry the re-transmits
+ * which can only be done using the connect API and that can only
+ * be done once we are disconnected, hence we ask the user to
+ * call disconnect here */
+ pContext->connectStatus = MQTTDisconnectPending;
+ }
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
return status;
@@ -2755,6 +2886,7 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
size_t subscriptionCount,
uint16_t packetId )
{
+ MQTTConnectionStatus_t connectStatus;
size_t remainingLength = 0UL, packetSize = 0UL;
/* Validate arguments. */
@@ -2777,16 +2909,26 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- MQTT_PRE_SEND_HOOK( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
+
+ connectStatus = pContext->connectStatus;
+
+ if( connectStatus != MQTTConnected )
+ {
+ status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
+ }
- /* Send MQTT SUBSCRIBE packet. */
- status = sendSubscribeWithoutCopy( pContext,
- pSubscriptionList,
- subscriptionCount,
- packetId,
- remainingLength );
+ if( status == MQTTSuccess )
+ {
+ /* Send MQTT SUBSCRIBE packet. */
+ status = sendSubscribeWithoutCopy( pContext,
+ pSubscriptionList,
+ subscriptionCount,
+ packetId,
+ remainingLength );
+ }
- MQTT_POST_SEND_HOOK( pContext );
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
return status;
@@ -2802,7 +2944,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
size_t remainingLength = 0UL;
size_t packetSize = 0UL;
MQTTPublishState_t publishStatus = MQTTStateNull;
- bool stateUpdateHookExecuted = false;
+ MQTTConnectionStatus_t connectStatus;
/* Maximum number of bytes required by the 'fixed' part of the PUBLISH
* packet header according to the MQTT specifications.
@@ -2836,67 +2978,71 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
&headerSize );
}
- if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
+ if( status == MQTTSuccess )
{
+ /* Take the mutex as multiple send calls are required for sending this
+ * packet. */
MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- /* Set the flag so that the corresponding hook can be called later. */
- stateUpdateHookExecuted = true;
+ connectStatus = pContext->connectStatus;
- status = MQTT_ReserveState( pContext,
- packetId,
- pPublishInfo->qos );
-
- /* State already exists for a duplicate packet.
- * If a state doesn't exist, it will be handled as a new publish in
- * state engine. */
- if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) )
+ if( connectStatus != MQTTConnected )
{
- status = MQTTSuccess;
+ status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
}
- }
- if( status == MQTTSuccess )
- {
- /* Take the mutex as multiple send calls are required for sending this
- * packet. */
- MQTT_PRE_SEND_HOOK( pContext );
+ if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
+ {
+ /* Set the flag so that the corresponding hook can be called later. */
- status = sendPublishWithoutCopy( pContext,
- pPublishInfo,
- mqttHeader,
- headerSize,
- packetId );
+ status = MQTT_ReserveState( pContext,
+ packetId,
+ pPublishInfo->qos );
- /* Give the mutex away for the next taker. */
- MQTT_POST_SEND_HOOK( pContext );
- }
+ /* State already exists for a duplicate packet.
+ * If a state doesn't exist, it will be handled as a new publish in
+ * state engine. */
+ if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) )
+ {
+ status = MQTTSuccess;
+ }
+ }
- if( ( status == MQTTSuccess ) &&
- ( pPublishInfo->qos > MQTTQoS0 ) )
- {
- /* Update state machine after PUBLISH is sent.
- * Only to be done for QoS1 or QoS2. */
- status = MQTT_UpdateStatePublish( pContext,
- packetId,
- MQTT_SEND,
- pPublishInfo->qos,
- &publishStatus );
+ if( status == MQTTSuccess )
+ {
+ status = sendPublishWithoutCopy( pContext,
+ pPublishInfo,
+ mqttHeader,
+ headerSize,
+ packetId );
+ }
- if( status != MQTTSuccess )
+ if( ( status == MQTTSuccess ) &&
+ ( pPublishInfo->qos > MQTTQoS0 ) )
{
- LogError( ( "Update state for publish failed with status %s."
- " However PUBLISH packet was sent to the broker."
- " Any further handling of ACKs for the packet Id"
- " will fail.",
- MQTT_Status_strerror( status ) ) );
+ /* Update state machine after PUBLISH is sent.
+ * Only to be done for QoS1 or QoS2. */
+ status = MQTT_UpdateStatePublish( pContext,
+ packetId,
+ MQTT_SEND,
+ pPublishInfo->qos,
+ &publishStatus );
+
+ if( status != MQTTSuccess )
+ {
+ LogError( ( "Update state for publish failed with status %s."
+ " However PUBLISH packet was sent to the broker."
+ " Any further handling of ACKs for the packet Id"
+ " will fail.",
+ MQTT_Status_strerror( status ) ) );
+ }
}
- }
- if( stateUpdateHookExecuted == true )
- {
- /* Regardless of the status, if the mutex was taken due to the
- * packet being of QoS > QoS0, then it should be relinquished. */
+ /* mutex should be released and not before updating the state
+ * because we need to make sure that the state is updated
+ * after sending the publish packet, before the receive
+ * loop receives ack for this and would want to update its state
+ */
MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
@@ -2919,6 +3065,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
/* MQTT ping packets are of fixed length. */
uint8_t pingreqPacket[ 2U ];
MQTTFixedBuffer_t localBuffer;
+ MQTTConnectionStatus_t connectStatus;
localBuffer.pBuffer = pingreqPacket;
localBuffer.size = sizeof( pingreqPacket );
@@ -2956,32 +3103,42 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
{
/* Take the mutex as the send call should not be interrupted in
* between. */
- MQTT_PRE_SEND_HOOK( pContext );
- /* Send the serialized PINGREQ packet to transport layer.
- * Here, we do not use the vectored IO approach for efficiency as the
- * Ping packet does not have numerous fields which need to be copied
- * from the user provided buffers. Thus it can be sent directly. */
- sendResult = sendBuffer( pContext,
- localBuffer.pBuffer,
- packetSize );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- /* Give the mutex away. */
- MQTT_POST_SEND_HOOK( pContext );
+ connectStatus = pContext->connectStatus;
- /* It is an error to not send the entire PINGREQ packet. */
- if( sendResult < ( int32_t ) packetSize )
+ if( connectStatus != MQTTConnected )
{
- LogError( ( "Transport send failed for PINGREQ packet." ) );
- status = MQTTSendFailed;
+ status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
}
- else
+
+ if( status == MQTTSuccess )
{
- pContext->pingReqSendTimeMs = pContext->lastPacketTxTime;
- pContext->waitingForPingResp = true;
- LogDebug( ( "Sent %ld bytes of PINGREQ packet.",
- ( long int ) sendResult ) );
+ /* Send the serialized PINGREQ packet to transport layer.
+ * Here, we do not use the vectored IO approach for efficiency as the
+ * Ping packet does not have numerous fields which need to be copied
+ * from the user provided buffers. Thus it can be sent directly. */
+ sendResult = sendBuffer( pContext,
+ localBuffer.pBuffer,
+ packetSize );
+
+ /* It is an error to not send the entire PINGREQ packet. */
+ if( sendResult < ( int32_t ) packetSize )
+ {
+ LogError( ( "Transport send failed for PINGREQ packet." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ pContext->pingReqSendTimeMs = pContext->lastPacketTxTime;
+ pContext->waitingForPingResp = true;
+ LogDebug( ( "Sent %ld bytes of PINGREQ packet.",
+ ( long int ) sendResult ) );
+ }
}
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
return status;
@@ -2994,6 +3151,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
size_t subscriptionCount,
uint16_t packetId )
{
+ MQTTConnectionStatus_t connectStatus;
size_t remainingLength = 0UL, packetSize = 0UL;
/* Validate arguments. */
@@ -3017,16 +3175,25 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
/* Take the mutex because the below call should not be interrupted. */
- MQTT_PRE_SEND_HOOK( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- status = sendUnsubscribeWithoutCopy( pContext,
- pSubscriptionList,
- subscriptionCount,
- packetId,
- remainingLength );
+ connectStatus = pContext->connectStatus;
+
+ if( connectStatus != MQTTConnected )
+ {
+ status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
+ }
- /* Give the mutex away. */
- MQTT_POST_SEND_HOOK( pContext );
+ if( status == MQTTSuccess )
+ {
+ status = sendUnsubscribeWithoutCopy( pContext,
+ pSubscriptionList,
+ subscriptionCount,
+ packetId,
+ remainingLength );
+ }
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
return status;
@@ -3041,6 +3208,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
MQTTStatus_t status = MQTTSuccess;
MQTTFixedBuffer_t localBuffer;
uint8_t disconnectPacket[ 2U ];
+ MQTTConnectionStatus_t connectStatus;
localBuffer.pBuffer = disconnectPacket;
localBuffer.size = 2U;
@@ -3069,38 +3237,46 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
if( status == MQTTSuccess )
{
/* Take the mutex because the below call should not be interrupted. */
- MQTT_PRE_SEND_HOOK( pContext );
-
- /* Here we do not use vectors as the disconnect packet has fixed fields
- * which do not reside in user provided buffers. Thus, it can be sent
- * using a simple send call. */
- sendResult = sendBuffer( pContext,
- localBuffer.pBuffer,
- packetSize );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- /* Give the mutex away. */
- MQTT_POST_SEND_HOOK( pContext );
+ connectStatus = pContext->connectStatus;
- if( sendResult < ( int32_t ) packetSize )
+ if( connectStatus == MQTTNotConnected )
{
- LogError( ( "Transport send failed for DISCONNECT packet." ) );
- status = MQTTSendFailed;
+ status = MQTTStatusNotConnected;
}
- else
+
+ if( status == MQTTSuccess )
{
- LogDebug( ( "Sent %ld bytes of DISCONNECT packet.",
- ( long int ) sendResult ) );
- }
- }
+ LogInfo( ( "Disconnected from the broker." ) );
+ pContext->connectStatus = MQTTNotConnected;
- if( status == MQTTSuccess )
- {
- LogInfo( ( "Disconnected from the broker." ) );
- pContext->connectStatus = MQTTNotConnected;
+ /* Reset the index and clean the buffer on a successful disconnect. */
+ pContext->index = 0;
+ ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
+
+ LogError( ( "MQTT Connection Disconnected Successfully" ) );
+
+ /* Here we do not use vectors as the disconnect packet has fixed fields
+ * which do not reside in user provided buffers. Thus, it can be sent
+ * using a simple send call. */
+ sendResult = sendBuffer( pContext,
+ localBuffer.pBuffer,
+ packetSize );
- /* Reset the index and clean the buffer on a successful disconnect. */
- pContext->index = 0;
- ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
+ if( sendResult < ( int32_t ) packetSize )
+ {
+ LogError( ( "Transport send failed for DISCONNECT packet." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ LogDebug( ( "Sent %ld bytes of DISCONNECT packet.",
+ ( long int ) sendResult ) );
+ }
+ }
+
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
return status;
@@ -3372,6 +3548,18 @@ const char * MQTT_Status_strerror( MQTTStatus_t status )
str = "MQTTNeedMoreBytes";
break;
+ case MQTTStatusConnected:
+ str = "MQTTStatusConnected";
+ break;
+
+ case MQTTStatusNotConnected:
+ str = "MQTTStatusNotConnected";
+ break;
+
+ case MQTTStatusDisconnectPending:
+ str = "MQTTStatusDisconnectPending";
+ break;
+
default:
str = "Invalid MQTT Status code";
break;
diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h
index 8e0bad49e..ba5219348 100644
--- a/source/include/core_mqtt.h
+++ b/source/include/core_mqtt.h
@@ -107,8 +107,9 @@ typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext,
*/
typedef enum MQTTConnectionStatus
{
- MQTTNotConnected, /**< @brief MQTT Connection is inactive. */
- MQTTConnected /**< @brief MQTT Connection is active. */
+ MQTTNotConnected, /**< @brief MQTT Connection is inactive. */
+ MQTTConnected, /**< @brief MQTT Connection is active. */
+ MQTTDisconnectPending /**< @brief MQTT Connection needs to be disconnected as a transport error has occurred. */
} MQTTConnectionStatus_t;
/**
@@ -414,6 +415,25 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
size_t incomingPublishCount );
/* @[declare_mqtt_initstatefulqos] */
+/**
+ * @brief Checks the MQTT connection status with the broker.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ *
+ * @return #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTStatusConnected if the MQTT connection is established with the broker.
+ * #MQTTStatusNotConnected if the MQTT connection is broker.
+ * #MQTTStatusDisconnectPending if Transport Interface has failed and MQTT connection needs to be closed.
+ *
+ * Example
+ * @code{c}
+ *
+ * @endcode
+ */
+/* @[declare_mqtt_checkconnectstatus] */
+MQTTStatus_t MQTT_CheckConnectStatus( MQTTContext_t * pContext );
+/* @[declare_mqtt_checkconnectstatus] */
+
/**
* @brief Establish an MQTT session.
*
diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h
index 4837ee654..86b3c6396 100644
--- a/source/include/core_mqtt_serializer.h
+++ b/source/include/core_mqtt_serializer.h
@@ -96,9 +96,12 @@ typedef enum MQTTStatus
MQTTIllegalState, /**< An illegal state in the state record. */
MQTTStateCollision, /**< A collision with an existing state record entry. */
MQTTKeepAliveTimeout, /**< Timeout while waiting for PINGRESP. */
- MQTTNeedMoreBytes /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received
+ MQTTNeedMoreBytes, /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received
incomplete data; it should be called again (probably after
a delay). */
+ MQTTStatusConnected, /**< MQTT connection is established with the broker */
+ MQTTStatusNotConnected, /**< MQTT connection is not established with the broker */
+ MQTTStatusDisconnectPending /**< Transport Interface has failed and MQTT connection needs to be closed */
} MQTTStatus_t;
/**
diff --git a/test/unit-test/core_mqtt_utest.c b/test/unit-test/core_mqtt_utest.c
index c9acae5aa..131f255df 100644
--- a/test/unit-test/core_mqtt_utest.c
+++ b/test/unit-test/core_mqtt_utest.c
@@ -1062,6 +1062,11 @@ static void expectProcessLoopCalls( MQTTContext_t * const pContext,
}
}
+ if( expectMoreCalls && ( pContext->connectStatus != MQTTConnected ) )
+ {
+ expectMoreCalls = false;
+ }
+
/* Update the state based on the sent packet. */
if( expectMoreCalls )
{
@@ -1156,6 +1161,46 @@ void test_MQTT_Init_Invalid_Params( void )
TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus );
}
+/* ========================================================================== */
+
+/**
+ * @brief Test that NULL parameter causes MQTT_CheckConnectStatus to return MQTTBadParameter.
+ */
+void test_MQTT_CheckConnectStatus_invalid_params( void )
+{
+ MQTTStatus_t mqttStatus = { 0 };
+
+ mqttStatus = MQTT_CheckConnectStatus( NULL );
+ TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus );
+}
+
+/**
+ * @brief Test that MQTT_CheckConnectStatus returns correct status corresponding to the connection status.
+ */
+void test_MQTT_CheckConnectStatus_return_correct_status( void )
+{
+ MQTTStatus_t mqttStatus = { 0 };
+ MQTTContext_t context = { 0 };
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+
+ setupTransportInterface( &transport );
+ setupNetworkBuffer( &networkBuffer );
+
+ context.connectStatus = MQTTConnected;
+ mqttStatus = MQTT_CheckConnectStatus( &context );
+ TEST_ASSERT_EQUAL( MQTTStatusConnected, mqttStatus );
+
+ context.connectStatus = MQTTNotConnected;
+ mqttStatus = MQTT_CheckConnectStatus( &context );
+ TEST_ASSERT_EQUAL( MQTTStatusNotConnected, mqttStatus );
+
+ context.connectStatus = MQTTDisconnectPending;
+ mqttStatus = MQTT_CheckConnectStatus( &context );
+ TEST_ASSERT_EQUAL( MQTTStatusDisconnectPending, mqttStatus );
+}
+
+
/* ========================================================================== */
static uint8_t * MQTT_SerializeConnectFixedHeader_cb( uint8_t * pIndex,
@@ -1172,6 +1217,72 @@ static uint8_t * MQTT_SerializeConnectFixedHeader_cb( uint8_t * pIndex,
return pIndex;
}
+/**
+ * @brief Test MQTT_Connect, when the status is already MQTTConnected.
+ */
+void test_MQTT_Connect_sendConnect_already_connected( void )
+{
+ MQTTContext_t mqttContext = { 0 };
+ MQTTConnectInfo_t connectInfo = { 0 };
+ uint32_t timeout = 2;
+ bool sessionPresent;
+ MQTTStatus_t status;
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ size_t remainingLength;
+ size_t packetSize;
+
+ memset( &mqttContext, 0x0, sizeof( mqttContext ) );
+ MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+
+ /* set MQTT Connection status as connected*/
+ mqttContext.connectStatus = MQTTConnected;
+
+ MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb );
+ MQTT_GetConnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetConnectPacketSize_IgnoreArg_pPacketSize();
+ MQTT_GetConnectPacketSize_IgnoreArg_pRemainingLength();
+ MQTT_GetConnectPacketSize_ReturnThruPtr_pPacketSize( &packetSize );
+ MQTT_GetConnectPacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
+
+ status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent );
+
+ TEST_ASSERT_EQUAL_INT( MQTTStatusConnected, status );
+}
+
+/**
+ * @brief Test MQTT_Connect, when the status is MQTTDisconnectPending.
+ */
+void test_MQTT_Connect_sendConnect_disconnect_pending( void )
+{
+ MQTTContext_t mqttContext = { 0 };
+ MQTTConnectInfo_t connectInfo = { 0 };
+ uint32_t timeout = 2;
+ bool sessionPresent;
+ MQTTStatus_t status;
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ size_t remainingLength;
+ size_t packetSize;
+
+ memset( &mqttContext, 0x0, sizeof( mqttContext ) );
+ MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+
+ /* set MQTT Connection status as connected*/
+ mqttContext.connectStatus = MQTTDisconnectPending;
+
+ MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb );
+ MQTT_GetConnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetConnectPacketSize_IgnoreArg_pPacketSize();
+ MQTT_GetConnectPacketSize_IgnoreArg_pRemainingLength();
+ MQTT_GetConnectPacketSize_ReturnThruPtr_pPacketSize( &packetSize );
+ MQTT_GetConnectPacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
+
+ status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent );
+
+ TEST_ASSERT_EQUAL_INT( MQTTStatusDisconnectPending, status );
+}
+
/**
* @brief Test MQTT_Connect, except for receiving the CONNACK.
*/
@@ -1312,6 +1423,7 @@ void test_MQTT_Connect_sendConnect2( void )
packetSize = 13;
remainingLength = 11;
mqttContext.transportInterface.send = transportSendFailure;
+ mqttContext.transportInterface.writev = NULL;
MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb );
MQTT_GetConnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetConnectPacketSize_IgnoreArg_pPacketSize();
@@ -1891,12 +2003,32 @@ void test_MQTT_Connect_resendPendingAcks( void )
MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTBadParameter );
MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID );
status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult );
+ TEST_ASSERT_EQUAL_INT( MQTTBadParameter, status );
+ TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus );
+ TEST_ASSERT_TRUE( sessionPresentResult );
+
+ /* Test 3. One packet found in ack pending state, but Transport Send failed. */
+ sessionPresentResult = false;
+ mqttContext.connectStatus = MQTTNotConnected;
+ mqttContext.keepAliveIntervalSec = 0;
+ mqttContext.transportInterface.send = transportSendFailure;
+ MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket );
+ MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent );
+ MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier );
+ MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState );
+ MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID );
+ status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult );
TEST_ASSERT_EQUAL_INT( MQTTSendFailed, status );
- TEST_ASSERT_EQUAL_INT( MQTTNotConnected, mqttContext.connectStatus );
+ TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus );
TEST_ASSERT_TRUE( sessionPresentResult );
+ mqttContext.transportInterface.send = transportSendSuccess;
- /* Test 3. One packet found in ack pending state, Sent
+ /* Test 4. One packet found in ack pending state, Sent
* PUBREL successfully. */
+ mqttContext.connectStatus = MQTTNotConnected;
MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket );
MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess );
@@ -1913,7 +2045,7 @@ void test_MQTT_Connect_resendPendingAcks( void )
TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus );
TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec );
- /* Test 4. Three packets found in ack pending state. Sent PUBREL successfully
+ /* Test 5. Three packets found in ack pending state. Sent PUBREL successfully
* for first and failed for second and no attempt for third. */
mqttContext.keepAliveIntervalSec = 0;
mqttContext.connectStatus = MQTTNotConnected;
@@ -1935,11 +2067,12 @@ void test_MQTT_Connect_resendPendingAcks( void )
/* Query for any remaining packets pending to ack. */
MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 2 );
status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent );
- TEST_ASSERT_EQUAL_INT( MQTTSendFailed, status );
- TEST_ASSERT_EQUAL_INT( MQTTNotConnected, mqttContext.connectStatus );
+ TEST_ASSERT_EQUAL_INT( MQTTBadParameter, status );
+ TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus );
- /* Test 5. Two packets found in ack pending state. Sent PUBREL successfully
+ /* Test 6. Two packets found in ack pending state. Sent PUBREL successfully
* for first and failed for second. */
+ mqttContext.connectStatus = MQTTNotConnected;
MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket );
MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess );
@@ -2429,6 +2562,7 @@ void test_MQTT_Publish_DuplicatePublish( void )
mqttContext.outgoingPublishRecordMaxCount = 10;
mqttContext.outgoingPublishRecords = outgoingPublishRecord;
+ mqttContext.connectStatus = MQTTConnected;
publishInfo.qos = MQTTQoS1;
publishInfo.dup = true;
@@ -2442,6 +2576,40 @@ void test_MQTT_Publish_DuplicatePublish( void )
TEST_ASSERT_EQUAL_INT( MQTTSuccess, status );
}
+/**
+ * @brief Test that MQTT_Publish works as intended when the connection status is anything but MQTTConnected.
+ */
+void test_MQTT_Publish_not_connected( void )
+{
+ MQTTContext_t mqttContext = { 0 };
+ MQTTPublishInfo_t publishInfo = { 0 };
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ MQTTStatus_t status;
+
+ setupTransportInterface( &transport );
+ setupNetworkBuffer( &networkBuffer );
+ transport.send = transportSendFailure;
+
+ memset( &mqttContext, 0x0, sizeof( mqttContext ) );
+ memset( &publishInfo, 0x0, sizeof( publishInfo ) );
+ MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+
+ /* Test 1 connection status is MQTTNotConnected */
+ mqttContext.connectStatus = MQTTNotConnected;
+ MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess );
+ status = MQTT_Publish( &mqttContext, &publishInfo, 10 );
+ TEST_ASSERT_EQUAL_INT( MQTTStatusNotConnected, status );
+
+ /* Test 2 connection status is MQTTDisconnectPending */
+ mqttContext.connectStatus = MQTTDisconnectPending;
+ MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess );
+ status = MQTT_Publish( &mqttContext, &publishInfo, 10 );
+ TEST_ASSERT_EQUAL_INT( MQTTStatusDisconnectPending, status );
+}
+
/**
* @brief Test that MQTT_Publish works as intended.
*/
@@ -2470,6 +2638,7 @@ void test_MQTT_Publish_DuplicatePublish_UpdateFailed( void )
mqttContext.outgoingPublishRecordMaxCount = 10;
mqttContext.outgoingPublishRecords = outgoingPublishRecord;
+ mqttContext.connectStatus = MQTTConnected;
publishInfo.qos = MQTTQoS1;
publishInfo.dup = true;
@@ -2513,6 +2682,7 @@ void test_MQTT_Publish_WriteVSendsPartialBytes( void )
mqttContext.outgoingPublishRecordMaxCount = 10;
mqttContext.outgoingPublishRecords = outgoingPublishRecord;
+ mqttContext.connectStatus = MQTTConnected;
publishInfo.qos = MQTTQoS1;
publishInfo.dup = false;
@@ -2547,6 +2717,7 @@ void test_MQTT_Publish7( void )
MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess );
+ mqttContext.connectStatus = MQTTConnected;
/* We need sendPacket to be called with at least 1 byte to send, so that
* it can return failure. This argument is the output of serializing the
@@ -2576,6 +2747,8 @@ void test_MQTT_Publish8( void )
memset( &publishInfo, 0x0, sizeof( publishInfo ) );
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTConnected;
+
/* We want to test the first call to sendPacket within sendPublish succeeding,
* and the second one failing. */
mqttContext.transportInterface.send = transportSendSucceedThenFail;
@@ -2606,6 +2779,8 @@ void test_MQTT_Publish9( void )
memset( &publishInfo, 0x0, sizeof( publishInfo ) );
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTConnected;
+
MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess );
mqttContext.transportInterface.send = transportSendSuccess;
@@ -2632,6 +2807,8 @@ void test_MQTT_Publish10( void )
memset( &publishInfo, 0x0, sizeof( publishInfo ) );
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTConnected;
+
/* Test that sending a publish without a payload succeeds. */
publishInfo.pPayload = NULL;
publishInfo.payloadLength = 0;
@@ -2661,6 +2838,8 @@ void test_MQTT_Publish11( void )
memset( &publishInfo, 0x0, sizeof( publishInfo ) );
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTConnected;
+
/* Restore the test payload and length. */
publishInfo.pPayload = "Test";
publishInfo.payloadLength = 4;
@@ -2710,6 +2889,8 @@ void test_MQTT_Publish12( void )
mqttContext.outgoingPublishRecords->qos = MQTTQoS2;
+ mqttContext.connectStatus = MQTTConnected;
+
expectedState = MQTTPublishSend;
MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
@@ -2746,6 +2927,8 @@ void test_MQTT_Publish13( void )
memset( &publishInfo, 0x0, sizeof( publishInfo ) );
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTConnected;
+
MQTT_InitStatefulQoS( &mqttContext,
&outgoingRecords, 4,
&incomingRecords, 4 );
@@ -2781,6 +2964,8 @@ void test_MQTT_Publish14( void )
memset( &publishInfo, 0x0, sizeof( publishInfo ) );
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTConnected;
+
/* Duplicate publish. dup flag is marked by application. */
publishInfo.dup = true;
@@ -2811,6 +2996,8 @@ void test_MQTT_Publish15( void )
memset( &publishInfo, 0x0, sizeof( publishInfo ) );
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTConnected;
+
/* Duplicate publish. dup flag is marked by application.
* State record is not present. */
publishInfo.dup = true;
@@ -2846,6 +3033,8 @@ void test_MQTT_Publish_Send_Timeout( void )
/* Initialize the MQTT context. */
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTConnected;
+
/* Setup for making sure that the test results in calling sendPacket function
* where calls to transport send function are made (repeatedly to send packet
* over the network).*/
@@ -2863,6 +3052,32 @@ void test_MQTT_Publish_Send_Timeout( void )
/* ========================================================================== */
+/**
+ * @brief Test that MQTT_Disconnect works as intended when the connection is already disconnected.
+ */
+void test_MQTT_Disconnect_already_disconnected( void )
+{
+ MQTTContext_t mqttContext = { 0 };
+ MQTTStatus_t status;
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ size_t disconnectSize = 2;
+
+ setupTransportInterface( &transport );
+ setupNetworkBuffer( &networkBuffer );
+
+ memset( &mqttContext, 0x0, sizeof( mqttContext ) );
+ MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTNotConnected;
+
+ /* Send failure with network error. */
+ MQTT_GetDisconnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetDisconnectPacketSize_ReturnThruPtr_pPacketSize( &disconnectSize );
+ MQTT_SerializeDisconnect_ExpectAnyArgsAndReturn( MQTTSuccess );
+ status = MQTT_Disconnect( &mqttContext );
+ TEST_ASSERT_EQUAL( MQTTStatusNotConnected, status );
+}
+
/**
* @brief Test that MQTT_Disconnect works as intended.
*/
@@ -2923,6 +3138,7 @@ void test_MQTT_Disconnect2( void )
MQTT_SerializeDisconnect_ExpectAnyArgsAndReturn( MQTTSuccess );
status = MQTT_Disconnect( &mqttContext );
TEST_ASSERT_EQUAL( MQTTSendFailed, status );
+ TEST_ASSERT_EQUAL( MQTTNotConnected, mqttContext.connectStatus );
}
/**
@@ -3015,6 +3231,52 @@ void test_MQTT_Disconnect4( void )
/* At disconnect, the buffer is cleared of any pending packets. */
TEST_ASSERT_EACH_EQUAL_UINT8( 0, mqttBuffer, MQTT_TEST_BUFFER_LENGTH );
}
+
+/**
+ * @brief Test that MQTT_Disconnect works as intended when status is disconnect pending.
+ */
+void test_MQTT_Disconnect4_status_disconnect_pending( void )
+{
+ MQTTContext_t mqttContext = { 0 };
+ MQTTStatus_t status;
+ uint8_t buffer[ 10 ];
+ uint8_t * bufPtr = buffer;
+ NetworkContext_t networkContext = { 0 };
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ size_t disconnectSize = 2;
+
+ /* Fill the buffer with garbage data. */
+ memset( mqttBuffer, 0xAB, MQTT_TEST_BUFFER_LENGTH );
+
+ setupTransportInterface( &transport );
+ setupNetworkBuffer( &networkBuffer );
+ networkContext.buffer = &bufPtr;
+ transport.pNetworkContext = &networkContext;
+ transport.recv = transportRecvSuccess;
+ transport.send = transportSendFailure;
+ transport.writev = NULL;
+
+ memset( &mqttContext, 0x0, sizeof( mqttContext ) );
+ MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );
+ mqttContext.connectStatus = MQTTDisconnectPending;
+
+ /* Successful send. */
+ mqttContext.transportInterface.send = mockSend;
+ MQTT_GetDisconnectPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetDisconnectPacketSize_ReturnThruPtr_pPacketSize( &disconnectSize );
+ MQTT_SerializeDisconnect_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_SerializeDisconnect_Stub( MQTT_SerializeDisconnect_stub );
+ /* Write a disconnect packet into the buffer. */
+ mqttBuffer[ 0 ] = MQTT_PACKET_TYPE_DISCONNECT;
+
+ status = MQTT_Disconnect( &mqttContext );
+
+ TEST_ASSERT_EQUAL( MQTTSuccess, status );
+ TEST_ASSERT_EQUAL( MQTTNotConnected, mqttContext.connectStatus );
+ /* At disconnect, the buffer is cleared of any pending packets. */
+ TEST_ASSERT_EACH_EQUAL_UINT8( 0, mqttBuffer, MQTT_TEST_BUFFER_LENGTH );
+}
/* ========================================================================== */
/**
@@ -3222,6 +3484,8 @@ void test_MQTT_ProcessLoop_HandleKeepAlive1( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned. */
MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize );
@@ -3273,6 +3537,25 @@ void test_MQTT_ProcessLoop_RecvFailed( void )
transport.recv = transportRecvFailure;
setupNetworkBuffer( &networkBuffer );
+ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
+ context.connectStatus = MQTTConnected;
+
+ mqttStatus = MQTT_ProcessLoop( &context );
+
+ TEST_ASSERT_EQUAL( MQTTRecvFailed, mqttStatus );
+}
+
+void test_MQTT_ProcessLoop_RecvFailed_disconnected( void )
+{
+ MQTTContext_t context = { 0 };
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ MQTTStatus_t mqttStatus;
+
+ setupTransportInterface( &transport );
+ transport.recv = transportRecvFailure;
+ setupNetworkBuffer( &networkBuffer );
+
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
mqttStatus = MQTT_ProcessLoop( &context );
@@ -3325,6 +3608,8 @@ void test_MQTT_ProcessLoop_discardPacket_second_recv_fail( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
context.networkBuffer.size = 20;
incomingPacket.type = currentPacketType;
@@ -3362,6 +3647,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path1( void )
mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingCallback, 10 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
modifyIncomingPacketStatus = MQTTSuccess;
/* Assume QoS = 1 so that a PUBACK will be sent after receiving PUBLISH.
@@ -3399,6 +3686,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path2( void )
mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingPublish, 10 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
modifyIncomingPacketStatus = MQTTSuccess;
/* Assume QoS = 2 so that a PUBREC will be sent after receiving PUBLISH.
@@ -3443,6 +3732,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path3( void )
&outgoingRecords, 4,
&incomingRecords, 4 );
+ context.connectStatus = MQTTConnected;
+
modifyIncomingPacketStatus = MQTTSuccess;
/* Duplicate QoS1 publish received.
@@ -3489,6 +3780,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path4( void )
mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingPublish, 10 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
modifyIncomingPacketStatus = MQTTSuccess;
/* Duplicate QoS2 publish received.
@@ -3530,6 +3823,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path5( void )
mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingPublish, 10 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
modifyIncomingPacketStatus = MQTTSuccess;
/* A publish is received when already a state record exists, but dup
@@ -3568,6 +3863,8 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Happy_Path6( void )
mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, pIncomingPublish, 10 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
modifyIncomingPacketStatus = MQTTSuccess;
/* Duplicate QoS2 publish received with no collision.
@@ -3640,6 +3937,7 @@ void test_MQTT_ProcessLoop_handleIncomingPublish_Error_Paths( void )
expectParams.pPubInfo = &publishInfo;
/* The other loop parameter fields are irrelevant. */
expectProcessLoopCalls( &context, &expectParams );
+
TEST_ASSERT_FALSE( isEventCallbackInvoked );
}
@@ -3711,6 +4009,8 @@ void test_MQTT_ProcessLoop_handleIncomingAck_Happy_Paths( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
modifyIncomingPacketStatus = MQTTSuccess;
/* Mock the receiving of a PUBACK packet type and expect the appropriate
@@ -3809,6 +4109,8 @@ void test_MQTT_ProcessLoop_handleIncomingAck_Error_Paths( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
modifyIncomingPacketStatus = MQTTSuccess;
/* Verify that MQTTBadResponse is propagated when deserialization fails upon
@@ -3828,8 +4130,34 @@ void test_MQTT_ProcessLoop_handleIncomingAck_Error_Paths( void )
expectParams.stateAfterDeserialize = MQTTPubRelSend;
expectParams.serializeStatus = MQTTNoMemory;
expectParams.stateAfterSerialize = MQTTStateNull;
- expectParams.processLoopStatus = MQTTSendFailed;
+ expectParams.processLoopStatus = MQTTNoMemory;
+ expectProcessLoopCalls( &context, &expectParams );
+
+ /* Verify that MQTTStatusNotConnected propagated when receiving a any ACK,
+ * here PUBREC but thr connection status is MQTTNotConnected. */
+ currentPacketType = MQTT_PACKET_TYPE_PUBREC;
+ /* Set expected return values in the loop. */
+ resetProcessLoopParams( &expectParams );
+ expectParams.stateAfterDeserialize = MQTTPubRelSend;
+ expectParams.stateAfterSerialize = MQTTPubCompPending;
+ expectParams.serializeStatus = MQTTSuccess;
+ expectParams.processLoopStatus = MQTTStatusNotConnected;
+ context.connectStatus = MQTTNotConnected;
+ expectProcessLoopCalls( &context, &expectParams );
+ context.connectStatus = MQTTConnected;
+
+ /* Verify that MQTTStatusNotConnected propagated when receiving a any ACK,
+ * here PUBREC but thr connection status is MQTTNotConnected. */
+ currentPacketType = MQTT_PACKET_TYPE_PUBREC;
+ /* Set expected return values in the loop. */
+ resetProcessLoopParams( &expectParams );
+ expectParams.stateAfterDeserialize = MQTTPubRelSend;
+ expectParams.stateAfterSerialize = MQTTPubCompPending;
+ expectParams.serializeStatus = MQTTSuccess;
+ expectParams.processLoopStatus = MQTTStatusDisconnectPending;
+ context.connectStatus = MQTTDisconnectPending;
expectProcessLoopCalls( &context, &expectParams );
+ context.connectStatus = MQTTConnected;
/* Verify that MQTTBadResponse is propagated when deserialization fails upon
* receiving a PUBACK. */
@@ -3898,6 +4226,8 @@ void test_MQTT_ProcessLoop_handleKeepAlive_Happy_Paths1( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
context.waitingForPingResp = false;
context.keepAliveIntervalSec = 0;
expectParams.incomingPublish = false;
@@ -4183,6 +4513,8 @@ void test_MQTT_ProcessLoop_handleKeepAlive_Error_Paths3( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
globalEntryTime = PACKET_RX_TIMEOUT_MS + 1;
context.keepAliveIntervalSec = 0;
context.lastPacketTxTime = 0;
@@ -4224,6 +4556,8 @@ void test_MQTT_ProcessLoop_handleKeepAlive_Error_Paths4( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
globalEntryTime = PACKET_RX_TIMEOUT_MS + 1;
context.keepAliveIntervalSec = ( PACKET_TX_TIMEOUT_MS / 1000 ) + 1U;
context.lastPacketTxTime = 0;
@@ -4368,6 +4702,8 @@ void test_MQTT_ProcessLoop_Timer_Overflow( void )
mqttStatus = MQTT_InitStatefulQoS( &context, NULL, 0, incomingPublishRecords, 10 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
MQTT_ProcessIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_ProcessIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket );
/* Assume QoS = 1 so that a PUBACK will be sent after receiving PUBLISH. */
@@ -4571,6 +4907,8 @@ void test_MQTT_Subscribe_happy_path( void )
&incomingRecords, 4 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned with the following mocks. */
MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
@@ -4583,6 +4921,58 @@ void test_MQTT_Subscribe_happy_path( void )
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
}
+/**
+ * @brief This test case verifies that MQTT_Subscribe does not return success if the connect status
+ * is anythin but MQTTConnected.
+ */
+void test_MQTT_Subscribe_happy_path_not_connected( void )
+{
+ MQTTStatus_t mqttStatus;
+ MQTTContext_t context = { 0 };
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ MQTTSubscribeInfo_t subscribeInfo = { 0 };
+ size_t remainingLength = MQTT_SAMPLE_REMAINING_LENGTH;
+ size_t packetSize = MQTT_SAMPLE_REMAINING_LENGTH;
+ MQTTPubAckInfo_t incomingRecords = { 0 };
+ MQTTPubAckInfo_t outgoingRecords = { 0 };
+
+ setupTransportInterface( &transport );
+ setupNetworkBuffer( &networkBuffer );
+ setupSubscriptionInfo( &subscribeInfo );
+
+ /* Initialize context. */
+ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
+ TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ mqttStatus = MQTT_InitStatefulQoS( &context,
+ &outgoingRecords, 4,
+ &incomingRecords, 4 );
+ TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ /* Test 1 connect status is MQTTNotConnected */
+ context.connectStatus = MQTTNotConnected;
+ /* Verify MQTTSuccess is returned with the following mocks. */
+ MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
+ MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
+ MQTT_SerializeSubscribeHeader_Stub( MQTT_SerializeSubscribedHeader_cb );
+ /* Expect the above calls when running MQTT_Subscribe. */
+ mqttStatus = MQTT_Subscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID );
+ TEST_ASSERT_EQUAL( MQTTStatusNotConnected, mqttStatus );
+
+ /* Test 2 connect status is MQTTDisconnectPending*/
+ context.connectStatus = MQTTDisconnectPending;
+ /* Verify MQTTSuccess is returned with the following mocks. */
+ MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
+ MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
+ MQTT_SerializeSubscribeHeader_Stub( MQTT_SerializeSubscribedHeader_cb );
+ /* Expect the above calls when running MQTT_Subscribe. */
+ mqttStatus = MQTT_Subscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID );
+ TEST_ASSERT_EQUAL( MQTTStatusDisconnectPending, mqttStatus );
+}
+
/**
* @brief This test case verifies that MQTT_Subscribe returns successfully
* when valid parameters are passed and all bytes are sent.
@@ -4612,6 +5002,8 @@ void test_MQTT_Subscribe_happy_path1( void )
&incomingRecords, 4 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned with the following mocks. */
MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
@@ -4654,6 +5046,8 @@ void test_MQTT_Subscribe_happy_path2( void )
&incomingRecords, 4 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned with the following mocks. */
MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
@@ -4711,6 +5105,8 @@ void test_MQTT_Subscribe_MultipleSubscriptions( void )
&incomingRecords, 4 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned with the following mocks. */
MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
@@ -4748,6 +5144,9 @@ void test_MQTT_Subscribe_error_paths1( void )
/* Initialize context. */
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSendFailed is propagated when transport interface returns an error. */
MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
@@ -4785,6 +5184,8 @@ void test_MQTT_Subscribe_error_paths2( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
@@ -4793,6 +5194,44 @@ void test_MQTT_Subscribe_error_paths2( void )
TEST_ASSERT_EQUAL( MQTTSendFailed, mqttStatus );
}
+/**
+ * @brief This test case verifies that MQTT_Subscribe returns MQTTSendFailed
+ * if transport interface fails to send and the connection status is converted to
+ * MQTTDisconnectPending
+ */
+void test_MQTT_Subscribe_error_paths_with_transport_failure( void )
+{
+ MQTTStatus_t mqttStatus;
+ MQTTContext_t context = { 0 };
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ MQTTSubscribeInfo_t subscribeInfo = { 0 };
+ size_t remainingLength = MQTT_SAMPLE_REMAINING_LENGTH;
+ size_t packetSize = MQTT_SAMPLE_REMAINING_LENGTH;
+
+ /* Verify that an error is propagated when transport interface returns an error. */
+ setupNetworkBuffer( &networkBuffer );
+ setupSubscriptionInfo( &subscribeInfo );
+ subscribeInfo.qos = MQTTQoS0;
+ setupTransportInterface( &transport );
+ transport.writev = NULL;
+ transport.send = transportSendFailure;
+
+ /* Initialize context. */
+ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
+ TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ context.connectStatus = MQTTConnected;
+
+ MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
+ MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
+ MQTT_SerializeSubscribeHeader_Stub( MQTT_SerializeSubscribedHeader_cb );
+ mqttStatus = MQTT_Subscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID );
+ TEST_ASSERT_EQUAL( MQTTSendFailed, mqttStatus );
+ TEST_ASSERT_EQUAL( MQTTDisconnectPending, context.connectStatus );
+}
+
/**
* @brief This test case verifies that MQTT_Subscribe returns MQTTSendFailed
* if transport interface send fails and timer overflows.
@@ -4827,6 +5266,8 @@ void test_MQTT_Subscribe_error_paths_timerOverflowCheck( void )
mqttStatus = MQTT_Init( &context, &transport, getTimeMock, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
@@ -4870,6 +5311,8 @@ void test_MQTT_Subscribe_error_paths_timerOverflowCheck1( void )
mqttStatus = MQTT_Init( &context, &transport, getTimeMockBigTimeStep, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
MQTT_GetSubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
MQTT_GetSubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
@@ -4944,6 +5387,9 @@ void test_MQTT_Unsubscribe_happy_path( void )
/* Initialize context. */
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned with the following mocks. */
MQTT_SerializeUnsubscribeHeader_Stub( MQTT_SerializeUnsubscribeHeader_cb );
MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
@@ -4955,6 +5401,50 @@ void test_MQTT_Unsubscribe_happy_path( void )
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
}
+/**
+ * @brief This test case verifies that MQTT_Unsubscribe does not return success
+ * when the connection status is anything but MQTTConnected
+ */
+void test_MQTT_Unsubscribe_not_connected( void )
+{
+ MQTTStatus_t mqttStatus;
+ MQTTContext_t context = { 0 };
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ MQTTSubscribeInfo_t subscribeInfo = { 0 };
+ size_t remainingLength = MQTT_SAMPLE_REMAINING_LENGTH;
+ size_t packetSize = MQTT_SAMPLE_REMAINING_LENGTH;
+
+ setupTransportInterface( &transport );
+ setupNetworkBuffer( &networkBuffer );
+ setupSubscriptionInfo( &subscribeInfo );
+ subscribeInfo.qos = MQTTQoS0;
+
+ /* Initialize context. */
+ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
+ TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ /* Test 1 Connection status is MQTTNotConnected*/
+ context.connectStatus = MQTTNotConnected;
+ /* Verify MQTTSuccess is returned with the following mocks. */
+ MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
+ MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
+ /* Expect the above calls when running MQTT_Unsubscribe. */
+ mqttStatus = MQTT_Unsubscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID );
+ TEST_ASSERT_EQUAL( MQTTStatusNotConnected, mqttStatus );
+
+ /* Test 2 Connection status is MQTTDisconnectPending*/
+ context.connectStatus = MQTTDisconnectPending;
+ /* Verify MQTTSuccess is returned with the following mocks. */
+ MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
+ MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pRemainingLength( &remainingLength );
+ /* Expect the above calls when running MQTT_Unsubscribe. */
+ mqttStatus = MQTT_Unsubscribe( &context, &subscribeInfo, 1, MQTT_FIRST_VALID_PACKET_ID );
+ TEST_ASSERT_EQUAL( MQTTStatusDisconnectPending, mqttStatus );
+}
+
/**
* @brief This test case verifies that MQTT_Subscribe returns successfully
* when valid parameters are passed and all bytes are sent.
@@ -4984,6 +5474,8 @@ void test_MQTT_Unsubscribe_happy_path1( void )
&incomingRecords, 4 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned with the following mocks. */
MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
@@ -5027,6 +5519,8 @@ void test_MQTT_unsubscribe_happy_path2( void )
&incomingRecords, 4 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned with the following mocks. */
MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
@@ -5084,6 +5578,8 @@ void test_MQTT_Unsubscribe_MultipleSubscriptions( void )
&incomingRecords, 4 );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned with the following mocks. */
MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetUnsubscribePacketSize_ReturnThruPtr_pPacketSize( &packetSize );
@@ -5123,6 +5619,9 @@ void test_MQTT_Unsubscribe_error_path1( void )
/* Initialize context. */
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ context.connectStatus = MQTTConnected;
+
MQTT_SerializeUnsubscribeHeader_Stub( MQTT_SerializeUnsubscribeHeader_cb );
/* Verify MQTTSendFailed is propagated when transport interface returns an error. */
MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
@@ -5163,6 +5662,8 @@ void test_MQTT_Unsubscribe_error_path2( void )
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+ context.connectStatus = MQTTConnected;
+
MQTT_SerializeUnsubscribeHeader_Stub( MQTT_SerializeUnsubscribeHeader_cb );
transport.send = transportSendNoBytes; /* Use the mock function that returns zero bytes sent. */
MQTT_GetUnsubscribePacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
@@ -5206,6 +5707,9 @@ void test_MQTT_Ping_happy_path( void )
/* Initialize context. */
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSuccess is returned. */
MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize );
@@ -5218,6 +5722,46 @@ void test_MQTT_Ping_happy_path( void )
TEST_ASSERT_TRUE( context.waitingForPingResp );
}
+/**
+ * @brief This test case verifies that MQTT_Ping does not returns success
+ * if the connection status is anything but MQTTConnect.
+ */
+void test_MQTT_Ping_not_connected( void )
+{
+ MQTTStatus_t mqttStatus;
+ MQTTContext_t context = { 0 };
+ TransportInterface_t transport = { 0 };
+ MQTTFixedBuffer_t networkBuffer = { 0 };
+ size_t pingreqSize = MQTT_PACKET_PINGREQ_SIZE;
+
+ setupTransportInterface( &transport );
+ setupNetworkBuffer( &networkBuffer );
+
+ /* Initialize context. */
+ mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
+ TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ /* Test 1 when the connection status is MQTTNotConnected*/
+ context.connectStatus = MQTTNotConnected;
+ /* Verify MQTTSuccess is returned. */
+ MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize );
+ MQTT_SerializePingreq_ExpectAnyArgsAndReturn( MQTTSuccess );
+ /* Expect the above calls when running MQTT_Ping. */
+ mqttStatus = MQTT_Ping( &context );
+ TEST_ASSERT_EQUAL( MQTTStatusNotConnected, mqttStatus );
+
+ /* Test 2 when the connection status is MQTTDisconnectPending*/
+ context.connectStatus = MQTTDisconnectPending;
+ /* Verify MQTTSuccess is returned. */
+ MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
+ MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize );
+ MQTT_SerializePingreq_ExpectAnyArgsAndReturn( MQTTSuccess );
+ /* Expect the above calls when running MQTT_Ping. */
+ mqttStatus = MQTT_Ping( &context );
+ TEST_ASSERT_EQUAL( MQTTStatusDisconnectPending, mqttStatus );
+}
+
/**
* @brief This test case verifies that MQTT_Ping returns MQTTSendFailed
* if transport interface send returns an error.
@@ -5241,6 +5785,9 @@ void test_MQTT_Ping_error_path( void )
/* Initialize context. */
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTSendFailed is propagated when transport interface returns an error. */
MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize );
@@ -5255,6 +5802,9 @@ void test_MQTT_Ping_error_path( void )
/* Initialize context. */
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ context.connectStatus = MQTTConnected;
+
MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize );
MQTT_SerializePingreq_ExpectAnyArgsAndReturn( MQTTSuccess );
@@ -5266,6 +5816,9 @@ void test_MQTT_Ping_error_path( void )
/* Initialize context. */
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
+
+ context.connectStatus = MQTTConnected;
+
/* Verify MQTTBadParameter is propagated when getting PINGREQ packet size fails. */
MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTBadParameter );
MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize );
@@ -5853,7 +6406,19 @@ void test_MQTT_Status_strerror( void )
str = MQTT_Status_strerror( status );
TEST_ASSERT_EQUAL_STRING( "MQTTNeedMoreBytes", str );
- status = MQTTNeedMoreBytes + 1;
+ status = MQTTStatusConnected;
+ str = MQTT_Status_strerror( status );
+ TEST_ASSERT_EQUAL_STRING( "MQTTStatusConnected", str );
+
+ status = MQTTStatusNotConnected;
+ str = MQTT_Status_strerror( status );
+ TEST_ASSERT_EQUAL_STRING( "MQTTStatusNotConnected", str );
+
+ status = MQTTStatusDisconnectPending;
+ str = MQTT_Status_strerror( status );
+ TEST_ASSERT_EQUAL_STRING( "MQTTStatusDisconnectPending", str );
+
+ status = MQTTStatusDisconnectPending + 1;
str = MQTT_Status_strerror( status );
TEST_ASSERT_EQUAL_STRING( "Invalid MQTT Status code", str );
}
diff --git a/tools/cmock/coverage.cmake b/tools/cmock/coverage.cmake
index 570f92758..e56e0417c 100644
--- a/tools/cmock/coverage.cmake
+++ b/tools/cmock/coverage.cmake
@@ -15,8 +15,9 @@ execute_process( COMMAND lcov --directory ${CMAKE_BINARY_DIR}
--initial
--capture
--rc lcov_branch_coverage=1
- --include "*source*"
--output-file=${CMAKE_BINARY_DIR}/base_coverage.info
+ --include "*source*"
+
)
file(GLOB files "${CMAKE_BINARY_DIR}/bin/tests/*")
@@ -59,8 +60,8 @@ execute_process(
--add-tracefile ${CMAKE_BINARY_DIR}/base_coverage.info
--add-tracefile ${CMAKE_BINARY_DIR}/second_coverage.info
--output-file ${CMAKE_BINARY_DIR}/coverage.info
- --include "*source*"
--rc lcov_branch_coverage=1
+ --include "*source*"
)
execute_process(
COMMAND genhtml --rc lcov_branch_coverage=1