Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT doesn't detect broker outage and overflows message queue #1772

Closed
romansavrulin opened this issue Feb 1, 2017 · 13 comments
Closed

MQTT doesn't detect broker outage and overflows message queue #1772

romansavrulin opened this issue Feb 1, 2017 · 13 comments

Comments

@romansavrulin
Copy link

romansavrulin commented Feb 1, 2017

Is there any way to know MQTT message queue size or limit it? I use the code listed below to connect to mosquito, handle reconnects and publish dumb test timestamps. And it works fine under stable conditions. But together with #1731 and possibly #1680 this code fails when I disconnect broker from the network and eating up all heap memory. @pjsg , Please can you take a look at that?

Expected behavior

When MQTT client detects broker gone away for some reason (e.g. timeout, which set to 10 seconds in this example) fire any of failure callbacks to stop sending.

Also it will be great to have any (or all) of the following

  • some method to check message queue size or
  • limit it (say in client constructor method) and have an MQ overflow callback method
  • have message queue clear methods
  • have not only publish success callback but also publish failed callback
  • if the message was postponed to be delivered later, return pointer to that message object inside the queue in order to manually remove certain messages from queue

Actual behavior

this doesn't work at all

m:on("offline", function(client)
    publishMqtt:stop()
    print("MQTT: offline")
end)

Failure callback inside the MQTT constructor haven't trigger either

//normal work, I manually trigger heap command on console

> =node.heap()
37272
> =node.heap()
39000
> =node.heap()
37272
> =node.heap()
39000

.... //I disconnected broker from network to test reconnect logic (the script worked fine for about 3 hours before that, giving 39Kb free heap space)

> =node.heap()
15208
> =node.heap()
15136
> =node.heap()

.... //Still no recovery (about 2 minutes later)

448
> =node.heap()
448
> =node.heap()
E:M 48
not enough memory
> =node.heap()
E:M 40
not enough memory
> E:M 32
PUB: timestamp publish failed


.... //Crash into reason

=node.heap()
E:M 48
PANIC: unprotected error in call to Lua API (not enough memory)

Test code

WIFI_SSID = "SSID"
WIFI_PASS = "PWD"
MQTT_BrokerIP = "192.168.10.140"
MQTT_BrokerPort = 1883
MQTT_ClientID = "esp-001"
MQTT_Client_user = "user"
MQTT_Client_password = "password"

wifi.setmode(wifi.STATION)
wifi.sta.config(WIFI_SSID, WIFI_PASS)
wifiStatusPrev = wifi.STA_IDLE
print("WiFi: Connecting to "..WIFI_SSID)
wifi.sta.connect()

--wifi reconnect logic timer
tmr.alarm(0, 500, 1, function()
    wifiStatus = wifi.sta.status()
    if wifiStatus ~= wifiStatusPrev then
        print("WiFi: status change "..wifiStatusPrev.."->"..wifiStatus)
    end
    if (wifiStatusPrev ~= 5 and wifiStatus == 5) then 
        print("WiFi: connected")
        print(wifi.sta.getip())
    elseif (wifiStatus ~= 1 and wifiStatus ~= 5) then
      print("WiFi: Reconnecting to "..WIFI_SSID)
      wifi.sta.connect()
    end
    wifiStatusPrev = wifiStatus
end)

m = mqtt.Client(MQTT_ClientID, 10, MQTT_Client_user, MQTT_Client_password)

m:on("message", function(client, topic, data) 
    print("MQTT: "..topic .. ":" ) 
    if data ~= nil then
        print(data)
    end
end)

m:on("offline", function(client)
    publishMqtt:stop()
    print("MQTT: offline")
end)

--MQTT reconnect logic timer
reconnMqtt = tmr.create()

reconnMqtt:register(10, tmr.ALARM_SEMI, function (t)
  reconnMqtt:interval(500);
  print("MQTT: trying to connect to "..MQTT_BrokerIP..":"..MQTT_BrokerPort);
  m:close()
  m:connect(MQTT_BrokerIP, MQTT_BrokerPort, 0, 1, function(client) 
    print("MQTT: connected")
    publishMqtt:start()
    print("PUB: started")
 end, function(client, reason) 
    publishMqtt:stop()
    print("MQTT: connection failed with reason "..reason)
    reconnMqtt:start()
 end)
end)

