Skip to content

Commit

Permalink
[TOOLS] Add scripts for sniffing H4 & H5 HCI UART w/ btmon
Browse files Browse the repository at this point in the history
  • Loading branch information
darthcloud committed Dec 31, 2024
1 parent 6e06026 commit c59ccdd
Show file tree
Hide file tree
Showing 2 changed files with 383 additions and 0 deletions.
189 changes: 189 additions & 0 deletions tools/btmon_h4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# SPDX-FileCopyrightText: 2024 Jacques Gagnon
# SPDX-License-Identifier: Apache-2.0
import struct
import subprocess
import sys
import time
from argparse import ArgumentParser
from enum import IntEnum
from glob import glob
from serial import Serial
from socket import socket, AF_INET, SOCK_STREAM


socat = None
btmon = None
sock = None


class H4Type(IntEnum):
CMD_PKT = 1
ACL_PKT = 2
SCO_PKT = 3
EVT_PKT = 4
ISO_PKT = 5


class BtsnoopOpcode(IntEnum):
NEW_INDEX = 0
DEL_INDEX = 1
COMMAND_PKT = 2
EVENT_PKT = 3
ACL_TX_PKT = 4
ACL_RX_PKT = 5
SCO_TX_PKT = 6
SCO_RX_PKT = 7
OPEN_INDEX = 8
CLOSE_INDEX = 9
INDEX_INFO = 10
VENDOR_DIAG = 11
SYSTEM_NOTE = 12
USER_LOGGING = 13
CTRL_OPEN = 14
CTRL_CLOSE = 15
CTRL_COMMAND = 16
CTRL_EVENT = 17
ISO_TX_PKT = 18
ISO_RX_PKT = 19


def btsnoop_opcode(tx, type):
opcode = None

if type == H4Type.CMD_PKT:
opcode = BtsnoopOpcode.COMMAND_PKT
elif type == H4Type.ACL_PKT:
if tx:
opcode = BtsnoopOpcode.ACL_TX_PKT
else:
opcode = BtsnoopOpcode.ACL_RX_PKT
elif type == H4Type.SCO_PKT:
if tx:
opcode = BtsnoopOpcode.SCO_TX_PKT
else:
opcode = BtsnoopOpcode.SCO_RX_PKT
elif type == H4Type.EVT_PKT:
opcode = BtsnoopOpcode.EVENT_PKT
elif type == H4Type.ISO_PKT:
if tx:
opcode = BtsnoopOpcode.ISO_TX_PKT
else:
opcode = BtsnoopOpcode.ISO_RX_PKT
return opcode


def parse_args():
parser = ArgumentParser()
parser.add_argument('--tty0', help='TTY connected to H4 HCI TX|RX')
parser.add_argument('--tty1', help='TTY connected to H4 HCI TX|RX')
parser.add_argument('-b', '--baud', type=int, default=115200, help='TTY baudrate')
parser.add_argument('-d', '--data', type=int, default=8, help='TTY data bytesize')
parser.add_argument('-p', '--parity', default='N', help='TTY parity: N, E, O, M, S')
parser.add_argument('-s', '--stop', type=float, default=1, help='TTY stopbits')
parser.add_argument('-w', '--write', help='Save trace in btsnoop format')
return parser.parse_args()


def except_hook(type, value, tb):
global socat, btmon, sock

print('')

if isinstance(sock, socket):
sock.close()
print('Socket closed')
if isinstance(btmon, subprocess.Popen):
btmon.kill()
print('btmon closed')
if isinstance(socat, subprocess.Popen):
socat.kill()
print('socat closed')

if type is KeyboardInterrupt:
print('Capture ended sucessfully')
sys.exit(0)
else:
import traceback
error = ''.join(traceback.format_exception(type, value, tb))
print(error)


def main():
global socat, btmon, sock

# Get arguments
args = parse_args()

# Use first two USB tty by default
if args.tty0 is None and args.tty1 is None:
args.tty0, args.tty1, *_ = glob('/dev/ttyUSB*')

# Deinit hook on failure or keyboard exit
sys.excepthook = except_hook

