Skip to content

Commit

Permalink
Replace MQTT_MAX_TRIES disabling feature with an increasing reconnect…
Browse files Browse the repository at this point in the history
…ion interval (issue #215)
  • Loading branch information
xoseperez committed Sep 15, 2017
1 parent 97f3557 commit 3c848d1
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 118 deletions.
8 changes: 5 additions & 3 deletions code/espurna/config/general.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,11 @@ PROGMEM const char* const custom_reset_string[] = {
#define MQTT_RETAIN true // MQTT retain flag
#define MQTT_QOS 0 // MQTT QoS value for all messages
#define MQTT_KEEPALIVE 30 // MQTT keepalive value
#define MQTT_RECONNECT_DELAY 10000 // Try to reconnect after 10s
#define MQTT_TRY_INTERVAL 30000 // Timeframe for disconnect retries
#define MQTT_MAX_TRIES 5 // After these many retries during the previous MQTT_TRY_INTERVAL the board will reset

#define MQTT_RECONNECT_DELAY_MIN 5000 // Try to reconnect in 5 seconds upon disconnection
#define MQTT_RECONNECT_DELAY_STEP 5000 // Increase the reconnect delay in 5 seconds after each failed attempt
#define MQTT_RECONNECT_DELAY_MAX 120000 // Set reconnect time to 2 minutes at most

#define MQTT_SKIP_RETAINED 1 // Skip retained messages on connection
#define MQTT_SKIP_TIME 1000 // Skip messages for 1 second anter connection

Expand Down
213 changes: 98 additions & 115 deletions code/espurna/mqtt.ino
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ WiFiClientSecure _mqtt_client_secure;
#endif // MQTT_USE_ASYNC

bool _mqtt_enabled = MQTT_ENABLED;
unsigned char _mqtt_connection_tries = 0;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
String _mqtt_topic;
String _mqtt_setter;
String _mqtt_getter;
Expand Down Expand Up @@ -216,6 +216,7 @@ void _mqttCallback(unsigned int type, const char * topic, const char * payload)
void _mqttOnConnect() {

DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;

#if MQTT_SKIP_RETAINED
_mqtt_connected_at = millis();
Expand Down Expand Up @@ -316,137 +317,134 @@ bool mqttEnabled() {

void mqttConnect() {

if (_mqtt_enabled & !_mqtt.connected()) {

// Disable MQTT after MQTT_MAX_TRIES attemps in a row
#if MQTT_MAX_TRIES > 0
static unsigned long last_try = millis();
if (millis() - last_try < MQTT_TRY_INTERVAL) {
if (++_mqtt_connection_tries > MQTT_MAX_TRIES) {
DEBUG_MSG_P(PSTR("[MQTT] MQTT_MAX_TRIES met, disabling MQTT\n"));
mqttEnabled(false);
_mqtt_connection_tries = 0;
return;
}
} else {
_mqtt_connection_tries = 0;
}
last_try = millis();
#endif
// Do not connect if disabled
if (!_mqtt_enabled) return;

if (_mqtt_user) free(_mqtt_user);
if (_mqtt_pass) free(_mqtt_pass);
// Do not connect if already connected
if (_mqtt.connected()) return;

char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str());
if (strlen(host) == 0) return;
unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();
_mqtt_user = strdup(getSetting("mqttUser", MQTT_USER).c_str());
_mqtt_pass = strdup(getSetting("mqttPassword", MQTT_PASS).c_str());
if (_mqtt_will) free(_mqtt_will);
_mqtt_will = strdup((_mqtt_topic + MQTT_TOPIC_STATUS).c_str());
// Check reconnect interval
static unsigned long last = 0;
if (millis() - last < _mqtt_reconnect_delay) return;
last = millis();

DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port);
// Increase the reconnect delay
_mqtt_reconnect_delay += MQTT_RECONNECT_DELAY_STEP;
if (_mqtt_reconnect_delay > MQTT_RECONNECT_DELAY_MAX) {
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
}

#if MQTT_USE_ASYNC
if (_mqtt_user) free(_mqtt_user);
if (_mqtt_pass) free(_mqtt_pass);

_mqtt.setServer(host, port);
_mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
_mqtt.setWill(_mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
_mqtt.setCredentials(_mqtt_user, _mqtt_pass);
}
char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str());
if (strlen(host) == 0) return;
unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();
_mqtt_user = strdup(getSetting("mqttUser", MQTT_USER).c_str());
_mqtt_pass = strdup(getSetting("mqttPassword", MQTT_PASS).c_str());
if (_mqtt_will) free(_mqtt_will);
_mqtt_will = strdup((_mqtt_topic + MQTT_TOPIC_STATUS).c_str());

#if ASYNC_TCP_SSL_ENABLED
DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port);
Serial.println(millis() / 1000);

bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
_mqtt.setSecure(secure);
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
unsigned char fp[20] = {0};
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
_mqtt.addServerFingerprint(fp);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
}
#if MQTT_USE_ASYNC

_mqtt.setServer(host, port);
_mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
_mqtt.setWill(_mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
_mqtt.setCredentials(_mqtt_user, _mqtt_pass);
}

#if ASYNC_TCP_SSL_ENABLED

bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
_mqtt.setSecure(secure);
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
unsigned char fp[20] = {0};
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
_mqtt.addServerFingerprint(fp);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
}
}

