Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hooks to the source code #200

Merged
merged 16 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lexicon.txt
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ mqttsubacksuccessqos
mqttsubscribeinfo
mqttsuccess
msb
mutex
mynetworkrecvimplementation
mynetworksendimplementation
mytcpsocketcontext
Expand Down
167 changes: 108 additions & 59 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,39 @@

#include "core_mqtt_default_logging.h"

#ifndef MQTT_PRE_SEND_HOOK

/**
* @brief Hook called before a 'send' operation is executed.
*/
#define MQTT_PRE_SEND_HOOK( pContext )
#endif /* !MQTT_PRE_SEND_HOOK */

#ifndef MQTT_POST_SEND_HOOK

/**
* @brief Hook called after the 'send' operation is complete.
*/
#define MQTT_POST_SEND_HOOK( pContext )
#endif /* !MQTT_POST_SEND_HOOK */

#ifndef MQTT_PRE_STATE_UPDATE_HOOK

/**
* @brief Hook called just before an update to the MQTT state is made.
*/
#define MQTT_PRE_STATE_UPDATE_HOOK( pContext )
#endif /* !MQTT_PRE_STATE_UPDATE_HOOK */

#ifndef MQTT_POST_STATE_UPDATE_HOOK

/**
* @brief Hook called just after an update to the MQTT state has
* been made.
*/
#define MQTT_POST_STATE_UPDATE_HOOK( pContext )
#endif /* !MQTT_POST_STATE_UPDATE_HOOK */

/*-----------------------------------------------------------*/

/**
Expand Down Expand Up @@ -431,23 +464,6 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
size_t headerSize,
uint16_t packetId );

/**
* @brief Serializes a PUBLISH message.
*
* @brief param[in] pContext Initialized MQTT context.
* @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
* @brief param[in] packetId Packet Id of the publish packet.
* @brief param[out] pHeaderSize Size of the serialized PUBLISH header.
*
* @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
* #MQTTBadParameter if invalid parameters are passed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId,
size_t * const pHeaderSize );

/**
* @brief Function to validate #MQTT_Publish parameters.
*
Expand Down Expand Up @@ -1269,22 +1285,31 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_SEND_HOOK( pContext );

/* Here, we are not using the vector approach for efficiency. There is just one buffer
* to be sent which can be achieved with a normal send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
MQTT_PUBLISH_ACK_PACKET_SIZE );

MQTT_POST_SEND_HOOK( pContext );
}

if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
{
pContext->controlPacketSent = true;

Copy link
Contributor

@paulbartell paulbartell Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it be necessary to keep hold of the mutex between the sendPacket and MQTT_UpdateStateAck calls?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not.
They are decoupled altogether. Also, the state will be protected by its mutex.

MQTT_PRE_STATE_UPDATE_HOOK( pContext );

status = MQTT_UpdateStateAck( pContext,
packetId,
packetType,
MQTT_SEND,
&newState );

MQTT_POST_STATE_UPDATE_HOOK( pContext );

if( status != MQTTSuccess )
{
LogError( ( "Failed to update state of publish %hu.",
Expand Down Expand Up @@ -1383,12 +1408,16 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

status = MQTT_UpdateStatePublish( pContext,
packetIdentifier,
MQTT_RECEIVE,
publishInfo.qos,
&publishRecordState );

MQTT_POST_STATE_UPDATE_HOOK( pContext );

if( status == MQTTSuccess )
{
LogInfo( ( "State record updated. New state=%s.",
Expand Down Expand Up @@ -1500,12 +1529,16 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

status = MQTT_UpdateStateAck( pContext,
packetIdentifier,
ackType,
MQTT_RECEIVE,
&publishRecordState );

MQTT_POST_STATE_UPDATE_HOOK( pContext );

if( status == MQTTSuccess )
{
LogInfo( ( "State record updated. New state=%s.",
Expand Down Expand Up @@ -2104,8 +2137,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
/* Validate arguments. */
if( pConnectInfo == NULL )
{
LogError( ( "Argument cannot be NULL: pConnectInfo=%p, "
"pFixedBuffer=%p.",
LogError( ( "Argument cannot be NULL: pConnectInfo=%p.",
( void * ) pConnectInfo ) );
status = MQTTBadParameter;
}
Expand Down Expand Up @@ -2329,44 +2361,6 @@ static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
return status;
}

/*-----------------------------------------------------------*/

static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId,
size_t * const pHeaderSize )
{
MQTTStatus_t status = MQTTSuccess;
size_t remainingLength = 0UL, packetSize = 0UL;

assert( pContext != NULL );
assert( pPublishInfo != NULL );
assert( pHeaderSize != NULL );

/* Get the remaining length and packet size.*/
status = MQTT_GetPublishPacketSize( pPublishInfo,
&remainingLength,
&packetSize );
LogDebug( ( "PUBLISH packet size is %lu and remaining length is %lu.",
( unsigned long ) packetSize,
( unsigned long ) remainingLength ) );

if( status == MQTTSuccess )
{
status = MQTT_SerializePublishHeader( pPublishInfo,
packetId,
remainingLength,
&( pContext->networkBuffer ),
pHeaderSize );
LogDebug( ( "Serialized PUBLISH header size is %lu.",
( unsigned long ) *pHeaderSize ) );
}

return status;
}

