From 19eeafef1768bfc7624bc2b69c6815d1fb49ad65 Mon Sep 17 00:00:00 2001 From: Nils Weiss Date: Mon, 23 Sep 2024 15:50:10 +0200 Subject: [PATCH] Cleanup DoIP sockets (#4533) * Cleanup DoIP sockets * change get_addr_info --- scapy/contrib/automotive/doip.py | 101 +++---------------------------- test/contrib/automotive/doip.uts | 8 +-- 2 files changed, 14 insertions(+), 95 deletions(-) diff --git a/scapy/contrib/automotive/doip.py b/scapy/contrib/automotive/doip.py index fe514b9811b..b9d279bcf6b 100644 --- a/scapy/contrib/automotive/doip.py +++ b/scapy/contrib/automotive/doip.py @@ -39,7 +39,7 @@ ) from scapy.layers.inet import TCP, UDP from scapy.packet import Packet, bind_layers, bind_bottom_up -from scapy.supersocket import StreamSocket, SSLStreamSocket +from scapy.supersocket import SSLStreamSocket # ISO 13400-2 sect 9.2 @@ -361,21 +361,23 @@ def __init__(self, self.force_tls = force_tls self.context = context try: - self._init_socket(socket.AF_INET) + self._init_socket() except Exception: self.close() raise - def _init_socket(self, sock_family=socket.AF_INET): - # type: (int) -> None + def _init_socket(self): + # type: () -> None connected = False + addrinfo = socket.getaddrinfo(self.ip, self.port, proto=socket.IPPROTO_TCP) + sock_family = addrinfo[0][0] + s = socket.socket(sock_family, socket.SOCK_STREAM) s.settimeout(5) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if not self.force_tls: - addrinfo = socket.getaddrinfo(self.ip, self.port, proto=socket.IPPROTO_TCP) s.connect(addrinfo[0][-1]) connected = True DoIPSSLStreamSocket.__init__(self, s) @@ -450,66 +452,7 @@ def _activate_routing(self): # type: (...) -> int return -1 -class DoIPSocket6(DoIPSocket): - """Socket for DoIP communication. This sockets automatically - sends a routing activation request as soon as a TCP or TLS connection is - established. - - :param ip: IPv6 address of destination - :param port: destination port, usually 13400 - :param tls_port: destination port for TLS connection, usually 3496 - :param activate_routing: If true, routing activation request is - automatically sent - :param source_address: DoIP source address - :param target_address: DoIP target address, this is automatically - determined if routing activation request is sent - :param activation_type: This allows to set a different activation type for - the routing activation request - :param reserved_oem: Optional parameter to set value for reserved_oem field - of routing activation request - :param force_tls: Skip establishing of a TCP connection and directly try to - connect via SSL/TLS - :param context: Optional ssl.SSLContext object for initialization of ssl socket - connections. - - Example: - >>> socket = DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9") - >>> socket_link_local = DoIPSocket6("fe80::30e8:80ff:fe07:6d43%eth1") - >>> pkt = DoIP(payload_type=0x8001, source_address=0xe80, target_address=0x1000) / UDS() / UDS_RDBI(identifiers=[0x1000]) - >>> resp = socket.sr1(pkt, timeout=1) - """ # noqa: E501 - - def __init__(self, - ip='::1', # type: str - port=13400, # type: int - tls_port=3496, # type: int - activate_routing=True, # type: bool - source_address=0xe80, # type: int - target_address=0, # type: int - activation_type=0, # type: int - reserved_oem=b"", # type: bytes - force_tls=False, # type: bool - context=None # type: Optional[ssl.SSLContext] - ): # type: (...) -> None - self.ip = ip - self.port = port - self.tls_port = tls_port - self.activate_routing = activate_routing - self.source_address = source_address - self.target_address = target_address - self.activation_type = activation_type - self.reserved_oem = reserved_oem - self.buffer = b"" - self.force_tls = force_tls - self.context = context - try: - self._init_socket(socket.AF_INET6) - except Exception: - self.close() - raise - - -class _UDS_DoIPSocketBase(StreamSocket): +class UDS_DoIPSocket(DoIPSocket): """ Application-Layer socket for DoIP endpoints. This socket takes care about the encapsulation of UDS packets into DoIP packets. @@ -524,8 +467,8 @@ def send(self, x): # type: (Union[Packet, bytes]) -> int if isinstance(x, UDS): pkt = DoIP(payload_type=0x8001, - source_address=self.source_address, # type: ignore - target_address=self.target_address # type: ignore + source_address=self.source_address, + target_address=self.target_address ) / x else: pkt = x @@ -545,28 +488,4 @@ def recv(self, x=MTU, **kwargs): else: return pkt - -class UDS_DoIPSocket(_UDS_DoIPSocketBase, DoIPSocket): - """ - Application-Layer socket for DoIP endpoints. This socket takes care about - the encapsulation of UDS packets into DoIP packets. - - Example: - >>> socket = UDS_DoIPSocket("169.254.117.238") - >>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000]) - >>> resp = socket.sr1(pkt, timeout=1) - """ - pass - - -class UDS_DoIPSocket6(_UDS_DoIPSocketBase, DoIPSocket6): - """ - Application-Layer socket for DoIP endpoints. This socket takes care about - the encapsulation of UDS packets into DoIP packets. - - Example: - >>> socket = UDS_DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9") - >>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000]) - >>> resp = socket.sr1(pkt, timeout=1) - """ pass diff --git a/test/contrib/automotive/doip.uts b/test/contrib/automotive/doip.uts index b246b542191..7f4467c6d76 100644 --- a/test/contrib/automotive/doip.uts +++ b/test/contrib/automotive/doip.uts @@ -528,7 +528,7 @@ def server(): server_thread = threading.Thread(target=server) server_thread.start() server_up.wait(timeout=1) -sock = DoIPSocket6(activate_routing=False) +sock = DoIPSocket(ip="::1", activate_routing=False) pkts = sock.sniff(timeout=1, count=2) server_thread.join(timeout=1) @@ -668,7 +668,7 @@ server_up.wait(timeout=1) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE -sock = DoIPSocket6(activate_routing=False, force_tls=True, context=context) +sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) pkts = sock.sniff(timeout=1, count=2) server_thread.join(timeout=1) @@ -705,7 +705,7 @@ server_up.wait(timeout=1) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE -sock = UDS_DoIPSocket6(activate_routing=False, force_tls=True, context=context) +sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) pkts = sock.sniff(timeout=1, count=2) server_thread.join(timeout=1) @@ -765,7 +765,7 @@ context.check_hostname = False context.verify_mode = ssl.CERT_NONE -sock = UDS_DoIPSocket6(ip="::1", context=context) +sock = UDS_DoIPSocket(ip="::1", context=context) pkts = sock.sniff(timeout=1, count=2) server_tcp_thread.join(timeout=1)