--MQTT local timestamp publishing timer
publishMqtt = tmr.create()
publishMqtt:register(500, tmr.ALARM_AUTO, function (t)
  if not m:publish("/ESP/timestamp", tmr.time().."."..((tmr.now()/1000)%1000), 0, 0)
  then
    print("PUB: timestamp publish failed");
  end
end)

reconnMqtt:start()

NodeMCU version

NodeMCU custom build by frightanic.com
branch: master
commit: 81ec366
SSL: false
modules: adc,dht,file,gpio,mqtt,net,node,ow,tmr,uart,wifi
build built on: 2017-01-31 20:44
powered by Lua 5.1.4 on SDK 1.5.4.1(39cb9a32)

Hardware

NodeMCU LUA Amica R2. I guess, it can be reproduced on any ESP8266 module.

@romansavrulin
Copy link
Author

romansavrulin commented Feb 1, 2017

Update:

If I disable reconnect option and modify logic like this

m = mqtt.Client(MQTT_ClientID, 10, MQTT_Client_user, MQTT_Client_password)

m:on("offline", function(client)
    publishMqtt:stop()
    reconnMqtt:start()
    print("MQTT: offline")
end)

--MQTT reconnect logic timer
reconnMqtt = tmr.create()

reconnMqtt:register(10, tmr.ALARM_SEMI, function (t)
  reconnMqtt:interval(500);
  print("MQTT: trying to connect to "..MQTT_BrokerIP..":"..MQTT_BrokerPort);
  m:close()
  m:connect(MQTT_BrokerIP, MQTT_BrokerPort, 0, 0, function(client) 
    print("MQTT: connected")
    publishMqtt:start()
    print("PUB: started")
 end, function(client, reason) 
    publishMqtt:stop()
    print("MQTT: connection failed with reason "..reason)
    reconnMqtt:start()
 end)
end)

i.e. explicitly implement reconnect logic in my code, I get the script stop, because of not triggered any failure callback, while device still responds to wifi events and console

NodeMCU custom build by frightanic.com
	branch: master
	commit: 81ec3665cb5fe68eb8596612485cc206b65659c9
	SSL: false
	modules: adc,dht,file,gpio,mqtt,net,node,ow,tmr,uart,wifi
 build 	built on: 2017-01-31 20:44
 powered by Lua 5.1.4 on SDK 1.5.4.1(39cb9a32)
WiFi: Connecting to SSID
> MQTT: trying to connect to 192.168.10.140:1883
WiFi: status change 0->1
WiFi: status change 1->5
WiFi: connected
192.168.10.198	255.255.255.0	192.168.10.1
MQTT: connection failed with reason -5
MQTT: trying to connect to 192.168.10.140:1883
MQTT: connection failed with reason -5
MQTT: trying to connect to 192.168.10.140:1883

// script stopped here and hanged about an hour, because no fail callback was called

=node.heap() // I ask this in the console
39360
> =node.heap()
39360
// I switched off WiFi router
> WiFi: status change 5->1
WiFi: status change 1->3
WiFi: Reconnecting to VRealnosti
WiFi: status change 3->1
WiFi: status change 1->3
WiFi: Reconnecting to VRealnosti
WiFi: status change 3->1
WiFi: status change 1->5
WiFi: connected
192.168.10.198	255.255.255.0	192.168.10.1

//still no activity
=node.heap()
39296
> 

@devyte
Copy link

devyte commented Feb 1, 2017 via email

@romansavrulin
Copy link
Author

Same on dev with first test case

NodeMCU custom build by frightanic.com
	branch: dev
	commit: 419ec3384ac332d035623d93a36994ca24a6d41b
	SSL: false
	modules: adc,dht,file,gpio,mqtt,net,node,ow,tmr,uart,wifi
 build 	built on: 2017-02-01 19:01
 powered by Lua 5.1.4 on SDK 2.0.0(656edbf)