/*-----------------------------------------------------------*/

static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId )
Expand Down Expand Up @@ -2542,8 +2536,12 @@ MQTTStatus_t MQTT_CancelCallback( MQTTContext_t * pContext,
}
else
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

status = MQTT_RemoveStateRecord( pContext,
packetId );

MQTT_POST_STATE_UPDATE_HOOK( pContext );
}

return status;
Expand Down Expand Up @@ -2587,10 +2585,14 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_SEND_HOOK( pContext );

status = sendConnectWithoutCopy( pContext,
pConnectInfo,
pWillInfo,
remainingLength );

MQTT_POST_SEND_HOOK( pContext );
}

/* Read CONNACK from transport layer. */
Expand Down Expand Up @@ -2656,12 +2658,16 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
MQTT_PRE_SEND_HOOK( pContext );

/* Send MQTT SUBSCRIBE packet. */
status = sendSubscribeWithoutCopy( pContext,
pSubscriptionList,
subscriptionCount,
packetId,
remainingLength );

MQTT_POST_SEND_HOOK( pContext );
}

return status;
Expand All @@ -2675,6 +2681,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
{
size_t headerSize = 0UL, remainingLength = 0UL, packetSize = 0UL;
MQTTPublishState_t publishStatus = MQTTStateNull;
bool stateUpdateHookExecuted = false;

/* 1 header byte + 4 bytes (maximum) required for encoding the length +
* 2 bytes for topic string. */
Expand All @@ -2701,7 +2708,11 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,

if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
{
/* Reserve state for publish message. Only to be done for QoS1 or QoS2. */
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

/* Set the flag so that the corresponding hook can be called later. */
stateUpdateHookExecuted = true;

status = MQTT_ReserveState( pContext,
packetId,
pPublishInfo->qos );
Expand All @@ -2717,14 +2728,22 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
/* Take the mutex as multiple send calls are required for sending this
* packet. */
MQTT_PRE_SEND_HOOK( pContext );

status = sendPublishWithoutCopy( pContext,
pPublishInfo,
mqttHeader,
headerSize,
packetId );

/* Give the mutex away for the next taker. */
MQTT_POST_SEND_HOOK( pContext );
}

if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
if( ( status == MQTTSuccess ) &&
( pPublishInfo->qos > MQTTQoS0 ) )
{
/* Update state machine after PUBLISH is sent.
* Only to be done for QoS1 or QoS2. */
Expand All @@ -2744,6 +2763,13 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
}
}

if( stateUpdateHookExecuted == true )
{
/* Regardless of the status, if the mutex was taken due to the
* packet being of QoS > QoS0, then it should be relinquished. */
MQTT_POST_STATE_UPDATE_HOOK( pContext );
}

if( status != MQTTSuccess )
{
LogError( ( "MQTT PUBLISH failed with status %s.",
Expand Down Expand Up @@ -2797,6 +2823,10 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )

if( status == MQTTSuccess )
{
/* Take the mutex as the send call should not be interrupted in
* between. */
MQTT_PRE_SEND_HOOK( pContext );

/* Send the serialized PINGREQ packet to transport layer.
* Here, we do not use the vectored IO approach for efficiency as the
* Ping packet does not have numerous fields which need to be copied
Expand All @@ -2805,6 +2835,9 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
localBuffer.pBuffer,
2U );

/* Give the mutex away. */
MQTT_POST_SEND_HOOK( pContext );

/* It is an error to not send the entire PINGREQ packet. */
if( bytesSent < ( int32_t ) packetSize )
{
Expand Down Expand Up @@ -2852,11 +2885,17 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
/* Take the mutex because the below call should not be interrupted. */
MQTT_PRE_SEND_HOOK( pContext );

status = sendUnsubscribeWithoutCopy( pContext,
pSubscriptionList,
subscriptionCount,
packetId,
remainingLength );

/* Give the mutex away. */
MQTT_POST_SEND_HOOK( pContext );
}

return status;
Expand Down Expand Up @@ -2898,13 +2937,19 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )

if( status == MQTTSuccess )
{
/* Take the mutex because the below call should not be interrupted. */
MQTT_PRE_SEND_HOOK( pContext );

/* Here we do not use vectors as the disconnect packet has fixed fields
* which do not reside in user provided buffers. Thus, it can be sent
* using a simple send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
packetSize );

/* Give the mutex away. */
MQTT_POST_SEND_HOOK( pContext );

if( bytesSent < ( int32_t ) packetSize )
{
LogError( ( "Transport send failed for DISCONNECT packet." ) );
Expand Down Expand Up @@ -2987,6 +3032,8 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )

if( pContext != NULL )
{
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

packetId = pContext->nextPacketId;

/* A packet ID of zero is not a valid packet ID. When the max ID
Expand All @@ -2999,6 +3046,8 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
{
pContext->nextPacketId++;
}

MQTT_POST_STATE_UPDATE_HOOK( pContext );
}

return packetId;
Expand Down