Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT: Message Queue #975

Closed
efuturetoday opened this issue Jan 25, 2016 · 18 comments
Closed

MQTT: Message Queue #975

efuturetoday opened this issue Jan 25, 2016 · 18 comments

Comments

@efuturetoday
Copy link

Hello Dev-Team,
i really like your awesome framework and want to thank you first in place for your hard work!
I want to push sensor data to a mqtt broker in a certain period of time over and over.
Therefore i created a semi automated timer and start the timer again after i pushed the message with

mqtt.client:publish("/data", json.encode(data), 0, 0, function() tmr.start(0) end)  

I noticed that the timer would stop working after some time - because the callback get only called when PUBACK is received. With QoS 1 the same issue.
If i start the timer over and over like this:

mqtt.client:publish("/data", json.encode(data), 0, 0, function() end)
tmr.start(0)

After some time i will face a heap - overflow, because the message is added to the message queue.
Is there a way to remove the previous message from the mqtt-queue maybe with an id returned by
mqtt.client:publish or a implement a callback that is fired when the message is removed from the message queue?

Greetings!

@devsaurus
Copy link
Member

@efuturetoday no, there is no way to remove published messages from the queue.

You definitely have to use QoS=1 to rely on the callback as explained here #561 (comment). Your report seems to tell that you at least tried QoS=1 once? This should have done the job. Please post the full script, eg as a gist.

@efuturetoday
Copy link
Author

CONFIG = {
    WIFI = {
        SSID = "xxxxxxxxxxx",
        PASSWORD = "xxxxxxxxxx"
    },
    MQTT = {
        SERVER = "192.168.1.156",
        PORT = 1883,
        USER = "user",
        PASSWORD = "password",
        KEEPALIVE = 10
    }
}

SCAN_CFG = {
--    ssid = CONFIG.WIFI.SSID,
    channel = 0,
    show_hidden = 1
}

SERVICES = {
    _services = {},
    _callbacks = {},
    add = function(name, type, callback)
        SERVICES._services[node.chipid()..":"..name] = {
            type = type
        }
        SERVICES._callbacks[name] = callback
    end,
    perform = function(name, args, mailbox)
        print("[MQTT] should perform "..name)
        local ok, err = pcall(function()
            assert(SERVICES._callbacks[name] ~= nil, "service not found on host")
            SERVICES._callbacks[name](args)
        end)
        local data = {}
        data.mailbox = mailbox
        if (ok) then
            data.status = "OK"
        else
            data.status = "ERROR"
            data.message = err
        end
        m:publish("/notify", cjson.encode(data), 1, 0, function(conn) print("[MQTT] notify sent! "..cjson.encode(data)) end)
    end,
    toJSON = function() 
        return cjson.encode(SERVICES._services)
    end
}

RUNNERS = require("runner")

function endsWith(String,End)
   return End=='' or string.sub(String,-string.len(End))==End
end

function publishData(stream, value, id)
    local packet = {
        stream = stream,
        value = value
    }

    print("PUBLISH: "..stream)
    m:publish("/data", cjson.encode(packet), 1, 0, function(client)
        print("y: "..stream)
        tmr.start(id)
    end)
end
function publishAPData(t)
    local found = false
    for bssid,v in pairs(t) do
        local ssid, rssi, authmode, channel = string.match(v, "([^,]+),([^,]+),([^,]+),([^,]*)")
        print(ssid)
        if (ssid == CONFIG.WIFI.SSID) then
            publishData("wifi", {
                ssid = ssid,
                bssid = bssid,
                rssi = rssi,
                authmode = authmode,
                channel = channel
            }, 2)
            return
        end
    end
    if (found == false) then
        print("AP NOT FOUND!")
    end
end

-- CONSTANT SERVICES AND RUNNERS
SERVICES.add("relay1", "relay", function(args)
    assert(args ~= nil, "relay need arguments")
    print("relay1")
end)

RUNNERS.add(1, 10, function(id)
    publishData("heap", node.heap(), id)
end)

RUNNERS.add(2, 20000, function(id)
    --check for specific AP
    wifi.sta.getap(SCAN_CFG, 1, publishAPData)
end)

