From e41b167eab6333150007d2c82747234068a899f4 Mon Sep 17 00:00:00 2001 From: farmio Date: Sun, 20 Feb 2022 21:40:04 +0100 Subject: [PATCH 1/8] add IP-Secure tunnelling frame parsers --- changelog.md | 4 + examples/ip_secure_calculations.py | 331 +++++++++++++++++++++++++++++ xknx/knxip/knxip.py | 16 ++ xknx/knxip/knxip_enum.py | 17 ++ xknx/knxip/secure_wrapper.py | 88 ++++++++ xknx/knxip/session_authenticate.py | 72 +++++++ xknx/knxip/session_request.py | 61 ++++++ xknx/knxip/session_response.py | 72 +++++++ xknx/knxip/session_status.py | 63 ++++++ 9 files changed, 724 insertions(+) create mode 100644 examples/ip_secure_calculations.py create mode 100644 xknx/knxip/secure_wrapper.py create mode 100644 xknx/knxip/session_authenticate.py create mode 100644 xknx/knxip/session_request.py create mode 100644 xknx/knxip/session_response.py create mode 100644 xknx/knxip/session_status.py diff --git a/changelog.md b/changelog.md index 8d46a38c6..b617b11c6 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,10 @@ ## Unreleased changes +### Protocol + +- add SessionRequest, SessionResponse, SessionAuthenticate, SessionStatus, SecureWrapper Frame parser + ### Internals - Drop support for Python 3.8 to follow Home Assistant changes diff --git a/examples/ip_secure_calculations.py b/examples/ip_secure_calculations.py new file mode 100644 index 000000000..8631fcf01 --- /dev/null +++ b/examples/ip_secure_calculations.py @@ -0,0 +1,331 @@ +"""Cryptographical calculations of KNX specification example frames.""" +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric.x25519 import ( + X25519PrivateKey, + X25519PublicKey, +) +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + + +def bytes_xor(a: bytes, b: bytes) -> bytes: # pylint: disable=invalid-name + """XOR two bytes values.""" + return (int.from_bytes(a, "big") ^ int.from_bytes(b, "big")).to_bytes(len(a), "big") + + +def byte_pad(data: bytes, block_size: int) -> bytes: + """Padd data with 0x00 until its length is a multiple of block_size.""" + padding = bytes(block_size - (len(data) % block_size)) + return data + padding + + +def sha256_hash(data: bytes) -> bytes: + """Calculate SHA256 hash of data.""" + digest = hashes.Hash(hashes.SHA256()) + digest.update(data) + return digest.finalize() + + +def calculate_message_authentication_code_cbc( + key: bytes, + additional_data: bytes, + payload: bytes = bytes(), + block_0: bytes = bytes(16), + # counter_0: bytes = bytes(16), +) -> bytes: + """Calculate the message authentication code (MAC) for a message with AES-CBC.""" + blocks = ( + block_0 + len(additional_data).to_bytes(2, "big") + additional_data + payload + ) + y_cipher = Cipher(algorithms.AES(key), modes.CBC(bytes(16))) + y_encryptor = y_cipher.encryptor() + y_blocks = ( + y_encryptor.update(byte_pad(blocks, block_size=16)) + y_encryptor.finalize() + ) + # only calculate, no ctr encryption + return y_blocks[-16:] + # s_cipher = Cipher(algorithms.AES(key), modes.CTR(counter_0)) + # s_encryptor = s_cipher.encryptor() + # return s_encryptor.update(y_blocks[-16:]) + s_encryptor.finalize() + + +def encrypt_data_ctr( + key: bytes, + payload: bytes, + counter_0: bytes = bytes(16), +) -> bytes: + """ + Encrypt data with AES-CTR. + + Payload is expected `MAC CBC + Plain KNX/IP frame` or `MAC CBC` only. + MAC shall be encrypted with coutner 0, KNXnet/IP frame with incremented counters. + Encrypted MAC is appended to the end of encrypted payload data (if there is any). + """ + s_cipher = Cipher(algorithms.AES(key), modes.CTR(counter_0)) + s_encryptor = s_cipher.encryptor() + mac = s_encryptor.update(payload[:16]) + data = s_encryptor.update(payload[16:]) + s_encryptor.finalize() + return data + mac + + +def decrypt_ctr( + session_key: bytes, + payload: bytes, + counter_0: bytes = bytes(16), +) -> bytes: + """ + Decrypt data from SecureWrapper. + + MAC is expected to be the last 16 octets of the payload. This will be sliced and + decoded first with counter 0. + Returns a tuple of (KNX/IP frame bytes, MAC TR for verification). + """ + cipher = Cipher(algorithms.AES(session_key), modes.CTR(counter_0)) + decryptor = cipher.decryptor() + mac_tr = decryptor.update(payload[-16:]) # MAC is encrypted with counter 0 + decrypted_data = decryptor.update(payload[:-16]) + decryptor.finalize() + + return (decrypted_data, mac_tr) + + +def calculate_wrapper( + session_key: bytes, + encapsulated_frame: bytes, + secure_session_id: bytes = bytes.fromhex("00 01"), + sequence_number: bytes = bytes.fromhex("00 00 00 00 00 00"), + serial_number: bytes = bytes.fromhex("00 fa 12 34 56 78"), + message_tag: bytes = bytes.fromhex("af fe"), +) -> bytes: + """Calculate the payload and mac for a secure wrapper.""" + print("# SecureWrapper") + + total_length = ( + 6 # KNX/IP Header + + len(secure_session_id) + + len(sequence_number) + + len(serial_number) + + len(message_tag) + + len(encapsulated_frame) + + 16 # MAC + ) + wrapper_header = bytes.fromhex("06 10 09 50") + total_length.to_bytes(2, "big") + + a_data = wrapper_header + secure_session_id + p_data = encapsulated_frame + q_payload_length = len(p_data).to_bytes(2, "big") + + b_0_secure_wrapper = ( + sequence_number + serial_number + message_tag + q_payload_length + ) + ctr_0_secure_wrapper = ( + sequence_number + serial_number + message_tag + bytes.fromhex("ff") + bytes(1) + ) # last octet is the coutner to increment by 1 each step + + mac_cbc = calculate_message_authentication_code_cbc( + session_key, + additional_data=a_data, + payload=p_data, + block_0=b_0_secure_wrapper, + ) + encrypted_data = encrypt_data_ctr( + session_key, + payload=mac_cbc + p_data, + counter_0=ctr_0_secure_wrapper, + ) + + # encrypted data + # ctr_1_secure_wrapper = (int.from_bytes(ctr_0_secure_wrapper, "big") + 1).to_bytes( + # 16, "big" + # ) + # cipher = Cipher(algorithms.AES(session_key), modes.CTR(ctr_1_secure_wrapper)) + # encryptor = cipher.encryptor() + # enc_frame = encryptor.update(p_data) + encryptor.finalize() + + print(f"encrypted_data: {encrypted_data[16:].hex()}") + + dec_frame, mac_tr = decrypt_ctr( + session_key, + payload=encrypted_data, + counter_0=ctr_0_secure_wrapper, + ) + assert dec_frame == p_data + assert mac_tr == mac_cbc # verification of MAC + + return encrypted_data + + +def main(): + """Recalculate KNX specification example frames.""" + ################ + # SessionRequest + ################ + print("# SessionRequest") + client_private_key = X25519PrivateKey.from_private_bytes( + bytes.fromhex( + "b8 fa bd 62 66 5d 8b 9e 8a 9d 8b 1f 4b ca 42 c8 c2 78 9a 61 10 f5 0e 9d d7 85 b3 ed e8 83 f3 78" + ) + ) + client_public_key_raw = client_private_key.public_key().public_bytes( + serialization.Encoding.Raw, serialization.PublicFormat.Raw + ) # append to SessionRequest (15-46) + print(f"Public key: {client_public_key_raw.hex()}") + + ################# + # SessionResponse + ################# + print("# SessionResponse") + peer_public_key = X25519PublicKey.from_public_bytes( + bytes.fromhex( + "bd f0 99 90 99 23 14 3e f0 a5 de 0b 3b e3 68 7b c5 bd 3c f5 f9 e6 f9 01 69 9c d8 70 ec 1f f8 24" + ) + ) + pub_keys_xor = bytes_xor( + client_public_key_raw, + peer_public_key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ), + ) + + peer_device_authentication_password = "trustme" + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=16, + salt=b"device-authentication-code.1.secure.ip.knx.org", + iterations=65536, + ) + # TODO: is encoding "latin-1" correct? (also used for device names in DIBs) + peer_device_authentication_code = kdf.derive( + peer_device_authentication_password.encode("latin-1") + ) + assert peer_device_authentication_code == bytes.fromhex( + "e1 58 e4 01 20 47 bd 6c c4 1a af bc 5c 04 c1 fc" + ) + + _a_data = bytes.fromhex( + "06 10 09 52 00 38 00 01 b7 52 be 24 64 59 26 0f 6b 0c 48 01 fb d5 a6 75 99 f8 3b 40 57 b3 ef 1e 79 e4 69 ac 17 23 4e 15" + ) + ctr_0_session_response = ( + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\x00" + ) + message_authentication_code_cbc = calculate_message_authentication_code_cbc( + peer_device_authentication_code, + additional_data=_a_data[:8] + + pub_keys_xor, # knx_ip_header + secure_session_id + bytes_xor(client_pub_key, server_pub_key) + ) + message_authentication_code = encrypt_data_ctr( + peer_device_authentication_code, + message_authentication_code_cbc, + counter_0=ctr_0_session_response, + ) + assert message_authentication_code == bytes.fromhex( + "a9 22 50 5a aa 43 61 63 57 0b d5 49 4c 2d f2 a3" + ) + + ecdh_shared_secret = client_private_key.exchange(peer_public_key) + print(f"ECDH shared secret: {ecdh_shared_secret.hex()}") + + session_key = sha256_hash(ecdh_shared_secret)[:16] + print(f"Session key: {session_key.hex()}") + + _, mac_tr = decrypt_ctr( + peer_device_authentication_code, + payload=message_authentication_code, + counter_0=ctr_0_session_response, + ) + assert mac_tr == message_authentication_code_cbc # verification of MAC + + ##################### + # SessionAuthenticate + ##################### + # shall be wrapped in SecureWrapper + print("# SessionAuthenticate") + + password_string = "secret" + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=16, + salt=b"user-password.1.secure.ip.knx.org", + iterations=65536, + ) + password_hash = kdf.derive(password_string.encode("latin-1")) + print(f"Password hash: {password_hash.hex(' ')}") + assert password_hash == bytes.fromhex( + "03 fc ed b6 66 60 25 1e c8 1a 1a 71 69 01 69 6a" + ) + + authenticate_wrapper = bytes.fromhex( + "06 10 09 50 00 3e 00 01 00 00 00 00 00 00 00 fa 12 34 56 78 af fe" + "79 15 a4 f3 6e 6e 42 08" + "d2 8b 4a 20 7d 8f 35 c0" + "d1 38 c2 6a 7b 5e 71 69" + "52 db a8 e7 e4 bd 80 bd" + "7d 86 8a 3a e7 87 49 de" + ) + session_authenticate = bytes.fromhex( + "06 10 09 53 00 18 00 01" + "1f 1d 59 ea 9f 12 a1 52 e5 d9 72 7f 08 46 2c de" # MAC + ) + mac_cbc_authenticate = calculate_message_authentication_code_cbc( + password_hash, + additional_data=session_authenticate[:8] + pub_keys_xor, + block_0=bytes(16), + ) + ctr_0_session_authenticate = ( + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\x00" + ) + assert ( + encrypt_data_ctr( + password_hash, + mac_cbc_authenticate, + counter_0=ctr_0_session_authenticate, + ) + == session_authenticate[8:] + ) + assert ( + calculate_wrapper( + session_key, + encapsulated_frame=session_authenticate, + secure_session_id=authenticate_wrapper[6:8], + sequence_number=authenticate_wrapper[8:14], + serial_number=authenticate_wrapper[14:20], + message_tag=authenticate_wrapper[20:22], + ) + == authenticate_wrapper[22:] + ) + # verify MAC + _, mac_tr = decrypt_ctr( + password_hash, + payload=session_authenticate[8:], + counter_0=ctr_0_session_authenticate, + ) + assert mac_tr == mac_cbc_authenticate + + ############### + # SessionStatus + ############### + # shall be wrapped in SecureWrapper + print("# SessionStatus") + + status_wrapper = bytes.fromhex( + "06 10 09 50 00 2e 00 01 00 00 00 00 00 00 00 fa aa aa aa aa af fe" + "26 15 6d b5 c7 49 88 8f" + "a3 73 c3 e0 b4 bd e4 49" + "7c 39 5e 4b 1c 2f 46 a1" + ) + session_status = bytes.fromhex("06 10 09 54 00 08 00 00") + assert ( + calculate_wrapper( + session_key, + encapsulated_frame=session_status, + secure_session_id=status_wrapper[6:8], + sequence_number=status_wrapper[8:14], + serial_number=status_wrapper[14:20], + message_tag=status_wrapper[20:22], + ) + == status_wrapper[22:] + ) + + +if __name__ == "__main__": + main() diff --git a/xknx/knxip/knxip.py b/xknx/knxip/knxip.py index 5767b64c8..569071b94 100644 --- a/xknx/knxip/knxip.py +++ b/xknx/knxip/knxip.py @@ -24,6 +24,11 @@ from .routing_indication import RoutingIndication from .search_request import SearchRequest from .search_response import SearchResponse +from .secure_wrapper import SecureWrapper +from .session_authenticate import SessionAuthenticate +from .session_request import SessionRequest +from .session_response import SessionResponse +from .session_status import SessionStatus from .tunnelling_ack import TunnellingAck from .tunnelling_request import TunnellingRequest @@ -74,6 +79,17 @@ def init(self, service_type_ident: KNXIPServiceType) -> KNXIPBody: # Routing elif service_type_ident == KNXIPServiceType.ROUTING_INDICATION: body = RoutingIndication(self.xknx) + # Secure + elif service_type_ident == KNXIPServiceType.SECURE_WRAPPER: + body = SecureWrapper(self.xknx) + elif service_type_ident == KNXIPServiceType.SESSION_AUTHENTICATE: + body = SessionAuthenticate(self.xknx) + elif service_type_ident == KNXIPServiceType.SESSION_REQUEST: + body = SessionRequest(self.xknx) + elif service_type_ident == KNXIPServiceType.SESSION_RESPONSE: + body = SessionResponse(self.xknx) + elif service_type_ident == KNXIPServiceType.SESSION_STATUS: + body = SessionStatus(self.xknx) else: raise CouldNotParseKNXIP( f"KNXIPServiceType not implemented: {service_type_ident.name}" diff --git a/xknx/knxip/knxip_enum.py b/xknx/knxip/knxip_enum.py index 6b5913cec..6f1815ba1 100644 --- a/xknx/knxip/knxip_enum.py +++ b/xknx/knxip/knxip_enum.py @@ -203,3 +203,20 @@ class DIBServiceFamily(Enum): # Object Server'. OBJECT_SERVER = 0x08 + + +class SecureSessionStatusCode(Enum): + """Enum class for KNX/IP Secure session status codes.""" + + # The user could successfully be authenticated + STATUS_AUTHENTICATION_SUCCESS = 0x00 + # An error occurred during secure session handshake + STATUS_AUTHENTICATION_FAILED = 0x01 + # The session is not (yet) authenticated + STATUS_UNAUTHENTICATED = 0x02 + # A timeout occurred during secure session handshake + STATUS_TIMEOUT = 0x03 + # The secure session shall be closed + STATUS_CLOSE = 0x04 + # Prevent inactivity on the secure session closing it with timeout error + STATUS_KEEPALIVE = 0x05 diff --git a/xknx/knxip/secure_wrapper.py b/xknx/knxip/secure_wrapper.py new file mode 100644 index 000000000..aa88dc237 --- /dev/null +++ b/xknx/knxip/secure_wrapper.py @@ -0,0 +1,88 @@ +""" +Module for Serialization and Deserialization of KNX Secure Wrapper. + +When KNXnet/IP frames are to be sent over a secured connection, each frame including +the KNXnet/IP header shall be completely encapsulated as encrypted payload inside a +SECURE_WRAPPER frame that adds some extra information needed to decrypt the frame and +for ensuring data integrity and freshness. +""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +from xknx.exceptions import CouldNotParseKNXIP + +from .body import KNXIPBody +from .knxip_enum import KNXIPServiceType + +if TYPE_CHECKING: + from xknx.xknx import XKNX + +# 2 octets secure session identifier +# 6 octets sequence information +# 6 octets KNX serial number +# 2 octets message tag +SECURITY_INFORMATION_LENGTH: Final = 16 +# 6 octets for KNX/IP header +# 2 for smallest payload (eg. SessionStatus) +# 16 for message authentication code +MINIMUM_PAYLOAD_LENGTH: Final = 24 + + +class SecureWrapper(KNXIPBody): + """Representation of a KNX Secure Wrapper.""" + + SERVICE_TYPE = KNXIPServiceType.SECURE_WRAPPER + + def __init__( + self, + xknx: XKNX, + secure_session_id: int = 0, + sequence_information: int = 0, + serial_number: int = 0, + message_tag: int = 0, + encrypted_data: bytes = bytes(0), + ): + """Initialize SecureWrapper object.""" + super().__init__(xknx) + self.secure_session_id = secure_session_id + self.sequence_information = sequence_information + self.serial_number = serial_number + self.message_tag = message_tag + self.encrypted_data = encrypted_data + + def calculated_length(self) -> int: + """Get length of KNX/IP body.""" + return SECURITY_INFORMATION_LENGTH + len(self.encrypted_data) + + def from_knx(self, raw: bytes) -> int: + """Parse/deserialize from KNX/IP raw data.""" + if len(raw) < (SECURITY_INFORMATION_LENGTH + MINIMUM_PAYLOAD_LENGTH): + raise CouldNotParseKNXIP("SecureWrapper has invalid length") + self.secure_session_id = int.from_bytes(raw[:2], "big") + self.sequence_information = int.from_bytes(raw[2:8], "big") + self.serial_number = int.from_bytes(raw[8:14], "big") + self.message_tag = int.from_bytes(raw[14:16], "big") + self.encrypted_data = raw[16:] + return len(raw) + + def to_knx(self) -> bytes: + """Serialize to KNX/IP raw data.""" + return ( + self.secure_session_id.to_bytes(2, "big") + + self.sequence_information.to_bytes(6, "big") + + self.serial_number.to_bytes(6, "big") + + self.message_tag.to_bytes(2, "big") + + self.encrypted_data + ) + + def __str__(self) -> str: + """Return object as readable string.""" + return ( + f"" + ) diff --git a/xknx/knxip/session_authenticate.py b/xknx/knxip/session_authenticate.py new file mode 100644 index 000000000..d9371141e --- /dev/null +++ b/xknx/knxip/session_authenticate.py @@ -0,0 +1,72 @@ +""" +Module for Serialization and Deserialization of KNX Session Authenticates. + +The SESSION_AUTHENTICATE shall be sent by the KNXnet/IP secure client to the +control endpoint of the KNXnet/IP secure server after the Diffie-Hellman handshake +to authenticate the user against the server device. +""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +from xknx.exceptions import CouldNotParseKNXIP + +from .body import KNXIPBody +from .knxip_enum import KNXIPServiceType + +if TYPE_CHECKING: + from xknx.xknx import XKNX + + +class SessionAuthenticate(KNXIPBody): + """Representation of a KNX Session Authenticate.""" + + SERVICE_TYPE = KNXIPServiceType.SESSION_AUTHENTICATE + LENGTH: Final = 18 + + def __init__( + self, + xknx: XKNX, + user_id: int = 0x02, + message_authentication_code: bytes = bytes(16), + ): + """Initialize SessionAuthenticate object.""" + super().__init__(xknx) + # 00h: Reserved, shall not be used + # 01h: Management level access + # 02h – 7Fh: User level access + # 80h – FFh: Reserved, shall not be used + self.user_id = user_id # TODO maybe use an Enum instead of int + self.message_authentication_code = message_authentication_code + + def calculated_length(self) -> int: + """Get length of KNX/IP body.""" + return SessionAuthenticate.LENGTH + + def from_knx(self, raw: bytes) -> int: + """Parse/deserialize from KNX/IP raw data.""" + if len(raw) != SessionAuthenticate.LENGTH: + raise CouldNotParseKNXIP("SessionAuthenticate has wrong length") + self.user_id = raw[1] + self.message_authentication_code = raw[2:] + return SessionAuthenticate.LENGTH + + def to_knx(self) -> bytes: + """Serialize to KNX/IP raw data.""" + return ( + bytes( + ( + 0x00, # reserved + self.user_id, + ) + ) + + self.message_authentication_code + ) + + def __str__(self) -> str: + """Return object as readable string.""" + return ( + f"" + ) diff --git a/xknx/knxip/session_request.py b/xknx/knxip/session_request.py new file mode 100644 index 000000000..59fb6ac4b --- /dev/null +++ b/xknx/knxip/session_request.py @@ -0,0 +1,61 @@ +""" +Module for Serialization and Deserialization of KNX Session Requests. + +The SESSION_REQUEST is used to initiate the secure connection setup +handshake for a new secure communication session. +""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +from xknx.exceptions import CouldNotParseKNXIP + +from .body import KNXIPBody +from .hpai import HPAI +from .knxip_enum import HostProtocol, KNXIPServiceType + +if TYPE_CHECKING: + from xknx.xknx import XKNX + + +class SessionRequest(KNXIPBody): + """Representation of a KNX Session Request.""" + + SERVICE_TYPE = KNXIPServiceType.SESSION_REQUEST + # 8 octets for the UDP/TCP HPAI and 32 octets for the client’s ECDH public value + LENGTH: Final = 40 + + def __init__( + self, + xknx: XKNX, + control_endpoint: HPAI = HPAI(protocol=HostProtocol.IPV4_TCP), + ecdh_client_public_key: bytes = bytes(32), + ): + """Initialize SessionRequest object.""" + super().__init__(xknx) + self.control_endpoint = control_endpoint + self.ecdh_client_public_key = ecdh_client_public_key + + def calculated_length(self) -> int: + """Get length of KNX/IP body.""" + return SessionRequest.LENGTH + + def from_knx(self, raw: bytes) -> int: + """Parse/deserialize from KNX/IP raw data.""" + if len(raw) != SessionRequest.LENGTH: + raise CouldNotParseKNXIP("SessionRequest has wrong length") + pos = self.control_endpoint.from_knx(raw) + self.ecdh_client_public_key = raw[pos:] + return SessionRequest.LENGTH + + def to_knx(self) -> bytes: + """Serialize to KNX/IP raw data.""" + return self.control_endpoint.to_knx() + self.ecdh_client_public_key + + def __str__(self) -> str: + """Return object as readable string.""" + return ( + f"" + ) diff --git a/xknx/knxip/session_response.py b/xknx/knxip/session_response.py new file mode 100644 index 000000000..fb240c3c1 --- /dev/null +++ b/xknx/knxip/session_response.py @@ -0,0 +1,72 @@ +""" +Module for Serialization and Deserialization of KNX Session Responses. + +The SESSION_RESPONSE shall be sent by the KNXnet/IP secure server to the secure +client's control endpoint in response to a received secure session request frame. +""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +from xknx.exceptions import CouldNotParseKNXIP + +from .body import KNXIPBody +from .knxip_enum import KNXIPServiceType + +if TYPE_CHECKING: + from xknx.xknx import XKNX + + +class SessionResponse(KNXIPBody): + """Representation of a KNX Session Response.""" + + SERVICE_TYPE = KNXIPServiceType.SESSION_RESPONSE + # 2 octets secure session identifier + # 32 octets for the servers’s ECDH public value + # 16 octets for the message authentication code + LENGTH: Final = 50 + + def __init__( + self, + xknx: XKNX, + secure_session_id: int = 0, + ecdh_server_public_key: bytes = bytes(32), + message_authentication_code: bytes = bytes(16), + ): + """Initialize SessionResponse object.""" + super().__init__(xknx) + self.ecdh_server_public_key = ecdh_server_public_key + # secure session identifier 0 shall in general be reserved for + # multicast data and shall not be used for unicast connections + self.secure_session_id = secure_session_id + self.message_authentication_code = message_authentication_code + + def calculated_length(self) -> int: + """Get length of KNX/IP body.""" + return SessionResponse.LENGTH + + def from_knx(self, raw: bytes) -> int: + """Parse/deserialize from KNX/IP raw data.""" + if len(raw) != SessionResponse.LENGTH: + raise CouldNotParseKNXIP("SessionResponse has wrong length") + self.secure_session_id = int.from_bytes(raw[:2], "big") + self.ecdh_server_public_key = raw[2:34] + self.message_authentication_code = raw[34:] + return SessionResponse.LENGTH + + def to_knx(self) -> bytes: + """Serialize to KNX/IP raw data.""" + return ( + self.secure_session_id.to_bytes(2, "big") + + self.ecdh_server_public_key + + self.message_authentication_code + ) + + def __str__(self) -> str: + """Return object as readable string.""" + return ( + f"" + ) diff --git a/xknx/knxip/session_status.py b/xknx/knxip/session_status.py new file mode 100644 index 000000000..4a83190d7 --- /dev/null +++ b/xknx/knxip/session_status.py @@ -0,0 +1,63 @@ +""" +Module for Serialization and Deserialization of KNX Session Status. + +A SESSION_STATUS may be sent by the KNXnet/IP secure server to the KNXnet/IP secure client +or by the KNXnet/IP secure client to the KNXnet/IP secure server in any stage of the +secure session handshake to indicate an error condition or status information. +""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +from xknx.exceptions import CouldNotParseKNXIP + +from .body import KNXIPBody +from .knxip_enum import KNXIPServiceType, SecureSessionStatusCode + +if TYPE_CHECKING: + from xknx.xknx import XKNX + + +class SessionStatus(KNXIPBody): + """Representation of a KNX Session Status.""" + + SERVICE_TYPE = KNXIPServiceType.SESSION_STATUS + LENGTH: Final = 2 + + def __init__( + self, + xknx: XKNX, + status: SecureSessionStatusCode = SecureSessionStatusCode.STATUS_KEEPALIVE, + ): + """Initialize SessionStatus object.""" + super().__init__(xknx) + self.status = status + + def calculated_length(self) -> int: + """Get length of KNX/IP body.""" + return SessionStatus.LENGTH + + def from_knx(self, raw: bytes) -> int: + """Parse/deserialize from KNX/IP raw data.""" + if len(raw) != SessionStatus.LENGTH: + raise CouldNotParseKNXIP("SessionStatus has wrong length") + try: + self.status = SecureSessionStatusCode(raw[0]) + except ValueError as err: + raise CouldNotParseKNXIP( + f"SessionStatus has unsupported status code: {raw[0]}" + ) from err + return SessionStatus.LENGTH + + def to_knx(self) -> bytes: + """Serialize to KNX/IP raw data.""" + return bytes( + ( + self.status.value, + 0x00, # reserved + ) + ) + + def __str__(self) -> str: + """Return object as readable string.""" + return f'' From ac85f16f7e9d278973d95643f9303ded33df9adf Mon Sep 17 00:00:00 2001 From: farmio Date: Mon, 21 Feb 2022 00:19:54 +0100 Subject: [PATCH 2/8] add `cryptography` to requirements --- requirements/production.txt | 1 + setup.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements/production.txt b/requirements/production.txt index b00a84e7c..7730378b4 100644 --- a/requirements/production.txt +++ b/requirements/production.txt @@ -1 +1,2 @@ +cryptography==35.0.0 netifaces==0.11.0 diff --git a/setup.py b/setup.py index 74fff9d10..06b0af9dc 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ with open(path.join(THIS_DIRECTORY, "xknx/__version__.py"), encoding="utf-8") as fp: exec(fp.read(), VERSION) -REQUIRES = ["netifaces>=0.11.0"] +REQUIRES = ["cryptography>=35.0.0", "netifaces>=0.11.0"] setup( name="xknx", From 5f8b96871ee90f81ebdb447e0f22629c47063b75 Mon Sep 17 00:00:00 2001 From: farmio Date: Tue, 22 Feb 2022 09:57:37 +0100 Subject: [PATCH 3/8] store MAC in extra attribute in SecureWrapper --- examples/ip_secure_calculations.py | 16 +++++++++------- xknx/knxip/secure_wrapper.py | 27 +++++++++++++++++++++------ 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/examples/ip_secure_calculations.py b/examples/ip_secure_calculations.py index 8631fcf01..2ac91f406 100644 --- a/examples/ip_secure_calculations.py +++ b/examples/ip_secure_calculations.py @@ -51,20 +51,21 @@ def calculate_message_authentication_code_cbc( def encrypt_data_ctr( key: bytes, - payload: bytes, + mac_cbc: bytes, + payload: bytes = bytes(), counter_0: bytes = bytes(16), ) -> bytes: """ Encrypt data with AES-CTR. - Payload is expected `MAC CBC + Plain KNX/IP frame` or `MAC CBC` only. + Payload is optional; expected plain KNX/IP frame bytes. MAC shall be encrypted with coutner 0, KNXnet/IP frame with incremented counters. Encrypted MAC is appended to the end of encrypted payload data (if there is any). """ s_cipher = Cipher(algorithms.AES(key), modes.CTR(counter_0)) s_encryptor = s_cipher.encryptor() - mac = s_encryptor.update(payload[:16]) - data = s_encryptor.update(payload[16:]) + s_encryptor.finalize() + mac = s_encryptor.update(mac_cbc) + data = s_encryptor.update(payload) + s_encryptor.finalize() return data + mac @@ -129,7 +130,8 @@ def calculate_wrapper( ) encrypted_data = encrypt_data_ctr( session_key, - payload=mac_cbc + p_data, + mac_cbc=mac_cbc, + payload=p_data, counter_0=ctr_0_secure_wrapper, ) @@ -215,7 +217,7 @@ def main(): ) message_authentication_code = encrypt_data_ctr( peer_device_authentication_code, - message_authentication_code_cbc, + mac_cbc=message_authentication_code_cbc, counter_0=ctr_0_session_response, ) assert message_authentication_code == bytes.fromhex( @@ -277,7 +279,7 @@ def main(): assert ( encrypt_data_ctr( password_hash, - mac_cbc_authenticate, + mac_cbc=mac_cbc_authenticate, counter_0=ctr_0_session_authenticate, ) == session_authenticate[8:] diff --git a/xknx/knxip/secure_wrapper.py b/xknx/knxip/secure_wrapper.py index aa88dc237..433cbeea5 100644 --- a/xknx/knxip/secure_wrapper.py +++ b/xknx/knxip/secure_wrapper.py @@ -25,8 +25,14 @@ SECURITY_INFORMATION_LENGTH: Final = 16 # 6 octets for KNX/IP header # 2 for smallest payload (eg. SessionStatus) -# 16 for message authentication code -MINIMUM_PAYLOAD_LENGTH: Final = 24 +MINIMUM_PAYLOAD_LENGTH: Final = 2 +MESSAGE_AUTHENTICATION_CODE_LENGTH: Final = 16 + +SECURE_WRAPPER_MINIMUM_LENGTH: Final = ( + SECURITY_INFORMATION_LENGTH + + MINIMUM_PAYLOAD_LENGTH + + MESSAGE_AUTHENTICATION_CODE_LENGTH +) class SecureWrapper(KNXIPBody): @@ -42,6 +48,7 @@ def __init__( serial_number: int = 0, message_tag: int = 0, encrypted_data: bytes = bytes(0), + message_authentication_code: bytes = bytes(16), ): """Initialize SecureWrapper object.""" super().__init__(xknx) @@ -50,20 +57,26 @@ def __init__( self.serial_number = serial_number self.message_tag = message_tag self.encrypted_data = encrypted_data + self.message_authentication_code = message_authentication_code def calculated_length(self) -> int: """Get length of KNX/IP body.""" - return SECURITY_INFORMATION_LENGTH + len(self.encrypted_data) + return ( + SECURITY_INFORMATION_LENGTH + + len(self.encrypted_data) + + MESSAGE_AUTHENTICATION_CODE_LENGTH + ) def from_knx(self, raw: bytes) -> int: """Parse/deserialize from KNX/IP raw data.""" - if len(raw) < (SECURITY_INFORMATION_LENGTH + MINIMUM_PAYLOAD_LENGTH): + if len(raw) < SECURE_WRAPPER_MINIMUM_LENGTH: raise CouldNotParseKNXIP("SecureWrapper has invalid length") self.secure_session_id = int.from_bytes(raw[:2], "big") self.sequence_information = int.from_bytes(raw[2:8], "big") self.serial_number = int.from_bytes(raw[8:14], "big") self.message_tag = int.from_bytes(raw[14:16], "big") - self.encrypted_data = raw[16:] + self.encrypted_data = raw[16:-MESSAGE_AUTHENTICATION_CODE_LENGTH] + self.message_authentication_code = raw[-MESSAGE_AUTHENTICATION_CODE_LENGTH:] return len(raw) def to_knx(self) -> bytes: @@ -74,6 +87,7 @@ def to_knx(self) -> bytes: + self.serial_number.to_bytes(6, "big") + self.message_tag.to_bytes(2, "big") + self.encrypted_data + + self.message_authentication_code ) def __str__(self) -> str: @@ -84,5 +98,6 @@ def __str__(self) -> str: f'sequence_information="{self.sequence_information}" ' f'serial_number="{self.serial_number}" ' f'message_tag="{self.message_tag}" ' - f"encrypted_data={self.encrypted_data.hex()} />" + f'encrypted_data={self.encrypted_data.hex()}" ' + f"message_authentication_code={self.message_authentication_code.hex()} />" ) From 7a6db12b9e479b7b5382ae9a379ed78cb92b6ea6 Mon Sep 17 00:00:00 2001 From: farmio Date: Tue, 22 Feb 2022 17:15:48 +0100 Subject: [PATCH 4/8] test frame parsers --- test/knxip_tests/secure_wrapper_test.py | 65 +++++++++++++++++++ test/knxip_tests/session_authenticate_test.py | 38 +++++++++++ test/knxip_tests/session_request_test.py | 38 +++++++++++ test/knxip_tests/session_response_test.py | 50 ++++++++++++++ test/knxip_tests/session_status_test.py | 35 ++++++++++ test/knxip_tests/tunnelling_ack_test.py | 2 +- test/knxip_tests/tunnelling_request_test.py | 2 +- xknx/knxip/__init__.py | 10 +++ 8 files changed, 238 insertions(+), 2 deletions(-) create mode 100644 test/knxip_tests/secure_wrapper_test.py create mode 100644 test/knxip_tests/session_authenticate_test.py create mode 100644 test/knxip_tests/session_request_test.py create mode 100644 test/knxip_tests/session_response_test.py create mode 100644 test/knxip_tests/session_status_test.py diff --git a/test/knxip_tests/secure_wrapper_test.py b/test/knxip_tests/secure_wrapper_test.py new file mode 100644 index 000000000..2f5229813 --- /dev/null +++ b/test/knxip_tests/secure_wrapper_test.py @@ -0,0 +1,65 @@ +"""Unit test for KNX/IP SecureWrapper objects.""" +from xknx import XKNX +from xknx.knxip import KNXIPFrame, SecureWrapper + + +class TestKNXIPSecureWrapper: + """Test class for KNX/IP SecureWrapper objects.""" + + def test_secure_wrapper(self): + """Test parsing and streaming secure wrapper KNX/IP packet.""" + sequence_number = bytes.fromhex("00 00 00 00 00 00") + sequence_number_int = int.from_bytes(sequence_number, "big") + knx_serial_number = bytes.fromhex("00 fa 12 34 56 78") + knx_serial_number_int = int.from_bytes(knx_serial_number, "big") + message_tag = bytes.fromhex("af fe") + message_tag_int = int.from_bytes(message_tag, "big") + encrypted_data = bytes.fromhex( + "79 15 a4 f3 6e 6e 42 08" # SessionAuthenticate Frame + "d2 8b 4a 20 7d 8f 35 c0" + "d1 38 c2 6a 7b 5e 71 69" + ) + message_authentication_code = bytes.fromhex( + "52 db a8 e7 e4 bd 80 bd 7d 86 8a 3a e7 87 49 de" + ) + raw = ( + bytes.fromhex( + "06 10 09 50 00 3e" # KNXnet/IP header + "00 01" # Secure Session Identifier + ) + + sequence_number + + knx_serial_number + + message_tag + + encrypted_data + + message_authentication_code + ) + xknx = XKNX() + knxipframe = KNXIPFrame(xknx) + knxipframe.from_knx(raw) + + assert isinstance(knxipframe.body, SecureWrapper) + assert knxipframe.body.secure_session_id == 1 + assert knxipframe.body.sequence_information == sequence_number_int + assert knxipframe.body.serial_number == knx_serial_number_int + assert knxipframe.body.message_tag == message_tag_int + assert knxipframe.body.encrypted_data == encrypted_data + assert ( + knxipframe.body.message_authentication_code == message_authentication_code + ) + + assert knxipframe.to_knx() == raw + + secure_wrapper = SecureWrapper( + xknx, + secure_session_id=1, + sequence_information=sequence_number_int, + serial_number=knx_serial_number_int, + message_tag=message_tag_int, + encrypted_data=encrypted_data, + message_authentication_code=message_authentication_code, + ) + knxipframe2 = KNXIPFrame.init_from_body(secure_wrapper) + + print(knxipframe2.to_knx().hex()) + print(raw.hex()) + assert knxipframe2.to_knx() == raw diff --git a/test/knxip_tests/session_authenticate_test.py b/test/knxip_tests/session_authenticate_test.py new file mode 100644 index 000000000..93689cb12 --- /dev/null +++ b/test/knxip_tests/session_authenticate_test.py @@ -0,0 +1,38 @@ +"""Unit test for KNX/IP SessionAuthenticate objects.""" +from xknx import XKNX +from xknx.knxip import KNXIPFrame, SessionAuthenticate + + +class TestKNXIPSessionAuthenticate: + """Test class for KNX/IP SessionAuthenticate objects.""" + + def test_session_authenticate(self): + """Test parsing and streaming session authenticate KNX/IP packet.""" + message_authentication_code = bytes.fromhex( + "1f 1d 59 ea 9f 12 a1 52" # Message Authentication Code + "e5 d9 72 7f 08 46 2c de" + ) + raw = ( + bytes.fromhex("06 10 09 53 00 18" "00 01") # KNXnet/IP header # User ID + + message_authentication_code + ) + xknx = XKNX() + knxipframe = KNXIPFrame(xknx) + knxipframe.from_knx(raw) + + assert isinstance(knxipframe.body, SessionAuthenticate) + assert knxipframe.body.user_id == 1 + assert ( + knxipframe.body.message_authentication_code == message_authentication_code + ) + + assert knxipframe.to_knx() == raw + + session_authenticate = SessionAuthenticate( + xknx, + user_id=1, + message_authentication_code=message_authentication_code, + ) + knxipframe2 = KNXIPFrame.init_from_body(session_authenticate) + + assert knxipframe2.to_knx() == raw diff --git a/test/knxip_tests/session_request_test.py b/test/knxip_tests/session_request_test.py new file mode 100644 index 000000000..b4d042a26 --- /dev/null +++ b/test/knxip_tests/session_request_test.py @@ -0,0 +1,38 @@ +"""Unit test for KNX/IP SessionRequest objects.""" +from xknx import XKNX +from xknx.knxip import HPAI, KNXIPFrame, SessionRequest +from xknx.knxip.knxip_enum import HostProtocol + + +class TestKNXIPSessionRequest: + """Test class for KNX/IP SessionRequest objects.""" + + def test_session_request(self): + """Test parsing and streaming session request KNX/IP packet.""" + public_key = bytes.fromhex( + "0a a2 27 b4 fd 7a 32 31" # Diffie-Hellman Client Public Value X + "9b a9 96 0a c0 36 ce 0e" + "5c 45 07 b5 ae 55 16 1f" + "10 78 b1 dc fb 3c b6 31" + ) + raw = ( + bytes.fromhex( + "06 10 09 51 00 2e" # KNXnet/IP header + "08 02 00 00 00 00 00 00" # HPAI TCPv4 + ) + + public_key + ) + xknx = XKNX() + knxipframe = KNXIPFrame(xknx) + knxipframe.from_knx(raw) + + assert isinstance(knxipframe.body, SessionRequest) + assert knxipframe.body.control_endpoint == HPAI(protocol=HostProtocol.IPV4_TCP) + assert knxipframe.body.ecdh_client_public_key == public_key + + assert knxipframe.to_knx() == raw + + session_request = SessionRequest(xknx, ecdh_client_public_key=public_key) + knxipframe2 = KNXIPFrame.init_from_body(session_request) + + assert knxipframe2.to_knx() == raw diff --git a/test/knxip_tests/session_response_test.py b/test/knxip_tests/session_response_test.py new file mode 100644 index 000000000..c07dc4214 --- /dev/null +++ b/test/knxip_tests/session_response_test.py @@ -0,0 +1,50 @@ +"""Unit test for KNX/IP SessionResponse objects.""" +from xknx import XKNX +from xknx.knxip import KNXIPFrame, SessionResponse + + +class TestKNXIPSessionResponse: + """Test class for KNX/IP SessionResponse objects.""" + + def test_session_response(self): + """Test parsing and streaming session response KNX/IP packet.""" + public_key = bytes.fromhex( + "bd f0 99 90 99 23 14 3e" # Diffie-Hellman Server Public Value Y + "f0 a5 de 0b 3b e3 68 7b" + "c5 bd 3c f5 f9 e6 f9 01" + "69 9c d8 70 ec 1f f8 24" + ) + message_authentication_code = bytes.fromhex( + "a9 22 50 5a aa 43 61 63" # Message Authentication Code + "57 0b d5 49 4c 2d f2 a3" + ) + raw = ( + bytes.fromhex( + "06 10 09 52 00 38" # KNXnet/IP header + "00 01" # Secure Session Identifier + ) + + public_key + + message_authentication_code + ) + xknx = XKNX() + knxipframe = KNXIPFrame(xknx) + knxipframe.from_knx(raw) + + assert isinstance(knxipframe.body, SessionResponse) + assert knxipframe.body.secure_session_id == 1 + assert knxipframe.body.ecdh_server_public_key == public_key + assert ( + knxipframe.body.message_authentication_code == message_authentication_code + ) + + assert knxipframe.to_knx() == raw + + session_response = SessionResponse( + xknx, + secure_session_id=1, + ecdh_server_public_key=public_key, + message_authentication_code=message_authentication_code, + ) + knxipframe2 = KNXIPFrame.init_from_body(session_response) + + assert knxipframe2.to_knx() == raw diff --git a/test/knxip_tests/session_status_test.py b/test/knxip_tests/session_status_test.py new file mode 100644 index 000000000..61a2c9c3f --- /dev/null +++ b/test/knxip_tests/session_status_test.py @@ -0,0 +1,35 @@ +"""Unit test for KNX/IP SessionStatus objects.""" +from xknx import XKNX +from xknx.knxip import KNXIPFrame, SessionStatus +from xknx.knxip.knxip_enum import SecureSessionStatusCode + + +class TestKNXIPSessionStatus: + """Test class for KNX/IP SessionStatus objects.""" + + def test_session_status(self): + """Test parsing and streaming session status KNX/IP packet.""" + raw = bytes.fromhex( + "06 10 09 54 00 08" # KNXnet/IP header + "00" # status code 00h STATUS_AUTHENTICATION_SUCCESS + "00" # reserved + ) + xknx = XKNX() + knxipframe = KNXIPFrame(xknx) + knxipframe.from_knx(raw) + + assert isinstance(knxipframe.body, SessionStatus) + assert ( + knxipframe.body.status + == SecureSessionStatusCode.STATUS_AUTHENTICATION_SUCCESS + ) + + assert knxipframe.to_knx() == raw + + session_status = SessionStatus( + xknx, + status=SecureSessionStatusCode.STATUS_AUTHENTICATION_SUCCESS, + ) + knxipframe2 = KNXIPFrame.init_from_body(session_status) + + assert knxipframe2.to_knx() == raw diff --git a/test/knxip_tests/tunnelling_ack_test.py b/test/knxip_tests/tunnelling_ack_test.py index e279fba2a..332a16a96 100644 --- a/test/knxip_tests/tunnelling_ack_test.py +++ b/test/knxip_tests/tunnelling_ack_test.py @@ -6,7 +6,7 @@ from xknx.knxip import ErrorCode, KNXIPFrame, TunnellingAck -class TestKNXIPTunnelingAck: +class TestKNXIPTunnellingAck: """Test class for KNX/IP TunnellingAck objects.""" def test_connect_request(self): diff --git a/test/knxip_tests/tunnelling_request_test.py b/test/knxip_tests/tunnelling_request_test.py index 84727309b..7f94f5df4 100644 --- a/test/knxip_tests/tunnelling_request_test.py +++ b/test/knxip_tests/tunnelling_request_test.py @@ -9,7 +9,7 @@ from xknx.telegram.apci import GroupValueWrite -class TestKNXIPTunnelingRequest: +class TestKNXIPTunnellingRequest: """Test class for KNX/IP TunnellingRequest objects.""" def test_connect_request(self): diff --git a/xknx/knxip/__init__.py b/xknx/knxip/__init__.py index 5f3866d8d..f610a4f55 100644 --- a/xknx/knxip/__init__.py +++ b/xknx/knxip/__init__.py @@ -28,6 +28,11 @@ from .routing_indication import RoutingIndication from .search_request import SearchRequest from .search_response import SearchResponse +from .secure_wrapper import SecureWrapper +from .session_authenticate import SessionAuthenticate +from .session_request import SessionRequest +from .session_response import SessionResponse +from .session_status import SessionStatus from .tunnelling_ack import TunnellingAck from .tunnelling_request import TunnellingRequest @@ -62,6 +67,11 @@ "RoutingIndication", "SearchRequest", "SearchResponse", + "SecureWrapper", + "SessionAuthenticate", + "SessionRequest", + "SessionResponse", + "SessionStatus", "TunnellingAck", "TunnellingRequest", ] From 8b22c8d2dc67bf9f3aaea3e61b22fc71d4e7d5fc Mon Sep 17 00:00:00 2001 From: farmio Date: Fri, 25 Feb 2022 13:29:21 +0100 Subject: [PATCH 5/8] first working draft of initializing a secure tunnelling session --- xknx/io/request_response/__init__.py | 2 + xknx/io/request_response/authenticate.py | 44 +++ xknx/io/request_response/request_response.py | 36 +- xknx/io/request_response/session.py | 42 +++ xknx/io/secure_session.py | 327 +++++++++++++++++++ xknx/io/transport/__init__.py | 2 +- xknx/io/transport/tcp_transport.py | 54 ++- xknx/io/tunnel.py | 120 ++++++- 8 files changed, 591 insertions(+), 36 deletions(-) create mode 100644 xknx/io/request_response/authenticate.py create mode 100644 xknx/io/request_response/session.py create mode 100644 xknx/io/secure_session.py diff --git a/xknx/io/request_response/__init__.py b/xknx/io/request_response/__init__.py index 23127c85e..443ce2f08 100644 --- a/xknx/io/request_response/__init__.py +++ b/xknx/io/request_response/__init__.py @@ -3,8 +3,10 @@ of specific KNX/IP Packets used for tunnelling connections. """ # flake8: noqa +from .authenticate import Authenticate from .connect import Connect from .connectionstate import ConnectionState from .disconnect import Disconnect from .request_response import RequestResponse +from .session import Session from .tunnelling import Tunnelling diff --git a/xknx/io/request_response/authenticate.py b/xknx/io/request_response/authenticate.py new file mode 100644 index 000000000..9dac86f52 --- /dev/null +++ b/xknx/io/request_response/authenticate.py @@ -0,0 +1,44 @@ +"""Abstraction to send SessionAuthenticate and wait for SessionStatus.""" +from __future__ import annotations + +from typing import TYPE_CHECKING + +from xknx.knxip import KNXIPFrame, SessionAuthenticate, SessionStatus + +from .request_response import RequestResponse + +if TYPE_CHECKING: + from xknx.io.transport import KNXIPTransport + from xknx.xknx import XKNX + + +class Authenticate(RequestResponse): + """Class to send a SessionAuthenticate and wait for SessionStatus.""" + + def __init__( + self, + xknx: XKNX, + transport: KNXIPTransport, + user_id: int, + message_authentication_code: bytes, + ): + """Initialize Session class.""" + super().__init__(xknx, transport, SessionStatus, timeout_in_seconds=10) + self.user_id = user_id + self.message_authentication_code = message_authentication_code + self.response: SessionStatus | None = None + + def create_knxipframe(self) -> KNXIPFrame: + """Create KNX/IP Frame object to be sent to device.""" + return KNXIPFrame.init_from_body( + SessionAuthenticate( + self.xknx, + user_id=self.user_id, + message_authentication_code=self.message_authentication_code, + ) + ) + + def on_success_hook(self, knxipframe: KNXIPFrame) -> None: + """Set communication channel and identifier after having received a valid answer.""" + assert isinstance(knxipframe.body, SessionStatus) + self.response = knxipframe.body diff --git a/xknx/io/request_response/request_response.py b/xknx/io/request_response/request_response.py index 3c0476810..d16d05633 100644 --- a/xknx/io/request_response/request_response.py +++ b/xknx/io/request_response/request_response.py @@ -11,7 +11,7 @@ from xknx.exceptions import CommunicationError from xknx.io.transport import KNXIPTransport -from xknx.knxip import HPAI, ErrorCode, KNXIPBodyResponse, KNXIPFrame +from xknx.knxip import HPAI, ErrorCode, KNXIPBody, KNXIPBodyResponse, KNXIPFrame if TYPE_CHECKING: from xknx.xknx import XKNX @@ -26,13 +26,13 @@ def __init__( self, xknx: XKNX, transport: KNXIPTransport, - awaited_response_class: type[KNXIPBodyResponse], - timeout_in_seconds: float = 1.0, + awaited_response_class: type[KNXIPBody], + timeout_in_seconds: float = 5.0, ): """Initialize RequstResponse class.""" self.xknx = xknx self.transport = transport - self.awaited_response_class: type[KNXIPBodyResponse] = awaited_response_class + self.awaited_response_class: type[KNXIPBody] = awaited_response_class self.response_received_event = asyncio.Event() self.success = False self.timeout_in_seconds = timeout_in_seconds @@ -79,22 +79,20 @@ def response_rec_callback( if not isinstance(knxipframe.body, self.awaited_response_class): logger.warning("Could not understand knxipframe") return - self.response_status_code = knxipframe.body.status_code self.response_received_event.set() - if knxipframe.body.status_code == ErrorCode.E_NO_ERROR: - self.success = True - self.on_success_hook(knxipframe) - else: - self.on_error_hook(knxipframe) + + if isinstance(knxipframe.body, KNXIPBodyResponse): + self.response_status_code = knxipframe.body.status_code + if knxipframe.body.status_code != ErrorCode.E_NO_ERROR: + logger.debug( + "Error: KNX bus responded to request of type '%s' with error in '%s': %s", + self.__class__.__name__, + self.awaited_response_class.__name__, + knxipframe.body.status_code, + ) + return + self.success = True + self.on_success_hook(knxipframe) def on_success_hook(self, knxipframe: KNXIPFrame) -> None: """Do something after having received a valid answer. May be overwritten in derived class.""" - - def on_error_hook(self, knxipframe: KNXIPFrame) -> None: - """Do something after having received error within given time. May be overwritten in derived class.""" - logger.debug( - "Error: KNX bus responded to request of type '%s' with error in '%s': %s", - self.__class__.__name__, - self.awaited_response_class.__name__, - knxipframe.body.status_code, # type: ignore - ) diff --git a/xknx/io/request_response/session.py b/xknx/io/request_response/session.py new file mode 100644 index 000000000..f8e8f4a44 --- /dev/null +++ b/xknx/io/request_response/session.py @@ -0,0 +1,42 @@ +"""Abstraction to send SessionRequest and wait for SessionResponse.""" +from __future__ import annotations + +from typing import TYPE_CHECKING + +from xknx.knxip import KNXIPFrame, SessionRequest, SessionResponse + +from .request_response import RequestResponse + +if TYPE_CHECKING: + from xknx.io.transport import KNXIPTransport + from xknx.xknx import XKNX + + +class Session(RequestResponse): + """Class to send a SessionRequest and wait for SessionResponse.""" + + def __init__( + self, xknx: XKNX, transport: KNXIPTransport, ecdh_client_public_key: bytes + ): + """Initialize Session class.""" + # TODO: increase timeout to timeoutAuthentication: 10sec ? + super().__init__(xknx, transport, SessionResponse) + self.ecdh_client_public_key = ecdh_client_public_key + # TODO: make RequestResponse generic for response class + # maybe replace self.success with self.response None check + # remove on_success_hook in favour of using knxipframe.body directly + self.response: SessionResponse | None = None + + def create_knxipframe(self) -> KNXIPFrame: + """Create KNX/IP Frame object to be sent to device.""" + return KNXIPFrame.init_from_body( + SessionRequest( + self.xknx, + ecdh_client_public_key=self.ecdh_client_public_key, + ) + ) + + def on_success_hook(self, knxipframe: KNXIPFrame) -> None: + """Set communication channel and identifier after having received a valid answer.""" + assert isinstance(knxipframe.body, SessionResponse) + self.response = knxipframe.body diff --git a/xknx/io/secure_session.py b/xknx/io/secure_session.py new file mode 100644 index 000000000..d06d9b294 --- /dev/null +++ b/xknx/io/secure_session.py @@ -0,0 +1,327 @@ +"""SecureSession is an abstraction for handling a KNXnet/IP Secure session.""" +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, cast + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric.x25519 import ( + X25519PrivateKey, + X25519PublicKey, +) +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +from xknx.exceptions import CommunicationError +from xknx.knxip import KNXIPFrame, SecureWrapper, SessionResponse + +if TYPE_CHECKING: + from xknx.xknx import XKNX + +logger = logging.getLogger("xknx.log") + + +COUNTER_0_HANDSHAKE = ( # used in SessionResponse and SessionAuthenticate + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\x00" +) + + +def bytes_xor(a: bytes, b: bytes) -> bytes: # pylint: disable=invalid-name + """XOR two bytes values.""" + return (int.from_bytes(a, "big") ^ int.from_bytes(b, "big")).to_bytes(len(a), "big") + + +def byte_pad(data: bytes, block_size: int) -> bytes: + """Padd data with 0x00 until its length is a multiple of block_size.""" + padding = bytes(block_size - (len(data) % block_size)) + return data + padding + + +def sha256_hash(data: bytes) -> bytes: + """Calculate SHA256 hash of data.""" + digest = hashes.Hash(hashes.SHA256()) + digest.update(data) + return digest.finalize() + + +def calculate_message_authentication_code_cbc( + key: bytes, + additional_data: bytes, + payload: bytes = bytes(), + block_0: bytes = bytes(16), +) -> bytes: + """Calculate the message authentication code (MAC) for a message with AES-CBC.""" + blocks = ( + block_0 + len(additional_data).to_bytes(2, "big") + additional_data + payload + ) + y_cipher = Cipher(algorithms.AES(key), modes.CBC(bytes(16))) + y_encryptor = y_cipher.encryptor() # type: ignore[no-untyped-call] + y_blocks = ( + y_encryptor.update(byte_pad(blocks, block_size=16)) + y_encryptor.finalize() + ) + # only calculate, no ctr encryption + return cast(bytes, y_blocks[-16:]) + + +def encrypt_data_ctr( + key: bytes, + payload: bytes, + counter_0: bytes = COUNTER_0_HANDSHAKE, +) -> tuple[bytes, bytes]: + """ + Encrypt data with AES-CTR. + + Payload is expected `MAC CBC + Plain KNX/IP frame` or `MAC CBC` only. + MAC shall be encrypted with coutner 0, KNXnet/IP frame with incremented counters. + Returns a tuple of encrypted data (if there is any) and encrypted MAC. + """ + s_cipher = Cipher(algorithms.AES(key), modes.CTR(counter_0)) + s_encryptor = s_cipher.encryptor() # type: ignore[no-untyped-call] + mac = s_encryptor.update(payload[:16]) + encrypted_data = s_encryptor.update(payload[16:]) + s_encryptor.finalize() + return (encrypted_data, mac) + + +def decrypt_ctr( + session_key: bytes, + payload: bytes, + counter_0: bytes = COUNTER_0_HANDSHAKE, +) -> tuple[bytes, bytes]: + """ + Decrypt data from SecureWrapper. + + MAC is expected to be the last 16 octets of the payload. This will be sliced and + decoded first with counter 0. + Returns a tuple of (KNX/IP frame bytes, MAC TR for verification). + """ + cipher = Cipher(algorithms.AES(session_key), modes.CTR(counter_0)) + decryptor = cipher.decryptor() # type: ignore[no-untyped-call] + mac_tr = decryptor.update(payload[-16:]) # MAC is encrypted with counter 0 + decrypted_data = decryptor.update(payload[:-16]) + decryptor.finalize() + + return (decrypted_data, mac_tr) + + +def derive_device_authentication_password(device_authentication_password: str) -> bytes: + """Derive device authentication password.""" + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=16, + salt=b"device-authentication-code.1.secure.ip.knx.org", + iterations=65536, + ) + return kdf.derive(device_authentication_password.encode("latin-1")) + + +def derive_user_password(password_string: str) -> bytes: + """Derive user password.""" + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=16, + salt=b"user-password.1.secure.ip.knx.org", + iterations=65536, + ) + return kdf.derive(password_string.encode("latin-1")) + + +class SecureSession: + """Class for handling a KNXnet/IP Secure session.""" + + def __init__( + self, + xknx: XKNX, + device_authentication_password: str, + user_id: int, + user_password: str, + ) -> None: + """Initialize SecureSession class.""" + self.xknx = xknx + self._device_authentication_code = derive_device_authentication_password( + device_authentication_password + ) + self.user_id = user_id + self._user_password = derive_user_password(user_password) + + self._private_key: X25519PrivateKey + self.public_key: bytes + self._peer_public_key: X25519PublicKey + self.session_id: int + self._session_key: bytes + + self.message_tag = bytes.fromhex("00 00") # use 0x00 0x00 for tunneling + self.serial_number = bytes.fromhex("00 fa 12 34 56 78") # TODO configurable? + self._sequence_number = 0 + self.initialized = False + + def increment_sequence_number(self) -> bytes: + """Increment sequence number. Return byte representation of current sequence number.""" + next_sn = self._sequence_number.to_bytes(6, "big") + self._sequence_number += 1 + return next_sn + + def initialize(self) -> bytes: + """Initialize secure session.""" + self._private_key = X25519PrivateKey.generate() + self.public_key = self._private_key.public_key().public_bytes( + serialization.Encoding.Raw, serialization.PublicFormat.Raw + ) + return self.public_key + + def handshake(self, session_response: SessionResponse) -> bytes: + """ + Handshake with device. + + Returns a SessionAuthenticate KNX/IP body. + """ + self._peer_public_key = X25519PublicKey.from_public_bytes( + session_response.ecdh_server_public_key + ) + self.session_id = session_response.secure_session_id + # verify SessionResponse MAC + # TODO: get header data from actual KNX/IP frame + response_header_data = bytes.fromhex("06 10 09 52 00 38") + pub_keys_xor = bytes_xor( + self.public_key, + session_response.ecdh_server_public_key, + ) + response_mac_cbc = calculate_message_authentication_code_cbc( + self._device_authentication_code, + additional_data=response_header_data + + self.session_id.to_bytes(2, "big") + + pub_keys_xor, # knx_ip_header + secure_session_id + bytes_xor(client_pub_key, server_pub_key) + ) + _, mac_tr = decrypt_ctr( + self._device_authentication_code, + payload=session_response.message_authentication_code, + ) + if mac_tr != response_mac_cbc: + raise CommunicationError("SessionResponse MAC verification failed.") + # calculate session key + ecdh_shared_secret = self._private_key.exchange(self._peer_public_key) + self._session_key = sha256_hash(ecdh_shared_secret)[:16] + self.initialized = True + # generate SessionAuthenticate MAC + authenticate_header_data = bytes.fromhex("06 10 09 53 00 18") + authenticate_mac_cbc = calculate_message_authentication_code_cbc( + key=self._user_password, + additional_data=authenticate_header_data + + bytes(1) # reserved + + self.user_id.to_bytes(1, "big") + + pub_keys_xor, + block_0=bytes(16), + ) + _, authenticate_mac = encrypt_data_ctr( + key=self._user_password, + payload=authenticate_mac_cbc, + ) + return authenticate_mac + + # def _block_0_secure_wrapper( + # self, sequence_number: bytes, payload_length: int + # ) -> bytes: + # """Return block 0 for SecureWrapper.""" + # return ( + # sequence_number + # + self.serial_number + # + self.message_tag + # + payload_length.to_bytes(2, "big") + # ) + + # def _counter_0_secure_wrapper(self, sequence_number: bytes) -> bytes: + # """Return counter 0 for SecureWrapper.""" + # # octet 14 (0xFF) is constant + # # last octet is the coutner to increment by 1 each step + # return ( + # sequence_number + # + self.serial_number + # + self.message_tag + # + bytes.fromhex("ff 00") + # ) + + def encrypt_frame(self, plain_frame: KNXIPFrame) -> KNXIPFrame: + """Wrap KNX/IP frame in SecureWrapper.""" + sequence_number = self.increment_sequence_number() + plain_payload = plain_frame.to_knx() # P + payload_length = len(plain_payload) # Q + # 6 KNXnet/IP header, 2 session_id, 6 sequence_number, 6 serial_number, 2 message_tag, 16 MAC = 38 + total_length = 38 + payload_length + # TODO: get header data and total_length from SecureWrapper class + wrapper_header = bytes.fromhex("06 10 09 50") + total_length.to_bytes(2, "big") + + mac_cbc = calculate_message_authentication_code_cbc( + key=self._session_key, + additional_data=wrapper_header + self.session_id.to_bytes(2, "big"), + payload=plain_payload, + block_0=( + sequence_number + + self.serial_number + + self.message_tag + + payload_length.to_bytes(2, "big") + ), + ) + encrypted_data, mac = encrypt_data_ctr( + key=self._session_key, + payload=mac_cbc + plain_payload, + counter_0=( + sequence_number + + self.serial_number + + self.message_tag + + bytes.fromhex("ff 00") + ), + ) + return KNXIPFrame.init_from_body( + SecureWrapper( + self.xknx, + secure_session_id=self.session_id, + sequence_information=int.from_bytes( + sequence_number, "big" + ), # TODO: remove encoding, decoding, encoding + serial_number=int.from_bytes(self.serial_number, "big"), + message_tag=int.from_bytes(self.message_tag, "big"), + encrypted_data=encrypted_data, + message_authentication_code=mac, + ) + ) + + def decrypt_frame(self, encrypted_frame: KNXIPFrame) -> KNXIPFrame: + """Unwrap and verify KNX/IP frame from SecureWrapper.""" + # TODO: get raw data from KNXIPFrame class directly instead of recalculating it with to_knx() + # TODO: refactor so assert isn't needed (maybe subclass SecureWrapper from KNXIPFrame instead of being an attribute) + assert isinstance(encrypted_frame.body, SecureWrapper) + assert encrypted_frame.body.secure_session_id == self.session_id + session_id_bytes = encrypted_frame.body.secure_session_id.to_bytes(2, "big") + wrapper_header = encrypted_frame.header.to_knx() + sequence_number_bytes = encrypted_frame.body.sequence_information.to_bytes( + 6, "big" + ) # TODO: remove encoding, decoding, encoding + serial_number_bytes = encrypted_frame.body.serial_number.to_bytes(6, "big") + message_tag_bytes = encrypted_frame.body.message_tag.to_bytes(2, "big") + + dec_frame, mac_tr = decrypt_ctr( + self._session_key, + payload=encrypted_frame.body.encrypted_data + + encrypted_frame.body.message_authentication_code, + counter_0=( + sequence_number_bytes + + serial_number_bytes + + message_tag_bytes + + bytes.fromhex("ff 00") + ), + ) + mac_cbc = calculate_message_authentication_code_cbc( + key=self._session_key, + additional_data=wrapper_header + session_id_bytes, + payload=dec_frame, + block_0=( + sequence_number_bytes + + serial_number_bytes + + message_tag_bytes + + len(dec_frame).to_bytes(2, "big") + ), + ) + assert mac_cbc == mac_tr + + knxipframe = KNXIPFrame(self.xknx) + knxipframe.from_knx(dec_frame) + # TODO: handle KNX/IP frame parsing errors or just put raw back into transport ? + return knxipframe diff --git a/xknx/io/transport/__init__.py b/xknx/io/transport/__init__.py index a7227e37f..8f4636a40 100644 --- a/xknx/io/transport/__init__.py +++ b/xknx/io/transport/__init__.py @@ -3,5 +3,5 @@ """ # flake8: noqa from .ip_transport import KNXIPTransport -from .tcp_transport import TCPTransport +from .tcp_transport import SecureTCPTransport, TCPTransport from .udp_transport import UDPTransport diff --git a/xknx/io/transport/tcp_transport.py b/xknx/io/transport/tcp_transport.py index a9deb70d3..bfaddf70f 100644 --- a/xknx/io/transport/tcp_transport.py +++ b/xknx/io/transport/tcp_transport.py @@ -11,7 +11,8 @@ from typing import TYPE_CHECKING, Callable, cast from xknx.exceptions import CommunicationError, CouldNotParseKNXIP, IncompleteKNXIPFrame -from xknx.knxip import HPAI, HostProtocol, KNXIPFrame +from xknx.io.secure_session import SecureSession +from xknx.knxip import HPAI, HostProtocol, KNXIPFrame, SecureWrapper from .ip_transport import KNXIPTransport @@ -58,7 +59,7 @@ def __init__( xknx: XKNX, remote_addr: tuple[str, int], ): - """Initialize UDPTransport class.""" + """Initialize TCPTransport class.""" self.xknx = xknx self.remote_addr = remote_addr self.remote_hpai = HPAI(*remote_addr, protocol=HostProtocol.IPV4_TCP) @@ -132,3 +133,52 @@ def send(self, knxipframe: KNXIPFrame, addr: tuple[str, int] | None = None) -> N raise CommunicationError("Transport not connected") self.transport.write(knxipframe.to_knx()) + + +class SecureTCPTransport(TCPTransport): + """Class for handling (sending and receiving) secure TCP packets.""" + + def __init__( + self, + xknx: XKNX, + remote_addr: tuple[str, int], + secure_session: SecureSession, + ): + """Initialize SecureTCPTransport class.""" + self.secure_session = secure_session + super().__init__(xknx, remote_addr) + + def send(self, knxipframe: KNXIPFrame, addr: tuple[str, int] | None = None) -> None: + """Send KNXIPFrame to socket. `addr` is ignored on TCP.""" + knx_logger.debug( + "Sending to %s at %s:\n%s", + self.remote_hpai, + time.time(), + knxipframe, + ) + if self.secure_session.initialized: + knxipframe = self.secure_session.encrypt_frame(plain_frame=knxipframe) + super().send(knxipframe, addr) + + def handle_knxipframe(self, knxipframe: KNXIPFrame, source: HPAI) -> None: + """Handle secure KNXIPFrame. Callback for having received data over TCP.""" + if isinstance(knxipframe.body, SecureWrapper): + if not self.secure_session.initialized: + raise CouldNotParseKNXIP( + "Received SecureWrapper with Secure session not initialized" + ) + try: + knxipframe = self.secure_session.decrypt_frame(knxipframe) + except CouldNotParseKNXIP as couldnotparseknxip: + # TODO: log raw data of unsupported frame + knx_logger.debug( + "Unsupported encrypted KNXIPFrame: %s", + couldnotparseknxip.description, + ) + return + knx_logger.debug( + "Decrypted frame at %s:\n%s", + time.time(), + knxipframe, + ) + super().handle_knxipframe(knxipframe, source) diff --git a/xknx/io/tunnel.py b/xknx/io/tunnel.py index 9a29be6f0..1124520da 100644 --- a/xknx/io/tunnel.py +++ b/xknx/io/tunnel.py @@ -24,14 +24,23 @@ TunnellingAck, TunnellingRequest, ) +from xknx.knxip.knxip_enum import SecureSessionStatusCode from xknx.telegram import IndividualAddress, Telegram, TelegramDirection from .const import HEARTBEAT_RATE from .gateway_scanner import GatewayDescriptor from .interface import Interface -from .request_response import Connect, ConnectionState, Disconnect, Tunnelling +from .request_response import ( + Authenticate, + Connect, + ConnectionState, + Disconnect, + Session, + Tunnelling, +) +from .secure_session import SecureSession from .self_description import DescriptionQuery -from .transport import KNXIPTransport, TCPTransport, UDPTransport +from .transport import KNXIPTransport, SecureTCPTransport, TCPTransport, UDPTransport if TYPE_CHECKING: from xknx.xknx import XKNX @@ -85,9 +94,9 @@ def _init_transport(self) -> None: # set up self.transport @abstractmethod - def _get_hpai(self) -> HPAI: - """Return local HPAI for this tunnel.""" - # local HPAI used for control and data endpoint + async def setup_tunnel(self) -> None: + """Set up tunnel before sending a ConnectionRequest.""" + # eg. set local HPAI used for control and data endpoint #################### # @@ -102,7 +111,7 @@ async def connect(self) -> bool: ) try: await self.transport.connect() - self.local_hpai = self._get_hpai() + await self.setup_tunnel() await self._connect_request() except (OSError, CommunicationError) as ex: logger.debug( @@ -423,6 +432,8 @@ def __init__( """Initialize Tunnel class.""" self.gateway_ip = gateway_ip self.gateway_port = gateway_port + # TCP always uses 0.0.0.0:0 + self.local_hpai = HPAI(protocol=HostProtocol.IPV4_TCP) super().__init__( xknx=xknx, @@ -438,10 +449,8 @@ def _init_transport(self) -> None: (self.gateway_ip, self.gateway_port), ) - def _get_hpai(self) -> HPAI: - """Return local HPAI for this tunnel.""" - # TCP always uses 0.0.0.0:0 - return HPAI(protocol=HostProtocol.IPV4_TCP) + async def setup_tunnel(self) -> None: + """Set up tunnel before sending a ConnectionRequest.""" async def _tunnelling_request(self, telegram: Telegram) -> bool: """Send Telegram to tunnelling device.""" @@ -512,12 +521,13 @@ def _init_transport(self) -> None: multicast=False, ) - def _get_hpai(self) -> HPAI: - """Return local HPAI for this tunnel.""" + async def setup_tunnel(self) -> None: + """Set up tunnel before sending a ConnectionRequest.""" if self.route_back: - return HPAI() + self.local_hpai = HPAI() + return (local_addr, local_port) = self.transport.getsockname() - return HPAI(ip_addr=local_addr, port=local_port) + self.local_hpai = HPAI(ip_addr=local_addr, port=local_port) # OUTGOING REQUESTS @@ -568,3 +578,85 @@ def _send_tunnelling_ack( self.transport.send( KNXIPFrame.init_from_body(ack), addr=self._data_endpoint_addr ) + + +class SecureTunnel(TCPTunnel): + """Class for handling KNX/IP secure TCP tunnels.""" + + transport: SecureTCPTransport + + def __init__( + self, + xknx: XKNX, + gateway_ip: str, + gateway_port: int, + telegram_received_callback: TelegramCallbackType | None = None, + auto_reconnect: bool = True, + auto_reconnect_wait: int = 3, + device_authentication_password: str = "hello_device", + user_id: int = 2, + user_password: str = "hello_user_2", + ): + """Initialize SecureTunnel class.""" + # TODO: store derived passwords here and pass them to SecureSession init + # in setup_tunnel instead of using .initialize() + # TODO: store passowrds in connection_config and use them from there - maybe read .knxkeys file + self.secure_session = SecureSession( + xknx, device_authentication_password, user_id, user_password + ) + super().__init__( + xknx=xknx, + gateway_ip=gateway_ip, + gateway_port=gateway_port, + telegram_received_callback=telegram_received_callback, + auto_reconnect=auto_reconnect, + auto_reconnect_wait=auto_reconnect_wait, + ) + + def _init_transport(self) -> None: + """Initialize transport transport.""" + self.transport = SecureTCPTransport( + self.xknx, + (self.gateway_ip, self.gateway_port), + secure_session=self.secure_session, + ) + + async def setup_tunnel(self) -> None: + """Set up tunnel before sending a ConnectionRequest.""" + # setup secure session + public_key = self.secure_session.initialize() + request_session = Session( + self.xknx, + transport=self.transport, + ecdh_client_public_key=public_key, + ) + await request_session.start() + if request_session.response is None: + raise CommunicationError( + "Secure session could not be established. No response received." + ) + authenticate_mac = self.secure_session.handshake(request_session.response) + + # TODO: authentication and everything else after now + # shall be wrapped in SecureWrapper + # TODO: on connection loss restore to not decrypt frames + # and renew Session (private key) + + request_authentication = Authenticate( + self.xknx, + transport=self.transport, + user_id=self.secure_session.user_id, + message_authentication_code=authenticate_mac, + ) + await request_authentication.start() + if request_authentication.response is None: + raise CommunicationError( + "Secure session could not be established. No response received." + ) + if ( # TODO: look for status in request/response and use `success` instead of response ? + request_authentication.response.status + != SecureSessionStatusCode.STATUS_AUTHENTICATION_SUCCESS + ): + raise CommunicationError( + f"Secure session authentication failed: {request_authentication.response.status}" + ) From e366616ce371ba24d3d660bda83b205ce12c5808 Mon Sep 17 00:00:00 2001 From: farmio Date: Fri, 25 Feb 2022 22:22:35 +0100 Subject: [PATCH 6/8] fix stupid super-order-bug --- xknx/io/transport/tcp_transport.py | 13 ++----------- xknx/io/tunnel.py | 6 ++---- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/xknx/io/transport/tcp_transport.py b/xknx/io/transport/tcp_transport.py index bfaddf70f..b9d3ff1be 100644 --- a/xknx/io/transport/tcp_transport.py +++ b/xknx/io/transport/tcp_transport.py @@ -150,13 +150,8 @@ def __init__( def send(self, knxipframe: KNXIPFrame, addr: tuple[str, int] | None = None) -> None: """Send KNXIPFrame to socket. `addr` is ignored on TCP.""" - knx_logger.debug( - "Sending to %s at %s:\n%s", - self.remote_hpai, - time.time(), - knxipframe, - ) if self.secure_session.initialized: + knx_logger.debug("Encrypting frame: %s", knxipframe) knxipframe = self.secure_session.encrypt_frame(plain_frame=knxipframe) super().send(knxipframe, addr) @@ -176,9 +171,5 @@ def handle_knxipframe(self, knxipframe: KNXIPFrame, source: HPAI) -> None: couldnotparseknxip.description, ) return - knx_logger.debug( - "Decrypted frame at %s:\n%s", - time.time(), - knxipframe, - ) + knx_logger.debug("Decrypted frame: %s", knxipframe) super().handle_knxipframe(knxipframe, source) diff --git a/xknx/io/tunnel.py b/xknx/io/tunnel.py index 1124520da..a426da5b9 100644 --- a/xknx/io/tunnel.py +++ b/xknx/io/tunnel.py @@ -432,15 +432,14 @@ def __init__( """Initialize Tunnel class.""" self.gateway_ip = gateway_ip self.gateway_port = gateway_port - # TCP always uses 0.0.0.0:0 - self.local_hpai = HPAI(protocol=HostProtocol.IPV4_TCP) - super().__init__( xknx=xknx, telegram_received_callback=telegram_received_callback, auto_reconnect=auto_reconnect, auto_reconnect_wait=auto_reconnect_wait, ) + # TCP always uses 0.0.0.0:0 + self.local_hpai = HPAI(protocol=HostProtocol.IPV4_TCP) def _init_transport(self) -> None: """Initialize transport transport.""" @@ -504,7 +503,6 @@ def __init__( self.local_ip = local_ip self.local_port = local_port self.route_back = route_back - super().__init__( xknx=xknx, telegram_received_callback=telegram_received_callback, From ce1cda0cf6caa30f7bf8d33004e8b37f5da3f7a1 Mon Sep 17 00:00:00 2001 From: farmio Date: Fri, 25 Feb 2022 22:28:17 +0100 Subject: [PATCH 7/8] reset timeout to normal --- xknx/io/request_response/request_response.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xknx/io/request_response/request_response.py b/xknx/io/request_response/request_response.py index d16d05633..6ac72fd5d 100644 --- a/xknx/io/request_response/request_response.py +++ b/xknx/io/request_response/request_response.py @@ -27,7 +27,7 @@ def __init__( xknx: XKNX, transport: KNXIPTransport, awaited_response_class: type[KNXIPBody], - timeout_in_seconds: float = 5.0, + timeout_in_seconds: float = 1.0, ): """Initialize RequstResponse class.""" self.xknx = xknx From 0d05e9cd499e7cf65fba8ee56f78509b9d6add44 Mon Sep 17 00:00:00 2001 From: farmio Date: Sat, 26 Feb 2022 07:46:33 +0100 Subject: [PATCH 8/8] readability of encrypt/decrypt function parameters --- xknx/io/secure_session.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/xknx/io/secure_session.py b/xknx/io/secure_session.py index d06d9b294..896031525 100644 --- a/xknx/io/secure_session.py +++ b/xknx/io/secure_session.py @@ -47,7 +47,7 @@ def sha256_hash(data: bytes) -> bytes: def calculate_message_authentication_code_cbc( key: bytes, additional_data: bytes, - payload: bytes = bytes(), + payload: bytes = b"", block_0: bytes = bytes(16), ) -> bytes: """Calculate the message authentication code (MAC) for a message with AES-CBC.""" @@ -65,39 +65,40 @@ def calculate_message_authentication_code_cbc( def encrypt_data_ctr( key: bytes, - payload: bytes, + mac_cbc: bytes, + payload: bytes = b"", counter_0: bytes = COUNTER_0_HANDSHAKE, ) -> tuple[bytes, bytes]: """ Encrypt data with AES-CTR. - Payload is expected `MAC CBC + Plain KNX/IP frame` or `MAC CBC` only. + Payload is expected a full Plain KNX/IP frame with header. MAC shall be encrypted with coutner 0, KNXnet/IP frame with incremented counters. Returns a tuple of encrypted data (if there is any) and encrypted MAC. """ s_cipher = Cipher(algorithms.AES(key), modes.CTR(counter_0)) s_encryptor = s_cipher.encryptor() # type: ignore[no-untyped-call] - mac = s_encryptor.update(payload[:16]) - encrypted_data = s_encryptor.update(payload[16:]) + s_encryptor.finalize() + mac = s_encryptor.update(mac_cbc) + encrypted_data = s_encryptor.update(payload) + s_encryptor.finalize() return (encrypted_data, mac) def decrypt_ctr( session_key: bytes, - payload: bytes, + mac: bytes, + payload: bytes = b"", counter_0: bytes = COUNTER_0_HANDSHAKE, ) -> tuple[bytes, bytes]: """ Decrypt data from SecureWrapper. - MAC is expected to be the last 16 octets of the payload. This will be sliced and - decoded first with counter 0. + MAC will be decoded first with counter 0. Returns a tuple of (KNX/IP frame bytes, MAC TR for verification). """ cipher = Cipher(algorithms.AES(session_key), modes.CTR(counter_0)) decryptor = cipher.decryptor() # type: ignore[no-untyped-call] - mac_tr = decryptor.update(payload[-16:]) # MAC is encrypted with counter 0 - decrypted_data = decryptor.update(payload[:-16]) + decryptor.finalize() + mac_tr = decryptor.update(mac) # MAC is encrypted with counter 0 + decrypted_data = decryptor.update(payload) + decryptor.finalize() return (decrypted_data, mac_tr) @@ -192,7 +193,7 @@ def handshake(self, session_response: SessionResponse) -> bytes: ) _, mac_tr = decrypt_ctr( self._device_authentication_code, - payload=session_response.message_authentication_code, + mac=session_response.message_authentication_code, ) if mac_tr != response_mac_cbc: raise CommunicationError("SessionResponse MAC verification failed.") @@ -212,7 +213,7 @@ def handshake(self, session_response: SessionResponse) -> bytes: ) _, authenticate_mac = encrypt_data_ctr( key=self._user_password, - payload=authenticate_mac_cbc, + mac_cbc=authenticate_mac_cbc, ) return authenticate_mac @@ -261,7 +262,8 @@ def encrypt_frame(self, plain_frame: KNXIPFrame) -> KNXIPFrame: ) encrypted_data, mac = encrypt_data_ctr( key=self._session_key, - payload=mac_cbc + plain_payload, + mac_cbc=mac_cbc, + payload=plain_payload, counter_0=( sequence_number + self.serial_number @@ -299,8 +301,8 @@ def decrypt_frame(self, encrypted_frame: KNXIPFrame) -> KNXIPFrame: dec_frame, mac_tr = decrypt_ctr( self._session_key, - payload=encrypted_frame.body.encrypted_data - + encrypted_frame.body.message_authentication_code, + mac=encrypted_frame.body.message_authentication_code, + payload=encrypted_frame.body.encrypted_data, counter_0=( sequence_number_bytes + serial_number_bytes