From 87b68b59a026347fe45af86089b59fa0fb38ddb8 Mon Sep 17 00:00:00 2001 From: embak Date: Fri, 31 Jul 2020 22:08:05 -0400 Subject: [PATCH] Add support for EZIO4O --- config.yaml | 55 ++ insteon_mqtt/config.py | 1 + insteon_mqtt/device/EZIO4O.py | 868 ++++++++++++++++++++++++++++++++ insteon_mqtt/device/__init__.py | 1 + insteon_mqtt/mqtt/EZIO4O.py | 236 +++++++++ insteon_mqtt/mqtt/__init__.py | 1 + insteon_mqtt/mqtt/config.py | 2 + tests/mqtt/test_EZIO4O.py | 418 +++++++++++++++ tests/mqtt/test_config.py | 6 +- tests/util/helpers/main.py | 20 +- 10 files changed, 1603 insertions(+), 5 deletions(-) create mode 100644 insteon_mqtt/device/EZIO4O.py create mode 100644 insteon_mqtt/mqtt/EZIO4O.py create mode 100644 tests/mqtt/test_EZIO4O.py diff --git a/config.yaml b/config.yaml index 3826a190..66b6256d 100644 --- a/config.yaml +++ b/config.yaml @@ -124,6 +124,10 @@ insteon: thermostat: # - aa.bb.cc: 'downstairs' + # EZIO4O 4 output relay modules + #ezio4o: + # - aa.bb.cc: 'relays' + #========================================================================== # # MQTT configuration @@ -919,4 +923,55 @@ mqtt: scene_topic: 'insteon/{{address}}/scene/{{button}}' scene_payload: '{ "cmd" : "{{value.lower()}}" }' + #------------------------------------------------------------------------ + # EZIO4O 4 relay output module + #------------------------------------------------------------------------ + # EZIO4O is a 4 relay output wall plug module from Smartenit. + # Each relay has a normally open and a normally closed contact. + # Device relay 1 to 4 correspond to topics button 1 to 4. + # In Home Assistant use MQTT switch with a configuration like: + # switch: + # - platform: mqtt + # state_topic: 'insteon/aa.bb.cc/state/1' + # command_topic: 'insteon/aa.bb.cc/set/1' + # switch: + # - platform: mqtt + # state_topic: 'insteon/aa.bb.cc/state/2' + # command_topic: 'insteon/aa.bb.cc/set/2' + ezio4o: + # Output state change topic and template. This message is sent + # whenever the device state changes for any reason. Available + # variables for templating are: + # address = 'aa.bb.cc' + # name = 'device name' + # button = 1 to 4 (relay number) + # on = 0/1 + # on_str = 'off'/'on' + # mode = 'normal'/'fast'/'instant' + # fast = 0/1 + # instant = 0/1 + # reason = 'device'/'scene'/'command'/'refresh'/'...' + state_topic: "insteon/{{address}}/state/{{button}}" + state_payload: "{{on_str.upper()}}" + + # Input on/off command. Similar functionality to the cmd_topic but only + # for turning the device on and off. If reason is input, is will be + # passed through to the state_payload. The output of passing the payload + # through the template must match the following: + # { "cmd" : 'on'/'off', "button": button, + # ["mode" : 'normal'/'fast'/'instant'], + # ["fast" : 1/0], ["instant" : 1/0], ["reason" : "..."] } + # Available variables for templating are: + # value = the input payload + # json = the input payload converted to json. Use json.VAR to extract + # a variable from a json payload. + on_off_topic: "insteon/{{address}}/set/{{button}}" + on_off_payload: '{ "cmd" : "{{value.lower()}}" }' + + # Scene on/off command. This triggers the scene broadcast on the outlet + # in the same way clicking the button would. The inputs are the same as + # those for the on_off topic and payload. + scene_topic: "insteon/{{address}}/scene/{{button}}" + scene_payload: '{ "cmd" : "{{value.lower()}}" }' + #---------------------------------------------------------------- diff --git a/insteon_mqtt/config.py b/insteon_mqtt/config.py index f07f5f80..17cc0ad3 100644 --- a/insteon_mqtt/config.py +++ b/insteon_mqtt/config.py @@ -18,6 +18,7 @@ # class to use and any extra keyword args to pass to the constructor. 'dimmer' : (device.Dimmer, {}), 'battery_sensor' : (device.BatterySensor, {}), + "ezio4o": (device.EZIO4O, {}), 'fan_linc' : (device.FanLinc, {}), 'io_linc' : (device.IOLinc, {}), 'keypad_linc' : (device.KeypadLinc, {'dimmer' : True}), diff --git a/insteon_mqtt/device/EZIO4O.py b/insteon_mqtt/device/EZIO4O.py new file mode 100644 index 00000000..0abef7b7 --- /dev/null +++ b/insteon_mqtt/device/EZIO4O.py @@ -0,0 +1,868 @@ +#=========================================================================== +# +# Smartenit EZIO4O - 4 relay output device +# +#=========================================================================== +import functools +from .Base import Base +from ..CommandSeq import CommandSeq +from .. import handler +from .. import log +from .. import message as Msg +from .. import on_off +from ..Signal import Signal +from .. import util + +LOG = log.get_logger() + + +# EZIOxx Flags settings definition +EZIO4xx_flags = { + "analog-input": { + "options": {"none": 0b00000000, "command": 0b00000001, "interval": 0b00000011}, + "mask": 0b00000011, + "default": "none", + }, + "alarms": { + "options": {True: 0b00000100, False: 0b00000000}, + "mask": 0b00000100, + "default": False, + }, + "debounce": { + "options": {True: 0b00001000, False: 0b00000000}, + "mask": 0b00001000, + "default": False, + }, + "one-wire": { + "options": {True: 0b00010000, False: 0b00000000}, + "mask": 0b00010000, + "default": False, + }, + "output-timers-unit": { + "options": {"second": 0b00100000, "minute": 0b00000000}, + "mask": 0b00100000, + "default": "minute", + }, + "broadcast-change": { + "options": {True: 0b01000000, False: 0b00000000}, + "mask": 0b01000000, + "default": False, + }, + "output-timers-enable": { + "options": {True: 0b100000000, False: 0b00000000}, + "mask": 0b10000000, + "default": False, + }, +} + + +class EZIO4O(Base): + """Smartenit EZIO4O - 4 relay output device. + + This class can be used to model the EZIO4O device which has 4 outputs. + Each output has a normally close and a normally open contact. They are + independent switches and are controlled via group 1 to group 4 inputs. + + State changes are communicated by emitting signals. Other classes can + connect to these signals to perform an action when a change is made to + the device (like sending MQTT messages). Supported signals are: + + - signal_on_off( Device, int group, bool is_on, on_off.Mode mode, str + reason ): Sent whenever an output is turned on or off. + Group will be 1 to 4 matching the corresponding device + output. + """ + + def __init__(self, protocol, modem, address, name=None): + """Constructor + + Args: + protocol (Protocol): The Protocol object used to communicate + with the Insteon network. This is needed to allow the + device to send messages to the PLM modem. + modem (Modem): The Insteon modem used to find other devices. + address (Address): The address of the device. + name (str): Nice alias name to use for the device. + """ + super().__init__(protocol, modem, address, name) + + self._is_on = [False, False, False, False] # output state + + # Support on/off style signals. + # API: func(Device, int group, bool is_on, on_off.Mode mode, + # str reason) + self.signal_on_off = Signal() + + # Remote (mqtt) commands mapped to methods calls. Add to the + # base class defined commands. + self.cmd_map.update( + { + "on": self.on, + "off": self.off, + "set_flags": self.set_flags, + "set": self.set, + "scene": self.scene, + } + ) + + # EZIOxx configuration port settings. See set_flags(). + self._flag_value = None + + # Special callback to run when receiving a broadcast clean up. See + # scene() for details. + self.broadcast_done = None + self.broadcast_reason = "" + + # NOTE: EZIO4O does NOT include the group in the ACK of an on/off + # command. So there is no way to tell which output is being ACK'ed + # if we send multiple messages to it. Each time on or off is called, + # it pushes the output to this list so that when the ACK/NAK arrives, + # we can pop it off and know which output was commanded. + self._which_output = [] + + #----------------------------------------------------------------------- + def pair(self, on_done=None): + """Pair the device with the modem. + + This only needs to be called one time. It will set the device + as a controller and the modem as a responder so the modem will + see group broadcasts and report them to us. + + The device must already be a responder to the modem (push set + on the modem, then set on the device) so we can update it's + database. + + Args: + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + """ + LOG.info("EZIO4O %s pairing", self.label) + + # Build a sequence of calls to the do the pairing. This insures each + # call finishes and works before calling the next one. We have to do + # this for device db manipulation because we need to know the memory + # layout on the device before making changes. + seq = CommandSeq(self.protocol, "EZIO4O paired", on_done) + + # Start with a refresh command - since we're changing the db, it must + # be up to date or bad things will happen. + seq.add(self.refresh) + + # Add the device as a responder to the modem on group 1. This is + # probably already there - and maybe needs to be there before we can + # even issue any commands but this check insures that the link is + # present on the device and the modem. + seq.add( + self.db_add_resp_of, + 0x01, + self.modem.addr, + 0x01, + refresh=False, + local_data=[0x0, 0x0, 0x00], + ) + + # Start the sequence running. This will return so the + # network event loop can process everything and the on_done callbacks + # will chain everything together. + seq.run() + + #----------------------------------------------------------------------- + def refresh(self, force=False, on_done=None): + """Refresh the current device state and database if needed. + + This sends a ping to the device. The reply has the current device + state (on/off, level, etc) and the current db delta value which is + checked against the current db value. If the current db is out of + date, it will trigger a download of the database. + + This will send out an updated signal for the current device status + whenever possible (like dimmer levels). + + Args: + force (bool): If true, will force a refresh of the device database + even if the delta value matches as well as a re-query of the + device model information even if it is already known. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + """ + LOG.info("EZIO4O %s cmd: status refresh", self.label) + + # NOTE: EZIO4O cmd1=0x4F cmd2=0x02 will report the output state. + seq = CommandSeq(self.protocol, "Device refreshed", on_done) + + # This sends a refresh ping which will respond w/ the current + # database delta field. The handler checks that against the current + # value. If it's different, it will send a database download command + # to the device to update the database. + msg = Msg.OutStandard.direct(self.addr, 0x4F, 0x02) + msg_handler = handler.DeviceRefresh( + self, self.handle_refresh, force, on_done, num_retry=3 + ) + seq.add_msg(msg, msg_handler) + + # If model number is not known, or force true, run get_model + self.addRefreshData(seq, force) + + # Run all the commands. + seq.run() + + #----------------------------------------------------------------------- + def on( + self, group=0x01, level=None, mode=on_off.Mode.NORMAL, reason="", on_done=None + ): + """Turn the device on. + + NOTE: This does NOT simulate a button press on the device - it just + changes the state of the device. It will not trigger any responders + that are linked to this device. To simulate a button press, call the + scene() method. + + This will send the command to the device to update it's state. When + we get an ACK of the result, we'll change our internal state and emit + the state changed signals. + + Args: + group (int): The group to send the command to. Group 1 to 4 matching + output 1 to 4. + level (int): If non zero, turn the device on. Should be in the + range 0 to 255. Only dimmers use the intermediate values, all + other devices look at level=0 or level>0. + mode (on_off.Mode): The type of command to send (normal, fast, etc). + reason (str): This is optional and is used to identify why the + command was sent. It is passed through to the output signal + when the state changes - nothing else is done with it. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + """ + LOG.info("EZIO4O %s grp: %s cmd: on", self.label, group) + assert 1 <= group <= 4 + assert level >= 0 and level <= 0xFF + assert isinstance(mode, on_off.Mode) + + # Use a standard message to send "output on" (0x45) command for the output + msg = Msg.OutStandard.direct(self.addr, 0x45, group - 1) + + # Use the standard command handler which will notify us when + # the command is ACK'ed. + callback = functools.partial(self.handle_ack, reason=reason) + msg_handler = handler.StandardCmd(msg, callback, on_done) + + # See __init__ code comments for what this is for. + self._which_output.append(group) + + # Send the message to the PLM modem for protocol. + self.send(msg, msg_handler) + + #----------------------------------------------------------------------- + def off(self, group=0x01, mode=on_off.Mode.NORMAL, reason="", on_done=None): + """Turn the device off. + + NOTE: This does NOT simulate a button press on the device - it just + changes the state of the device. It will not trigger any responders + that are linked to this device. To simulate a button press, call the + scene() method. + + This will send the command to the device to update it's state. When + we get an ACK of the result, we'll change our internal state and emit + the state changed signals. + + Args: + group (int): The group to send the command to. Group 1 to 4 matching + output 1 to 4. + mode (on_off.Mode): The type of command to send (normal, fast, etc). + reason (str): This is optional and is used to identify why the + command was sent. It is passed through to the output signal + when the state changes - nothing else is done with it. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + """ + LOG.info("EZIO4O %s grp: %s cmd: off", self.label, group) + assert 1 <= group <= 4 + assert isinstance(mode, on_off.Mode) + + # Use a standard message to send "output off" (0x46) command for the output + msg = Msg.OutStandard.direct(self.addr, 0x46, group - 1) + + # Use the standard command handler which will notify us when the + # command is ACK'ed. + callback = functools.partial(self.handle_ack, reason=reason) + msg_handler = handler.StandardCmd(msg, callback, on_done) + + # See __init__ code comments for what this is for. + self._which_output.append(group) + + # Send the message to the PLM modem for protocol. + self.send(msg, msg_handler) + + #----------------------------------------------------------------------- + def set(self, level, group=0x01, mode=on_off.Mode.NORMAL, reason="", on_done=None): + """Turn the device on or off. Level zero will be off. + + NOTE: This does NOT simulate a button press on the device - it just + changes the state of the device. It will not trigger any responders + that are linked to this device. To simulate a button press, call the + scene() method. + + This will send the command to the device to update it's state. When + we get an ACK of the result, we'll change our internal state and emit + the state changed signals. + + Args: + level (int): If non zero, turn the device on. Should be in the + range 0 to 255. Only dimmers use the intermediate values, all + other devices look at level=0 or level>0. + group (int): The group to send the command to. Group 1 to 4 matching + output 1 to 4. + mode (on_off.Mode): The type of command to send (normal, fast, etc). + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + """ + if level: + self.on(group, level, mode, reason, on_done) + else: + self.off(group, mode, reason, on_done) + + #----------------------------------------------------------------------- + def scene(self, is_on, group=0x01, reason="", on_done=None): + """Trigger a scene on the device. + + Triggering a scene is the same as simulating a button press on the + device. It will change the state of the device and notify responders + that are linked ot the device to be updated. + + Args: + is_on (bool): True for an on command, False for an off command. + group (int): The output on the device to stimulate. Group 1 to 4 + matching output 1 to 4. + reason (str): This is optional and is used to identify why the + command was sent. It is passed through to the output signal + when the state changes - nothing else is done with it. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + """ + LOG.info( + "EZIO4O %s grp: %s cmd: scene %s", + self.label, + group, + "on" if is_on else "off", + ) + assert 1 <= group <= 4 + + on_done = util.make_callback(on_done) + + # From here search modem scene (link group resp) into device link + # database matching to the desired output (group-1) in data 2 + # Will use the first entry in the device database with responder + # link from the modem with a non 0 group + + modem_scene = 0 + + entries = self.db.find_all(self.modem.addr, is_controller=False) + for e in entries: + if e.group != 1 and e.data[2] == group - 1: + modem_scene = e.group + break + + if not modem_scene: + LOG.error( + "EZIO4O %s Can't trigger scene %s - there is no responder " + "from the modem in the device db", + self.label, + group, + ) + on_done(False, "Failed to send scene command", None) + return + + # Tell the modem to send it's virtual scene broadcast to the device + LOG.info( + "EZIO4O %s triggering modem scene %s for device output %s", + self.label, + modem_scene, + group, + ) + self.modem.scene(is_on, modem_scene, on_done=on_done, reason=reason) + + #----------------------------------------------------------------------- + def link_data(self, is_controller, group, data=None): + """Create default device 3 byte link data. + + This is the 3 byte field (D1, D2, D3) stored in the device database + entry. This overrides the defaults specified in base.py for + specific values used by EZIO4O. + + For controllers, the default fields are: + D1: unknown (0x00) + D2: discrete output action (0x00) + D3: the group number -1 on the local device (0x00) + + For responders, the default fields are: + D1: unknown (0x00) + D2: discrete output action (0x00) + D3: the group number -1 on the local device (0x00) + + Args: + is_controller (bool): True if the device is the controller, false + if it's the responder. + group (int): The group number of the controller button or the + group number of the responding button. + data (bytes[3]): Optional 3 byte data entry. If this is None, + defaults are returned. Otherwise it must be a 3 element list. + Any element that is not None is replaced with the default. + + Returns: + bytes[3]: Returns a list of 3 bytes to use as D1,D2,D3. + """ + + # Controller data. + if is_controller: + defaults = [0x03, 0x00, group] + + # Responder data. + else: + defaults = [0x00, 0x00, group - 1] + + # For each field, use the input if not -1, else the default. + return util.resolve_data3(defaults, data) + + #----------------------------------------------------------------------- + def link_data_to_pretty(self, is_controller, data): + """Converts Link Data1-3 to Human Readable Attributes + + This takes a list of the data values 1-3 and returns a dict with + the human readable attributes as keys and the human readable values + as values. + + Args: + is_controller (bool): True if the device is the controller, false + if it's the responder. + data (list[3]): List of three data values. + + Returns: + list[3]: list, containing a dict of the human readable values + """ + if is_controller: + ret = [{"data_1": data[0]}, {"data_2": data[1]}, {"group": data[2]}] + else: + ret = [{"data_1": data[0]}, {"data_2": data[1]}, {"group": data[2] + 1}] + return ret + + #----------------------------------------------------------------------- + def link_data_from_pretty(self, is_controller, data): + """Converts Link Data1-3 from Human Readable Attributes + + This takes a dict of the human readable attributes as keys and their + associated values and returns a list of the data1-3 values. + + Args: + is_controller (bool): True if the device is the controller, false + if it's the responder. + data (dict[3]): Dict of three data values. + + Returns: + list[3]: List of Data1-3 values + """ + data_1 = None + if "data_1" in data: + data_1 = data["data_1"] + data_2 = None + if "data_2" in data: + data_2 = data["data_2"] + data_3 = None + if "data_3" in data: + data_3 = data["data_3"] + if "group" in data: + if is_controller: + data_3 = data["group"] + else: + data_3 = data["group"] - 1 + return [data_1, data_2, data_3] + + #----------------------------------------------------------------------- + def set_flags(self, on_done, **kwargs): + """Set internal device flags. + + This command is used to change EZIOxx Configuration Port settings. + Valid flags are: + - analog_input = {none, command, interval} + Set the analog input options. valid options are: + none : Analog input is not used (default) + command : Analog input used, conversion on command + interval : Analog input used, conversion on fixed interval + Default = none + - alarms = {on, off} + Send Broadcast on Sensor Alarm. + Default = off + - debounce = {on, off} + Send Broadcast on Sensor Alarm. + Default = off + - one-wire = {on, off} + Enable 1-Wire port + Default = off + - output-timers-unit = { second, minute } + Select the output timers unit. + Default = minute + - broadcast-change = {on, off} + Enable broadcast of output and input port change + Default = 0ff + - output-timers-enable = {on, off} + Enable output timers if greater than 0 + Default = off + + Args: + kwargs: Key=value pairs of the flags to change. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + """ + LOG.info("EZIO4O %s cmd: set flags", self.label) + + # TODO initialize flags on first run + # Initialise flag value by reading the device Configuration Port settings + if self._flag_value is None: + LOG.info( + "EZIO4O %s cmd: flags not initialized - run get-flags first", self.label + ) + return + + # Check the input flags to make sure only ones we can understand were + # passed in. + valid_flags = EZIO4xx_flags.keys() + flags_to_set = kwargs.keys() + + unknown = set(flags_to_set).difference(valid_flags) + if unknown: + raise Exception( + "EZIO4O Unknown flags input: %s.\n Valid " + "flags are: %s" % (unknown, valid_flags) + ) + + # Construct the flag register to write + new_flag_value = self._flag_value + + for field in list(flags_to_set): + if True in EZIO4xx_flags[field]["options"]: + option = util.input_bool(kwargs, field) + else: + option = util.input_choice( + kwargs, field, EZIO4xx_flags[field]["options"].keys() + ) + + if option is not None: + value = EZIO4xx_flags[field]["options"][option] + mask = EZIO4xx_flags[field]["mask"] + new_flag_value = (new_flag_value & ~mask) | (value & mask) + else: + raise Exception( + "EZIO4O Unknown option: %s for flag: %s.\n Valid " + "options are: %s" + % (option, field, EZIO4xx_flags[field]["options"].keys()) + ) + + # Use a standard message to send "write configuration to port" (0x4D) command + msg = Msg.OutStandard.direct(self.addr, 0x4D, new_flag_value) + + # Use the standard command handler which will notify us when the + # command is ACK'ed. + callback = functools.partial(self.handle_flags) + msg_handler = handler.StandardCmd(msg, callback, on_done) + + # Send the message to the PLM modem for protocol. + self.send(msg, msg_handler) + + #----------------------------------------------------------------------- + def get_flags(self, on_done=None): + """get internal device flags. + + This command is used to read EZIOxx Configuration Port settings. + See set_flags() for the settings description + + Args: + kwargs: Key=value pairs of the flags to change. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + """ + LOG.info("EZIO4O %s cmd: get flags", self.label) + + # Check the input flags to make sure only ones we can understand were + # passed in. + + # Use a standard message to send "read configuration to port" (0x4E) command + msg = Msg.OutStandard.direct(self.addr, 0x4E, 0x00) + + # Use the standard command handler which will notify us when the + # command is ACK'ed. + callback = functools.partial(self.handle_flags) + msg_handler = handler.StandardCmd(msg, callback, on_done) + + # Send the message to the PLM modem for protocol. + self.send(msg, msg_handler) + + #----------------------------------------------------------------------- + def handle_flags(self, msg, on_done): + """Callback for flags settings commanded messages. + + This callback is run when we get a reply back from set or read flags commands. + If the command was ACK'ed, we know it worked so we'll update the internal + state of flags. + + Args: + msg (message.InpStandard): The reply message from the device. + The on/off level will be in the cmd2 field. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + reason (str): This is optional and is used to identify why the + command was sent. It is passed through to the output signal + when the state changes - nothing else is done with it. + """ + assert msg.cmd1 in [0x4D, 0x4E] + + # If this it the ACK we're expecting, update the internal + # state. + if msg.flags.type == Msg.Flags.Type.DIRECT_ACK: + LOG.debug("EZIO4O %s Flag ACK: %s", self.label, msg) + bits = msg.cmd2 + self._flag_value = bits + LOG.ui("EZIO4O %s operating flags: %s", self.label, "{:08b}".format(bits)) + + for field in EZIO4xx_flags: + flag_bits = bits & EZIO4xx_flags[field]["mask"] + option = "unknown" + for flag_option, option_bits in EZIO4xx_flags[field]["options"].items(): + if flag_bits == option_bits: + option = flag_option + LOG.ui("%s : %s", field, option) + + on_done(True, "EZIO4O %s flags updated" % self.label, None) + + elif msg.flags.type == Msg.Flags.Type.DIRECT_NAK: + LOG.error("EZIO4O %s flags NAK error: %s", self.label, msg) + on_done(False, "EZIO4O %s flags update failed" % self.label, None) + + #----------------------------------------------------------------------- + def handle_broadcast(self, msg): + """Handle broadcast messages from this device. + + This is called automatically by the system (via handle.Broadcast) + when we receive a message from the device. + + The broadcast message from a device is sent when the device is + triggered. The message has the group ID in it. We'll update the + device state and look up the group in the all link database. For + each device that is in the group (as a responder), we'll call + handle_group_cmd() on that device to trigger it. This way all the + devices in the group are updated to the correct values when we see + the broadcast message. + + Args: + msg (InpStandard): Broadcast message from the device. + """ + + # ACK of the broadcast. + if msg.cmd1 == 0x06: + LOG.info("EZIO4O %s broadcast ACK grp: %s", self.label, msg.group) + if self.broadcast_done: + self.broadcast_done() + self.broadcast_done = None + return + + # unknown broadcast + else: + LOG.info( + "EZIO4O %s unknown broadcast grp: %s, msg: %s", + self.label, + msg.group, + msg, + ) + + # This will find all the devices we're the controller of for + # this group and call their handle_group_cmd() methods to + # update their states since they will have seen the group + # broadcast and updated (without sending anything out). + super().handle_broadcast(msg) + + #----------------------------------------------------------------------- + def handle_refresh(self, msg): + """Callback for handling refresh() responses. + + This is called when we get a response to the refresh() command. The + refresh command reply will contain the current device state in cmd2 + and this updates the device with that value. It is called by + handler.DeviceRefresh when we can an ACK for the refresh command. + + Args: + msg (message.InpStandard): The refresh message reply. The current + device state is in the msg.cmd2 field. + """ + LOG.debug("EZIO4O %s refresh response %s", self.label, msg) + + if 0x00 <= msg.cmd2 <= 0x0F: + for i in range(4): + if util.bit_get(msg.cmd2, i): + is_on = True + else: + is_on = False + + # State change for output + if is_on != self._is_on[i]: + self._set_is_on(i + 1, is_on, reason=on_off.REASON_REFRESH) + else: + LOG.error("EZIO4O %s unknown refresh response %s", self.label, msg) + + #----------------------------------------------------------------------- + def handle_ack(self, msg, on_done, reason=""): + """Callback for standard commanded messages. + + This callback is run when we get a reply back from one of our + commands to the device. If the command was ACK'ed, we know it worked + so we'll update the internal state of the device and emit the signals + to notify others of the state change. + + Args: + msg (message.InpStandard): The reply message from the device. + The on/off level will be in the cmd2 field. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + reason (str): This is optional and is used to identify why the + command was sent. It is passed through to the output signal + when the state changes - nothing else is done with it. + """ + assert 0x00 <= msg.cmd2 <= 0x0F + assert msg.cmd1 in [0x45, 0x46] + + LOG.debug("EZIO4O %s ACK response %s", self.label, msg) + + # Get the last output we were commanding. The message doesn't tell + # us which output it was so we have to track it here. See __init__ + # code comments for more info. + if not self._which_output: + LOG.error("EZIO4O %s ACK error. No output ID's were saved", self.label) + on_done(False, "EZIO4O update failed - no ID's saved", None) + return + + group = self._which_output.pop(0) + + # If this it the ACK we're expecting, update the internal + # state and emit our signals. + if msg.flags.type == Msg.Flags.Type.DIRECT_ACK: + LOG.debug("EZIO4O %s ACK: %s", self.label, msg) + + for i in range(4): + if util.bit_get(msg.cmd2, i): + is_on = True + else: + is_on = False + + # State change for the output and all outputs with state change + if is_on != self._is_on[i] or i == group - 1: + self._set_is_on(i + 1, is_on, reason=on_off.REASON_REFRESH) + on_done( + True, "EZIO4O state %s updated to: %s" % (i + 1, is_on), None + ) + + elif msg.flags.type == Msg.Flags.Type.DIRECT_NAK: + LOG.error("EZIO4O %s NAK error: %s", self.label, msg) + on_done(False, "EZIO4O state %s update failed" % group, None) + + #----------------------------------------------------------------------- + def handle_scene(self, msg, on_done, reason=""): + """Callback for scene simulation commanded messages. + + This callback is run when we get a reply back from triggering a scene + on the device. If the command was ACK'ed, we know it worked. The + device will then send out standard broadcast messages which will + trigger other updates for the scene devices. + + Args: + msg (message.InpStandard): The reply message from the device. + on_done: Finished callback. This is called when the command has + completed. Signature is: on_done(success, msg, data) + reason (str): This is optional and is used to identify why the + command was sent. It is passed through to the output signal + when the state changes - nothing else is done with it. + """ + # DEBUG + LOG.debug("EZIO4O %s handle scene %s", self.label, msg) + + # Call the callback. We don't change state here - the device will + # send a regular broadcast message which will run handle_broadcast + # which will then update the state. + if msg.flags.type == Msg.Flags.Type.DIRECT_ACK: + LOG.debug("EZIO4O %s ACK: %s", self.label, msg) + + elif msg.flags.type == Msg.Flags.Type.DIRECT_NAK: + LOG.error("EZIO4O %s NAK error: %s", self.label, msg) + self.broadcast_reason = None + on_done(False, "EZIO4O Scene trigger failed", None) + + else: + LOG.debug("EZIO4O %s broadcast ACK: %s", self.label, msg) + + #----------------------------------------------------------------------- + def handle_group_cmd(self, addr, msg): + """Respond to a group command for this device. + + This is called when this device is a responder to a scene. The + device that received the broadcast message (handle_broadcast) will + call this method for every device that is linked to it. The device + should look up the responder entry for the group in it's all link + database and update it's state accordingly. + + Args: + addr (Address): The device that sent the message. This is the + controller in the scene. + msg (InpStandard): Broadcast message from the device. Use + msg.group to find the group and msg.cmd1 for the command. + """ + + # Make sure we're really a responder to this message. This shouldn't + # ever occur. + entry = self.db.find(addr, msg.group, is_controller=False) + if not entry: + LOG.error( + "EZIO4O %s has no group %s entry from %s", self.label, msg.group, addr + ) + return + + # The local button being modified is stored in the db entry. + localGroup = entry.data[2] + 1 + + # Handle on/off commands codes. + if on_off.Mode.is_valid(msg.cmd1): + is_on, mode = on_off.Mode.decode(msg.cmd1) + self._set_is_on(localGroup, is_on, mode, on_off.REASON_SCENE) + + else: + LOG.warning("EZIO4O %s unknown group cmd %#04x", self.label, msg.cmd1) + + #----------------------------------------------------------------------- + def _set_is_on(self, group, is_on, mode=on_off.Mode.NORMAL, reason=""): + """Update the device on/off state. + + This will change the internal state and emit the state changed + signals. It is called by whenever we're informed that the device has + changed state. + + Args: + group (int): The group to update (1 to 4). + is_on (bool): True if the switch is on, False if it isn't. + mode (on_off.Mode): The type of on/off that was triggered (normal, + fast, etc). + reason (str): This is optional and is used to identify why the + command was sent. It is passed through to the output signal + when the state changes - nothing else is done with it. + """ + is_on = bool(is_on) + + LOG.info( + "EZIO4O %s setting grp: %s to %s %s %s", + self.label, + group, + is_on, + mode, + reason, + ) + self._is_on[group - 1] = is_on + + # Notify others that the output state has changed. + self.signal_on_off.emit(self, group, is_on, mode, reason) + + #----------------------------------------------------------------------- diff --git a/insteon_mqtt/device/__init__.py b/insteon_mqtt/device/__init__.py index 4359a23f..132c0eda 100644 --- a/insteon_mqtt/device/__init__.py +++ b/insteon_mqtt/device/__init__.py @@ -29,6 +29,7 @@ from .Base import Base from .BatterySensor import BatterySensor from .Dimmer import Dimmer +from .EZIO4O import EZIO4O from .FanLinc import FanLinc from .IOLinc import IOLinc from .KeypadLinc import KeypadLinc diff --git a/insteon_mqtt/mqtt/EZIO4O.py b/insteon_mqtt/mqtt/EZIO4O.py new file mode 100644 index 00000000..69f0c6b7 --- /dev/null +++ b/insteon_mqtt/mqtt/EZIO4O.py @@ -0,0 +1,236 @@ +#=========================================================================== +# +# EZIO4O 4 relay output device +# +#=========================================================================== +import functools +from .. import log +from .. import on_off +from .MsgTemplate import MsgTemplate +from . import util + +LOG = log.get_logger() + + +class EZIO4O: + """MQTT interface to Smartenit EZIO4O 4 relay output device. + + This class connects to a device.EZIO4O object and converts it's + output state changes to MQTT messages. It also subscribes to topics to + allow input MQTT messages to change the state of the device. + + EZIO4O will report their state (state/output) and can be commanded to turn + on and off (set/output topic) or trigger a device scene with which the modem + is a responder (scene/output topic). + """ + + def __init__(self, mqtt, device): + """Constructor + + Args: + mqtt (mqtt.Mqtt): The MQTT main interface. + device (device.Outlet): The Insteon object to link to. + """ + self.mqtt = mqtt + self.device = device + + # Output state change reporting template. + self.msg_state = MsgTemplate( + topic="insteon/{{address}}/state/{{button}}", payload="{{on_str.lower()}}" + ) + + # Input on/off command template. + self.msg_on_off = MsgTemplate( + topic="insteon/{{address}}/set/{{button}}", + payload='{ "cmd" : "{{value.lower()}}" }', + ) + + # Input scene on/off command template. + self.msg_scene = MsgTemplate( + topic="insteon/{{address}}/scene/{{button}}", + payload='{ "cmd" : "{{value.lower()}}" }', + ) + + device.signal_on_off.connect(self._insteon_on_off) + + #----------------------------------------------------------------------- + def load_config(self, config, qos=None): + """Load values from a configuration data object. + + Args: + config (dict: The configuration dictionary to load from. The object + config is stored in config['ezio4o']. + qos (int): The default quality of service level to use. + """ + data = config.get("ezio4o", None) + if not data: + return + + self.msg_state.load_config(data, "state_topic", "state_payload", qos) + self.msg_on_off.load_config(data, "on_off_topic", "on_off_payload", qos) + self.msg_scene.load_config(data, "scene_topic", "scene_payload", qos) + + #----------------------------------------------------------------------- + def subscribe(self, link, qos): + """Subscribe to any MQTT topics the object needs. + + Subscriptions are used when the object has things that can be + commanded to change. + + Args: + link (network.Mqtt): The MQTT network client to use. + qos (int): The quality of service to use. + """ + # Connect input topics for groups 1 to 4 (one for each relay). + # Create a function that will call the input callback with the right + # group number set for each socket. + for group in [1, 2, 3, 4]: + handler = functools.partial(self._input_on_off, group=group) + data = self.template_data(button=group) + + topic = self.msg_on_off.render_topic(data) + link.subscribe(topic, qos, handler) + + handler = functools.partial(self._input_scene, group=group) + topic = self.msg_scene.render_topic(data) + link.subscribe(topic, qos, handler) + + #----------------------------------------------------------------------- + def unsubscribe(self, link): + """Unsubscribe to any MQTT topics the object was subscribed to. + + Args: + link (network.Mqtt): The MQTT network client to use. + """ + for group in [1, 2, 3, 4]: + data = self.template_data(button=group) + + topic = self.msg_on_off.render_topic(data) + link.unsubscribe(topic) + + topic = self.msg_scene.render_topic(data) + link.unsubscribe(topic) + + #----------------------------------------------------------------------- + def template_data( + self, is_on=None, button=None, mode=on_off.Mode.NORMAL, reason=None + ): + """Create the Jinja templating data variables for on/off messages. + + Args: + button (int): The button (group) ID (1-4) of the EZIO4O relay + that was triggered. + is_on (bool): True for on, False for off. If None, on/off and + mode attributes are not added to the data. + mode (on_off.Mode): The on/off mode state. + reason (str): The reason the device was triggered. This is an + arbitrary string set into the template variables. + + Returns: + dict: Returns a dict with the variables available for templating. + """ + data = { + "address": self.device.addr.hex, + "name": self.device.name if self.device.name else self.device.addr.hex, + } + + if button is not None: + data["button"] = button + + if is_on is not None: + data["on"] = 1 if is_on else 0 + data["on_str"] = "on" if is_on else "off" + data["mode"] = str(mode) + data["fast"] = 1 if mode == on_off.Mode.FAST else 0 + data["instant"] = 1 if mode == on_off.Mode.INSTANT else 0 + data["reason"] = reason if reason is not None else "" + + return data + + #----------------------------------------------------------------------- + def _insteon_on_off(self, device, group, is_on, mode=on_off.Mode.NORMAL, reason=""): + """Device active on/off callback. + + This is triggered via signal when the Insteon device turns on or off. + It will publish an MQTT message with the new state. + + Args: + device (device.Outlet): The Insteon device that changed. + group (int): The relay number (1 to 4) that was changed. + is_on (bool): True for on, False for off. If None, on/off and + mode attributes are not added to the data. + mode (on_off.Mode): The on/off mode state. + reason (str): The reason the device was triggered. This is an + arbitrary string set into the template variables. + """ + LOG.info( + "MQTT received on/off %s grp: %s on: %s %s '%s'", + device.label, + group, + is_on, + mode, + reason, + ) + + data = self.template_data(is_on, group, mode, reason=reason) + self.msg_state.publish(self.mqtt, data) + + #----------------------------------------------------------------------- + def _input_on_off(self, client, data, message, group): + """Handle an input on/off change MQTT message. + + This is called when we receive a message on the on/off MQTT topic + subscription. Parse the message and pass the command to the Insteon + device. + + Args: + client (paho.Client): The paho mqtt client (self.link). + data: Optional user data (unused). + message: MQTT message - has attrs: topic, payload, qos, retain. + """ + LOG.debug( + "EZIO4O output %s message %s %s", group, message.topic, message.payload + ) + + # Parse the input MQTT message. + data = self.msg_on_off.to_json(message.payload) + LOG.info("EZIO4O input command: %s", data) + + try: + # Tell the device to update it's state. + is_on, mode = util.parse_on_off(data) + reason = data.get("reason", "") + self.device.set(level=is_on, group=group, mode=mode, reason=reason) + except: + LOG.exception("Invalid EZIO4O on/off command: %s", data) + + #----------------------------------------------------------------------- + def _input_scene(self, client, data, message, group): + """Handle an input scene MQTT message. + + This is called when we receive a message on the scene trigger MQTT + topic subscription. Parse the message and pass the command to the + Insteon device. + + Args: + client (paho.Client): The paho mqtt client (self.link). + data: Optional user data (unused). + message: MQTT message - has attrs: topic, payload, qos, retain. + """ + LOG.debug( + "EZIO4O output %s message %s %s", group, message.topic, message.payload + ) + + # Parse the input MQTT message. + data = self.msg_scene.to_json(message.payload) + LOG.info("EZIO4O input command: %s", data) + + try: + # Scenes don't support modes so don't parse that element. + is_on = util.parse_on_off(data, have_mode=False) + reason = data.get("reason", "") + self.device.scene(is_on, group, reason) + except: + LOG.exception("Invalid EZIO4O scene command: %s", data) + + #----------------------------------------------------------------------- diff --git a/insteon_mqtt/mqtt/__init__.py b/insteon_mqtt/mqtt/__init__.py index e405d5ff..8d7045e0 100644 --- a/insteon_mqtt/mqtt/__init__.py +++ b/insteon_mqtt/mqtt/__init__.py @@ -20,6 +20,7 @@ from .BatterySensor import BatterySensor from .Dimmer import Dimmer +from .EZIO4O import EZIO4O from .FanLinc import FanLinc from .IOLinc import IOLinc from .KeypadLinc import KeypadLinc diff --git a/insteon_mqtt/mqtt/config.py b/insteon_mqtt/mqtt/config.py index 9d470cb1..0b2e361f 100644 --- a/insteon_mqtt/mqtt/config.py +++ b/insteon_mqtt/mqtt/config.py @@ -15,6 +15,7 @@ from ..Modem import Modem from .BatterySensor import BatterySensor from .Dimmer import Dimmer +from .EZIO4O import EZIO4O from .FanLinc import FanLinc from .IOLinc import IOLinc from .KeypadLinc import KeypadLinc @@ -32,6 +33,7 @@ Modem : MqttModem, device.BatterySensor : BatterySensor, device.Dimmer : Dimmer, + device.EZIO4O : EZIO4O, device.FanLinc : FanLinc, device.IOLinc : IOLinc, device.KeypadLinc : KeypadLinc, diff --git a/tests/mqtt/test_EZIO4O.py b/tests/mqtt/test_EZIO4O.py new file mode 100644 index 00000000..e3e8fa12 --- /dev/null +++ b/tests/mqtt/test_EZIO4O.py @@ -0,0 +1,418 @@ +#=========================================================================== +# +# Tests for: insteon_mqtt/mqtt/EZIO4O.py +# +# pylint: disable=redefined-outer-name +#=========================================================================== +import pytest +import insteon_mqtt as IM +import helpers as H + +# from .. import message as Msg + + +# NOTE about mocking: Don't mock classes directly being used by the class +# being tested. If we do that, then we're not testing whether the class +# actually works with the class it depends on. For example, let's say we're +# testing class A which depends on B and C. If we mock B and C and then +# later the API's for B and C change, the test will still pass because in the +# test those are fake interfaces. Part of what the test or A needs to catch +# are changes in classes that A depends on not being updated in A. So the +# correct test pattern is to always use the actual classes that A depends on +# and mock the classes that those dependencies depend on. + + +# Create our MQTT object to test as well as the linked Insteon object and a +# mocked MQTT client to publish to. +@pytest.fixture +def setup(mock_paho_mqtt, tmpdir): + proto = H.main.MockProtocol() + modem = H.main.MockModem(tmpdir) + addr = IM.Address(1, 2, 3) + name = "device name" + dev = IM.device.EZIO4O(proto, modem, addr, name) + + link = IM.network.Mqtt() + mqttModem = H.mqtt.MockModem() + mqtt = IM.mqtt.Mqtt(link, mqttModem) + mdev = IM.mqtt.EZIO4O(mqtt, dev) + + return H.Data( + addr=addr, name=name, dev=dev, mdev=mdev, link=link, proto=proto, modem=modem + ) + + +class MockSceneDevice: + def __init__(self, addr=None, is_controller=False, group=None, link_data=None): + self.addr = addr + self.is_controller = is_controller + self.group = group + if link_data: + self.link_data = link_data + elif group: + self.link_data = [0, 0, group] + else: + self.link_data = [0, 0, 0] + + +#=========================================================================== +class Test_EZIO4O: + #----------------------------------------------------------------------- + def test_pubsub(self, setup): + mdev, addr, link = setup.getAll(["mdev", "addr", "link"]) + + mdev.subscribe(link, 2) + assert len(link.client.sub) == 8 + assert link.client.sub[0] == dict(topic="insteon/%s/set/1" % addr.hex, qos=2) + assert link.client.sub[1] == dict(topic="insteon/%s/scene/1" % addr.hex, qos=2) + assert link.client.sub[2] == dict(topic="insteon/%s/set/2" % addr.hex, qos=2) + assert link.client.sub[3] == dict(topic="insteon/%s/scene/2" % addr.hex, qos=2) + assert link.client.sub[4] == dict(topic="insteon/%s/set/3" % addr.hex, qos=2) + assert link.client.sub[5] == dict(topic="insteon/%s/scene/3" % addr.hex, qos=2) + assert link.client.sub[6] == dict(topic="insteon/%s/set/4" % addr.hex, qos=2) + assert link.client.sub[7] == dict(topic="insteon/%s/scene/4" % addr.hex, qos=2) + + mdev.unsubscribe(link) + assert len(link.client.unsub) == 8 + assert link.client.unsub[0] == dict(topic="insteon/%s/set/1" % addr.hex) + assert link.client.unsub[1] == dict(topic="insteon/%s/scene/1" % addr.hex) + assert link.client.unsub[2] == dict(topic="insteon/%s/set/2" % addr.hex) + assert link.client.unsub[3] == dict(topic="insteon/%s/scene/2" % addr.hex) + assert link.client.unsub[4] == dict(topic="insteon/%s/set/3" % addr.hex) + assert link.client.unsub[5] == dict(topic="insteon/%s/scene/3" % addr.hex) + assert link.client.unsub[6] == dict(topic="insteon/%s/set/4" % addr.hex) + assert link.client.unsub[7] == dict(topic="insteon/%s/scene/4" % addr.hex) + + #----------------------------------------------------------------------- + def test_template(self, setup): + mdev, addr, name = setup.getAll(["mdev", "addr", "name"]) + + data = mdev.template_data() + right = {"address": addr.hex, "name": name} + assert data == right + + data = mdev.template_data( + is_on=True, button=1, reason="something", mode=IM.on_off.Mode.FAST + ) + right = { + "address": addr.hex, + "name": name, + "button": 1, + "on": 1, + "on_str": "on", + "reason": "something", + "mode": "fast", + "fast": 1, + "instant": 0, + } + assert data == right + + data = mdev.template_data(is_on=False, button=2) + right = { + "address": addr.hex, + "name": name, + "button": 2, + "on": 0, + "on_str": "off", + "reason": "", + "mode": "normal", + "fast": 0, + "instant": 0, + } + assert data == right + + data = mdev.template_data(is_on=False, button=3) + right = { + "address": addr.hex, + "name": name, + "button": 3, + "on": 0, + "on_str": "off", + "reason": "", + "mode": "normal", + "fast": 0, + "instant": 0, + } + assert data == right + + data = mdev.template_data(is_on=False, button=4) + right = { + "address": addr.hex, + "name": name, + "button": 4, + "on": 0, + "on_str": "off", + "reason": "", + "mode": "normal", + "fast": 0, + "instant": 0, + } + assert data == right + + #----------------------------------------------------------------------- + def test_mqtt(self, setup): + mdev, dev, link = setup.getAll(["mdev", "dev", "link"]) + topic = "insteon/%s" % setup.addr.hex + + # Should do nothing + mdev.load_config({}) + + # Send an on/off signal + dev.signal_on_off.emit(dev, 1, True) + dev.signal_on_off.emit(dev, 2, False) + dev.signal_on_off.emit(dev, 3, True) + dev.signal_on_off.emit(dev, 4, False) + assert len(link.client.pub) == 4 + assert link.client.pub[0] == dict( + topic="%s/state/1" % topic, payload="on", qos=0, retain=True + ) + assert link.client.pub[1] == dict( + topic="%s/state/2" % topic, payload="off", qos=0, retain=True + ) + assert link.client.pub[2] == dict( + topic="%s/state/3" % topic, payload="on", qos=0, retain=True + ) + assert link.client.pub[3] == dict( + topic="%s/state/4" % topic, payload="off", qos=0, retain=True + ) + link.client.clear() + + #----------------------------------------------------------------------- + def test_config(self, setup): + mdev, dev, link = setup.getAll(["mdev", "dev", "link"]) + + config = { + "ezio4o": { + "state_topic": "foo/{{address}}/{{button}}", + "state_payload": "{{button}} {{on}} {{on_str.upper()}}", + } + } + qos = 3 + mdev.load_config(config, qos) + + stopic = "foo/%s" % setup.addr.hex + + # Send an on/off signal + dev.signal_on_off.emit(dev, 1, True) + dev.signal_on_off.emit(dev, 2, False) + dev.signal_on_off.emit(dev, 3, True) + dev.signal_on_off.emit(dev, 4, False) + assert len(link.client.pub) == 4 + assert link.client.pub[0] == dict( + topic=stopic + "/1", payload="1 1 ON", qos=qos, retain=True + ) + assert link.client.pub[1] == dict( + topic=stopic + "/2", payload="2 0 OFF", qos=qos, retain=True + ) + assert link.client.pub[2] == dict( + topic=stopic + "/3", payload="3 1 ON", qos=qos, retain=True + ) + assert link.client.pub[3] == dict( + topic=stopic + "/4", payload="4 0 OFF", qos=qos, retain=True + ) + link.client.clear() + + #----------------------------------------------------------------------- + def test_input_on_off(self, setup): + mdev, link, proto = setup.getAll(["mdev", "link", "proto"]) + + # button in topic + qos = 2 + config = { + "ezio4o": { + "on_off_topic": "foo/{{address}}/{{button}}", + "on_off_payload": ( + '{ "cmd" : "{{json.on.lower()}}",' + '"mode" : "{{json.mode.lower()}}" }' + ), + } + } + mdev.load_config(config, qos=qos) + + mdev.subscribe(link, qos) + + # for all group standard message + for i in range(4): + # turn on + proto.clear() + payload = b'{ "on" : "ON", "mode" : "NORMAL" }' + link.publish(link.client.sub[i * 2].topic, payload, qos, retain=False) + assert len(proto.sent) == 1 + assert proto.sent[0].msg.cmd1 == 0x45 + assert proto.sent[0].msg.cmd2 == i + assert isinstance(proto.sent[0].msg, IM.message.OutStandard) + + # turn off + proto.clear() + payload = b'{ "on" : "OFF", "mode" : "FAST" }' + link.publish(link.client.sub[i * 2].topic, payload, qos, retain=False) + assert len(proto.sent) == 1 + assert proto.sent[0].msg.cmd1 == 0x46 + assert proto.sent[0].msg.cmd2 == i + assert isinstance(proto.sent[0].msg, IM.message.OutStandard) + + # test error payload + link.publish(link.client.sub[0].topic, b"asdf", qos, False) + + #----------------------------------------------------------------------- + def test_input_on_off_reason(self, setup): + mdev, link, proto = setup.getAll(["mdev", "link", "proto"]) + + # button in topic + qos = 2 + config = { + "ezio4o": { + "on_off_topic": "foo/{{address}}/{{button}}", + "on_off_payload": ( + '{ "cmd" : "{{json.on.lower()}}",' + '"mode" : "{{json.mode.lower()}}",' + '"reason" : "{{json.reason}}" }' + ), + } + } + mdev.load_config(config, qos=qos) + + mdev.subscribe(link, qos) + + # for all group standard message + for i in range(4): + # turn on + proto.clear() + payload = b'{ "on" : "ON", "mode" : "FAST", "reason" : "baz" }' + link.publish(link.client.sub[i * 2].topic, payload, qos, retain=False) + assert len(proto.sent) == 1 + assert proto.sent[0].msg.cmd1 == 0x45 + assert proto.sent[0].msg.cmd2 == i + assert isinstance(proto.sent[0].msg, IM.message.OutStandard) + cb = proto.sent[0].handler.callback + assert cb.keywords == {"reason": "baz"} + + # turn off + proto.clear() + payload = b'{ "on" : "OFF", "mode" : "NORMAL", "reason" : "ABC" }' + link.publish(link.client.sub[i * 2].topic, payload, qos, retain=False) + assert len(proto.sent) == 1 + assert proto.sent[0].msg.cmd1 == 0x46 + assert proto.sent[0].msg.cmd2 == i + assert isinstance(proto.sent[0].msg, IM.message.OutStandard) + cb = proto.sent[0].handler.callback + assert cb.keywords == {"reason": "ABC"} + + # test error payload + link.publish(link.client.sub[0].topic, b"asdf", qos, False) + + #----------------------------------------------------------------------- + def test_input_scene(self, setup): + dev, mdev, link, modem = setup.getAll(["dev", "mdev", "link", "modem"]) + + qos = 2 + config = { + "ezio4o": { + "scene_topic": "foo/{{address}}/{{button}}", + "scene_payload": ('{ "cmd" : "{{json.on.lower()}}" }'), + } + } + mdev.load_config(config, qos=qos) + + mdev.subscribe(link, qos) + + # add device to the modem database + modem.add(dev) + + # add device default responder db link with modem + local = MockSceneDevice(dev.addr, False, 1, [0, 0, 1]) + remote = MockSceneDevice(modem.addr, True, 1, [0, 0, 1]) + dev.db.add_from_config(remote, local) + + # add device scenes responder db links for each group with modem + # scene/1 to scene/4 -> device responder for modem group 50 to 53 + for i in range(4): + local.group = i + 1 + local.link_data = [0, 0, i] + remote.group = 1 + dev.db.add_from_config(remote, local) + + remote.group = i + 50 + dev.db.add_from_config(remote, local) + + # for all group trigger modem ALL-Link Broadcast -> modem scene + for i in range(4): + # turn off + modem.scenes = [] + payload = b'{ "on" : "OFF" }' + link.publish(link.client.sub[i * 2 + 1].topic, payload, qos, retain=False) + assert len(modem.scenes) == 1 + assert modem.scenes[0][0] == 0 # is_on + assert modem.scenes[0][1] == i + 50 # group + + # turn on + modem.scenes = [] + payload = b'{ "on" : "ON" }' + link.publish(link.client.sub[i * 2 + 1].topic, payload, qos, retain=False) + assert modem.scenes[0][0] == 1 # is_on + assert modem.scenes[0][1] == i + 50 # group + + # test error payload + link.publish(link.client.sub[1].topic, b"asdf", qos, False) + + #----------------------------------------------------------------------- + def test_input_scene_reason(self, setup): + dev, mdev, link, modem = setup.getAll(["dev", "mdev", "link", "modem"]) + + qos = 2 + config = { + "ezio4o": { + "scene_topic": "foo/{{address}}/{{button}}", + "scene_payload": ( + '{ "cmd" : "{{json.on.lower()}}",' '"reason" : "{{json.reason}}"}' + ), + } + } + mdev.load_config(config, qos=qos) + + mdev.subscribe(link, qos) + + # add device to the modem database + modem.add(dev) + + # add device default responder db link with modem + local = MockSceneDevice(dev.addr, False, 1, [0, 0, 1]) + remote = MockSceneDevice(modem.addr, True, 1, [0, 0, 1]) + dev.db.add_from_config(remote, local) + + # add device scenes responder db links for each group with modem + # scene/1 to scene/4 -> device responder for modem group 50 to 53 + for i in range(4): + local.group = i + 1 + local.link_data = [0, 0, i] + remote.group = 1 + dev.db.add_from_config(remote, local) + + remote.group = i + 50 + dev.db.add_from_config(remote, local) + + # for all group trigger modem ALL-Link Broadcast -> modem scene + for i in range(4): + # turn off + modem.scenes = [] + payload = b'{ "on" : "OFF", "reason" : "a b c" }' + link.publish(link.client.sub[i * 2 + 1].topic, payload, qos, retain=False) + assert len(modem.scenes) == 1 + assert modem.scenes[0][0] == 0 # is_on + assert modem.scenes[0][1] == i + 50 # group + assert modem.scenes[0][2] == "a b c" # reason + + # turn on + modem.scenes = [] + payload = b'{ "on" : "ON", "reason" : "zyx" }' + link.publish(link.client.sub[i * 2 + 1].topic, payload, qos, retain=False) + assert len(modem.scenes) == 1 + assert modem.scenes[0][0] == 1 # is_on + assert modem.scenes[0][1] == i + 50 # group + assert modem.scenes[0][2] == "zyx" # reason + + # test error payload + link.publish(link.client.sub[1].topic, b"asdf", qos, False) + + +#=========================================================================== diff --git a/tests/mqtt/test_config.py b/tests/mqtt/test_config.py index 0a1d5ad8..48b8821b 100644 --- a/tests/mqtt/test_config.py +++ b/tests/mqtt/test_config.py @@ -14,9 +14,9 @@ def test_find(self, tmpdir): modem = H.main.MockModem(tmpdir) addr = IM.Address(1, 2, 3) - types = ["BatterySensor", "Dimmer", "FanLinc", "IOLinc", "KeypadLinc", - "Leak", "Motion", "Outlet", "SmokeBridge", "Switch", - "Thermostat"] + types = ["BatterySensor", "Dimmer", "EZIO4O", "FanLinc", "IOLinc", + "KeypadLinc","Leak", "Motion", "Outlet", "SmokeBridge", + "Switch","Thermostat"] instances = [] for t in types: dev = getattr(IM.device, t) diff --git a/tests/util/helpers/main.py b/tests/util/helpers/main.py index 981b1e7b..4f6cfd30 100644 --- a/tests/util/helpers/main.py +++ b/tests/util/helpers/main.py @@ -16,9 +16,25 @@ def __init__(self, save_path): self.addr = IM.Address(0x20, 0x30, 0x40) self.save_path = str(save_path) self.scenes = [] + self.devices = {} + self.device_names = {} - def scene(self, is_on, group, num_retry=3, on_done=None): - self.scenes.append((is_on, group)) + def add(self, device): + self.devices[device.addr.id] = device + if device.name: + self.device_names[device.name] = device + + def find(self, addr): + device = self.devices.get(addr.id, None) + return device + + def remove(self, device): + self.devices.pop(device.addr.id, None) + if device.name: + self.device_names.pop(device.name, None) + + def scene(self, is_on, group, num_retry=3, on_done=None, reason=""): + self.scenes.append((is_on, group, reason)) #===========================================================================