Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

module mqtt: add compatibility to paho_mqtt v2 #707

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 36 additions & 25 deletions modules/mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import inspect
import datetime

from importlib.metadata import version

import paho.mqtt.client as mqtt

from lib.model.module import Module
Expand All @@ -43,7 +45,7 @@


class Mqtt(Module):
version = '1.7.6'
version = '1.8.0'
longname = 'MQTT module for SmartHomeNG'

__plugif_CallbackTopics = {} # for plugin interface
Expand Down Expand Up @@ -119,6 +121,17 @@ def __init__(self, sh):
if self.password == '':
self.password = None

#
# check paho.mqtt version
#
self.paho_ver = 0
try:
self.paho_ver = int(version('paho_mqtt').split('.')[0])
except Exception as e:
self.logger.error(f'unable to determin paho_mqtt version: {e}')
if self.paho_ver not in (1, 2):
self.logger.error(f'unsupported version of paho_mqtt installed: {version("paho_mqtt")}, module will possibly not work')

# tls ...
# ca_certs ...

Expand Down Expand Up @@ -166,11 +179,6 @@ def __init__(self, sh):
# tls ...
# ca_certs ...

if not self._connect_to_broker():
# self._init_complete = False
# return
pass


def start(self):
"""
Expand All @@ -180,6 +188,8 @@ def start(self):
"""
self.logger.dbghigh(self.translate("Methode '{method}' aufgerufen", {'method': 'start()'}))

self._connect_to_broker()

# self.alive = True
if (self.birth_topic != '') and (self.birth_payload != ''):
self._client.publish(self.birth_topic, self.birth_payload, self.qos, retain=True)
Expand All @@ -188,7 +198,7 @@ def start(self):
# set the name of the paho thread for this plugin instance
try:
self._client._thread.name = 'modules.' + self.get_fullname() + ".paho_client"
except:
except Exception:
self.logger.warning("Unable to set name for paho thread")


Expand Down Expand Up @@ -217,7 +227,12 @@ def _connect_to_broker(self):
"""
clientname = platform.uname()[1] + '.' + self.broker_client
self.logger.info("Connecting to broker '{}:{}'. Starting mqtt client '{}'".format(self.broker_ip, self.broker_port, clientname))
self._client = mqtt.Client(client_id=clientname)
if self.paho_ver <= 1:
self.logger.info('starting mqtt client with API version 1')
self._client = mqtt.Client(client_id=clientname)
else:
self.logger.info('starting mqtt client with API version 2')
self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id=clientname)

# set testament, if configured
if (self.last_will_topic != '') and (self.last_will_payload != ''):
Expand Down Expand Up @@ -550,7 +565,7 @@ def _on_mqtt_message(self, client, userdata, message):
msg_topic = message.topic.split('/')

equal = False
if (len(wc_topic) == len(msg_topic)) or (wc_topic[len(wc_topic)-1] == '#' and (len(wc_topic) <= len(msg_topic))):
if (len(wc_topic) == len(msg_topic)) or (wc_topic[len(wc_topic) - 1] == '#' and (len(wc_topic) <= len(msg_topic))):
equal = True
for i in range(len(wc_topic)):
if not (wc_topic[i] == msg_topic[i] or wc_topic[i] == '+' or wc_topic[i] == '#'):
Expand Down Expand Up @@ -687,19 +702,16 @@ def _unsubscribe_broker_infos(self):
self._client.unsubscribe('$SYS/broker/load/messages/sent/15min')
return


def _on_connect(self, client, userdata, flags, rc):
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
"""
Callback function called on connect
"""
self._connect_result = mqtt.connack_string(rc)

if rc == 0:
if reason_code == 0:
if self._got_disconnected:
self.logger.notice("Reconnected to broker")
self._got_disconnected = False
else:
self.logger.info("Connection returned result '{}' (userdata={}) ".format(mqtt.connack_string(rc), userdata))
self.logger.info(f"Connection returned result '{str(reason_code)}' (userdata={userdata})")
self._connected = True

self._subscribe_broker_infos()
Expand All @@ -708,31 +720,30 @@ def _on_connect(self, client, userdata, flags, rc):
for topic in self._subscribed_topics:
item = self._subscribed_topics[topic]
self._client.subscribe(topic, qos=self._get_qos_forTopic(topic, item))
#self.logger.info("Listening on topic '{}' for item '{}'".format(topic, item.property.path))

self.logger.info("self._subscribed_topics = {}".format(self._subscribed_topics))
self.logger.info(f"self._subscribed_topics = {self._subscribed_topics}")

return

msg = "Connection returned result '{}': {} (client={}, userdata={}, self._client={})".format(str(rc), mqtt.connack_string(rc), client, userdata, self._client)
if rc == 5:
msg = f"Connection returned result '{reason_code}': {str(reason_code)} (client={client}, userdata={userdata}, self._client={self._client})"
if (self.paho_ver == 1 and reason_code == 5) or (self.paho_ver == 2 and reason_code == 'Not authorized'):
self.logger.error(msg)
self._disconnect_from_broker()
else:
self.logger.warning(msg)


def _on_disconnect(self, client, userdata, rc):
def _on_disconnect(self, client, userdata, reason_code, properties=None):
"""
Callback function called on disconnect
"""
if rc == 0:
self.logger.info(f"Disconnection was successful (rc={rc})")
elif rc == 7:
self.logger.warning(f"Disconnected from broker with returncode '{rc}'")
if reason_code == 0:
self.logger.info(f"Disconnection was successful ({reason_code})")
elif reason_code == "Unspecified error":
self.logger.warning(f"Disconnected from broker with error '{reason_code}'")
self._got_disconnected = True
else:
self.logger.notice(f"Disconnection returned result '{rc}'")
self.logger.notice(f"Disconnection returned result '{reason_code}'")
return


Expand Down
2 changes: 1 addition & 1 deletion modules/mqtt/module.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
module:
# Global plugin attributes
classname: Mqtt
version: 1.7.6
version: 1.8.0
sh_minversion: 1.6a
# sh_maxversion: # maximum shNG version to use this plugin (leave empty if latest)
description:
Expand Down