Skip to content

Commit

Permalink
Implement MQTT message queue
Browse files Browse the repository at this point in the history
  • Loading branch information
matjack1 committed Mar 6, 2022
1 parent f22e28e commit 5d74047
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,6 @@ void ICACHE_RAM_ATTR loop()
#endif
}
}
processMqttMessage();
processMqttQueue();
}
}
88 changes: 63 additions & 25 deletions src/mqtt.esp
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
char mqttBuffer[512];
StaticJsonDocument<512> incomingMqttMessage;
bool mqttMessageToProcess = false;

struct MqttMessage {
char command[20];
char uid[20];
char user[64];
char serializedMessage[512];
MqttMessage *nextMessage = NULL;
};
MqttMessage *messageQueue = NULL;

void connectToMqtt()
{
Expand Down Expand Up @@ -380,7 +387,7 @@ void onMqttMessage(char *topic, char *payload, AsyncMqttClientMessageProperties
i++;
}
if(index + len == total) {
mqttBuffer[i + 1] = '\0';
mqttBuffer[i] = '\0';
} else {
return;
}
Expand All @@ -389,7 +396,8 @@ void onMqttMessage(char *topic, char *payload, AsyncMqttClientMessageProperties
Serial.println(mqttBuffer);
#endif

auto error = deserializeJson(incomingMqttMessage, mqttBuffer);
StaticJsonDocument<512> mqttIncomingJson;
auto error = deserializeJson(mqttIncomingJson, mqttBuffer);
if (error)
{
#ifdef DEBUG
Expand All @@ -403,9 +411,9 @@ void onMqttMessage(char *topic, char *payload, AsyncMqttClientMessageProperties
{
// Check if IP was send with command because we only
// accept commands for this where sent IP is equal to device IP
if (incomingMqttMessage.containsKey("doorip"))
if (mqttIncomingJson.containsKey("doorip"))
{
const char *ipadr = incomingMqttMessage["doorip"];
const char *ipadr = mqttIncomingJson["doorip"];
String espIp = WiFi.localIP().toString();
if (!((strcmp(ipadr, espIp.c_str()) == 0) && (ipadr != NULL)))
{
Expand All @@ -424,24 +432,50 @@ void onMqttMessage(char *topic, char *payload, AsyncMqttClientMessageProperties
}
}

mqttMessageToProcess = true;
return;
}

void processMqttMessage() {
if(!mqttMessageToProcess)
if(ESP.getFreeHeap() < 2000)
{
#ifdef DEBUG
Serial.println("Dropping MQTT message, out of memory");
#endif
writeEvent("ERRO", "mqtt", "Dropping MQTT message, out of memory","");
return;
}
mqttMessageToProcess = false;
const char *command = incomingMqttMessage["cmd"];

MqttMessage* incomingMessage = new MqttMessage;

strlcpy(incomingMessage->command, mqttIncomingJson["cmd"], sizeof(incomingMessage->command));
if (mqttIncomingJson.containsKey("uid")) {
strlcpy(incomingMessage->uid, mqttIncomingJson["uid"], sizeof(incomingMessage->uid));
}
if (mqttIncomingJson.containsKey("user")) {
strlcpy(incomingMessage->user, mqttIncomingJson["user"], sizeof(incomingMessage->user));
}
serializeJson(mqttIncomingJson, incomingMessage->serializedMessage);

MqttMessage* lastMessage = messageQueue;
if(lastMessage == NULL)
{
messageQueue = incomingMessage;
}
else {
while(lastMessage->nextMessage != NULL)
{
lastMessage = lastMessage->nextMessage;
}
lastMessage->nextMessage = incomingMessage;
}

return;
}

void processMqttMessage(MqttMessage *incomingMessage) {
char *command = incomingMessage->command;
if (strcmp(command, "getuser") == 0)
{
#ifdef DEBUG
Serial.println("[ INFO ] Get User List");
#endif
getUserList(0);
return;
}

else if (strcmp(command, "listusr") == 0)
Expand All @@ -450,7 +484,6 @@ void processMqttMessage() {
Serial.println("[ INFO ] List users");
#endif
getUserList(1);
return;
}

else if (strcmp(command, "opendoor") == 0)
Expand All @@ -462,7 +495,6 @@ void processMqttMessage() {
mqtt_publish_access(now(), "true", "Always", "MQTT", " ");
activateRelay[0] = true;
previousMillis = millis();
return;
}

else if (strcmp(command, "deletusers") == 0)
Expand All @@ -471,40 +503,38 @@ void processMqttMessage() {
Serial.println("[ INFO ] Delete all users");
#endif
DeleteAllUserFiles();
return;
}

else if (strcmp(command, "deletuid") == 0)
{
#ifdef DEBUG
Serial.println("[ INFO ] Delete a single user by uid");
#endif
const char *uid = incomingMqttMessage["uid"];
const char *uid = incomingMessage->uid;
DeleteUserID(uid);
return;
}

else if (strcmp(command, "adduser") == 0)
{

#ifdef DEBUG
Serial.print("[ INFO ] Add Users: ");
const char *name = incomingMqttMessage["user"];
const char *name = incomingMessage->user;
Serial.println(name);
#endif

const char *uid = incomingMqttMessage["uid"];
const char *uid = incomingMessage->uid;
String filename = "/P/";
filename += uid;
File f = SPIFFS.open(filename, "w+");
// Check if we created the file
if (f)
{
serializeJson(incomingMqttMessage, f);
f.println(incomingMessage->serializedMessage);
}
f.close();
return;
}
free(incomingMessage);
return;
}

Expand Down Expand Up @@ -541,4 +571,12 @@ void onMqttConnect(bool sessionPresent)
Serial.print("[ INFO ] Subscribing at QoS 2, packetId: ");
Serial.println(packetIdSub);
#endif
}
}

void processMqttQueue() {
while(messageQueue != NULL) {
MqttMessage *messageToProcess = messageQueue;
messageQueue = messageToProcess->nextMessage;
processMqttMessage(messageToProcess);
}
}

0 comments on commit 5d74047

Please sign in to comment.