# Create virtual tty
socat_cmd = [
'socat',
'PTY,raw,echo=0,link=/tmp/socat,nonblock,group-late=dialout,mode=660,b921600',
'TCP-LISTEN:8008,reuseaddr,fork'
]
socat = subprocess.Popen(socat_cmd, stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
time.sleep(1)

# Launch Bluez's btmon on our virtual tty
btmon_cmd = ['btmon', '--tty', '/tmp/socat', '--tty-speed', '921600']
if args.write is not None:
btmon_cmd.extend(['-w', args.write])
btmon = subprocess.Popen(btmon_cmd)

# Connect to virtual tty socket
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('localhost', 8008))

# Setup ttys we want to sniff H4 from
ttys = []
if args.tty0 is not None:
ttys.append(Serial(port=args.tty0, baudrate=args.baud, bytesize=args.data, parity=args.parity, stopbits=args.stop))
if args.tty1 is not None:
ttys.append(Serial(port=args.tty1, baudrate=args.baud, bytesize=args.data, parity=args.parity, stopbits=args.stop))

# ttys read & socket send loop
while True:
for tty in ttys:
if tty.in_waiting:
# Get H4 header header byte
h4_hdr = tty.read(1)[0]

# Best-effort timestamp, no way to know precisely when this got into the HW buffer
ts = time.perf_counter_ns() // 100000

if h4_hdr == H4Type.CMD_PKT:
# Flag the TX tty
# This assume we get some cmd pkt before any ACL one
tty.dir = 'TX'

pkt = tty.read(3)
data_len = struct.unpack('<HB', pkt)[1]
elif h4_hdr == H4Type.ACL_PKT:
pkt = tty.read(4)
data_len = struct.unpack('<HH', pkt)[1]
elif h4_hdr == H4Type.EVT_PKT:
pkt = tty.read(2)
data_len = struct.unpack('<BB', pkt)[1]
else:
continue

pkt += tty.read(data_len)

# Get BTSNOOP opcode base on type & direction
opcode = btsnoop_opcode(hasattr(tty, 'dir'), h4_hdr)
if opcode is None:
continue

# Craft btmon tty header & send to socat socket
btmon_hdr = struct.pack("<HHBBBI", len(pkt) + 4 + 5, opcode, 0, 5, 8, ts)
sock.send(btmon_hdr + pkt)


if __name__ == "__main__":
main()
194 changes: 194 additions & 0 deletions tools/btmon_h5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# SPDX-FileCopyrightText: 2024 Jacques Gagnon
# SPDX-License-Identifier: Apache-2.0
import struct
import subprocess
import sys
import time
from argparse import ArgumentParser
from enum import IntEnum
from glob import glob
from serial import Serial
from socket import socket, AF_INET, SOCK_STREAM


socat = None
btmon = None
sock = None


class H5Type(IntEnum):
ACK_PKT = 0
CMD_PKT = 1
ACL_PKT = 2
SCO_PKT = 3
EVT_PKT = 4
ISO_PKT = 5
VENDOR_PKT = 14
LC_PKT = 15


class BtsnoopOpcode(IntEnum):
NEW_INDEX = 0
DEL_INDEX = 1
COMMAND_PKT = 2
EVENT_PKT = 3
ACL_TX_PKT = 4
ACL_RX_PKT = 5
SCO_TX_PKT = 6
SCO_RX_PKT = 7
OPEN_INDEX = 8
CLOSE_INDEX = 9
INDEX_INFO = 10
VENDOR_DIAG = 11
SYSTEM_NOTE = 12
USER_LOGGING = 13
CTRL_OPEN = 14
CTRL_CLOSE = 15
CTRL_COMMAND = 16
CTRL_EVENT = 17
ISO_TX_PKT = 18
ISO_RX_PKT = 19


def btsnoop_opcode(tx, type):
opcode = None

if type == H5Type.CMD_PKT:
opcode = BtsnoopOpcode.COMMAND_PKT
elif type == H5Type.ACL_PKT:
if tx:
opcode = BtsnoopOpcode.ACL_TX_PKT
else:
opcode = BtsnoopOpcode.ACL_RX_PKT
elif type == H5Type.SCO_PKT:
if tx:
opcode = BtsnoopOpcode.SCO_TX_PKT
else:
opcode = BtsnoopOpcode.SCO_RX_PKT
elif type == H5Type.EVT_PKT:
opcode = BtsnoopOpcode.EVENT_PKT
elif type == H5Type.ISO_PKT:
if tx:
opcode = BtsnoopOpcode.ISO_TX_PKT
else:
opcode = BtsnoopOpcode.ISO_RX_PKT
return opcode