#endif // ASYNC_TCP_SSL_ENABLED
#endif // ASYNC_TCP_SSL_ENABLED

DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), MQTT_QOS);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), MQTT_RETAIN);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), MQTT_QOS);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), MQTT_RETAIN);

_mqtt.connect();
_mqtt.connect();

#else // not MQTT_USE_ASYNC
#else // not MQTT_USE_ASYNC

bool response = true;
bool response = true;

#if ASYNC_TCP_SSL_ENABLED
#if ASYNC_TCP_SSL_ENABLED

bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
if (_mqtt_client_secure.connect(host, port)) {
char fp[60] = {0};
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
if (_mqtt_client_secure.verify(fp, host)) {
_mqtt.setClient(_mqtt_client_secure);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n"));
response = false;
}
_mqtt_client_secure.stop();
yield();
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
if (_mqtt_client_secure.connect(host, port)) {
char fp[60] = {0};
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
if (_mqtt_client_secure.verify(fp, host)) {
_mqtt.setClient(_mqtt_client_secure);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n"));
response = false;
}
_mqtt_client_secure.stop();
yield();
} else {
DEBUG_MSG_P(PSTR("[MQTT] Client connection failed\n"));
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
response = false;
}

} else {
_mqtt.setClient(_mqtt_client);
DEBUG_MSG_P(PSTR("[MQTT] Client connection failed\n"));
response = false;
}

#else // not ASYNC_TCP_SSL_ENABLED

} else {
_mqtt.setClient(_mqtt_client);
}

#endif // ASYNC_TCP_SSL_ENABLED

if (response) {
#else // not ASYNC_TCP_SSL_ENABLED

_mqtt.setServer(host, port);
_mqtt.setClient(_mqtt_client);

if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_user, _mqtt_pass, _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
} else {
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
}
#endif // ASYNC_TCP_SSL_ENABLED

DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), MQTT_QOS);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), MQTT_RETAIN);
if (response) {

}
_mqtt.setServer(host, port);

if (response) {
_mqttOnConnect();
_mqtt_connected = true;
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_user, _mqtt_pass, _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
} else {
DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
}

#endif // MQTT_USE_ASYNC
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), MQTT_QOS);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), MQTT_RETAIN);

free(host);
}

}
if (response) {
_mqttOnConnect();
} else {
DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
}

#endif // MQTT_USE_ASYNC

free(host);

}

Expand All @@ -463,7 +461,6 @@ void mqttConfigure() {
_mqtt_forward = !_mqtt_getter.equals(_mqtt_setter);

// Enable
_mqtt_connection_tries = 0;
if (getSetting("mqttServer", MQTT_SERVER).length() == 0) {
mqttEnabled(false);
} else {
Expand Down Expand Up @@ -553,22 +550,14 @@ void mqttSetup() {

void mqttLoop() {

#if MQTT_USE_ASYNC
if (WiFi.status() != WL_CONNECTED) return;

if (!_mqtt_enabled) return;
if (WiFi.status() != WL_CONNECTED) return;
if (_mqtt.connected()) return;
#if MQTT_USE_ASYNC

static unsigned long last = 0;
if (millis() - last > MQTT_RECONNECT_DELAY) {
last = millis();
mqttConnect();
}
mqttConnect();

#else // not MQTT_USE_ASYNC

if (WiFi.status() != WL_CONNECTED) return;

if (_mqtt.connected()) {

_mqtt.loop();
Expand All @@ -580,13 +569,7 @@ void mqttLoop() {
_mqtt_connected = false;
}

if (_mqtt_enabled) {
static unsigned long last = 0;
if (millis() - last > MQTT_RECONNECT_DELAY) {
last = millis();
mqttConnect();
}
}
mqttConnect();

}

Expand Down

0 comments on commit 3c848d1

Please sign in to comment.