Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type hint arch/bpf #3825

Merged
merged 2 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .config/mypy/mypy_enabled.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ scapy/__main__.py
scapy/all.py
scapy/ansmachine.py
scapy/arch/__init__.py
scapy/arch/bpf/__init__.py
scapy/arch/bpf/consts.py
scapy/arch/bpf/core.py
scapy/arch/bpf/supersocket.py
scapy/arch/common.py
scapy/arch/libpcap.py
scapy/arch/linux.py
scapy/arch/unix.py
scapy/arch/solaris.py
scapy/arch/unix.py
scapy/arch/windows/__init__.py
scapy/arch/windows/native.py
scapy/arch/windows/structures.py
Expand Down Expand Up @@ -87,7 +91,12 @@ scapy/contrib/roce.py
scapy/contrib/tcpao.py

# LIBS
scapy/libs/__init__.py
scapy/libs/ethertypes.py
scapy/libs/extcap.py
scapy/libs/matplot.py
scapy/libs/structures.py
scapy/libs/test_pyx.py

# TEST
test/testsocket.py
Expand Down
17 changes: 12 additions & 5 deletions scapy/arch/bpf/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from scapy.libs.structures import bpf_program
from scapy.data import MTU

# Type hints
from typing import (
Any,
Callable,
)

SIOCGIFFLAGS = 0xc0206911
BPF_BUFFER_LENGTH = MTU

Expand All @@ -23,19 +29,20 @@
IOC_IN = 0x80000000
IOC_INOUT = IOC_IN | IOC_OUT

_th = lambda x: x if isinstance(x, int) else ctypes.sizeof(x)
_th = lambda x: x if isinstance(x, int) else ctypes.sizeof(x) # type: Callable[[Any], int] # noqa: E501


def _IOC(inout, group, num, len):
# type: (int, str, int, Any) -> int
return (inout |
((_th(len) & IOCPARM_MASK) << 16) |
(ord(group) << 8) | (num))


_IO = lambda g, n: _IOC(IOC_VOID, g, n, 0)
_IOR = lambda g, n, t: _IOC(IOC_OUT, g, n, t)
_IOW = lambda g, n, t: _IOC(IOC_IN, g, n, t)
_IOWR = lambda g, n, t: _IOC(IOC_INOUT, g, n, t)
_IO = lambda g, n: _IOC(IOC_VOID, g, n, 0) # type: Callable[[str, int], int]
_IOR = lambda g, n, t: _IOC(IOC_OUT, g, n, t) # type: Callable[[str, int, Any], int]
_IOW = lambda g, n, t: _IOC(IOC_IN, g, n, t) # type: Callable[[str, int, Any], int]
_IOWR = lambda g, n, t: _IOC(IOC_INOUT, g, n, t) # type: Callable[[str, int, Any], int]

# Length of some structures
_bpf_stat = 8
Expand Down
32 changes: 26 additions & 6 deletions scapy/arch/bpf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@
InterfaceProvider,
NetworkInterface,
network_name,
_GlobInterfaceType,
)
from scapy.pton_ntop import inet_ntop

# Typing
from typing import (
Dict,
List,
Optional,
Tuple,
)

if LINUX:
raise OSError("BPF conflicts with Linux")

Expand Down Expand Up @@ -67,7 +76,10 @@ class if_nameindex(Structure):


def get_if_raw_addr(ifname):
"""Returns the IPv4 address configured on 'ifname', packed with inet_pton.""" # noqa: E501
# type: (_GlobInterfaceType) -> bytes
"""
Returns the IPv4 address configured on 'ifname', packed with inet_pton.
"""

ifname = network_name(ifname)

Expand Down Expand Up @@ -99,6 +111,7 @@ def get_if_raw_addr(ifname):


def get_if_raw_hwaddr(ifname):
# type: (_GlobInterfaceType) -> Tuple[int, bytes]
"""Returns the packed MAC address configured on 'ifname'."""

NULL_MAC_ADDRESS = b'\x00' * 6
Expand Down Expand Up @@ -128,19 +141,19 @@ def get_if_raw_hwaddr(ifname):
raise Scapy_Exception("No MAC address found on %s !" % ifname)

# Pack and return the MAC address
mac = addresses[0].split(' ')[1]
mac = [chr(int(b, 16)) for b in mac.split(':')]
mac = [int(b, 16) for b in addresses[0].split(' ')[1].split(':')]

# Check that the address length is correct
if len(mac) != 6:
raise Scapy_Exception("No MAC address found on %s !" % ifname)

return (ARPHDR_ETHER, ''.join(mac))
return (ARPHDR_ETHER, struct.pack("!BBBBBB", *mac))


# BPF specific functions

def get_dev_bpf():
# type: () -> Tuple[int, int]
"""Returns an opened BPF file object"""

# Get the first available BPF handle
Expand All @@ -160,6 +173,7 @@ def get_dev_bpf():


def attach_filter(fd, bpf_filter, iface):
# type: (int, str, _GlobInterfaceType) -> None
"""Attach a BPF filter to the BPF file descriptor"""
bp = compile_filter(bpf_filter, iface)
# Assign the BPF program to the interface
Expand All @@ -171,6 +185,7 @@ def attach_filter(fd, bpf_filter, iface):
# Interface manipulation functions

def _get_ifindex_list():
# type: () -> List[Tuple[str, int]]
"""
Returns a list containing (iface, index)
"""
Expand All @@ -189,6 +204,7 @@ def _get_ifindex_list():


def _get_if_flags(ifname):
# type: (_GlobInterfaceType) -> Optional[int]
"""Internal function to get interface flags"""
# Get interface flags
try:
Expand All @@ -206,6 +222,7 @@ class BPFInterfaceProvider(InterfaceProvider):
name = "BPF"

def _is_valid(self, dev):
# type: (NetworkInterface) -> bool
if not dev.flags & 0x1: # not IFF_UP
return False
# Get a BPF handle
Expand All @@ -228,17 +245,20 @@ def _is_valid(self, dev):
os.close(fd)

def load(self):
# type: () -> Dict[str, NetworkInterface]
from scapy.fields import FlagValue
data = {}
ips = in6_getifaddr()
for ifname, index in _get_ifindex_list():
try:
ifflags = _get_if_flags(ifname)
ifflags_int = _get_if_flags(ifname)
if ifflags_int is None:
continue
mac = scapy.utils.str2mac(get_if_raw_hwaddr(ifname)[1])
ip = inet_ntop(socket.AF_INET, get_if_raw_addr(ifname))
except Scapy_Exception:
continue
ifflags = FlagValue(ifflags, _iff_flags)
ifflags = FlagValue(ifflags_int, _iff_flags)
if_data = {
"name": ifname,
"network_name": ifname,
Expand Down
Loading