def parse_args():
parser = ArgumentParser()
parser.add_argument('--tty0', help='TTY connected to H5 HCI TX|RX')
parser.add_argument('--tty1', help='TTY connected to H5 HCI TX|RX')
parser.add_argument('-b', '--baud', type=int, default=1500000, help='TTY baudrate')
parser.add_argument('-d', '--data', type=int, default=8, help='TTY data bytesize')
parser.add_argument('-p', '--parity', default='E', help='TTY parity: N, E, O, M, S')
parser.add_argument('-s', '--stop', type=float, default=1, help='TTY stopbits')
parser.add_argument('-w', '--write', help='Save trace in btsnoop format')
return parser.parse_args()


def except_hook(type, value, tb):
global socat, btmon, sock

print('')

if isinstance(sock, socket):
sock.close()
print('Socket closed')
if isinstance(btmon, subprocess.Popen):
btmon.kill()
print('btmon closed')
if isinstance(socat, subprocess.Popen):
socat.kill()
print('socat closed')

if type is KeyboardInterrupt:
print('Capture ended sucessfully')
sys.exit(0)
else:
import traceback
error = ''.join(traceback.format_exception(type, value, tb))
print(error)


def main():
global socat, btmon, sock

# Get arguments
args = parse_args()

# Use first two USB tty by default
if args.tty0 is None and args.tty1 is None:
args.tty0, args.tty1, *_ = glob('/dev/ttyUSB*')

# Deinit hook on failure or keyboard exit
sys.excepthook = except_hook

# Create virtual tty
socat_cmd = [
'socat',
'PTY,raw,echo=0,link=/tmp/socat,nonblock,group-late=dialout,mode=660,b921600',
'TCP-LISTEN:8008,reuseaddr,fork'
]
socat = subprocess.Popen(socat_cmd, stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
time.sleep(1)

# Launch Bluez's btmon on our virtual tty
btmon_cmd = ['btmon', '--tty', '/tmp/socat', '--tty-speed', '921600']
if args.write is not None:
btmon_cmd.extend(['-w', args.write])
btmon = subprocess.Popen(btmon_cmd)

# Connect to virtual tty socket
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('localhost', 8008))

# Setup ttys we want to sniff H5 from
ttys = []
if args.tty0 is not None:
ttys.append(Serial(port=args.tty0, baudrate=args.baud, bytesize=args.data, parity=args.parity, stopbits=args.stop))
if args.tty1 is not None:
ttys.append(Serial(port=args.tty1, baudrate=args.baud, bytesize=args.data, parity=args.parity, stopbits=args.stop))

# ttys read & socket send loop
while True:
for tty in ttys:
if tty.in_waiting:
# Get slip frame from tty
slip = tty.read_until(expected=b'\xC0')

# Best-effort timestamp, no way to know precisely when this got into the HW buffer
ts = time.perf_counter_ns() // 100000

# Decode slip frame
pkt = slip[:-1].replace(b'\xDB\xDC', b'\xC0').replace(b'\xDB\xDD', b'\xDB')
if len(pkt) < 4:
continue

# Validate H5 header
chksum = sum(pkt[0:4]) % 256
if chksum != 0xFF:
continue

# Extract payload len & type
len_type = struct.unpack("<BHB", pkt[0:4])[1]
payload_len = len_type >> 4
type = len_type & 0xF

# Flag the TX tty
# This assume we get some cmd pkt before any ACL one
if type == 1:
tty.dir = 'TX'

# Get BTSNOOP opcode base on type & direction
opcode = btsnoop_opcode(hasattr(tty, 'dir'), type)
if opcode is None:
continue

# Craft btmon tty header & send to socat socket
btmon_hdr = struct.pack("<HHBBBI", payload_len + 4 + 5, opcode, 0, 5, 8, ts)
sock.send(btmon_hdr + pkt[4:4 + payload_len])


if __name__ == "__main__":
main()

0 comments on commit c59ccdd

Please sign in to comment.