-- START RUNTIME LOGIC
-- init mqtt client with keepalive timer 120sec, cleanSession
m = mqtt.Client("clientid", CONFIG.MQTT.KEEPALIVE, CONFIG.MQTT.USER, CONFIG.MQTT.PASSWORD, 1) --1 auto reconnect
m:on("connect", function(con)
    print("[MQTT] connected!")

    -- build identification card
    local majorVer, minorVer, devVer, chipId, flashId, flashSize, flashMode, flashSpeed = node.info()

    local rawcode, reason = node.bootreason()
    local packet = {
        card = {
            rawcode = rawcode,
            reason = reason,
            majorVer = majorVer,
            minorVer = minorVer,
            devVer = devVer,
            chipId = chipId,
            flashId = flashId,
            flashSize = flashSize,
            flashMode = flashMode,
            flashSpeed = flashSpeed,
            ip = wifi.sta.getip(),
            mac = wifi.sta.getmac()
        },
        services = SERVICES._services
    }
    m:publish("/identify", cjson.encode(packet), 1, 0, function(conn) print("[MQTT] identification sent!") end)
    m:subscribe(chipId.."/perform", 1, function() print("[MQTT] service listener registrated!") end)

    RUNNERS.start()
end)
m:on("offline", function(conn)
    print("[MQTT] now offline!")

    RUNNERS.stop()
end)
m:on("message", function(conn, topic, data)
  print("[MQTT] message: "..topic..":")
  if (data ~= nil) then
    print("[MQTT] raw data: "..data)
    if (endsWith(topic, "/perform")) then
        local ok, packet = pcall(cjson.decode, data)
        if (ok) then
            SERVICES.perform(packet.service, packet.args, packet.mailbox)
        else
            print("[MQTT] failed to decode packet!")
        end
    end
  end
end)

wifi.setmode(wifi.STATION)
wifi.sta.config(CONFIG.WIFI.SSID, CONFIG.WIFI.PASSWORD, 1)
wifi.sta.eventMonReg(wifi.STA_CONNECTING, function(previousState)
    if (previousState == wifi.STA_GOTIP) then 
        print("[WIFI] lost connection with AP\n\tAttempting to reconnect...")
    else
        print("[WIFI] connecting to AP: "..CONFIG.WIFI.SSID)
    end
end)
wifi.sta.eventMonReg(wifi.STA_GOTIP, function()
    print("[WIFI] got IP: "..wifi.sta.getip())
    -- m:connect(host, port, secure, auto_reconnect, function(client) end)
    m:close()
    m:connect(CONFIG.MQTT.SERVER, CONFIG.MQTT.PORT, 0, 1)
end)
wifi.sta.eventMonStart()

AND THE runner.lua:

local M = {} --public interface
local timers = {}

function M.add(id, intervallMS, callback)
    timers[id] = true
    tmr.register(id, intervallMS, tmr.ALARM_SEMI, function()
        local ok, err = pcall(callback, id)
        if (ok == false) then
            print("[RUNNER] Error calling callback of timer "..id..": "..err.."\nrestart timer...")
            tmr.start(id)
        end
    end)
end

function M.start()
    M.stop() -- just to be sure
    for key, value in pairs(timers) do
        tmr.start(key)
    end
end

function M.stop()
    for key, value in pairs(timers) do
        tmr.stop(key)
    end
end


return M

@devsaurus you are right first i had QoS set to 1 but then i experienced that the "runner" stopped working after some time and the callback with tmr.start(id) did not get fired.
So there is not always 100% guarantee that the callback is fired even with QoS 1 ?

@devsaurus
Copy link
Member

So there is not always 100% guarantee that the callback is fired even with QoS 1?

The callback should be fired reliably with QoS > 0. If that's not the case then we'd look at some kind of bug. This is what I like figure out.

But your application is quite complex and contains concurrent scheduling of m:publish() commands. This might be the root cause of the inconsistencies you're facing. Here the most important rule is: Don't start subsequent m:publish() commands but wait with the next one until the previous one finished. This is typically achieved by nesting them in the publish callbacks. For example:

    m:publish("/identify", cjson.encode(packet), 1, 0, function(conn)
        RUNNERS.start()
        print("[MQTT] identification sent!")
    end)
    m:subscribe(chipId.."/perform", 1, function() print("[MQTT] service listener registrated!") end)

    -- don't start the RUNNERS immediately after m:publish()...
end)

Handling the concurrency properly is not simple, I assume.

So for further testing please strip down your code to comply with the rule above and make sure that two or more m:publish() are not scheduled in parallel. If the issue with unreliable firing of the callback remains then we can focus on a potential issue in the firmware.

@markfinn
Copy link

Here the most important rule is: Don't start subsequent m:publish() commands but wait with the next one until the previous one finished. This is typically achieved by nesting them in the publish callbacks.

You definitely have to use QoS=1 to rely on the callback

If I'm understanding your two statements together, if you publish with a qos=0, you may not ever be able to call publish again. The other issue you linked to mentioned a timeout, but you seem to be saying otherwise here... help?

@devsaurus
Copy link
Member

@markfinn the statements are related, yes.

if you publish with a qos=0, you may not ever be able to call publish again

Correct if you look at a typical solution.
You can't rely on the callback for QoS=0 because the PUBACK might come too late or not at all (this is a "feature" of QoS=0, I guess). Thus the typical implementation won't publish again. You'd need to extend it and establish some sort of error recovery logic to handle timeouts and resume publishing afterwards. Feasible, but not recommended unless you really have to stick with QoS=0. QoS>0 makes life much easier.

But we're already deep in FAQ topics here. It mentions the asynchronous nature of conn:send() and the same consideration applies to m:publish().

@markfinn
Copy link

But we're already deep in FAQ topics here

