Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement hid_listen subcommand #11923

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ jobs:
with:
submodules: recursive
- name: Install dependencies
run: pip3 install -r requirements.txt
run: pip3 install -r requirements-dev.txt
- name: Run tests
run: bin/qmk pytest
1 change: 1 addition & 0 deletions lib/python/qmk/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from . import clean
from . import compile
from . import config
from . import console
from . import docs
from . import doctor
from . import fileformat
Expand Down
122 changes: 122 additions & 0 deletions lib/python/qmk/cli/console.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Acquire debugging information from usb hid devices

cli implementation of https://www.pjrc.com/teensy/hid_listen.html

State machine is implemented as follows:

+-+
+-+ More Data?
| +------------+
| | |
+-----+-------+ +-----------------+ +-----+------+ |
| | | | | | |
| Search +----->+ Connect +------>+ Read +<----+
| | | | | |
+-----+-------+ +-----------------+ +------+-----+
^ |
| Disconnect/Error? |
+-----------------------------------------------+
"""
import hid
zvecr marked this conversation as resolved.
Show resolved Hide resolved
import queue
import time

from milc import cli


class StateMachine(queue.Queue):
def transition(self, func, *args):
self.put(lambda: func(self, *args))

def transition_later(self, delay, func, *args):
time.sleep(delay)
self.put(lambda: func(self, *args))

def on_exception(self, func):
self._except_func = func

def run_forever(self):
while True:
try:
f = self.get()
f()
except KeyboardInterrupt:
break
except BaseException as e:
if self._except_func:
self._except_func(e)


def _is_console_hid(x):
return x['usage_page'] == 0xFF31 and x['usage'] == 0x0074


def _is_filtered_device(x):
name = "%04x:%04x" % (x['vendor_id'], x['product_id'])
return name.lower().startswith(cli.args.device.lower())


def _search():
devices = filter(_is_console_hid, hid.enumerate())
if cli.args.device:
devices = filter(_is_filtered_device, devices)

return list(devices)


def list_devices():
cli.log.info('Available devices:')
devices = _search()
for dev in devices:
cli.log.info(" %04x:%04x %s %s", dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'])


def state_search(sm):
print('.', end='', flush=True)

found = _search()
selected = found[cli.args.index] if found[cli.args.index:] else None

if selected:
sm.transition_later(0.5, state_connect, selected)
else:
sm.transition_later(1, state_search)


def state_connect(sm, selected):
print()
print('Listening to %s:' % selected['path'].decode())
device = hid.Device(path=selected['path'])
sm.transition_later(1, state_read, device)


def state_read(sm, device):
print(device.read(32, 1).decode('ascii'), end='')
sm.transition(state_read, device)


def state_exception(sm, context):
# print(str(e))
print('Device disconnected.')
print('Waiting for new device:')
sm.transition(state_search)


@cli.argument('-d', '--device', help='device to select - uses format <pid>:<vid>.')
@cli.argument('-i', '--index', default=0, help='device index to select.')
@cli.argument('-l', '--list', arg_only=True, action='store_true', help='List available hid_listen devices.')
@cli.subcommand('Acquire debugging information from usb hid devices.', hidden=False if cli.config.user.developer else True)
def console(cli):
"""Acquire debugging information from usb hid devices
"""

if cli.args.list:
return list_devices()

print('Waiting for device:')

sm = StateMachine()
sm.transition(state_search)
sm.on_exception(lambda e: sm.transition(state_exception, e))

sm.run_forever()
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
-r requirements.txt

# Python development requirements
hid
nose2
flake8
pep8-naming
Expand Down