Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update Mqtt Client #1273

Merged
merged 13 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG_LATEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@

## Added

- rssi in Network Status Page

## Fixed

- issue in espMqttClient on low mem

## Changed

- mqtt free mem check 60k
2 changes: 1 addition & 1 deletion interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"@preact/compat": "^17.1.2",
"@prefresh/vite": "^2.4.1",
"@table-library/react-table-library": "4.1.7",
"@types/lodash-es": "^4.17.8",
"@types/lodash-es": "^4.17.9",
"@types/node": "^20.5.7",
"@types/react": "^18.2.21",
"@types/react-dom": "^18.2.7",
Expand Down
2 changes: 1 addition & 1 deletion interface/src/framework/network/NetworkStatusForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ const NetworkStatusForm: FC = () => {
<SettingsInputAntennaIcon />
</Avatar>
</ListItemAvatar>
<ListItemText primary="SSID" secondary={data.ssid} />
<ListItemText primary="SSID (RSSI)" secondary={data.ssid + ' (' + data.rssi + ' dBm)'} />
</ListItem>
<Divider variant="inset" component="li" />
</>
Expand Down
10 changes: 5 additions & 5 deletions interface/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1339,12 +1339,12 @@ __metadata:
languageName: node
linkType: hard

"@types/lodash-es@npm:^4.17.8":
version: 4.17.8
resolution: "@types/lodash-es@npm:4.17.8"
"@types/lodash-es@npm:^4.17.9":
version: 4.17.9
resolution: "@types/lodash-es@npm:4.17.9"
dependencies:
"@types/lodash": "*"
checksum: 950771d406c842814dd22217adba5e01bd06b3c21e97900d3c3816f38580e132894400b5d83a7962645fa284d8478614bdcc50755255ad15024311b7b8ed8520
checksum: 9fe82df0ec14e2aad50a1bf6488c4457e3378fcc77f5806fbc8035904ef0848b70e50037b13d9bddb66d3a30b425d2998a4a438a5024efe7431b63fde0920378
languageName: node
linkType: hard

Expand Down Expand Up @@ -1586,7 +1586,7 @@ __metadata:
"@prefresh/vite": ^2.4.1
"@table-library/react-table-library": 4.1.7
"@types/babel__core": ^7
"@types/lodash-es": ^4.17.8
"@types/lodash-es": ^4.17.9
"@types/node": ^20.5.7
"@types/react": ^18.2.21
"@types/react-dom": ^18.2.7
Expand Down
34 changes: 13 additions & 21 deletions lib/espMqttClient/src/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ bool MqttClient::disconnected() const {
}

bool MqttClient::connect() {
bool result = true;
bool result = false;
if (_state == State::disconnected) {
EMC_SEMAPHORE_TAKE();
if (_addPacketFront(_cleanSession,
Expand All @@ -116,19 +116,21 @@ bool MqttClient::connect() {
_willPayloadLength,
(uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally
_clientId)) {
result = true;
_state = State::connectingTcp1;
#if defined(ARDUINO_ARCH_ESP32)
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
vTaskResume(_taskHandle);
}
#endif
_state = State::connectingTcp1;
} else {
EMC_SEMAPHORE_GIVE();
emc_log_e("Could not create CONNECT packet");
_onError(0, Error::OUT_OF_MEMORY);
result = false;
}
EMC_SEMAPHORE_GIVE();
} else if (_state <= State::connected) { // already connected or connecting
result = true;
}
return result;
}
Expand Down Expand Up @@ -196,6 +198,14 @@ const char * MqttClient::getClientId() const {
return _clientId;
}

size_t MqttClient::queueSize() {
size_t ret = 0;
EMC_SEMAPHORE_TAKE();
ret = _outbox.size();
EMC_SEMAPHORE_GIVE();
return ret;
}

void MqttClient::loop() {
switch ((State)_state) { // modified by proddy for EMS-ESP compiling standalone
case State::disconnected:
Expand Down Expand Up @@ -335,7 +345,6 @@ int MqttClient::_sendPacket() {
size_t wantToWrite = 0;
size_t written = 0;
if (packet && (wantToWrite == written)) {
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->packet.available(_bytesSent);
if (wantToWrite == 0) {
EMC_SEMAPHORE_GIVE();
Expand Down Expand Up @@ -630,9 +639,6 @@ void MqttClient::_onPubcomp() {
// if it doesn't match the ID, return
if ((it.get()->packet.packetType()) == PacketType.PUBREL) {
if (it.get()->packet.packetId() == idToMatch) {
// if (!_addPacket(PacketType.PUBCOMP, idToMatch)) {
// emc_log_e("Could not create PUBCOMP packet");
// }
callback = true;
_outbox.remove(it);
break;
Expand Down Expand Up @@ -697,20 +703,6 @@ void MqttClient::_onUnsuback() {
}
}

uint16_t MqttClient::getQueue() const {
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
uint16_t count = 0;
while (it) {
// if (it.get()->packet.packetType() == PacketType.PUBLISH) {
++count;
// }
++it;
}
EMC_SEMAPHORE_GIVE();
return count;
}

void MqttClient::_clearQueue(int clearData) {
emc_log_i("clearing queue (clear session: %d)", clearData);
EMC_SEMAPHORE_TAKE();
Expand Down
23 changes: 16 additions & 7 deletions lib/espMqttClient/src/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class MqttClient {
uint16_t publish(const char * topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length);
void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true!
const char * getClientId() const;
uint16_t getQueue() const;
size_t queueSize(); // No const because of mutex
void loop();

protected:
Expand Down Expand Up @@ -131,8 +131,9 @@ class MqttClient {
uint32_t timeSent;
espMqttClientInternals::Packet packet;
template <typename... Args>
OutgoingPacket(uint32_t t, espMqttClientTypes::Error error, Args &&... args)
: timeSent(t)
OutgoingPacket(uint32_t t, espMqttClientTypes::Error & error, Args &&... args)
: // NOLINT(runtime/references)
timeSent(t)
, packet(error, std::forward<Args>(args)...) {
}
};
Expand All @@ -150,18 +151,26 @@ class MqttClient {
bool _addPacket(Args &&... args) {
espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS);
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.emplace(0, error, std::forward<Args>(args)...);
if (it && error == espMqttClientTypes::Error::SUCCESS)
if (it && error == espMqttClientTypes::Error::SUCCESS) {
return true;
return false;
} else {
if (it)
_outbox.remove(it);
return false;
}
}

template <typename... Args>
bool _addPacketFront(Args &&... args) {
espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS);
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.emplaceFront(0, error, std::forward<Args>(args)...);
if (it && error == espMqttClientTypes::Error::SUCCESS)
if (it && error == espMqttClientTypes::Error::SUCCESS) {
return true;
return false;
} else {
if (it)
_outbox.remove(it);
return false;
}
}

void _checkOutbox();
Expand Down
10 changes: 10 additions & 0 deletions lib/espMqttClient/src/Outbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ class Outbox {
return false;
}

size_t size() const {
Node* n = _first;
size_t count = 0;
while (n) {
n = n->next;
++count;
}
return count;
}

private:
Node* _first;
Node* _last;
Expand Down
6 changes: 3 additions & 3 deletions lib/espMqttClient/src/Packets/Packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Packet::Packet(espMqttClientTypes::Error& error,
(password ? 2 + strlen(password) : 0);

// allocate memory
if (!_allocate(remainingLength)) {
if (!_allocate(remainingLength, false)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
Expand Down Expand Up @@ -300,8 +300,8 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type)
}


bool Packet::_allocate(size_t remainingLength) {
if (EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
bool Packet::_allocate(size_t remainingLength, bool check) {
if (check && EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
emc_log_w("Packet buffer not allocated: low memory");
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/espMqttClient/src/Packets/Packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class Packet {

private:
// pass remainingLength = total size - header - remainingLengthLength!
bool _allocate(size_t remainingLength);
bool _allocate(size_t remainingLength, bool check = true);

// fills header and returns index of next available byte in buffer
size_t _fillPublishHeader(uint16_t packetId,
Expand Down
21 changes: 16 additions & 5 deletions src/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ void Mqtt::resubscribe() {

// Main MQTT loop - sends out top item on publish queue
void Mqtt::loop() {
queuecount_ = mqttClient_->queueSize();

// exit if MQTT is not enabled or if there is no network connection
if (!connected()) {
return;
Expand All @@ -142,7 +144,7 @@ void Mqtt::loop() {
EMSESP::publish_sensor_values(false);
}

queuecount_ = mqttClient_->getQueue();
// wait for empty queue before sending scheduled device messages
if (queuecount_ > 0) {
return;
}
Expand Down Expand Up @@ -482,7 +484,7 @@ void Mqtt::on_connect() {

connecting_ = true;
connectcount_++; // count # reconnects. not currently used.
queuecount_ = 0;
queuecount_ = mqttClient_->queueSize();

load_settings(); // reload MQTT settings - in case they have changes

Expand Down Expand Up @@ -510,7 +512,7 @@ void Mqtt::on_connect() {
// publish to the last will topic (see Mqtt::start() function) to say we're alive
queue_publish_retain("status", "online", true); // with retain on

mqtt_publish_fails_ = 0; // reset fail count to 0
// mqtt_publish_fails_ = 0; // reset fail count to 0
}

// Home Assistant Discovery - the main master Device called EMS-ESP
Expand Down Expand Up @@ -590,14 +592,23 @@ bool Mqtt::queue_message(const uint8_t operation, const std::string & topic, con
if (!mqtt_enabled_ || topic.empty()) {
return false; // quit, not using MQTT
}
// check free mem
if (ESP.getFreeHeap() < 60 * 1204) {
if (operation == Operation::PUBLISH) {
mqtt_message_id_++;
mqtt_publish_fails_++;
}
LOG_DEBUG("%s failed: low memory", operation == Operation::PUBLISH ? "Publish" : operation == Operation::SUBSCRIBE ? "Subscribe" : "Unsubscribe");
return false; // quit
}

uint16_t packet_id = 0;
char fulltopic[MQTT_TOPIC_MAX_SIZE];

if (topic.find(discovery_prefix_) == 0) {
strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave topic as it is
strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave discovery topic as it is
} else {
// it's a discovery topic, added the mqtt base to the topic path
// it's not a discovery topic, added the mqtt base to the topic path
snprintf(fulltopic, sizeof(fulltopic), "%s/%s", mqtt_base_.c_str(), topic.c_str()); // uses base
}

Expand Down
2 changes: 1 addition & 1 deletion src/version.h
Original file line number Diff line number Diff line change
@@ -1 +1 @@
#define EMSESP_APP_VERSION "3.7.0-dev.1"
#define EMSESP_APP_VERSION "3.7.0-dev.2"
1 change: 1 addition & 0 deletions src/web/WebEntityService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ WebEntityService::WebEntityService(AsyncWebServer * server, FS * fs, SecurityMan
void WebEntityService::begin() {
_fsPersistence.readFromFS();
EMSESP::logger().info("Starting Custom entity service");
Mqtt::subscribe(EMSdevice::DeviceType::CUSTOM, "custom/#", nullptr); // use empty function callback
}

// this creates the entity file, saving it to the FS
Expand Down
1 change: 1 addition & 0 deletions src/web/WebSchedulerService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ WebSchedulerService::WebSchedulerService(AsyncWebServer * server, FS * fs, Secur
void WebSchedulerService::begin() {
_fsPersistence.readFromFS();
EMSESP::logger().info("Starting Scheduler service");
Mqtt::subscribe(EMSdevice::DeviceType::SCHEDULER, "scheduler/#", nullptr); // use empty function callback
}

// this creates the scheduler file, saving it to the FS
Expand Down
4 changes: 2 additions & 2 deletions src/web/WebStatusService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ void WebStatusService::webStatusService(AsyncWebServerRequest * request) {
if (Mqtt::enabled()) {
statJson = statsJson.createNestedObject();
statJson["id"] = 5;
statJson["s"] = Mqtt::publish_count();
statJson["s"] = Mqtt::publish_count() - Mqtt::publish_fails();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statJson["s"] is for success, publish_count() is all messages. Same correction for quality calculation.

statJson["f"] = Mqtt::publish_fails();
statJson["q"] = Mqtt::publish_count() == 0 ? 100 : 100 - (uint8_t)((100 * Mqtt::publish_fails()) / (Mqtt::publish_count() + Mqtt::publish_fails()));
statJson["q"] = Mqtt::publish_count() == 0 ? 100 : 100 - (uint8_t)((100 * Mqtt::publish_fails()) / Mqtt::publish_count());
}

statJson = statsJson.createNestedObject();
Expand Down