Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Itho HRU400 heat recovery unit (868MHz) #145

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions RFXtrx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,49 @@ def send_command(self, transport, sound):
pkt.set_transmit(self.subtype, 0, self.id1, self.id2, sound)
transport.send(pkt.data)


class FanDevice(RFXtrxDevice):
""" Concrete class for a Fan control device """
def __init__(self, pkt):
super().__init__(pkt)
self.subtype = pkt.subtype
self.id_combined = pkt.id_combined

def send_low(self, transport):
""" Send a 'Low' command using the given transport """
self.send_command(transport, lowlevel.Fan.Commands.LOW.value)

def send_medium(self, transport):
""" Send a 'Medium' command using the given transport """
self.send_command(transport, lowlevel.Fan.Commands.MEDIUM.value)

def send_high(self, transport):
""" Send a 'High' command using the given transport """
self.send_command(transport, lowlevel.Fan.Commands.HIGH.value)

def send_timer10(self, transport):
""" Send a 'Timer 10 min' command using the given transport """
self.send_command(transport, lowlevel.Fan.Commands.TIMER10.value)

def send_timer20(self, transport):
""" Send a 'Timer 20 min' command using the given transport """
self.send_command(transport, lowlevel.Fan.Commands.TIMER20.value)

def send_timer30(self, transport):
""" Send a 'Timer 30 min' command using the given transport """
self.send_command(transport, lowlevel.Fan.Commands.TIMER30.value)

def send_command(self, transport, command):
""" Send a command using the given transport """
pkt = lowlevel.Fan()
pkt.set_transmit(
self.subtype,
self.id_combined,
command
)
transport.send(pkt.data)


###############################################################################
# get_device_from_pkt method
###############################################################################
Expand All @@ -466,6 +509,8 @@ def get_device_from_pkt(pkt):
device = SecurityDevice(pkt)
elif isinstance(pkt, lowlevel.Funkbus):
device = FunkDevice(pkt)
elif isinstance(pkt, lowlevel.Fan):
device = FanDevice(pkt)
else:
device = RFXtrxDevice(pkt)
return device
Expand Down Expand Up @@ -911,6 +956,7 @@ def reset(self):
def close(self):
""" close connection to rfxtrx device """
self._run_event.clear()
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()


Expand Down
115 changes: 114 additions & 1 deletion RFXtrx/lowlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
# pylint: disable=C0302,R0902,R0903,R0911,R0913
# pylint: disable= too-many-lines, too-many-statements

from enum import Enum

