From e5e22107680b986bc41ace087c86c166b54588e5 Mon Sep 17 00:00:00 2001 From: Ivan Kravets Date: Sat, 14 May 2022 23:30:36 +0300 Subject: [PATCH] Improved automatic detection of a testing serial port // Resolve #4076 --- HISTORY.rst | 1 + docs | 2 +- platformio/device/commands/monitor.py | 43 +++-------- platformio/device/helpers.py | 13 ---- platformio/device/serial.py | 91 +++++++++++++++++++++++ platformio/test/runners/readers/serial.py | 66 ++++++---------- 6 files changed, 128 insertions(+), 88 deletions(-) delete mode 100644 platformio/device/helpers.py create mode 100644 platformio/device/serial.py diff --git a/HISTORY.rst b/HISTORY.rst index 09d43582ae..f960d9c8ba 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -60,6 +60,7 @@ Please check the `Migration guide from 5.x to 6.0 `__ option (`issue #3132 `_) - Generate reports in JUnit and JSON formats using the `pio test `__ command (`issue #2891 `_) - Provide more information when the native program crashed on a host (errored with a non-zero return code) (`issue #3429 `_) + - Improved automatic detection of a testing serial port (`issue #4076 `_) - Fixed an issue when command line parameters (``--ignore``, ``--filter``) do not override values defined in the |PIOCONF| (`issue #3845 `_) - Renamed the "test_build_project_src" project configuration option to the `test_build_src `__ - Removed the "test_transport" option in favor of the `Custom "unity_config.h" `_ diff --git a/docs b/docs index 1bf2eb97b3..a57afb023a 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit 1bf2eb97b39a9515c2e65e3afba0cb05f694fde8 +Subproject commit a57afb023af0b3adbe6cc1bbff2cf287d9e02b0f diff --git a/platformio/device/commands/monitor.py b/platformio/device/commands/monitor.py index 2a7a94a12d..e13cfdc7ad 100644 --- a/platformio/device/commands/monitor.py +++ b/platformio/device/commands/monitor.py @@ -14,14 +14,13 @@ import os import sys -from fnmatch import fnmatch import click from serial.tools import miniterm from platformio import exception, fs from platformio.device.filters.base import register_filters -from platformio.device.list import list_serial_ports +from platformio.device.serial import scan_serial_port from platformio.platform.factory import PlatformFactory from platformio.project.config import ProjectConfig from platformio.project.exception import NotPlatformIOProjectError @@ -100,40 +99,22 @@ def device_monitor_cmd(**kwargs): # pylint: disable=too-many-branches project_options = {} platform = None - try: - with fs.cd(kwargs["project_dir"]): + with fs.cd(kwargs["project_dir"]): + try: project_options = get_project_options(kwargs["environment"]) kwargs = apply_project_monitor_options(kwargs, project_options) if "platform" in project_options: platform = PlatformFactory.new(project_options["platform"]) - except NotPlatformIOProjectError: - pass - - with fs.cd(kwargs["project_dir"]): + except NotPlatformIOProjectError: + pass register_filters(platform=platform, options=kwargs) - - if not kwargs["port"]: - ports = list_serial_ports(filter_hwid=True) - if len(ports) == 1: - kwargs["port"] = ports[0]["port"] - elif "platform" in project_options and "board" in project_options: - with fs.cd(kwargs["project_dir"]): - board_hwids = platform.board_config(project_options["board"]).get( - "build.hwids", [] - ) - for item in ports: - for hwid in board_hwids: - hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "") - if hwid_str in item["hwid"]: - kwargs["port"] = item["port"] - break - if kwargs["port"]: - break - elif kwargs["port"] and (set(["*", "?", "[", "]"]) & set(kwargs["port"])): - for item in list_serial_ports(): - if fnmatch(item["port"], kwargs["port"]): - kwargs["port"] = item["port"] - break + kwargs["port"] = scan_serial_port( + initial_port=kwargs["port"], + board_config=platform.board_config(project_options.get("board")) + if platform and project_options.get("board") + else None, + upload_protocol=project_options.get("upload_port"), + ) # override system argv with patched options sys.argv = ["monitor"] + project_options_to_monitor_argv( diff --git a/platformio/device/helpers.py b/platformio/device/helpers.py deleted file mode 100644 index b051490361..0000000000 --- a/platformio/device/helpers.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2014-present PlatformIO -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/platformio/device/serial.py b/platformio/device/serial.py new file mode 100644 index 0000000000..2d9a2f7bb9 --- /dev/null +++ b/platformio/device/serial.py @@ -0,0 +1,91 @@ +# Copyright (c) 2014-present PlatformIO +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from fnmatch import fnmatch + +import serial + +from platformio.compat import IS_WINDOWS +from platformio.device.list import list_serial_ports + + +def is_pattern_port(port): + if not port: + return False + return set(["*", "?", "[", "]"]) & set(port) + + +def is_serial_port_ready(port, timeout=1): + try: + serial.Serial(port, timeout=timeout).close() + return True + except: # pylint: disable=bare-except + pass + return False + + +def scan_serial_port( + initial_port, board_config=None, upload_protocol=None, ensure_ready=False +): + if initial_port: + if not is_pattern_port(initial_port): + return initial_port + return match_serial_port(initial_port) + port = None + if upload_protocol and upload_protocol.startswith("blackmagic"): + port = scan_blackmagic_serial_port() + if not port and board_config: + port = scan_board_serial_port(board_config) + if port: + return port + + # pick the first PID:VID USB device + for item in list_serial_ports(): + if ensure_ready and not is_serial_port_ready(item["port"]): + continue + port = item["port"] + if "VID:PID" in item["hwid"]: + return port + + return port + + +def match_serial_port(pattern): + for item in list_serial_ports(): + if fnmatch(item["port"], pattern): + return item["port"] + return None + + +def scan_blackmagic_serial_port(): + for item in list_serial_ports(): + port = item["port"] + if IS_WINDOWS and port.startswith("COM") and len(port) > 4: + port = "\\\\.\\%s" % port + if "GDB" in item["description"]: + return port + return None + + +def scan_board_serial_port(board_config): + board_hwids = board_config.get("build.hwids", []) + if not board_hwids: + return None + for item in list_serial_ports(filter_hwid=True): + port = item["port"] + for hwid in board_hwids: + hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "") + if hwid_str in item["hwid"]: + return port + return None diff --git a/platformio/test/runners/readers/serial.py b/platformio/test/runners/readers/serial.py index 3b9ae2d883..1f41fac290 100644 --- a/platformio/test/runners/readers/serial.py +++ b/platformio/test/runners/readers/serial.py @@ -17,7 +17,7 @@ import click import serial -from platformio.device.list import list_serial_ports +from platformio.device.serial import scan_serial_port from platformio.exception import UserSideException @@ -37,7 +37,7 @@ def begin(self): try: ser = serial.serial_for_url( - self.test_runner.get_test_port() or self.autodetect_test_port(), + self.resolve_test_port(), do_not_open=True, baudrate=self.test_runner.get_test_speed(), timeout=self.SERIAL_TIMEOUT, @@ -62,48 +62,28 @@ def begin(self): self.test_runner.on_testing_data_output(ser.read(ser.in_waiting or 1)) ser.close() - def autodetect_test_port(self): - board = self.test_runner.project_config.get( - f"env:{self.test_runner.test_suite.env_name}", "board" + def resolve_test_port(self): + project_options = self.test_runner.project_config.items( + env=self.test_runner.test_suite.env_name, as_dict=True ) - board_hwids = self.test_runner.platform.board_config(board).get( - "build.hwids", [] + scan_options = dict( + initial_port=self.test_runner.get_test_port(), + board_config=self.test_runner.platform.board_config( + project_options["board"] + ), + upload_protocol=project_options.get("upload_port"), + ensure_ready=True, ) - port = None - elapsed = 0 - while elapsed < 5 and not port: - for item in list_serial_ports(): - port = item["port"] - for hwid in board_hwids: - hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "") - if hwid_str in item["hwid"] and self.is_serial_port_ready(port): - return port - - if port and not self.is_serial_port_ready(port): - port = None - - if not port: - sleep(0.25) - elapsed += 0.25 - if not port: - raise UserSideException( - "Please specify `test_port` for environment or use " - "global `--test-port` option." - ) - return port - - @staticmethod - def is_serial_port_ready(port, timeout=3): - if not port: - return False elapsed = 0 - while elapsed < timeout: - try: - serial.Serial(port, timeout=1).close() - return True - except: # pylint: disable=bare-except - pass - sleep(1) - elapsed += 1 - return False + while elapsed < 5: + port = scan_serial_port(**scan_options) + if port: + return port + sleep(0.25) + elapsed += 0.25 + + raise UserSideException( + "Please specify `test_port` for environment or use " + "global `--test-port` option." + )