Skip to content

Commit

Permalink
Send MQTT message when done processing one
Browse files Browse the repository at this point in the history
This allows waiting on the other end to avoid sending too many messages, and also sends ACK so that we can be sure that the message has been received properly.
  • Loading branch information
matjack1 committed Mar 6, 2022
1 parent 5d74047 commit 76076d3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
20 changes: 6 additions & 14 deletions src/log.esp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
void extern mqtt_publish_event(JsonDocument *root);
void extern mqttPublishEvent(JsonDocument *root);

void ICACHE_FLASH_ATTR writeEvent(String type, String src, String desc, String data)
{
Expand All @@ -12,7 +12,7 @@ void ICACHE_FLASH_ATTR writeEvent(String type, String src, String desc, String d
{
root["cmd"] = "event";
root["door"] = config.deviceHostname;
mqtt_publish_event(&root);
mqttPublishEvent(&root);
}
else // log to file
{
Expand All @@ -33,18 +33,10 @@ void ICACHE_FLASH_ATTR writeLatest(String uid, String username, int acctype)
root["username"] = username;
root["acctype"] = acctype;
root["timestamp"] = now();
if ((config.mqttEvents) && (config.mqttEnabled == 1))
{ // log to MQTT
//root["cmd"] = "access";
//mqtt_publish_event(&root);
}
else // log to file
{
File latestlog = SPIFFS.open("/latestlog.json", "a");
serializeJson(root, latestlog);
latestlog.print("\n");
latestlog.close();
}
File latestlog = SPIFFS.open("/latestlog.json", "a");
serializeJson(root, latestlog);
latestlog.print("\n");
latestlog.close();
}

size_t lastPos; // position counter for fast seek
Expand Down
34 changes: 22 additions & 12 deletions src/mqtt.esp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
}
}

void mqttPublishEvent(JsonDocument *root)
{
if (mqttClient.connected())
{
String stopic(mqttTopic);
stopic = stopic + "/send";
String mqttBuffer;
serializeJson(*root, mqttBuffer);
mqttClient.publish(stopic.c_str(), 0, false, mqttBuffer.c_str());
#ifdef DEBUG
Serial.print("[ INFO ] Mqtt Publish:");
Serial.println(mqttBuffer);
#endif
}
}

void mqtt_publish_boot(time_t boot_time)
{
String stopic(config.mqttTopic);
Expand Down Expand Up @@ -288,18 +304,6 @@ void mqtt_publish_io(String const &io, String const &state)
}
}

void mqtt_publish_event(JsonDocument *root)
{
if (mqttClient.connected())
{
String stopic(config.mqttTopic);
stopic = stopic + "/send";
String mqttBuffer;
serializeJson(*root, mqttBuffer);
mqttClient.publish(stopic.c_str(), 0, false, mqttBuffer.c_str());
}
}

void onMqttPublish(uint16_t packetId)
{
writeEvent("INFO", "mqtt", "MQTT publish acknowledged", String(packetId));
Expand Down Expand Up @@ -465,6 +469,12 @@ void onMqttMessage(char *topic, char *payload, AsyncMqttClientMessageProperties
lastMessage->nextMessage = incomingMessage;
}

DynamicJsonDocument root(512);
root["type"] = mqttIncomingJson["cmd"];
root["ip"] = WiFi.localIP().toString();
root["hostname"] = deviceHostname;
mqttPublishEvent(&root);

return;
}

Expand Down

0 comments on commit 76076d3

Please sign in to comment.