Skip to content

Commit

Permalink
Add hooks to the source code (#200)
Browse files Browse the repository at this point in the history
* Add mutex hooks

* Clean up of code

* Add doxygen comments and fix spell check

* Fix LogError call

* Fix formatting and memory table

* Fix dereference failure

* Update the hook names

* Fix broken builds

* Update the macros and variables

* Reword the briefs of hooks and uncrustify

* Fir formatting

* Protect get packet ID

* Fix formatting
  • Loading branch information
AniruddhaKanhere authored Sep 14, 2022
1 parent 31defb2 commit fed0ad9
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 59 deletions.
1 change: 1 addition & 0 deletions lexicon.txt
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ mqttsubacksuccessqos
mqttsubscribeinfo
mqttsuccess
msb
mutex
mynetworkrecvimplementation
mynetworksendimplementation
mytcpsocketcontext
Expand Down
167 changes: 108 additions & 59 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,39 @@

#include "core_mqtt_default_logging.h"

#ifndef MQTT_PRE_SEND_HOOK

/**
* @brief Hook called before a 'send' operation is executed.
*/
#define MQTT_PRE_SEND_HOOK( pContext )
#endif /* !MQTT_PRE_SEND_HOOK */

#ifndef MQTT_POST_SEND_HOOK

/**
* @brief Hook called after the 'send' operation is complete.
*/
#define MQTT_POST_SEND_HOOK( pContext )
#endif /* !MQTT_POST_SEND_HOOK */

#ifndef MQTT_PRE_STATE_UPDATE_HOOK

/**
* @brief Hook called just before an update to the MQTT state is made.
*/
#define MQTT_PRE_STATE_UPDATE_HOOK( pContext )
#endif /* !MQTT_PRE_STATE_UPDATE_HOOK */

#ifndef MQTT_POST_STATE_UPDATE_HOOK

/**
* @brief Hook called just after an update to the MQTT state has
* been made.
*/
#define MQTT_POST_STATE_UPDATE_HOOK( pContext )
#endif /* !MQTT_POST_STATE_UPDATE_HOOK */

/*-----------------------------------------------------------*/

/**
Expand Down Expand Up @@ -431,23 +464,6 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
size_t headerSize,
uint16_t packetId );

/**
* @brief Serializes a PUBLISH message.
*
* @brief param[in] pContext Initialized MQTT context.
* @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
* @brief param[in] packetId Packet Id of the publish packet.
* @brief param[out] pHeaderSize Size of the serialized PUBLISH header.
*
* @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
* #MQTTBadParameter if invalid parameters are passed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId,
size_t * const pHeaderSize );

/**
* @brief Function to validate #MQTT_Publish parameters.
*
Expand Down Expand Up @@ -1269,22 +1285,31 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_SEND_HOOK( pContext );

/* Here, we are not using the vector approach for efficiency. There is just one buffer
* to be sent which can be achieved with a normal send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
MQTT_PUBLISH_ACK_PACKET_SIZE );

MQTT_POST_SEND_HOOK( pContext );
}

if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
{
pContext->controlPacketSent = true;

MQTT_PRE_STATE_UPDATE_HOOK( pContext );

status = MQTT_UpdateStateAck( pContext,
packetId,
packetType,
MQTT_SEND,
&newState );

MQTT_POST_STATE_UPDATE_HOOK( pContext );

if( status != MQTTSuccess )
{
LogError( ( "Failed to update state of publish %hu.",
Expand Down Expand Up @@ -1383,12 +1408,16 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

status = MQTT_UpdateStatePublish( pContext,
packetIdentifier,
MQTT_RECEIVE,
publishInfo.qos,
&publishRecordState );

MQTT_POST_STATE_UPDATE_HOOK( pContext );

if( status == MQTTSuccess )
{
LogInfo( ( "State record updated. New state=%s.",
Expand Down Expand Up @@ -1500,12 +1529,16 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

status = MQTT_UpdateStateAck( pContext,
packetIdentifier,
ackType,
MQTT_RECEIVE,
&publishRecordState );

MQTT_POST_STATE_UPDATE_HOOK( pContext );

if( status == MQTTSuccess )
{
LogInfo( ( "State record updated. New state=%s.",
Expand Down Expand Up @@ -2104,8 +2137,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
/* Validate arguments. */
if( pConnectInfo == NULL )
{
LogError( ( "Argument cannot be NULL: pConnectInfo=%p, "
"pFixedBuffer=%p.",
LogError( ( "Argument cannot be NULL: pConnectInfo=%p.",
( void * ) pConnectInfo ) );
status = MQTTBadParameter;
}
Expand Down Expand Up @@ -2329,44 +2361,6 @@ static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
return status;
}

/*-----------------------------------------------------------*/

static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId,
size_t * const pHeaderSize )
{
MQTTStatus_t status = MQTTSuccess;
size_t remainingLength = 0UL, packetSize = 0UL;

assert( pContext != NULL );
assert( pPublishInfo != NULL );
assert( pHeaderSize != NULL );

/* Get the remaining length and packet size.*/
status = MQTT_GetPublishPacketSize( pPublishInfo,
&remainingLength,
&packetSize );
LogDebug( ( "PUBLISH packet size is %lu and remaining length is %lu.",
( unsigned long ) packetSize,
( unsigned long ) remainingLength ) );

if( status == MQTTSuccess )
{
status = MQTT_SerializePublishHeader( pPublishInfo,
packetId,
remainingLength,
&( pContext->networkBuffer ),
pHeaderSize );
LogDebug( ( "Serialized PUBLISH header size is %lu.",
( unsigned long ) *pHeaderSize ) );
}

return status;
}

/*-----------------------------------------------------------*/

static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId )
Expand Down Expand Up @@ -2542,8 +2536,12 @@ MQTTStatus_t MQTT_CancelCallback( MQTTContext_t * pContext,
}
else
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