###############################################################################
# Packet class
###############################################################################
Expand Down Expand Up @@ -140,7 +142,13 @@ class Status(Packet):
],
[
"keeloq",
"homeconfort"
"homeconfort",
"undecoded",
"undecoded",
"itho rft", # 868Mhz
"undecoded", # Orcon 868MHz
"undecoded", # Itho CVE, HRU ECO 868MHz
"undecoded" # Itho_CVE RFT 868MHz
]
]
"""
Expand Down Expand Up @@ -3014,6 +3022,110 @@ def _set_strings(self):
else:
self.time_string = self.__UNKNOWN_TIME.format(self.time)

###############################################################################
# Fan class
###############################################################################


class Fan(Packet):
"""
Data class for the Fan packet type
"""

class Types(Enum):
""" Type constants """
ITHO_RFT = 0x0D

class Commands(Enum):
""" Command constants """
LOW = 0x01
MEDIUM = 0x02
HIGH = 0x03
TIMER10 = 0x04
TIMER20 = 0x05
TIMER30 = 0x06
JOIN = 0x09
LEAVE = 0x0A
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not add enums here. If we want them, they should be added to all types.


TYPES = {0x0D: 'Itho RFT'}

COMMANDS = {0x01: 'Low',
0x02: 'Medium',
0x03: 'High',
0x04: 'Timer 10 min',
0x05: 'Timer 20 min',
0x06: 'Timer 30 min',
0x09: 'Join',
0x0A: 'Leave'}

def __str__(self):
return ("Fan [subtype={0}, seqnbr={1}, id={2}, cmnd={3}]") \
.format(self.type_string, self.seqnbr, self.id_string,
self.cmnd_string)

def __init__(self):
"""Constructor"""
super().__init__()
self.id1 = None
self.id2 = None
self.id3 = None
self.id_combined = None
self.cmnd = None
self.cmnd_string = None

def load_receive(self, data):
"""Load data from a bytearray"""
self.data = data
self.packetlength = data[0]
self.packettype = data[1]
self.subtype = data[2]
self.seqnbr = data[3]
self.id1 = data[4]
self.id2 = data[5]
self.id3 = data[6]
self.id_combined = (self.id1 << 16) + (self.id2 << 8) + self.id3
self.cmnd = data[7]
self._set_strings()

def set_transmit(self, subtype, id_combined, cmnd):
"""Load data from individual data fields"""
self.packetlength = 0x08
self.packettype = 0x17
self.subtype = subtype
self.seqnbr = 0
self.id_combined = id_combined
self.id1 = id_combined >> 16 & 0xff
self.id2 = id_combined >> 8 & 0xff
self.id3 = id_combined & 0xff
self.cmnd = cmnd
self.data = bytearray([self.packetlength,
self.packettype,
self.subtype,
self.seqnbr,
self.id1, self.id2, self.id3,
self.cmnd,
0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00])
self._set_strings()

def _set_strings(self):
self.id_string = "{0:06x}".format(self.id_combined)
if self.subtype in self.TYPES:
self.type_string = self.TYPES[self.subtype]
else:
# Degrade nicely for yet unknown subtypes
self.type_string = self._UNKNOWN_TYPE.format(self.packettype,
self.subtype)

if self.cmnd is not None:
if (self.subtype == self.Types.ITHO_RFT.value
and self.cmnd in self.COMMANDS):
self.cmnd_string = self.COMMANDS[self.cmnd]
else:
self.cmnd_string = self._UNKNOWN_CMND.format(self.cmnd)


###############################################################################

PACKET_TYPES = {
0x01: Status,
Expand All @@ -3025,6 +3137,7 @@ def _set_strings(self):
0x14: Lighting5,
0x15: Lighting6,
0x16: Chime,
0x17: Fan,
0x19: RollerTrol,
0x1A: Rfy,
0x1E: Funkbus,
Expand Down
4 changes: 3 additions & 1 deletion examples/receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ def main():
rfxcom_device = sys.argv[1]
else:
rfxcom_device = '/dev/serial/by-id/usb-RFXCOM_RFXtrx433_A1Y0NJGR-if00-port0'
rfxcom_device = ('192.168.2.169', 10001)
rfxcom_device = ('192.168.2.247', 10001)

modes_list = sys.argv[2].split() if len(sys.argv) > 2 else None
print ("modes: ", modes_list)
core = RFXtrx.Core(rfxcom_device, print_callback, modes=modes_list)
core = RFXtrx.Connect(rfxcom_device, print_callback, transport_protocol=RFXtrx.PyNetworkTransport, modes=modes_list)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't really have both serial and network in same example.

print (core)
while True:
Expand Down
33 changes: 25 additions & 8 deletions examples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,37 @@
# along with pyRFXtrx. See the file COPYING.txt in the distribution.
# If not, see <http://www.gnu.org/licenses/>.

import logging
import sys
sys.path.append("../")

NOT TESTED


from RFXtrx.pyserial import PySerialTransport
from RFXtrx import LightingDevice
from RFXtrx import PySerialTransport, PyNetworkTransport
from RFXtrx import FanDevice
from time import sleep

logging.basicConfig(level=logging.DEBUG)

transport = PySerialTransport('/dev/cu.usbserial-05VN8GHS')
transport = PyNetworkTransport(('192.168.2.247', 10001))
Copy link
Collaborator

@elupus elupus May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these extra stuff. Pretty much all changes in this file.

transport.reset()

transport.send(b'\x0D\x00\x00\x01\x02\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00')
event = transport.receive_blocking()
print(event)

#transport.send(b'\x0D\x00\x00\x03\x07\x00\x00'
# b'\x00\x00\x00\x00\x00\x00\x00')
#event = transport.receive_blocking()
#print(event)

while True:
event = transport.receive_blocking()
if isinstance(event.device, LightingDevice):
print(event)
if hasattr(event, 'device') and isinstance(event.device, FanDevice):
sleep(5)
event.device.send_off(transport)
ack = transport.receive_blocking()
print(f"Sending high command")
event.device.send_high(transport)

# transport.close()

35 changes: 35 additions & 0 deletions tests/test_fan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from unittest import TestCase

import RFXtrx
from RFXtrx.lowlevel import Fan

class packetTestCase(TestCase):

def test_parse_bytes(self):
data = [0x08, 0x17, 0x0D, 0x00, 0x12, 0x34, 0x56, 0x01, 0x00]
packet = RFXtrx.lowlevel.parse(data)

self.assertEqual(Fan, type(packet))
self.assertEqual(packet.id1, 0x12)
self.assertEqual(packet.id2, 0x34)
self.assertEqual(packet.id3, 0x56)
self.assertEqual(packet.id_string,'123456')
self.assertEqual(packet.cmnd, 0x01)
self.assertEqual(packet.cmnd_string, 'Low')
self.assertEqual(str(packet), 'Fan [subtype=Itho RFT, seqnbr=0, id=123456, cmnd=Low]')


def test_set_transmit(self):
packet = RFXtrx.lowlevel.Fan()
packet.set_transmit(0x0D, 0x123456, Fan.Commands.MEDIUM.value)
print(packet)

self.assertEqual(packet.packetlength, 8)
self.assertEqual(packet.packettype, 0x17)
self.assertEqual(packet.subtype, 0x0D)
self.assertEqual(packet.id_combined, 0x123456)
self.assertEqual(packet.id1, 0x12)
self.assertEqual(packet.id2, 0x34)
self.assertEqual(packet.id3, 0x56)
self.assertEqual(packet.cmnd, 0x02)
self.assertEqual(packet.data, bytearray([0x08, 0x17, 0x0D, 0x00, 0x12, 0x34, 0x56, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]))