WiFi: Connecting to VRealnosti
> MQTT: trying to connect to 192.168.10.140:1883
WiFi: status change 0->1
WiFi: status change 1->5
WiFi: connected
192.168.10.198	255.255.255.0	192.168.10.1
MQTT: connection failed with reason -5
MQTT: trying to connect to 192.168.10.140:1883
MQTT: connected
PUB: started
... //just unplugging MQTT broker from network
=node.heap()
35432
> =node.heap()
35368
> =node.heap()
35296
... //to be short
> =node.heap()
3344
> E:M 40
PUB: timestamp publish failed
E:M 40
...
E:M 40
PUB: timestamp publish failed
=node.heap()
E:M 40
PANIC: unprotected error in call to Lua API (not enough memory)

 ets Jan  8 2013,rst cause:2, boot mode:(3,7)

load 0x40100000, len 26160, room 16 
tail 0
chksum 0xb3
load 0x3ffe8000, len 2192, room 8 
tail 8
chksum 0x97
load 0x3ffe8890, len 136, room 0 
tail 8
chksum 0x5b
csum 0x5b

@marcelstoer
Copy link
Member

#1683 hasn't landed yet.

@romansavrulin
Copy link
Author

@marcelstoer, @devyte second test (with callbacks only) confirms that. Is #1683 planned to go into 2.0.0 release?

NodeMCU custom build by frightanic.com
	branch: dev
	commit: 419ec3384ac332d035623d93a36994ca24a6d41b
	SSL: false
	modules: adc,dht,file,gpio,mqtt,net,node,ow,tmr,uart,wifi
 build 	built on: 2017-02-01 19:01
 powered by Lua 5.1.4 on SDK 2.0.0(656edbf)
lua: cannot open init.lua
> dofile("mqtt.lua")
WiFi: Connecting to SSID
> MQTT: trying to connect to 192.168.10.140:1883
WiFi: status change 0->1
WiFi: status change 1->5
WiFi: connected
192.168.10.198	255.255.255.0	192.168.10.1
MQTT: connection failed with reason -5
MQTT: trying to connect to 192.168.10.140:1883
MQTT: connection failed with reason -5
MQTT: trying to connect to 192.168.10.140:1883
//stop
=node.heap()
39016
...
> print(tmr.time().."."..((tmr.now()/1000)%1000))
911.788
> 

@marcelstoer
Copy link
Member

It will be merged to dev sometime after the next master drop - which is due today 😉. So, no, it won't go into 2.0.

See https://github.com/nodemcu/nodemcu-firmware/#releases for a brief description of our release "process".

@romansavrulin
Copy link
Author

romansavrulin commented Feb 1, 2017

Ok, thank you!

What can you say about methods to determine publish queue state I proposed in the first post? How one can manage that queue to prevent memleak? Should I create a separate feature request for that proposal?

@pjsg
Copy link
Member

pjsg commented Feb 2, 2017

There is no current way to figure out what is going on with the current mqtt implementation. It is all a bit of a mess.... Every time I dig into it, I find something more....