status = MQTT_RemoveStateRecord( pContext,
packetId );

MQTT_POST_STATE_UPDATE_HOOK( pContext );
}

return status;
Expand Down Expand Up @@ -2587,10 +2585,14 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_SEND_HOOK( pContext );

status = sendConnectWithoutCopy( pContext,
pConnectInfo,
pWillInfo,
remainingLength );

MQTT_POST_SEND_HOOK( pContext );
}

/* Read CONNACK from transport layer. */
Expand Down Expand Up @@ -2656,12 +2658,16 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_SEND_HOOK( pContext );

/* Send MQTT SUBSCRIBE packet. */
status = sendSubscribeWithoutCopy( pContext,
pSubscriptionList,
subscriptionCount,
packetId,
remainingLength );

MQTT_POST_SEND_HOOK( pContext );
}

return status;
Expand All @@ -2675,6 +2681,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
{
size_t headerSize = 0UL, remainingLength = 0UL, packetSize = 0UL;
MQTTPublishState_t publishStatus = MQTTStateNull;
bool stateUpdateHookExecuted = false;

/* 1 header byte + 4 bytes (maximum) required for encoding the length +
* 2 bytes for topic string. */
Expand All @@ -2701,7 +2708,11 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,

if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
{
/* Reserve state for publish message. Only to be done for QoS1 or QoS2. */
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

/* Set the flag so that the corresponding hook can be called later. */
stateUpdateHookExecuted = true;

status = MQTT_ReserveState( pContext,
packetId,
pPublishInfo->qos );
Expand All @@ -2717,14 +2728,22 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
/* Take the mutex as multiple send calls are required for sending this
* packet. */
MQTT_PRE_SEND_HOOK( pContext );

status = sendPublishWithoutCopy( pContext,
pPublishInfo,
mqttHeader,
headerSize,
packetId );

/* Give the mutex away for the next taker. */
MQTT_POST_SEND_HOOK( pContext );
}

if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
if( ( status == MQTTSuccess ) &&
( pPublishInfo->qos > MQTTQoS0 ) )
{
/* Update state machine after PUBLISH is sent.
* Only to be done for QoS1 or QoS2. */
Expand All @@ -2744,6 +2763,13 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
}
}

if( stateUpdateHookExecuted == true )
{
/* Regardless of the status, if the mutex was taken due to the
* packet being of QoS > QoS0, then it should be relinquished. */
MQTT_POST_STATE_UPDATE_HOOK( pContext );
}

if( status != MQTTSuccess )
{
LogError( ( "MQTT PUBLISH failed with status %s.",
Expand Down Expand Up @@ -2797,6 +2823,10 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )

if( status == MQTTSuccess )
{
/* Take the mutex as the send call should not be interrupted in
* between. */
MQTT_PRE_SEND_HOOK( pContext );

/* Send the serialized PINGREQ packet to transport layer.
* Here, we do not use the vectored IO approach for efficiency as the
* Ping packet does not have numerous fields which need to be copied
Expand All @@ -2805,6 +2835,9 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
localBuffer.pBuffer,
2U );

/* Give the mutex away. */
MQTT_POST_SEND_HOOK( pContext );

/* It is an error to not send the entire PINGREQ packet. */
if( bytesSent < ( int32_t ) packetSize )
{
Expand Down Expand Up @@ -2852,11 +2885,17 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
/* Take the mutex because the below call should not be interrupted. */
MQTT_PRE_SEND_HOOK( pContext );

status = sendUnsubscribeWithoutCopy( pContext,
pSubscriptionList,
subscriptionCount,
packetId,
remainingLength );

/* Give the mutex away. */
MQTT_POST_SEND_HOOK( pContext );
}

return status;
Expand Down Expand Up @@ -2898,13 +2937,19 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )

if( status == MQTTSuccess )
{
/* Take the mutex because the below call should not be interrupted. */
MQTT_PRE_SEND_HOOK( pContext );

/* Here we do not use vectors as the disconnect packet has fixed fields
* which do not reside in user provided buffers. Thus, it can be sent
* using a simple send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
packetSize );

/* Give the mutex away. */
MQTT_POST_SEND_HOOK( pContext );

if( bytesSent < ( int32_t ) packetSize )
{
LogError( ( "Transport send failed for DISCONNECT packet." ) );
Expand Down Expand Up @@ -2987,6 +3032,8 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )

if( pContext != NULL )
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

packetId = pContext->nextPacketId;

/* A packet ID of zero is not a valid packet ID. When the max ID
Expand All @@ -2999,6 +3046,8 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
{
pContext->nextPacketId++;
}

MQTT_POST_STATE_UPDATE_HOOK( pContext );
}

return packetId;
Expand Down

0 comments on commit fed0ad9

Please sign in to comment.