No, we're not. I understand asynchronous events. I'm trying to work out if your two statements are actually true because if they are then once you've sent a publish with a QOS=0 and it gets lost, you can never publish again, and I guess you would need to tear down the mqtt connection and restart. I hope your second statement about a timeout is possible because that would be much nicer than having to reconnect to the server, but it is in contradiction to your statement that:

Here the most important rule is: Don't start subsequent m:publish() commands but wait with the next one until the previous one finished.

@pjsg
Copy link
Member

pjsg commented Jan 26, 2016

I looked at the code on the mqtt module, and I notice that it doesn;t check the return code of espconn_sent() when it tries to send a packet. I suspect that part of the problem is that if two publish calls are made too close together, then the second one doesn't actually do the send and so the callback doesn't get called. It might be worth adding some logging to see if any of the network sending functions fail....

I also note that the code above doesn't check the return code from the publish calls either. I suspect that more error checking all round would be helpful.

@karrots
Copy link
Contributor

karrots commented Jan 26, 2016

I can't speak for how NodeMCU code responds to the call back it's been a while since I have looked. What I can say is that when using QOS 0 the server never sends a PUBACK back to the client. So depending on how the NodeMCU code functions it may execute the callback immediately after send or it may just do nothing.

http://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels

@efuturetoday
Copy link
Author

i still think that adding a callback that is called whenever the mqtt_socket_sent is entered would be helpful. Then you can even work with QoS = 0 messages and be sure that there is no heap overflow.
Or at least call the callback that is just for PUBACK after the sent when QoS = 0. (like @karrots mentioned)
For fast concurent data like current temperature i dont need QoS 1 and the overhead because for me i dont care about lost packages. But even with QoS = 1 i would be happy.
It need some investigation. I will strip down my code to a simple test and will report.
But keep in mind that concurrent calls to m:publish() should add the message to the message queue and should keep track of the PUBACK for each message?. I will further check if the message is received by the broker and if the callbacks are called.

@marcelstoer
Copy link
Member

Not sure what to make of this. Is it a bug in the firmware (i.e. the MQTT module) and if so where's the hard evidence? If there's no isolated test case then it probably falls under #719.

That being said I remember two related SO questions I commented on: no publish callback after PUBACK http://stackoverflow.com/q/34382676/131929 and a tiny show case of how the queue works http://stackoverflow.com/a/33444636/131929

@karrots
Copy link
Contributor

karrots commented Jan 28, 2016

This might be related. I noticed back a while ago that the MQTT code under certian circumstances would upgrade a QOS 1 queued message to QOS 2. Then the queue would build up because the client never received the third part of the handshake and the server would drop the message. Thus the client would keep sending it.

I couldn't track down what made it do this nor could I see in the code where it might be happening. The closest thing I could see in the logs was that it seemed to be related to getting a message with the ID of 2.

The project I was using NodeMCU MQTT on has since passed (conference badge) so it left my radar.

@devsaurus
Copy link
Member

It need some investigation. I will strip down my code to a simple test and will report.

@efuturetoday #985 might be helpful there. It adds error checking to client functions and should enable better diagnostics (thanks for the heads-up @pjsg).
As @marcelstoer noted, a testcase to demonstrate the potential flaw in the firmware would be helpful. Otherwise please close this issue.

@SheepWillPrevail
Copy link

This will make NodeMCU (commit c803756) reboot because of memory problems; heap decreases until NodeMCU resets.

local m = mqtt.Client("nodemcu", 60)
m:connect("raspberrypi", 1883, 0)

local connected = false

m:on("connect", function() connected = true end)
m:on("offline", function() connected = false end)

function sendTelemetry()
    if (connected) then
        local status, temperature, humidity = dht.readxx(7) 
        if (status == dht.OK) then
            m:publish("/temperature", temperature, 0, 0)
            m:publish("/humidity", humidity, 0, 0)
        end
    end
end

tmr.alarm(0, 1000, 1, sendTelemetry)

@pjsg
Copy link
Member

pjsg commented Jan 31, 2016

Can you please check that each of the publish statements returns 'true'? I think that it is possible that there is a memory leak if they don't -- but it would be nice to know if that was the problem.

@pjsg
Copy link
Member

pjsg commented Feb 2, 2016

OK -- it fails on my node as well. And there do not appear to be any obvious errors. Will investigate further.

@devsaurus
Copy link
Member

I'm closing this now with reference to #719 and summary:

@efuturetoday feel free to reopen this issue with an isolated testcase in case your problem wasn't fully addressed here.

@pjsg
Copy link
Member

pjsg commented Feb 5, 2016

I do note that the bug is not yet fixed as the PR has not yet been merged.....

@devsaurus
Copy link
Member

I regard #975 (comment) as a separate topic, unrelated to the initial report where coding was discussed. Both got mixed up here and have different resolutions.
The bug report should be branched off in a new issue since the fix in #1002 remains pending in review loop.

marcelstoer added a commit that referenced this issue Feb 10, 2016
Fix memory exhaustion in mqtt under circumstances from issue #975
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants