From ab015e032a8aafd3b44d06c8805eec9fb4b65285 Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Thu, 1 Sep 2022 22:33:16 -0700
Subject: [PATCH 01/13] Add mutex hooks
---
source/core_mqtt.c | 117 ++++++++++++++++++++++++++++++++++++++-------
1 file changed, 99 insertions(+), 18 deletions(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 887194fd9..cd22135f7 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -31,6 +31,22 @@
#include "core_mqtt_state.h"
#include "core_mqtt_default_logging.h"
+#ifndef MQTT_SEND_MUTEX_TAKE
+ #define MQTT_SEND_MUTEX_TAKE( pContext )
+#endif /* !MQTT_SEND_MUTEX_TAKE */
+
+#ifndef MQTT_SEND_MUTEX_GIVE
+ #define MQTT_SEND_MUTEX_GIVE( pContext )
+#endif /* !MQTT_SEND_MUTEX_GIVE */
+
+#ifndef MQTT_STATE_UPDATE_MUTEX_TAKE
+ #define MQTT_STATE_UPDATE_MUTEX_TAKE( pContext )
+#endif /* !MQTT_STATE_UPDATE_MUTEX_TAKE */
+
+#ifndef MQTT_STATE_UPDATE_MUTEX_GIVE
+ #define MQTT_STATE_UPDATE_MUTEX_GIVE( pContext )
+#endif /* !MQTT_STATE_UPDATE_MUTEX_GIVE */
+
/*-----------------------------------------------------------*/
/**
@@ -1270,23 +1286,32 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
+ MQTT_SEND_MUTEX_TAKE( pContext );
+
/* Here, we are not using the vector approach for efficiency. There is just one buffer
* to be sent which can be achieved with a normal send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
MQTT_PUBLISH_ACK_PACKET_SIZE,
MQTT_SEND_RETRY_TIMEOUT_MS );
+
+ MQTT_SEND_MUTEX_GIVE( pContext );
}
if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
{
pContext->controlPacketSent = true;
+
+ MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+
status = MQTT_UpdateStateAck( pContext,
packetId,
packetType,
MQTT_SEND,
&newState );
+ MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+
if( status != MQTTSuccess )
{
LogError( ( "Failed to update state of publish %hu.",
@@ -1375,12 +1400,16 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
+ MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+
status = MQTT_UpdateStatePublish( pContext,
packetIdentifier,
MQTT_RECEIVE,
publishInfo.qos,
&publishRecordState );
+ MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+
if( status == MQTTSuccess )
{
LogInfo( ( "State record updated. New state=%s.",
@@ -1492,12 +1521,16 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
+ MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+
status = MQTT_UpdateStateAck( pContext,
packetIdentifier,
ackType,
MQTT_RECEIVE,
&publishRecordState );
+ MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+
if( status == MQTTSuccess )
{
LogInfo( ( "State record updated. New state=%s.",
@@ -2440,9 +2473,13 @@ MQTTStatus_t MQTT_CancelCallback( MQTTContext_t * pContext,
{
MQTTStatus_t status;
+ MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+
status = MQTT_RemoveStateRecord( pContext,
packetId );
+ MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+
return status;
}
@@ -2484,10 +2521,14 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
+ MQTT_SEND_MUTEX_TAKE( pContext );
+
status = sendConnectWithoutCopy( pContext,
pConnectInfo,
pWillInfo,
remainingLength );
+
+ MQTT_SEND_MUTEX_GIVE( pContext );
}
/* Read CONNACK from transport layer. */
@@ -2553,12 +2594,16 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
+ MQTT_SEND_MUTEX_TAKE( pContext );
+
/* Send MQTT SUBSCRIBE packet. */
status = sendSubscribeWithoutCopy( pContext,
pSubscriptionList,
subscriptionCount,
packetId,
remainingLength );
+
+ MQTT_SEND_MUTEX_GIVE( pContext );
}
return status;
@@ -2598,10 +2643,12 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
{
- /* Reserve state for publish message. Only to be done for QoS1 or QoS2. */
+ /* Take the mutex required to update the state. */
+ MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+
status = MQTT_ReserveState( pContext,
- packetId,
- pPublishInfo->qos );
+ packetId,
+ pPublishInfo->qos );
/* State already exists for a duplicate packet.
* If a state doesn't exist, it will be handled as a new publish in
@@ -2614,31 +2661,46 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
+ /* Take the mutex as multiple send calls are required for sending this
+ * packet. */
+ MQTT_SEND_MUTEX_TAKE( pContext );
+
status = sendPublishWithoutCopy( pContext,
pPublishInfo,
mqttHeader,
headerSize,
packetId );
+
+ /* Give the mutex away for the next taker. */
+ MQTT_SEND_MUTEX_GIVE( pContext );
}
- if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
+ if( pPublishInfo->qos > MQTTQoS0 )
{
- /* Update state machine after PUBLISH is sent.
- * Only to be done for QoS1 or QoS2. */
- status = MQTT_UpdateStatePublish( pContext,
- packetId,
- MQTT_SEND,
- pPublishInfo->qos,
- &publishStatus );
-
- if( status != MQTTSuccess )
+ if( status == MQTTSuccess )
{
- LogError( ( "Update state for publish failed with status %s."
- " However PUBLISH packet was sent to the broker."
- " Any further handling of ACKs for the packet Id"
- " will fail.",
- MQTT_Status_strerror( status ) ) );
+ /* Update state machine after PUBLISH is sent.
+ * Only to be done for QoS1 or QoS2. */
+ status = MQTT_UpdateStatePublish( pContext,
+ packetId,
+ MQTT_SEND,
+ pPublishInfo->qos,
+ &publishStatus );
+
+
+ if( status != MQTTSuccess )
+ {
+ LogError( ( "Update state for publish failed with status %s."
+ " However PUBLISH packet was sent to the broker."
+ " Any further handling of ACKs for the packet Id"
+ " will fail.",
+ MQTT_Status_strerror( status ) ) );
+ }
}
+
+ /* Regardless of the status, if the mutex was taken due to the
+ * packet being of QoS > QoS0, then it should be relinquished. */
+ MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
}
if( status != MQTTSuccess )
@@ -2694,6 +2756,10 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
if( status == MQTTSuccess )
{
+ /* Take the mutex as the send call should not be interrupted in
+ * between. */
+ MQTT_SEND_MUTEX_TAKE( pContext );
+
/* Send the serialized PINGREQ packet to transport layer.
* Here, we do not use the vectored IO approach for efficiency as the
* Ping packet does not have numerous fields which need to be copied
@@ -2703,6 +2769,9 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
2U,
MQTT_SEND_RETRY_TIMEOUT_MS );
+ /* Give the mutex away. */
+ MQTT_SEND_MUTEX_GIVE( pContext );
+
/* It is an error to not send the entire PINGREQ packet. */
if( bytesSent < ( int32_t ) packetSize )
{
@@ -2750,11 +2819,17 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
+ /* Take the mutex because the below call should not be interrupted. */
+ MQTT_SEND_MUTEX_TAKE( pContext );
+
status = sendUnsubscribeWithoutCopy( pContext,
pSubscriptionList,
subscriptionCount,
packetId,
remainingLength );
+
+ /* Give the mutex away. */
+ MQTT_SEND_MUTEX_GIVE( pContext );
}
return status;
@@ -2796,6 +2871,9 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
if( status == MQTTSuccess )
{
+ /* Take the mutex because the below call should not be interrupted. */
+ MQTT_SEND_MUTEX_TAKE( pContext );
+
/* Here we do not use vectors as the disconnect packet has fixed fields
* which do not reside in user provided buffers. Thus, it can be sent
* using a simple send call. */
@@ -2804,6 +2882,9 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
packetSize,
MQTT_SEND_RETRY_TIMEOUT_MS );
+ /* Give the mutex away. */
+ MQTT_SEND_MUTEX_GIVE( pContext );
+
if( bytesSent < ( int32_t ) packetSize )
{
LogError( ( "Transport send failed for DISCONNECT packet." ) );
From 35556acfa6b5d3f1e639f0a64b739957e25655ec Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Fri, 2 Sep 2022 13:37:00 -0700
Subject: [PATCH 02/13] Clean up of code
---
source/core_mqtt.c | 75 ++++------------------------------------------
1 file changed, 6 insertions(+), 69 deletions(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index cd22135f7..7aecae85b 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -69,8 +69,7 @@
*/
static int32_t sendBuffer( MQTTContext_t * pContext,
const uint8_t * pBufferToSend,
- size_t bytesToSend,
- uint32_t timeout );
+ size_t bytesToSend );
/**
* @brief Sends MQTT connect without copying the users data into any buffer.
@@ -444,23 +443,6 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
size_t headerSize,
uint16_t packetId );
-/**
- * @brief Serializes a PUBLISH message.
- *
- * @brief param[in] pContext Initialized MQTT context.
- * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
- * @brief param[in] packetId Packet Id of the publish packet.
- * @brief param[out] pHeaderSize Size of the serialized PUBLISH header.
- *
- * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
- * #MQTTBadParameter if invalid parameters are passed;
- * #MQTTSuccess otherwise.
- */
-static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- uint16_t packetId,
- size_t * const pHeaderSize );
-
/**
* @brief Function to validate #MQTT_Publish parameters.
*
@@ -803,8 +785,7 @@ static int32_t sendMessageVector( MQTTContext_t * pContext,
{
sendResult = sendBuffer( pContext,
pIoVectIterator->iov_base,
- pIoVectIterator->iov_len,
- ( timeoutTime - pContext->getTime() ) );
+ pIoVectIterator->iov_len );
}
if( sendResult >= 0 )
@@ -843,14 +824,12 @@ static int32_t sendMessageVector( MQTTContext_t * pContext,
static int32_t sendBuffer( MQTTContext_t * pContext,
const uint8_t * pBufferToSend,
- size_t bytesToSend,
- uint32_t timeout )
+ size_t bytesToSend )
{
const uint8_t * pIndex = pBufferToSend;
size_t bytesRemaining = bytesToSend;
int32_t totalBytesSent = 0, bytesSent;
uint32_t lastSendTimeMs = 0U, timeSinceLastSendMs = 0U;
- uint32_t timeoutTime;
bool sendError = false;
assert( pContext != NULL );
@@ -859,7 +838,6 @@ static int32_t sendBuffer( MQTTContext_t * pContext,
assert( pIndex != NULL );
bytesRemaining = bytesToSend;
- timeoutTime = pContext->getTime() + timeout;
/* Loop until the entire packet is sent. */
while( ( bytesRemaining > 0UL ) && ( sendError == false ) )
@@ -1292,8 +1270,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
* to be sent which can be achieved with a normal send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
- MQTT_PUBLISH_ACK_PACKET_SIZE,
- MQTT_SEND_RETRY_TIMEOUT_MS );
+ MQTT_PUBLISH_ACK_PACKET_SIZE );
MQTT_SEND_MUTEX_GIVE( pContext );
}
@@ -2332,44 +2309,6 @@ static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
return status;
}
-/*-----------------------------------------------------------*/
-
-static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- uint16_t packetId,
- size_t * const pHeaderSize )
-{
- MQTTStatus_t status = MQTTSuccess;
- size_t remainingLength = 0UL, packetSize = 0UL;
-
- assert( pContext != NULL );
- assert( pPublishInfo != NULL );
- assert( pHeaderSize != NULL );
-
- /* Get the remaining length and packet size.*/
- status = MQTT_GetPublishPacketSize( pPublishInfo,
- &remainingLength,
- &packetSize );
- LogDebug( ( "PUBLISH packet size is %lu and remaining length is %lu.",
- ( unsigned long ) packetSize,
- ( unsigned long ) remainingLength ) );
-
- if( status == MQTTSuccess )
- {
- status = MQTT_SerializePublishHeader( pPublishInfo,
- packetId,
- remainingLength,
- &( pContext->networkBuffer ),
- pHeaderSize );
- LogDebug( ( "Serialized PUBLISH header size is %lu.",
- ( unsigned long ) *pHeaderSize ) );
- }
-
- return status;
-}
-
-/*-----------------------------------------------------------*/
-
static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId )
@@ -2766,8 +2705,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
* from the user provided buffers. Thus it can be sent directly. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
- 2U,
- MQTT_SEND_RETRY_TIMEOUT_MS );
+ 2U );
/* Give the mutex away. */
MQTT_SEND_MUTEX_GIVE( pContext );
@@ -2879,8 +2817,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
* using a simple send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
- packetSize,
- MQTT_SEND_RETRY_TIMEOUT_MS );
+ packetSize );
/* Give the mutex away. */
MQTT_SEND_MUTEX_GIVE( pContext );
From aa4756948ca6d8afac1fb1ccbea4c8d282492981 Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Fri, 2 Sep 2022 13:45:11 -0700
Subject: [PATCH 03/13] Add doxygen comments and fix spell check
---
lexicon.txt | 1 +
source/core_mqtt.c | 21 +++++++++++++++++++++
2 files changed, 22 insertions(+)
diff --git a/lexicon.txt b/lexicon.txt
index 255889b2b..be64ae6a7 100644
--- a/lexicon.txt
+++ b/lexicon.txt
@@ -194,6 +194,7 @@ mqttsubacksuccessqos
mqttsubscribeinfo
mqttsuccess
msb
+mutex
mynetworkrecvimplementation
mynetworksendimplementation
mytcpsocketcontext
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 7aecae85b..2bee35554 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -32,18 +32,39 @@
#include "core_mqtt_default_logging.h"
#ifndef MQTT_SEND_MUTEX_TAKE
+ /**
+ * @brief Macro which should point to a function which can acquire a
+ * mutex with infinite timeout when multiple senders are using the
+ * coreMQTT library. The mutex will serialize the access to send calls
+ * which should be made in order to keep the MQTT connection intact.
+ */
#define MQTT_SEND_MUTEX_TAKE( pContext )
#endif /* !MQTT_SEND_MUTEX_TAKE */
#ifndef MQTT_SEND_MUTEX_GIVE
+ /**
+ * @brief Macro which should point to a function which can release the
+ * mutex acquired with #MQTT_SEND_MUTEX_TAKE.
+ */
#define MQTT_SEND_MUTEX_GIVE( pContext )
#endif /* !MQTT_SEND_MUTEX_GIVE */
#ifndef MQTT_STATE_UPDATE_MUTEX_TAKE
+ /**
+ * @brief Macro which should point to a function which can acquire a
+ * mutex with infinite timeout when multiple senders are using the
+ * coreMQTT library. The mutex will serialize the access to the state
+ * data structure which holds the state of incoming and outgoing
+ * publishes.
+ */
#define MQTT_STATE_UPDATE_MUTEX_TAKE( pContext )
#endif /* !MQTT_STATE_UPDATE_MUTEX_TAKE */
#ifndef MQTT_STATE_UPDATE_MUTEX_GIVE
+ /**
+ * @brief Macro which should point to a function which can release the
+ * mutex acquired with #MQTT_STATE_UPDATE_MUTEX_TAKE.
+ */
#define MQTT_STATE_UPDATE_MUTEX_GIVE( pContext )
#endif /* !MQTT_STATE_UPDATE_MUTEX_GIVE */
From b8ed96942c731dfc157981314d51095f18f63aca Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Fri, 2 Sep 2022 13:49:01 -0700
Subject: [PATCH 04/13] Fix LogError call
---
source/core_mqtt.c | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 2bee35554..25656acd5 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -55,7 +55,7 @@
* mutex with infinite timeout when multiple senders are using the
* coreMQTT library. The mutex will serialize the access to the state
* data structure which holds the state of incoming and outgoing
- * publishes.
+ publishes.
*/
#define MQTT_STATE_UPDATE_MUTEX_TAKE( pContext )
#endif /* !MQTT_STATE_UPDATE_MUTEX_TAKE */
@@ -2112,8 +2112,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
/* Validate arguments. */
if( pConnectInfo == NULL )
{
- LogError( ( "Argument cannot be NULL: pConnectInfo=%p, "
- "pFixedBuffer=%p.",
+ LogError( ( "Argument cannot be NULL: pConnectInfo=%p.",
( void * ) pConnectInfo ) );
status = MQTTBadParameter;
}
From dc9c5b8887d891f240f6f9a8e5b2f7128fe1d700 Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Fri, 2 Sep 2022 21:18:12 +0000
Subject: [PATCH 05/13] Fix formatting and memory table
---
docs/doxygen/include/size_table.md | 4 +--
source/core_mqtt.c | 53 +++++++++++++++---------------
2 files changed, 29 insertions(+), 28 deletions(-)
diff --git a/docs/doxygen/include/size_table.md b/docs/doxygen/include/size_table.md
index 5997a82b7..fab96b276 100644
--- a/docs/doxygen/include/size_table.md
+++ b/docs/doxygen/include/size_table.md
@@ -9,7 +9,7 @@
core_mqtt.c |
- 3.8K |
+ 3.7K |
3.2K |
@@ -24,7 +24,7 @@
Total estimates |
- 8.1K |
+ 8.0K |
6.5K |
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 25656acd5..a3b5242bd 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -32,39 +32,43 @@
#include "core_mqtt_default_logging.h"
#ifndef MQTT_SEND_MUTEX_TAKE
- /**
- * @brief Macro which should point to a function which can acquire a
- * mutex with infinite timeout when multiple senders are using the
- * coreMQTT library. The mutex will serialize the access to send calls
- * which should be made in order to keep the MQTT connection intact.
- */
+
+/**
+ * @brief Macro which should point to a function which can acquire a
+ * mutex with infinite timeout when multiple senders are using the
+ * coreMQTT library. The mutex will serialize the access to send calls
+ * which should be made in order to keep the MQTT connection intact.
+ */
#define MQTT_SEND_MUTEX_TAKE( pContext )
#endif /* !MQTT_SEND_MUTEX_TAKE */
#ifndef MQTT_SEND_MUTEX_GIVE
- /**
- * @brief Macro which should point to a function which can release the
- * mutex acquired with #MQTT_SEND_MUTEX_TAKE.
- */
+
+/**
+ * @brief Macro which should point to a function which can release the
+ * mutex acquired with #MQTT_SEND_MUTEX_TAKE.
+ */
#define MQTT_SEND_MUTEX_GIVE( pContext )
#endif /* !MQTT_SEND_MUTEX_GIVE */
#ifndef MQTT_STATE_UPDATE_MUTEX_TAKE
- /**
- * @brief Macro which should point to a function which can acquire a
- * mutex with infinite timeout when multiple senders are using the
- * coreMQTT library. The mutex will serialize the access to the state
- * data structure which holds the state of incoming and outgoing
- publishes.
- */
+
+/**
+ * @brief Macro which should point to a function which can acquire a
+ * mutex with infinite timeout when multiple senders are using the
+ * coreMQTT library. The mutex will serialize the access to the state
+ * data structure which holds the state of incoming and outgoing
+ * publishes.
+ */
#define MQTT_STATE_UPDATE_MUTEX_TAKE( pContext )
#endif /* !MQTT_STATE_UPDATE_MUTEX_TAKE */
#ifndef MQTT_STATE_UPDATE_MUTEX_GIVE
- /**
- * @brief Macro which should point to a function which can release the
- * mutex acquired with #MQTT_STATE_UPDATE_MUTEX_TAKE.
- */
+
+/**
+ * @brief Macro which should point to a function which can release the
+ * mutex acquired with #MQTT_STATE_UPDATE_MUTEX_TAKE.
+ */
#define MQTT_STATE_UPDATE_MUTEX_GIVE( pContext )
#endif /* !MQTT_STATE_UPDATE_MUTEX_GIVE */
@@ -1399,13 +1403,11 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
-
status = MQTT_UpdateStatePublish( pContext,
packetIdentifier,
MQTT_RECEIVE,
publishInfo.qos,
&publishRecordState );
-
MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
if( status == MQTTSuccess )
@@ -2606,8 +2608,8 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
status = MQTT_ReserveState( pContext,
- packetId,
- pPublishInfo->qos );
+ packetId,
+ pPublishInfo->qos );
/* State already exists for a duplicate packet.
* If a state doesn't exist, it will be handled as a new publish in
@@ -2646,7 +2648,6 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
pPublishInfo->qos,
&publishStatus );
-
if( status != MQTTSuccess )
{
LogError( ( "Update state for publish failed with status %s."
From 44c99c128d0e870df451734dca1ed99e43d428cf Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Fri, 2 Sep 2022 21:24:28 +0000
Subject: [PATCH 06/13] Fix dereference failure
---
source/core_mqtt.c | 40 ++++++++++++++++++++++------------------
1 file changed, 22 insertions(+), 18 deletions(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index a3b5242bd..804f812fb 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -2578,6 +2578,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
{
size_t headerSize = 0UL, remainingLength = 0UL, packetSize = 0UL;
MQTTPublishState_t publishStatus = MQTTStateNull;
+ bool mutexTaken = false;
/* 1 header byte + 4 bytes (maximum) required for encoding the length +
* 2 bytes for topic string. */
@@ -2606,6 +2607,8 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
{
/* Take the mutex required to update the state. */
MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+ /* Set the flag so that the mutex can be released later. */
+ mutexTaken = true;
status = MQTT_ReserveState( pContext,
packetId,
@@ -2636,28 +2639,29 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
MQTT_SEND_MUTEX_GIVE( pContext );
}
- if( pPublishInfo->qos > MQTTQoS0 )
+ if( ( status == MQTTSuccess ) &&
+ ( pPublishInfo->qos > MQTTQoS0 ) )
{
- if( status == MQTTSuccess )
- {
- /* Update state machine after PUBLISH is sent.
- * Only to be done for QoS1 or QoS2. */
- status = MQTT_UpdateStatePublish( pContext,
- packetId,
- MQTT_SEND,
- pPublishInfo->qos,
- &publishStatus );
+ /* Update state machine after PUBLISH is sent.
+ * Only to be done for QoS1 or QoS2. */
+ status = MQTT_UpdateStatePublish( pContext,
+ packetId,
+ MQTT_SEND,
+ pPublishInfo->qos,
+ &publishStatus );
- if( status != MQTTSuccess )
- {
- LogError( ( "Update state for publish failed with status %s."
- " However PUBLISH packet was sent to the broker."
- " Any further handling of ACKs for the packet Id"
- " will fail.",
- MQTT_Status_strerror( status ) ) );
- }
+ if( status != MQTTSuccess )
+ {
+ LogError( ( "Update state for publish failed with status %s."
+ " However PUBLISH packet was sent to the broker."
+ " Any further handling of ACKs for the packet Id"
+ " will fail.",
+ MQTT_Status_strerror( status ) ) );
}
+ }
+ if( mutexTaken == true )
+ {
/* Regardless of the status, if the mutex was taken due to the
* packet being of QoS > QoS0, then it should be relinquished. */
MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
From f3a8c18262de3d84ab996e08cc49d8ac5707bcbc Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Mon, 5 Sep 2022 17:17:09 -0700
Subject: [PATCH 07/13] Update the hook names
---
source/core_mqtt.c | 76 +++++++++++++++++++++++-----------------------
1 file changed, 38 insertions(+), 38 deletions(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 25656acd5..a9ca5acc8 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -31,25 +31,25 @@
#include "core_mqtt_state.h"
#include "core_mqtt_default_logging.h"
-#ifndef MQTT_SEND_MUTEX_TAKE
+#ifndef MQTT_PRE_SEND_HOOK
/**
* @brief Macro which should point to a function which can acquire a
* mutex with infinite timeout when multiple senders are using the
* coreMQTT library. The mutex will serialize the access to send calls
* which should be made in order to keep the MQTT connection intact.
*/
- #define MQTT_SEND_MUTEX_TAKE( pContext )
-#endif /* !MQTT_SEND_MUTEX_TAKE */
+ #define MQTT_PRE_SEND_HOOK( pContext )
+#endif /* !MQTT_PRE_SEND_HOOK */
-#ifndef MQTT_SEND_MUTEX_GIVE
+#ifndef MQTT_POST_SEND_HOOK
/**
* @brief Macro which should point to a function which can release the
- * mutex acquired with #MQTT_SEND_MUTEX_TAKE.
+ * mutex acquired with #MQTT_PRE_SEND_HOOK.
*/
- #define MQTT_SEND_MUTEX_GIVE( pContext )
-#endif /* !MQTT_SEND_MUTEX_GIVE */
+ #define MQTT_POST_SEND_HOOK( pContext )
+#endif /* !MQTT_POST_SEND_HOOK */
-#ifndef MQTT_STATE_UPDATE_MUTEX_TAKE
+#ifndef MQTT_PRE_STATE_UPDATE_HOOK
/**
* @brief Macro which should point to a function which can acquire a
* mutex with infinite timeout when multiple senders are using the
@@ -57,16 +57,16 @@
* data structure which holds the state of incoming and outgoing
publishes.
*/
- #define MQTT_STATE_UPDATE_MUTEX_TAKE( pContext )
-#endif /* !MQTT_STATE_UPDATE_MUTEX_TAKE */
+ #define MQTT_PRE_STATE_UPDATE_HOOK( pContext )
+#endif /* !MQTT_PRE_STATE_UPDATE_HOOK */
-#ifndef MQTT_STATE_UPDATE_MUTEX_GIVE
+#ifndef MQTT_POST_STATE_UPDATE_HOOK
/**
* @brief Macro which should point to a function which can release the
- * mutex acquired with #MQTT_STATE_UPDATE_MUTEX_TAKE.
+ * mutex acquired with #MQTT_PRE_STATE_UPDATE_HOOK.
*/
- #define MQTT_STATE_UPDATE_MUTEX_GIVE( pContext )
-#endif /* !MQTT_STATE_UPDATE_MUTEX_GIVE */
+ #define MQTT_POST_STATE_UPDATE_HOOK( pContext )
+#endif /* !MQTT_POST_STATE_UPDATE_HOOK */
/*-----------------------------------------------------------*/
@@ -1285,7 +1285,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- MQTT_SEND_MUTEX_TAKE( pContext );
+ MQTT_PRE_SEND_HOOK( pContext );
/* Here, we are not using the vector approach for efficiency. There is just one buffer
* to be sent which can be achieved with a normal send call. */
@@ -1293,14 +1293,14 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
localBuffer.pBuffer,
MQTT_PUBLISH_ACK_PACKET_SIZE );
- MQTT_SEND_MUTEX_GIVE( pContext );
+ MQTT_POST_SEND_HOOK( pContext );
}
if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
{
pContext->controlPacketSent = true;
- MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
status = MQTT_UpdateStateAck( pContext,
packetId,
@@ -1308,7 +1308,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
MQTT_SEND,
&newState );
- MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
if( status != MQTTSuccess )
{
@@ -1398,7 +1398,7 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
status = MQTT_UpdateStatePublish( pContext,
packetIdentifier,
@@ -1406,7 +1406,7 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
publishInfo.qos,
&publishRecordState );
- MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
if( status == MQTTSuccess )
{
@@ -1519,7 +1519,7 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
status = MQTT_UpdateStateAck( pContext,
packetIdentifier,
@@ -1527,7 +1527,7 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
MQTT_RECEIVE,
&publishRecordState );
- MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
if( status == MQTTSuccess )
{
@@ -2432,12 +2432,12 @@ MQTTStatus_t MQTT_CancelCallback( MQTTContext_t * pContext,
{
MQTTStatus_t status;
- MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
status = MQTT_RemoveStateRecord( pContext,
packetId );
- MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
return status;
}
@@ -2480,14 +2480,14 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- MQTT_SEND_MUTEX_TAKE( pContext );
+ MQTT_PRE_SEND_HOOK( pContext );
status = sendConnectWithoutCopy( pContext,
pConnectInfo,
pWillInfo,
remainingLength );
- MQTT_SEND_MUTEX_GIVE( pContext );
+ MQTT_POST_SEND_HOOK( pContext );
}
/* Read CONNACK from transport layer. */
@@ -2553,7 +2553,7 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- MQTT_SEND_MUTEX_TAKE( pContext );
+ MQTT_PRE_SEND_HOOK( pContext );
/* Send MQTT SUBSCRIBE packet. */
status = sendSubscribeWithoutCopy( pContext,
@@ -2562,7 +2562,7 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
packetId,
remainingLength );
- MQTT_SEND_MUTEX_GIVE( pContext );
+ MQTT_POST_SEND_HOOK( pContext );
}
return status;
@@ -2603,7 +2603,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
{
/* Take the mutex required to update the state. */
- MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
status = MQTT_ReserveState( pContext,
packetId,
@@ -2622,7 +2622,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
{
/* Take the mutex as multiple send calls are required for sending this
* packet. */
- MQTT_SEND_MUTEX_TAKE( pContext );
+ MQTT_PRE_SEND_HOOK( pContext );
status = sendPublishWithoutCopy( pContext,
pPublishInfo,
@@ -2631,7 +2631,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
packetId );
/* Give the mutex away for the next taker. */
- MQTT_SEND_MUTEX_GIVE( pContext );
+ MQTT_POST_SEND_HOOK( pContext );
}
if( pPublishInfo->qos > MQTTQoS0 )
@@ -2659,7 +2659,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
/* Regardless of the status, if the mutex was taken due to the
* packet being of QoS > QoS0, then it should be relinquished. */
- MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
if( status != MQTTSuccess )
@@ -2717,7 +2717,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
{
/* Take the mutex as the send call should not be interrupted in
* between. */
- MQTT_SEND_MUTEX_TAKE( pContext );
+ MQTT_PRE_SEND_HOOK( pContext );
/* Send the serialized PINGREQ packet to transport layer.
* Here, we do not use the vectored IO approach for efficiency as the
@@ -2728,7 +2728,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
2U );
/* Give the mutex away. */
- MQTT_SEND_MUTEX_GIVE( pContext );
+ MQTT_POST_SEND_HOOK( pContext );
/* It is an error to not send the entire PINGREQ packet. */
if( bytesSent < ( int32_t ) packetSize )
@@ -2778,7 +2778,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
/* Take the mutex because the below call should not be interrupted. */
- MQTT_SEND_MUTEX_TAKE( pContext );
+ MQTT_PRE_SEND_HOOK( pContext );
status = sendUnsubscribeWithoutCopy( pContext,
pSubscriptionList,
@@ -2787,7 +2787,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
remainingLength );
/* Give the mutex away. */
- MQTT_SEND_MUTEX_GIVE( pContext );
+ MQTT_POST_SEND_HOOK( pContext );
}
return status;
@@ -2830,7 +2830,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
if( status == MQTTSuccess )
{
/* Take the mutex because the below call should not be interrupted. */
- MQTT_SEND_MUTEX_TAKE( pContext );
+ MQTT_PRE_SEND_HOOK( pContext );
/* Here we do not use vectors as the disconnect packet has fixed fields
* which do not reside in user provided buffers. Thus, it can be sent
@@ -2840,7 +2840,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
packetSize );
/* Give the mutex away. */
- MQTT_SEND_MUTEX_GIVE( pContext );
+ MQTT_POST_SEND_HOOK( pContext );
if( bytesSent < ( int32_t ) packetSize )
{
From e8735edb3fc356d3cfc6cd10fe67e85013ed41cf Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Mon, 5 Sep 2022 17:25:00 -0700
Subject: [PATCH 08/13] Fix broken builds
---
source/core_mqtt.c | 56 +---------------------------------------------
1 file changed, 1 insertion(+), 55 deletions(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 3ec4ae005..63399af29 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -31,7 +31,6 @@
#include "core_mqtt_state.h"
#include "core_mqtt_default_logging.h"
-<<<<<<< HEAD
#ifndef MQTT_PRE_SEND_HOOK
/**
* @brief Macro which should point to a function which can acquire a
@@ -68,48 +67,6 @@
*/
#define MQTT_POST_STATE_UPDATE_HOOK( pContext )
#endif /* !MQTT_POST_STATE_UPDATE_HOOK */
-=======
-#ifndef MQTT_SEND_MUTEX_TAKE
-
-/**
- * @brief Macro which should point to a function which can acquire a
- * mutex with infinite timeout when multiple senders are using the
- * coreMQTT library. The mutex will serialize the access to send calls
- * which should be made in order to keep the MQTT connection intact.
- */
- #define MQTT_SEND_MUTEX_TAKE( pContext )
-#endif /* !MQTT_SEND_MUTEX_TAKE */
-
-#ifndef MQTT_SEND_MUTEX_GIVE
-
-/**
- * @brief Macro which should point to a function which can release the
- * mutex acquired with #MQTT_SEND_MUTEX_TAKE.
- */
- #define MQTT_SEND_MUTEX_GIVE( pContext )
-#endif /* !MQTT_SEND_MUTEX_GIVE */
-
-#ifndef MQTT_STATE_UPDATE_MUTEX_TAKE
-
-/**
- * @brief Macro which should point to a function which can acquire a
- * mutex with infinite timeout when multiple senders are using the
- * coreMQTT library. The mutex will serialize the access to the state
- * data structure which holds the state of incoming and outgoing
- * publishes.
- */
- #define MQTT_STATE_UPDATE_MUTEX_TAKE( pContext )
-#endif /* !MQTT_STATE_UPDATE_MUTEX_TAKE */
-
-#ifndef MQTT_STATE_UPDATE_MUTEX_GIVE
-
-/**
- * @brief Macro which should point to a function which can release the
- * mutex acquired with #MQTT_STATE_UPDATE_MUTEX_TAKE.
- */
- #define MQTT_STATE_UPDATE_MUTEX_GIVE( pContext )
-#endif /* !MQTT_STATE_UPDATE_MUTEX_GIVE */
->>>>>>> 44c99c128d0e870df451734dca1ed99e43d428cf
/*-----------------------------------------------------------*/
@@ -1441,23 +1398,15 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
-<<<<<<< HEAD
MQTT_PRE_STATE_UPDATE_HOOK( pContext );
-=======
- MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
->>>>>>> 44c99c128d0e870df451734dca1ed99e43d428cf
status = MQTT_UpdateStatePublish( pContext,
packetIdentifier,
MQTT_RECEIVE,
publishInfo.qos,
&publishRecordState );
-<<<<<<< HEAD
MQTT_POST_STATE_UPDATE_HOOK( pContext );
-=======
- MQTT_STATE_UPDATE_MUTEX_GIVE( pContext );
->>>>>>> 44c99c128d0e870df451734dca1ed99e43d428cf
if( status == MQTTSuccess )
{
@@ -2655,13 +2604,10 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
{
/* Take the mutex required to update the state. */
-<<<<<<< HEAD
MQTT_PRE_STATE_UPDATE_HOOK( pContext );
-=======
- MQTT_STATE_UPDATE_MUTEX_TAKE( pContext );
+
/* Set the flag so that the mutex can be released later. */
mutexTaken = true;
->>>>>>> 44c99c128d0e870df451734dca1ed99e43d428cf
status = MQTT_ReserveState( pContext,
packetId,
From 2fc7b973117a594f7cd81b73c134febe7f0d53a9 Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Mon, 5 Sep 2022 17:40:54 -0700
Subject: [PATCH 09/13] Update the macros and variables
---
source/core_mqtt.c | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 63399af29..cf7030bd1 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -2576,7 +2576,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
{
size_t headerSize = 0UL, remainingLength = 0UL, packetSize = 0UL;
MQTTPublishState_t publishStatus = MQTTStateNull;
- bool mutexTaken = false;
+ bool stateUpdateHookExecuted = false;
/* 1 header byte + 4 bytes (maximum) required for encoding the length +
* 2 bytes for topic string. */
@@ -2603,11 +2603,10 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
{
- /* Take the mutex required to update the state. */
MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- /* Set the flag so that the mutex can be released later. */
- mutexTaken = true;
+ /* Set the flag so that the corresponding hook can be called later. */
+ stateUpdateHookExecuted = true;
status = MQTT_ReserveState( pContext,
packetId,
@@ -2659,7 +2658,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
}
}
- if( mutexTaken == true )
+ if( stateUpdateHookExecuted == true )
{
/* Regardless of the status, if the mutex was taken due to the
* packet being of QoS > QoS0, then it should be relinquished. */
From 6813b15cbb5a409b829c4494161462a27462b239 Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Tue, 6 Sep 2022 06:53:02 +0000
Subject: [PATCH 10/13] Reword the briefs of hooks and uncrustify
---
source/core_mqtt.c | 38 +++++++++++++++++---------------------
1 file changed, 17 insertions(+), 21 deletions(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index cf7030bd1..79e90933d 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -32,39 +32,35 @@
#include "core_mqtt_default_logging.h"
#ifndef MQTT_PRE_SEND_HOOK
- /**
- * @brief Macro which should point to a function which can acquire a
- * mutex with infinite timeout when multiple senders are using the
- * coreMQTT library. The mutex will serialize the access to send calls
- * which should be made in order to keep the MQTT connection intact.
- */
+
+/**
+ * @brief Hook called before a 'send' operation is executed.
+ */
#define MQTT_PRE_SEND_HOOK( pContext )
#endif /* !MQTT_PRE_SEND_HOOK */
#ifndef MQTT_POST_SEND_HOOK
- /**
- * @brief Macro which should point to a function which can release the
- * mutex acquired with #MQTT_PRE_SEND_HOOK.
- */
+
+/**
+ * @brief Hook called after the 'send' operation is complete.
+ */
#define MQTT_POST_SEND_HOOK( pContext )
#endif /* !MQTT_POST_SEND_HOOK */
#ifndef MQTT_PRE_STATE_UPDATE_HOOK
- /**
- * @brief Macro which should point to a function which can acquire a
- * mutex with infinite timeout when multiple senders are using the
- * coreMQTT library. The mutex will serialize the access to the state
- * data structure which holds the state of incoming and outgoing
- publishes.
- */
+
+/**
+ * @brief Hook called just before an update to the MQTT state is made.
+ */
#define MQTT_PRE_STATE_UPDATE_HOOK( pContext )
#endif /* !MQTT_PRE_STATE_UPDATE_HOOK */
#ifndef MQTT_POST_STATE_UPDATE_HOOK
- /**
- * @brief Macro which should point to a function which can release the
- * mutex acquired with #MQTT_PRE_STATE_UPDATE_HOOK.
- */
+
+/**
+ * @brief Hook called just after an update to the MQTT state has
+ * been made.
+ */
#define MQTT_POST_STATE_UPDATE_HOOK( pContext )
#endif /* !MQTT_POST_STATE_UPDATE_HOOK */
From ea62cdfe7b6f266897a01cdd0a521e4c58f955a3 Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Mon, 12 Sep 2022 23:44:09 +0000
Subject: [PATCH 11/13] Fir formatting
---
source/core_mqtt.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 384d4a2ea..879487756 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -2537,7 +2537,7 @@ MQTTStatus_t MQTT_CancelCallback( MQTTContext_t * pContext,
else
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );
-
+
status = MQTT_RemoveStateRecord( pContext,
packetId );
From 0e7df2e4dd293d717de1178259a474a14793c8b1 Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Mon, 12 Sep 2022 17:31:01 -0700
Subject: [PATCH 12/13] Protect get packet ID
---
source/core_mqtt.c | 2 ++
1 file changed, 2 insertions(+)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 879487756..f1d0f1826 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -3032,6 +3032,7 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
if( pContext != NULL )
{
+ MQTT_PRE_STATE_UPDATE_HOOK( pContext );
packetId = pContext->nextPacketId;
/* A packet ID of zero is not a valid packet ID. When the max ID
@@ -3044,6 +3045,7 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
{
pContext->nextPacketId++;
}
+ MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
return packetId;
From 92bf1a29ca8d5bf55af19ad082b5369b5bb6dc0e Mon Sep 17 00:00:00 2001
From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
Date: Tue, 13 Sep 2022 14:23:21 -0700
Subject: [PATCH 13/13] Fix formatting
---
source/core_mqtt.c | 2 ++
1 file changed, 2 insertions(+)
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index f1d0f1826..43f47ccc4 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -3033,6 +3033,7 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
if( pContext != NULL )
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );
+
packetId = pContext->nextPacketId;
/* A packet ID of zero is not a valid packet ID. When the max ID
@@ -3045,6 +3046,7 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
{
pContext->nextPacketId++;
}
+
MQTT_POST_STATE_UPDATE_HOOK( pContext );
}