From 3a44dcb10e43e614e3181e546670ec055112dcee Mon Sep 17 00:00:00 2001 From: SG Date: Fri, 28 Apr 2023 17:00:47 +0300 Subject: [PATCH 1/4] Scripts: wifi updater --- scripts/wifi_board.py | 231 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100755 scripts/wifi_board.py diff --git a/scripts/wifi_board.py b/scripts/wifi_board.py new file mode 100755 index 00000000000..b3ad811c729 --- /dev/null +++ b/scripts/wifi_board.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 + +from flipper.app import App +from serial.tools.list_ports_common import ListPortInfo + +import logging +import os +import tempfile +import subprocess +import serial.tools.list_ports as list_ports +import json +import requests +import tarfile + +class UpdateDownloader: + UPDATE_SERVER = "https://test-update.flipperzero.one" + UPDATE_PROJECT = "/blackmagic-firmware" + UPDATE_INDEX = UPDATE_SERVER + UPDATE_PROJECT + "/directory.json" + UPDATE_TYPE = "full_tgz" + + CHANNEL_ID_ALIAS = { + "dev": "development", + "rc": "release-candidate", + "r": "release", + "rel": "release", + } + + def __init__(self): + self.logger = logging.getLogger() + + def download(self, channel_id: str, dir: str) -> bool: + # Aliases + if channel_id in self.CHANNEL_ID_ALIAS: + channel_id = self.CHANNEL_ID_ALIAS[channel_id] + + # Make directory + if not os.path.exists(dir): + self.logger.info(f"Creating directory {dir}") + os.makedirs(dir) + + # Download json index + self.logger.info(f"Downloading {self.UPDATE_INDEX}") + response = requests.get(self.UPDATE_INDEX) + if response.status_code != 200: + self.logger.error(f"Failed to download {self.UPDATE_INDEX}") + return False + + # Parse json index + try: + index = json.loads(response.content) + except Exception as e: + self.logger.error(f"Failed to parse json index: {e}") + return False + + # Find channel + channel = None + for channel_candidate in index["channels"]: + if channel_candidate["id"] == channel_id: + channel = channel_candidate + break + + # Check if channel found + if channel is None: + self.logger.error(f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}") + return False + + self.logger.info(f"Using channel '{channel_id}'") + + # Get latest version + try: + version = channel["versions"][0] + except Exception as e: + self.logger.error(f"Failed to get version: {e}") + return False + + self.logger.info(f"Using version '{version['version']}'") + + # Get changelog + changelog = None + try: + changelog = version["changelog"] + except Exception as e: + self.logger.error(f"Failed to get changelog: {e}") + + # print changelog + if changelog is not None: + self.logger.info(f"Changelog:") + for line in changelog.split("\n"): + if line.strip() == "": + continue + self.logger.info(f" {line}") + + # Find file + file_url = None + for file_candidate in version['files']: + if file_candidate['type'] == self.UPDATE_TYPE: + file_url = file_candidate['url'] + break + + if file_url is None: + self.logger.error(f"File not found") + return False + + # Make file path + file_name = file_url.split("/")[-1] + file_path = os.path.join(dir, file_name) + + # Download file + self.logger.info(f"Downloading {file_url} to {file_path}") + with open(file_path, "wb") as f: + response = requests.get(file_url) + f.write(response.content) + + # Unzip tgz + self.logger.info(f"Unzipping {file_path}") + with tarfile.open(file_path, "r") as tar: + tar.extractall(dir) + + return True + + +class Main(App): + def init(self): + self.parser.add_argument("-p", "--port", help="CDC Port", default="auto") + self.parser.add_argument("-c", "--channel", help="Channel name", default="development") + self.parser.set_defaults(func=self.update) + + # logging + self.logger = logging.getLogger() + + def find_wifi_board(self) -> bool: + # idk why, but python thinks that list_ports.grep returns tuple[str, str, str] + blackmagics: list[ListPortInfo] = list(list_ports.grep("blackmagic")) # type: ignore + daps: list[ListPortInfo] = list(list_ports.grep("CMSIS-DAP")) # type: ignore + + return len(blackmagics) > 0 or len(daps) > 0 + + def find_wifi_board_bootloader(self): + # idk why, but python thinks that list_ports.grep returns tuple[str, str, str] + ports: list[ListPortInfo] = list(list_ports.grep("ESP32-S2")) # type: ignore + + if len(ports) == 0: + # Blackmagic probe serial port not found, will be handled later + pass + elif len(ports) > 1: + raise Exception("More than one WiFi board found") + else: + port = ports[0] + if os.name == "nt": + port.device = f"\\\\.\\{port.device}" + return port.device + + def update(self): + try: + port = self.find_wifi_board_bootloader() + except Exception as e: + self.logger.error(f"{e}") + return 1 + + if self.args.port != "auto": + port = self.args.port + + available_ports = [p[0] for p in list(list_ports.comports())] + if port not in available_ports: + self.logger.error(f"Port {port} not found") + return 1 + + if port is None: + if self.find_wifi_board(): + self.logger.error("WiFi board found, but not in bootloader mode.") + self.logger.info("Please hold down BOOT button and press RESET button") + else: + self.logger.error("WiFi board not found") + self.logger.info( + "Please connect WiFi board to your computer, hold down BOOT button and press RESET button" + ) + return 1 + + # get temporary dir + dir = tempfile.TemporaryDirectory() + downloader = UpdateDownloader() + + # download + downloader.download(self.args.channel, dir.name) + + with open(os.path.join(dir.name, "flash.command"), "r") as f: + flash_command = f.read() + + flash_command = flash_command.replace("\n", "").replace("\r", "") + flash_command = flash_command.replace("(PORT)", port) + + # We can't reset the board after flashing via usb + flash_command = flash_command.replace( + "--after hard_reset", "--after no_reset_stub" + ) + + args = flash_command.split(" ")[0:] + args = list(filter(None, args)) + + esptool_params = [] + esptool_params.extend(args) + + self.logger.info(f'Running command: "{" ".join(args)}" in "{dir.name}"') + + process = subprocess.Popen( + esptool_params, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=dir.name, + bufsize=1, + universal_newlines=True, + ) + + while process.poll() is None: + if process.stdout is not None: + for line in process.stdout: + self.logger.debug(f"{line.strip()}") + + dir.cleanup() + + if process.returncode != 0: + self.logger.error(f"Failed to flash WiFi board") + else: + self.logger.info("WiFi board flashed successfully") + self.logger.info("Press RESET button on WiFi board to start it") + + return process.returncode + + +if __name__ == "__main__": + Main()() \ No newline at end of file From ecd13d62e75f3d568604021bdfc05919b5234627 Mon Sep 17 00:00:00 2001 From: SG Date: Fri, 28 Apr 2023 17:15:23 +0300 Subject: [PATCH 2/4] WiFi board updater: lint, process download error --- scripts/wifi_board.py | 45 +++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/scripts/wifi_board.py b/scripts/wifi_board.py index b3ad811c729..37322835e97 100755 --- a/scripts/wifi_board.py +++ b/scripts/wifi_board.py @@ -12,6 +12,7 @@ import requests import tarfile + class UpdateDownloader: UPDATE_SERVER = "https://test-update.flipperzero.one" UPDATE_PROJECT = "/blackmagic-firmware" @@ -32,7 +33,7 @@ def download(self, channel_id: str, dir: str) -> bool: # Aliases if channel_id in self.CHANNEL_ID_ALIAS: channel_id = self.CHANNEL_ID_ALIAS[channel_id] - + # Make directory if not os.path.exists(dir): self.logger.info(f"Creating directory {dir}") @@ -44,14 +45,14 @@ def download(self, channel_id: str, dir: str) -> bool: if response.status_code != 200: self.logger.error(f"Failed to download {self.UPDATE_INDEX}") return False - + # Parse json index try: index = json.loads(response.content) except Exception as e: self.logger.error(f"Failed to parse json index: {e}") return False - + # Find channel channel = None for channel_candidate in index["channels"]: @@ -61,7 +62,9 @@ def download(self, channel_id: str, dir: str) -> bool: # Check if channel found if channel is None: - self.logger.error(f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}") + self.logger.error( + f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}" + ) return False self.logger.info(f"Using channel '{channel_id}'") @@ -72,7 +75,7 @@ def download(self, channel_id: str, dir: str) -> bool: except Exception as e: self.logger.error(f"Failed to get version: {e}") return False - + self.logger.info(f"Using version '{version['version']}'") # Get changelog @@ -89,18 +92,18 @@ def download(self, channel_id: str, dir: str) -> bool: if line.strip() == "": continue self.logger.info(f" {line}") - + # Find file file_url = None - for file_candidate in version['files']: - if file_candidate['type'] == self.UPDATE_TYPE: - file_url = file_candidate['url'] + for file_candidate in version["files"]: + if file_candidate["type"] == self.UPDATE_TYPE: + file_url = file_candidate["url"] break if file_url is None: self.logger.error(f"File not found") return False - + # Make file path file_name = file_url.split("/")[-1] file_path = os.path.join(dir, file_name) @@ -115,14 +118,16 @@ def download(self, channel_id: str, dir: str) -> bool: self.logger.info(f"Unzipping {file_path}") with tarfile.open(file_path, "r") as tar: tar.extractall(dir) - + return True - + class Main(App): def init(self): self.parser.add_argument("-p", "--port", help="CDC Port", default="auto") - self.parser.add_argument("-c", "--channel", help="Channel name", default="development") + self.parser.add_argument( + "-c", "--channel", help="Channel name", default="development" + ) self.parser.set_defaults(func=self.update) # logging @@ -179,9 +184,15 @@ def update(self): # get temporary dir dir = tempfile.TemporaryDirectory() downloader = UpdateDownloader() - - # download - downloader.download(self.args.channel, dir.name) + + # download latest channel update + try: + if not downloader.download(self.args.channel, dir.name): + self.logger.error(f"Cannot download update") + return 1 + except Exception as e: + self.logger.error(f"Cannot download update: {e}") + return 1 with open(os.path.join(dir.name, "flash.command"), "r") as f: flash_command = f.read() @@ -228,4 +239,4 @@ def update(self): if __name__ == "__main__": - Main()() \ No newline at end of file + Main()() From 3308a29387d6f5ab95d2c224b1cdeecf29c87622 Mon Sep 17 00:00:00 2001 From: SG Date: Fri, 28 Apr 2023 17:20:48 +0300 Subject: [PATCH 3/4] WiFi board updater: auto cleanup temp dir --- scripts/wifi_board.py | 72 +++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/scripts/wifi_board.py b/scripts/wifi_board.py index 37322835e97..64533654dae 100755 --- a/scripts/wifi_board.py +++ b/scripts/wifi_board.py @@ -182,52 +182,50 @@ def update(self): return 1 # get temporary dir - dir = tempfile.TemporaryDirectory() - downloader = UpdateDownloader() - - # download latest channel update - try: - if not downloader.download(self.args.channel, dir.name): - self.logger.error(f"Cannot download update") + with tempfile.TemporaryDirectory() as temp_dir: + downloader = UpdateDownloader() + + # download latest channel update + try: + if not downloader.download(self.args.channel, temp_dir): + self.logger.error(f"Cannot download update") + return 1 + except Exception as e: + self.logger.error(f"Cannot download update: {e}") return 1 - except Exception as e: - self.logger.error(f"Cannot download update: {e}") - return 1 - with open(os.path.join(dir.name, "flash.command"), "r") as f: - flash_command = f.read() + with open(os.path.join(temp_dir, "flash.command"), "r") as f: + flash_command = f.read() - flash_command = flash_command.replace("\n", "").replace("\r", "") - flash_command = flash_command.replace("(PORT)", port) + flash_command = flash_command.replace("\n", "").replace("\r", "") + flash_command = flash_command.replace("(PORT)", port) - # We can't reset the board after flashing via usb - flash_command = flash_command.replace( - "--after hard_reset", "--after no_reset_stub" - ) - - args = flash_command.split(" ")[0:] - args = list(filter(None, args)) + # We can't reset the board after flashing via usb + flash_command = flash_command.replace( + "--after hard_reset", "--after no_reset_stub" + ) - esptool_params = [] - esptool_params.extend(args) + args = flash_command.split(" ")[0:] + args = list(filter(None, args)) - self.logger.info(f'Running command: "{" ".join(args)}" in "{dir.name}"') + esptool_params = [] + esptool_params.extend(args) - process = subprocess.Popen( - esptool_params, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - cwd=dir.name, - bufsize=1, - universal_newlines=True, - ) + self.logger.info(f'Running command: "{" ".join(args)}" in "{temp_dir}"') - while process.poll() is None: - if process.stdout is not None: - for line in process.stdout: - self.logger.debug(f"{line.strip()}") + process = subprocess.Popen( + esptool_params, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=temp_dir, + bufsize=1, + universal_newlines=True, + ) - dir.cleanup() + while process.poll() is None: + if process.stdout is not None: + for line in process.stdout: + self.logger.debug(f"{line.strip()}") if process.returncode != 0: self.logger.error(f"Failed to flash WiFi board") From cdbffb1ed8f5f2a569b6f1fccf2779ae19766014 Mon Sep 17 00:00:00 2001 From: SG Date: Thu, 8 Jun 2023 10:54:59 +0300 Subject: [PATCH 4/4] Scripts: fix server address --- scripts/wifi_board.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/wifi_board.py b/scripts/wifi_board.py index 64533654dae..3f89ebdc656 100755 --- a/scripts/wifi_board.py +++ b/scripts/wifi_board.py @@ -14,7 +14,7 @@ class UpdateDownloader: - UPDATE_SERVER = "https://test-update.flipperzero.one" + UPDATE_SERVER = "https://update.flipperzero.one" UPDATE_PROJECT = "/blackmagic-firmware" UPDATE_INDEX = UPDATE_SERVER + UPDATE_PROJECT + "/directory.json" UPDATE_TYPE = "full_tgz" @@ -126,7 +126,7 @@ class Main(App): def init(self): self.parser.add_argument("-p", "--port", help="CDC Port", default="auto") self.parser.add_argument( - "-c", "--channel", help="Channel name", default="development" + "-c", "--channel", help="Channel name", default="release" ) self.parser.set_defaults(func=self.update)