Skip to content

Commit

Permalink
Cleanup DoIP sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
polybassa committed Sep 17, 2024
1 parent 30b0398 commit 91b8a8d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 92 deletions.
99 changes: 11 additions & 88 deletions scapy/contrib/automotive/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
)
from scapy.layers.inet import TCP, UDP
from scapy.packet import Packet, bind_layers, bind_bottom_up
from scapy.supersocket import StreamSocket, SSLStreamSocket
from scapy.supersocket import SSLStreamSocket


# ISO 13400-2 sect 9.2
Expand Down Expand Up @@ -350,6 +350,12 @@ def __init__(self,
force_tls=False, # type: bool
context=None # type: Optional[ssl.SSLContext]
): # type: (...) -> None

if ':' in ip:
self.ip_family = socket.AF_INET6
else:
self.ip_family = socket.AF_INET

self.ip = ip
self.port = port
self.tls_port = tls_port
Expand All @@ -361,7 +367,7 @@ def __init__(self,
self.force_tls = force_tls
self.context = context
try:
self._init_socket(socket.AF_INET)
self._init_socket(self.ip_family)
except Exception:
self.close()
raise
Expand Down Expand Up @@ -450,66 +456,7 @@ def _activate_routing(self): # type: (...) -> int
return -1


class DoIPSocket6(DoIPSocket):
"""Socket for DoIP communication. This sockets automatically
sends a routing activation request as soon as a TCP or TLS connection is
established.
:param ip: IPv6 address of destination
:param port: destination port, usually 13400
:param tls_port: destination port for TLS connection, usually 3496
:param activate_routing: If true, routing activation request is
automatically sent
:param source_address: DoIP source address
:param target_address: DoIP target address, this is automatically
determined if routing activation request is sent
:param activation_type: This allows to set a different activation type for
the routing activation request
:param reserved_oem: Optional parameter to set value for reserved_oem field
of routing activation request
:param force_tls: Skip establishing of a TCP connection and directly try to
connect via SSL/TLS
:param context: Optional ssl.SSLContext object for initialization of ssl socket
connections.
Example:
>>> socket = DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9")
>>> socket_link_local = DoIPSocket6("fe80::30e8:80ff:fe07:6d43%eth1")
>>> pkt = DoIP(payload_type=0x8001, source_address=0xe80, target_address=0x1000) / UDS() / UDS_RDBI(identifiers=[0x1000])
>>> resp = socket.sr1(pkt, timeout=1)
""" # noqa: E501

def __init__(self,
ip='::1', # type: str
port=13400, # type: int
tls_port=3496, # type: int
activate_routing=True, # type: bool
source_address=0xe80, # type: int
target_address=0, # type: int
activation_type=0, # type: int
reserved_oem=b"", # type: bytes
force_tls=False, # type: bool
context=None # type: Optional[ssl.SSLContext]
): # type: (...) -> None
self.ip = ip
self.port = port
self.tls_port = tls_port
self.activate_routing = activate_routing
self.source_address = source_address
self.target_address = target_address
self.activation_type = activation_type
self.reserved_oem = reserved_oem
self.buffer = b""
self.force_tls = force_tls
self.context = context
try:
self._init_socket(socket.AF_INET6)
except Exception:
self.close()
raise


class _UDS_DoIPSocketBase(StreamSocket):
class UDS_DoIPSocket(DoIPSocket):
"""
Application-Layer socket for DoIP endpoints. This socket takes care about
the encapsulation of UDS packets into DoIP packets.
Expand All @@ -524,8 +471,8 @@ def send(self, x):
# type: (Union[Packet, bytes]) -> int
if isinstance(x, UDS):
pkt = DoIP(payload_type=0x8001,
source_address=self.source_address, # type: ignore
target_address=self.target_address # type: ignore
source_address=self.source_address,
target_address=self.target_address
) / x
else:
pkt = x
Expand All @@ -545,28 +492,4 @@ def recv(self, x=MTU, **kwargs):
else:
return pkt


class UDS_DoIPSocket(_UDS_DoIPSocketBase, DoIPSocket):
"""
Application-Layer socket for DoIP endpoints. This socket takes care about
the encapsulation of UDS packets into DoIP packets.
Example:
>>> socket = UDS_DoIPSocket("169.254.117.238")
>>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000])
>>> resp = socket.sr1(pkt, timeout=1)
"""
pass


class UDS_DoIPSocket6(_UDS_DoIPSocketBase, DoIPSocket6):
"""
Application-Layer socket for DoIP endpoints. This socket takes care about
the encapsulation of UDS packets into DoIP packets.
Example:
>>> socket = UDS_DoIPSocket6("2001:16b8:3f0e:2f00:21a:37ff:febf:edb9")
>>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000])
>>> resp = socket.sr1(pkt, timeout=1)
"""
pass
8 changes: 4 additions & 4 deletions test/contrib/automotive/doip.uts
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def server():
server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket6(activate_routing=False)
sock = DoIPSocket(ip="::1", activate_routing=False)

pkts = sock.sniff(timeout=1, count=2)
server_thread.join(timeout=1)
Expand Down Expand Up @@ -668,7 +668,7 @@ server_up.wait(timeout=1)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = DoIPSocket6(activate_routing=False, force_tls=True, context=context)
sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context)

pkts = sock.sniff(timeout=1, count=2)
server_thread.join(timeout=1)
Expand Down Expand Up @@ -705,7 +705,7 @@ server_up.wait(timeout=1)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = UDS_DoIPSocket6(activate_routing=False, force_tls=True, context=context)
sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context)

pkts = sock.sniff(timeout=1, count=2)
server_thread.join(timeout=1)
Expand Down Expand Up @@ -765,7 +765,7 @@ context.check_hostname = False
context.verify_mode = ssl.CERT_NONE


sock = UDS_DoIPSocket6(ip="::1", context=context)
sock = UDS_DoIPSocket(ip="::1", context=context)

pkts = sock.sniff(timeout=1, count=2)
server_tcp_thread.join(timeout=1)
Expand Down

0 comments on commit 91b8a8d

Please sign in to comment.