Skip to content

Commit

Permalink
MqttClient revisions
Browse files Browse the repository at this point in the history
Move callbacks into PROGMEM
  • Loading branch information
mikee47 committed Nov 7, 2020
1 parent d89febc commit ff58432
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 44 deletions.
69 changes: 33 additions & 36 deletions Sming/Core/Network/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@
#include "Clock.h"

// Content length set to this value to indicate data refers to a stream, not a buffer
#define MQTT_PUBLISH_STREAM 0
static constexpr unsigned MQTT_PUBLISH_STREAM = 0;

mqtt_serialiser_t MqttClient::serialiser;
mqtt_parser_callbacks_t MqttClient::callbacks;
static constexpr uint8_t MQTT_CONNECT_PROTOCOL = 4; // version 3.1.1

const mqtt_parser_callbacks_t MqttClient::callbacks PROGMEM = {
.on_message_begin = staticOnMessageBegin,
.on_data_begin = staticOnDataBegin,
.on_data_payload = staticOnDataPayload,
.on_data_end = staticOnDataEnd,
.on_message_end = staticOnMessageEnd,
};

#define GET_CLIENT() \
auto client = static_cast<MqttClient*>(userData); \
if(client == nullptr) { \
return -1; \
}

static mqtt_message_t* createMessage(mqtt_type_t messageType)
{
Expand Down Expand Up @@ -57,23 +70,14 @@ static bool copyString(mqtt_buffer_t& destBuffer, const String& sourceString)

MqttClient::MqttClient(bool withDefaultPayloadParser, bool autoDestruct) : TcpClient(autoDestruct)
{
// TODO:...
// if(!bitSet(flags, MQTT_CLIENT_CALLBACKS)) {
callbacks.on_message_begin = staticOnMessageBegin;
callbacks.on_data_begin = staticOnDataBegin;
callbacks.on_data_payload = staticOnDataPayload;
callbacks.on_data_end = staticOnDataEnd;
callbacks.on_message_end = staticOnMessageEnd;
// }

mqtt_parser_init(&parser, &callbacks);
mqtt_parser_init(&parser, const_cast<mqtt_parser_callbacks_t*>(&callbacks));
mqtt_serialiser_init(&serialiser);
mqtt_message_init(&incomingMessage);
mqtt_message_init(&connectMessage);

parser.data = this;
connectMessage.common.type = MQTT_TYPE_CONNECT;
connectMessage.connect.protocol_version = 4; // version 3.1.1
connectMessage.connect.protocol_version = MQTT_CONNECT_PROTOCOL;

if(withDefaultPayloadParser) {
setPayloadParser(defaultPayloadParser);
Expand Down Expand Up @@ -116,10 +120,7 @@ int MqttClient::staticOnMessageBegin(void* userData, mqtt_message_t* message)

int MqttClient::staticOnDataBegin(void* userData, mqtt_message_t* message)
{
MqttClient* client = static_cast<MqttClient*>(userData);
if(client == nullptr) {
return -1;
}
GET_CLIENT();

if(client->payloadParser) {
client->payloadState.offset = 0;
Expand All @@ -131,10 +132,7 @@ int MqttClient::staticOnDataBegin(void* userData, mqtt_message_t* message)

int MqttClient::staticOnDataPayload(void* userData, mqtt_message_t* message, const char* data, size_t length)
{
MqttClient* client = static_cast<MqttClient*>(userData);
if(client == nullptr) {
return -1;
}
GET_CLIENT();

if(client->payloadParser) {
return client->payloadParser(client->payloadState, message, data, length);
Expand All @@ -145,10 +143,7 @@ int MqttClient::staticOnDataPayload(void* userData, mqtt_message_t* message, con

int MqttClient::staticOnDataEnd(void* userData, mqtt_message_t* message)
{
MqttClient* client = static_cast<MqttClient*>(userData);
if(client == nullptr) {
return -1;
}
GET_CLIENT();

if(client->payloadParser) {
return client->payloadParser(client->payloadState, message, nullptr, MQTT_PAYLOAD_PARSER_END);
Expand All @@ -159,27 +154,29 @@ int MqttClient::staticOnDataEnd(void* userData, mqtt_message_t* message)

int MqttClient::staticOnMessageEnd(void* userData, mqtt_message_t* message)
{
MqttClient* client = static_cast<MqttClient*>(userData);
if(client == nullptr) {
return -1;
}
GET_CLIENT();
return client->onMessageEnd(message);
}

int MqttClient::onMessageEnd(mqtt_message_t* message)
{
if(message->common.type == MQTT_TYPE_CONNACK) {
if(message->connack.return_code) {
// failure
clearBits(client->flags, MQTT_CLIENT_CONNECTED);
client->setTimeOut(1); // schedule the connection for closing
clearBits(flags, MQTT_CLIENT_CONNECTED);
setTimeOut(1); // schedule the connection for closing

return message->connack.return_code;
}

// success
client->setTimeOut(USHRT_MAX);
setBits(client->flags, MQTT_CLIENT_CONNECTED);
setTimeOut(USHRT_MAX);
setBits(flags, MQTT_CLIENT_CONNECTED);
}

if(client->eventHandler.contains(message->common.type)) {
return client->eventHandler[message->common.type](*client, message);
auto& handler = reinterpret_cast<const HandlerMap&>(eventHandlers)[message->common.type];
if(handler) {
return handler(*this, message);
}

return 0;
Expand Down
18 changes: 10 additions & 8 deletions Sming/Core/Network/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class MqttClient : protected TcpClient

void setEventHandler(mqtt_type_t type, MqttDelegate handler)
{
eventHandler[type] = handler;
eventHandlers[type] = handler;
}

/**
Expand All @@ -123,7 +123,7 @@ class MqttClient : protected TcpClient
*/
void setConnectedHandler(MqttDelegate handler)
{
eventHandler[MQTT_TYPE_CONNACK] = handler;
eventHandlers[MQTT_TYPE_CONNACK] = handler;
}

/**
Expand All @@ -134,8 +134,8 @@ class MqttClient : protected TcpClient
*/
void setPublishedHandler(MqttDelegate handler)
{
eventHandler[MQTT_TYPE_PUBACK] = handler;
eventHandler[MQTT_TYPE_PUBREC] = handler;
eventHandlers[MQTT_TYPE_PUBACK] = handler;
eventHandlers[MQTT_TYPE_PUBREC] = handler;
}

/**
Expand All @@ -145,7 +145,7 @@ class MqttClient : protected TcpClient
*/
void setMessageHandler(MqttDelegate handler)
{
eventHandler[MQTT_TYPE_PUBLISH] = handler;
eventHandlers[MQTT_TYPE_PUBLISH] = handler;
}

/**
Expand Down Expand Up @@ -233,6 +233,7 @@ class MqttClient : protected TcpClient
static int staticOnDataPayload(void* user_data, mqtt_message_t* message, const char* data, size_t length);
static int staticOnDataEnd(void* user_data, mqtt_message_t* message);
static int staticOnMessageEnd(void* user_data, mqtt_message_t* message);
int onMessageEnd(mqtt_message_t* message);

#ifndef MQTT_NO_COMPAT
/** @deprecated This method is only for compatibility with the previous release and will be removed soon. */
Expand Down Expand Up @@ -286,7 +287,8 @@ class MqttClient : protected TcpClient
Url url;

// callbacks
HashMap<mqtt_type_t, MqttDelegate> eventHandler;
using HandlerMap = HashMap<mqtt_type_t, MqttDelegate>;
HandlerMap eventHandlers;
MqttPayloadParser payloadParser = nullptr;

// states
Expand All @@ -306,8 +308,8 @@ class MqttClient : protected TcpClient
mqtt_message_t incomingMessage;

// parsers and serializers
static mqtt_serialiser_t serialiser;
static mqtt_parser_callbacks_t callbacks;
mqtt_serialiser_t serialiser;
static const mqtt_parser_callbacks_t callbacks;
mqtt_parser_t parser;

// client flags
Expand Down

0 comments on commit ff58432

Please sign in to comment.