Skip to content

Commit

Permalink
Merge pull request #1273 from MichaelDvP/dev
Browse files Browse the repository at this point in the history
update Mqtt Client
  • Loading branch information
proddy authored Aug 31, 2023
2 parents d6ae552 + 4778206 commit f60197e
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 47 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG_LATEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@

## Added

- rssi in Network Status Page

## Fixed

- issue in espMqttClient on low mem

## Changed

- mqtt free mem check 60k
2 changes: 1 addition & 1 deletion interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"@preact/compat": "^17.1.2",
"@prefresh/vite": "^2.4.1",
"@table-library/react-table-library": "4.1.7",
"@types/lodash-es": "^4.17.8",
"@types/lodash-es": "^4.17.9",
"@types/node": "^20.5.7",
"@types/react": "^18.2.21",
"@types/react-dom": "^18.2.7",
Expand Down
2 changes: 1 addition & 1 deletion interface/src/framework/network/NetworkStatusForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ const NetworkStatusForm: FC = () => {
<SettingsInputAntennaIcon />
</Avatar>
</ListItemAvatar>
<ListItemText primary="SSID" secondary={data.ssid} />
<ListItemText primary="SSID (RSSI)" secondary={data.ssid + ' (' + data.rssi + ' dBm)'} />
</ListItem>
<Divider variant="inset" component="li" />
</>
Expand Down
10 changes: 5 additions & 5 deletions interface/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1339,12 +1339,12 @@ __metadata:
languageName: node
linkType: hard

"@types/lodash-es@npm:^4.17.8":
version: 4.17.8
resolution: "@types/lodash-es@npm:4.17.8"
"@types/lodash-es@npm:^4.17.9":
version: 4.17.9
resolution: "@types/lodash-es@npm:4.17.9"
dependencies:
"@types/lodash": "*"
checksum: 950771d406c842814dd22217adba5e01bd06b3c21e97900d3c3816f38580e132894400b5d83a7962645fa284d8478614bdcc50755255ad15024311b7b8ed8520
checksum: 9fe82df0ec14e2aad50a1bf6488c4457e3378fcc77f5806fbc8035904ef0848b70e50037b13d9bddb66d3a30b425d2998a4a438a5024efe7431b63fde0920378
languageName: node
linkType: hard

Expand Down Expand Up @@ -1586,7 +1586,7 @@ __metadata:
"@prefresh/vite": ^2.4.1
"@table-library/react-table-library": 4.1.7
"@types/babel__core": ^7
"@types/lodash-es": ^4.17.8
"@types/lodash-es": ^4.17.9
"@types/node": ^20.5.7
"@types/react": ^18.2.21
"@types/react-dom": ^18.2.7
Expand Down
34 changes: 13 additions & 21 deletions lib/espMqttClient/src/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ bool MqttClient::disconnected() const {
}

bool MqttClient::connect() {
bool result = true;
bool result = false;
if (_state == State::disconnected) {
EMC_SEMAPHORE_TAKE();
if (_addPacketFront(_cleanSession,
Expand All @@ -116,19 +116,21 @@ bool MqttClient::connect() {
_willPayloadLength,
(uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally
_clientId)) {
result = true;
_state = State::connectingTcp1;
#if defined(ARDUINO_ARCH_ESP32)
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
vTaskResume(_taskHandle);
}
#endif
_state = State::connectingTcp1;
} else {
EMC_SEMAPHORE_GIVE();
emc_log_e("Could not create CONNECT packet");
_onError(0, Error::OUT_OF_MEMORY);
result = false;
}
EMC_SEMAPHORE_GIVE();
} else if (_state <= State::connected) { // already connected or connecting
result = true;
}
return result;
}
Expand Down Expand Up @@ -196,6 +198,14 @@ const char * MqttClient::getClientId() const {
return _clientId;
}

size_t MqttClient::queueSize() {
size_t ret = 0;
EMC_SEMAPHORE_TAKE();
ret = _outbox.size();
EMC_SEMAPHORE_GIVE();
return ret;
}

void MqttClient::loop() {
switch ((State)_state) { // modified by proddy for EMS-ESP compiling standalone
case State::disconnected:
Expand Down Expand Up @@ -335,7 +345,6 @@ int MqttClient::_sendPacket() {
size_t wantToWrite = 0;
size_t written = 0;
if (packet && (wantToWrite == written)) {
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->packet.available(_bytesSent);
if (wantToWrite == 0) {
EMC_SEMAPHORE_GIVE();
Expand Down Expand Up @@ -630,9 +639,6 @@ void MqttClient::_onPubcomp() {
// if it doesn't match the ID, return
if ((it.get()->packet.packetType()) == PacketType.PUBREL) {
if (it.get()->packet.packetId() == idToMatch) {
// if (!_addPacket(PacketType.PUBCOMP, idToMatch)) {
// emc_log_e("Could not create PUBCOMP packet");
// }
callback = true;
_outbox.remove(it);
break;
Expand Down Expand Up @@ -697,20 +703,6 @@ void MqttClient::_onUnsuback() {
}
}

uint16_t MqttClient::getQueue() const {
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
uint16_t count = 0;
while (it) {
// if (it.get()->packet.packetType() == PacketType.PUBLISH) {
++count;
// }
++it;
}
EMC_SEMAPHORE_GIVE();
return count;
}

void MqttClient::_clearQueue(int clearData) {
emc_log_i("clearing queue (clear session: %d)", clearData);
EMC_SEMAPHORE_TAKE();
Expand Down
23 changes: 16 additions & 7 deletions lib/espMqttClient/src/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class MqttClient {
uint16_t publish(const char * topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length);
void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true!
const char * getClientId() const;
uint16_t getQueue() const;
size_t queueSize(); // No const because of mutex
void loop();

protected:
Expand Down Expand Up @@ -131,8 +131,9 @@ class MqttClient {
uint32_t timeSent;
espMqttClientInternals::Packet packet;
template <typename... Args>
OutgoingPacket(uint32_t t, espMqttClientTypes::Error error, Args &&... args)
: timeSent(t)
OutgoingPacket(uint32_t t, espMqttClientTypes::Error & error, Args &&... args)
: // NOLINT(runtime/references)
timeSent(t)
, packet(error, std::forward<Args>(args)...) {
}
};
Expand All @@ -150,18 +151,26 @@ class MqttClient {
bool _addPacket(Args &&... args) {
espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS);
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.emplace(0, error, std::forward<Args>(args)...);
if (it && error == espMqttClientTypes::Error::SUCCESS)
if (it && error == espMqttClientTypes::Error::SUCCESS) {
return true;
return false;
} else {
if (it)
_outbox.remove(it);
return false;
}
}

template <typename... Args>
bool _addPacketFront(Args &&... args) {
espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS);
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.emplaceFront(0, error, std::forward<Args>(args)...);
if (it && error == espMqttClientTypes::Error::SUCCESS)
if (it && error == espMqttClientTypes::Error::SUCCESS) {
return true;
return false;
} else {
if (it)
_outbox.remove(it);
return false;
}
}

void _checkOutbox();
Expand Down
10 changes: 10 additions & 0 deletions lib/espMqttClient/src/Outbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ class Outbox {
return false;
}

size_t size() const {
Node* n = _first;
size_t count = 0;
while (n) {
n = n->next;
++count;
}
return count;
}

private:
Node* _first;
Node* _last;
Expand Down
6 changes: 3 additions & 3 deletions lib/espMqttClient/src/Packets/Packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Packet::Packet(espMqttClientTypes::Error& error,
(password ? 2 + strlen(password) : 0);

// allocate memory
if (!_allocate(remainingLength)) {
if (!_allocate(remainingLength, false)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
Expand Down Expand Up @@ -300,8 +300,8 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type)
}


bool Packet::_allocate(size_t remainingLength) {
if (EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
bool Packet::_allocate(size_t remainingLength, bool check) {
if (check && EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
emc_log_w("Packet buffer not allocated: low memory");
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/espMqttClient/src/Packets/Packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class Packet {

private:
// pass remainingLength = total size - header - remainingLengthLength!
bool _allocate(size_t remainingLength);
bool _allocate(size_t remainingLength, bool check = true);

// fills header and returns index of next available byte in buffer
size_t _fillPublishHeader(uint16_t packetId,
Expand Down
21 changes: 16 additions & 5 deletions src/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ void Mqtt::resubscribe() {

// Main MQTT loop - sends out top item on publish queue
void Mqtt::loop() {
queuecount_ = mqttClient_->queueSize();

// exit if MQTT is not enabled or if there is no network connection
if (!connected()) {
return;
Expand All @@ -142,7 +144,7 @@ void Mqtt::loop() {
EMSESP::publish_sensor_values(false);
}

queuecount_ = mqttClient_->getQueue();
// wait for empty queue before sending scheduled device messages
if (queuecount_ > 0) {
return;
}
Expand Down Expand Up @@ -482,7 +484,7 @@ void Mqtt::on_connect() {

connecting_ = true;
connectcount_++; // count # reconnects. not currently used.
queuecount_ = 0;
queuecount_ = mqttClient_->queueSize();

load_settings(); // reload MQTT settings - in case they have changes

Expand Down Expand Up @@ -510,7 +512,7 @@ void Mqtt::on_connect() {
// publish to the last will topic (see Mqtt::start() function) to say we're alive
queue_publish_retain("status", "online", true); // with retain on

mqtt_publish_fails_ = 0; // reset fail count to 0
// mqtt_publish_fails_ = 0; // reset fail count to 0
}

// Home Assistant Discovery - the main master Device called EMS-ESP
Expand Down Expand Up @@ -590,14 +592,23 @@ bool Mqtt::queue_message(const uint8_t operation, const std::string & topic, con
if (!mqtt_enabled_ || topic.empty()) {
return false; // quit, not using MQTT
}
// check free mem
if (ESP.getFreeHeap() < 60 * 1204) {
if (operation == Operation::PUBLISH) {
mqtt_message_id_++;
mqtt_publish_fails_++;
}
LOG_DEBUG("%s failed: low memory", operation == Operation::PUBLISH ? "Publish" : operation == Operation::SUBSCRIBE ? "Subscribe" : "Unsubscribe");
return false; // quit
}

uint16_t packet_id = 0;
char fulltopic[MQTT_TOPIC_MAX_SIZE];

if (topic.find(discovery_prefix_) == 0) {
strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave topic as it is
strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave discovery topic as it is
} else {
// it's a discovery topic, added the mqtt base to the topic path
// it's not a discovery topic, added the mqtt base to the topic path
snprintf(fulltopic, sizeof(fulltopic), "%s/%s", mqtt_base_.c_str(), topic.c_str()); // uses base
}

Expand Down
2 changes: 1 addition & 1 deletion src/version.h
Original file line number Diff line number Diff line change
@@ -1 +1 @@
#define EMSESP_APP_VERSION "3.7.0-dev.1"
#define EMSESP_APP_VERSION "3.7.0-dev.2"
1 change: 1 addition & 0 deletions src/web/WebEntityService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ WebEntityService::WebEntityService(AsyncWebServer * server, FS * fs, SecurityMan
void WebEntityService::begin() {
_fsPersistence.readFromFS();
EMSESP::logger().info("Starting Custom entity service");
Mqtt::subscribe(EMSdevice::DeviceType::CUSTOM, "custom/#", nullptr); // use empty function callback
}

// this creates the entity file, saving it to the FS
Expand Down
1 change: 1 addition & 0 deletions src/web/WebSchedulerService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ WebSchedulerService::WebSchedulerService(AsyncWebServer * server, FS * fs, Secur
void WebSchedulerService::begin() {
_fsPersistence.readFromFS();
EMSESP::logger().info("Starting Scheduler service");
Mqtt::subscribe(EMSdevice::DeviceType::SCHEDULER, "scheduler/#", nullptr); // use empty function callback
}

// this creates the scheduler file, saving it to the FS
Expand Down
4 changes: 2 additions & 2 deletions src/web/WebStatusService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ void WebStatusService::webStatusService(AsyncWebServerRequest * request) {
if (Mqtt::enabled()) {
statJson = statsJson.createNestedObject();
statJson["id"] = 5;
statJson["s"] = Mqtt::publish_count();
statJson["s"] = Mqtt::publish_count() - Mqtt::publish_fails();
statJson["f"] = Mqtt::publish_fails();
statJson["q"] = Mqtt::publish_count() == 0 ? 100 : 100 - (uint8_t)((100 * Mqtt::publish_fails()) / (Mqtt::publish_count() + Mqtt::publish_fails()));
statJson["q"] = Mqtt::publish_count() == 0 ? 100 : 100 - (uint8_t)((100 * Mqtt::publish_fails()) / Mqtt::publish_count());
}

statJson = statsJson.createNestedObject();
Expand Down

0 comments on commit f60197e

Please sign in to comment.