What do you think that the behavior ought to be when connection is lost to the server (even if that fact hasn't yet been discovered). My inclination is that we ought to have a max backlog of messages (settable when the client is created), and the publishing of a message would fail (either with an error code, or throwing an error).

There needs to be a design doc for how the mqtt client is supposed to work -- then the implementation can probably be adjusted to match the specification.

@romansavrulin
Copy link
Author

romansavrulin commented Feb 2, 2017

@pjsg The first thing that should be clarified is ambiguous callback system. What we have now is two set of them: 1st inside mqtt.connect method and 2nd inside mqtt.client:on(). What for? I cannot say clearly what callbacks will be triggered and when. It seems that in current releases mqtt.connect callbacks overrides ones inside mqtt.on. I personally prefer shorter and clear function calls, so will vote for mqtt.on methods. It will be also useful to either extend on offline with error code, or declare new event on error to make that API more expressive.

Then

  • on offline will be called on any disconnect event despite the reason, giving the user quick way to understand that connection has been lost and reconnection is needed
  • on error will be called on any error inside mqtt and give more detailed explanation of what happened, including disconnects, broker gone away (timeouts), unability to publish messages, queue overflows, etc... Certain event will be tagged by error code, just like in failure callback on connect method

BTW, I've checked the following script:

m:on("offline", function(client)
    publishMqtt:stop()
    print("MQTT: connection failed with reason "..reason)
    reconnMqtt:start()
end)

m:on("connect", function(client)
    print("MQTT: connected")
    publishMqtt:start()
    print("PUB: started")
end)

--MQTT reconnect logic timer
reconnMqtt = tmr.create()

reconnMqtt:register(10, tmr.ALARM_SEMI, function (t)
  reconnMqtt:interval(500);
  print("MQTT: trying to connect to "..MQTT_BrokerIP..":"..MQTT_BrokerPort);
  m:close()
  m:connect(MQTT_BrokerIP, MQTT_BrokerPort, 0[, 1])
end)

reconnMqtt:start()

and found out that offline callback is not called on connection errors on dev branch (due to #1683 ?). Reconnect flag doesn't affect that. I'll try to build #1683 locally and test it too.

p.s. @marcelstoer @pjsg , where can we discuss possible API/implementation changes for MQTT in more detail?

@pjsg
Copy link
Member

pjsg commented Mar 22, 2017

The MQTT api is weird (as you note). My feeling is that we aren't going to make progress on this until there is a concrete proposal for the "fixed" API -- documented somewhere. This could be as simple as forking the repo, and then committing a new version of mqtt.md into a private repo that we could all look at and work on.

My two current nodemcu projects are not using mqtt, so I'm not highly motivated to drive this.

@BugerDread
Copy link

BugerDread commented Mar 24, 2017

Hi, I found this conversation while googling around similar problem - my node was not able to recognize that broker is unavailable and after some time running without broker it crashed and rebooted - againg and again, until broker becomes online. So I used your testcode (with a little modifications) to check if it is really because of that:

m:on("offline", function(client)

is never triggered or there is another bug in my code

In my case I get success with FW 1.5.4.1-final. With 2.0.0 master / dev it tooks around 2.5minutes for the node to find out that broker is offline, m:on("offline")... was never fired, but "callback function for when the connection could not be established" (means the last one in "m:connect") was fired and reconnection started

see my notes for more details:
http://buger.dread.cz/nodemcu-mqtt-client-doesnt-detect-broker-outage-and-overflows-message-queue.html

Im rewriting my code to do following with every published message:

  • start timeout timer (lets say 1sec)
  • publish the message, in ACK callback just stop the timeout timer
  • if timeout timer expires, it means that there was problem with broker - stop publishing, reconnect and start publishing again if reconnect succesfull
    ... so far it seems to work fine

@NicolSpies
Copy link

Hi, I also experienced that the "on offline" event does not fire when the connection is lost. I believe the 2.5 minutes might be when the lwt is published by the broker when the broker stops receiving the keep alive heartbeat from the mqtt client and the broker then closes the connection. This then fires the offline event.

I explored it further #1406 and found the control reports that it is connected but becomes unresponsive after time of inactivity resulting in a publish failure that can only be corrected by restarting the ESP and re-establishing a new MQTT connection.

When the connection becomes "stale" re-connection is also not possible as the mqtt client still reports that it is connected to the broker. This is before the broker closes the connection.

I devised similar workarounds:

  • I simulated activity by doing "dummy" publishing to a nonexistent topic every minute, a true return indicates that the control/connection is alive. This action prevented the connection becoming "stale".
  • I used a 3G network and found that "standard" 3G network phenomena causes temporary broker connectivity issues. This often corrects itself so I only reboot the ESP after 2-3 consecutive dummy publish failures.

@BugerDread
Copy link

Here is updated info about my investigation around this bug...

http://buger.dread.cz/updated-nodemcu-mqtt-client-doesnt-detect-broker-outage-and-overflows-message-queue.html

In short - Im going back to 1.5.4.1, because it seems does not suffer with this bug and overall uses less heap. Will see if 2.0.0.0 will be fixed in future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants