Skip to content

Commit

Permalink
Added onDelivery callback for messages with QoS 1 or 2.
Browse files Browse the repository at this point in the history
  • Loading branch information
Slavey Karadzhov committed Feb 24, 2016
1 parent 5b9a747 commit 3d909ae
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
21 changes: 19 additions & 2 deletions Sming/SmingCore/Network/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,16 @@ bool MqttClient::publish(String topic, String message, bool retained /* = false*
return res > 0;
}

bool MqttClient::publishWithQoS(String topic, String message, int QoS, bool retained /* = false*/)
bool MqttClient::publishWithQoS(String topic, String message, int QoS, bool retained /* = false*/, MqttMessageDeliveredCallback onDelivery /* = NULL */)
{
int res = mqtt_publish_with_qos(&broker, topic.c_str(), message.c_str(), retained, QoS, NULL);
uint16_t msgId = 0;
int res = mqtt_publish_with_qos(&broker, topic.c_str(), message.c_str(), retained, QoS, &msgId);
if(QoS == 0 && onDelivery) {
debugf("The delivery callback is ignored for QoS 0.");
}
else if(QoS >0 && onDelivery && msgId) {
onDeliveryQueue[msgId] = onDelivery;
}
return res > 0;
}

Expand Down Expand Up @@ -255,6 +262,16 @@ err_t MqttClient::onReceive(pbuf *buf)
debugf("WRONG SIZES: %d: %d", lenTopic, lenMsg);
}
}
else if (type == MQTT_MSG_PUBACK || type == MQTT_MSG_PUBREC) {
// message with QoS 1 or 2 was received and this is the confirmation
const uint16_t msgId = mqtt_parse_msg_id(buffer);
debugf("message with id: %d was delivered", msgId);
if(onDeliveryQueue.contains(msgId)) {
// there is a callback for this message
onDeliveryQueue[msgId](msgId, type);
onDeliveryQueue.remove(msgId);
}
}
}
}
else
Expand Down
5 changes: 4 additions & 1 deletion Sming/SmingCore/Network/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
#include "TcpClient.h"
#include "../Delegate.h"
#include "../../Wiring/WString.h"
#include "../../Wiring/WHashMap.h"
#include "../../Services/libemqtt/libemqtt.h"

//typedef void (*MqttStringSubscriptionCallback)(String topic, String message);
typedef Delegate<void(String topic, String message)> MqttStringSubscriptionCallback;
typedef Delegate<void(uint16_t msgId, int type)> MqttMessageDeliveredCallback;

class MqttClient;
class URL;
Expand All @@ -41,7 +43,7 @@ class MqttClient: protected TcpClient
__forceinline TcpClientState getConnectionState() { return TcpClient::getConnectionState(); }

bool publish(String topic, String message, bool retained = false);
bool publishWithQoS(String topic, String message, int QoS, bool retained = false);
bool publishWithQoS(String topic, String message, int QoS, bool retained = false, MqttMessageDeliveredCallback onDelivery = NULL);

bool subscribe(String topic);
bool unsubscribe(String topic);
Expand All @@ -64,6 +66,7 @@ class MqttClient: protected TcpClient
MqttStringSubscriptionCallback callback;
int keepAlive = 20;
unsigned long lastMessage;
HashMap<uint16_t, MqttMessageDeliveredCallback> onDeliveryQueue;
};

#endif /* _SMING_CORE_NETWORK_MqttClient_H_ */
6 changes: 6 additions & 0 deletions samples/MqttClient_Hello/app/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ void checkMQTTDisconnect(TcpClient& client, bool flag){
procTimer.initializeMs(2 * 1000, startMqttClient).start(); // every 2 seconds
}

void onMessageDelivered(uint16_t msgId, int type) {
Serial.printf("Message with id %d and QoS %d was delivered successfully.", msgId, (type==MQTT_MSG_PUBREC? 2: 1));
}

// Publish our message
void publishMessage()
{
Expand All @@ -50,6 +54,8 @@ void publishMessage()

Serial.println("Let's publish message now!");
mqtt.publish("main/frameworks/sming", "Hello friends, from Internet of things :)"); // or publishWithQoS

mqtt.publishWithQoS("important/frameworks/sming", "Request Return Delivery", 1, false, onMessageDelivered);
}

// Callback for messages, arrived from MQTT server
Expand Down

0 comments on commit 3d909ae

Please sign in to comment.