Skip to content

Commit

Permalink
Scripts: WiFi board updater (#2625)
Browse files Browse the repository at this point in the history
* Scripts: wifi updater
* WiFi board updater: lint, process download error
* WiFi board updater: auto cleanup temp dir
* Scripts: fix server address
  • Loading branch information
DrZlo13 authored Jun 8, 2023
1 parent 3226254 commit e5343fd
Showing 1 changed file with 240 additions and 0 deletions.
240 changes: 240 additions & 0 deletions scripts/wifi_board.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
#!/usr/bin/env python3

from flipper.app import App
from serial.tools.list_ports_common import ListPortInfo

import logging
import os
import tempfile
import subprocess
import serial.tools.list_ports as list_ports
import json
import requests
import tarfile


class UpdateDownloader:
UPDATE_SERVER = "https://update.flipperzero.one"
UPDATE_PROJECT = "/blackmagic-firmware"
UPDATE_INDEX = UPDATE_SERVER + UPDATE_PROJECT + "/directory.json"
UPDATE_TYPE = "full_tgz"

CHANNEL_ID_ALIAS = {
"dev": "development",
"rc": "release-candidate",
"r": "release",
"rel": "release",
}

def __init__(self):
self.logger = logging.getLogger()

def download(self, channel_id: str, dir: str) -> bool:
# Aliases
if channel_id in self.CHANNEL_ID_ALIAS:
channel_id = self.CHANNEL_ID_ALIAS[channel_id]

# Make directory
if not os.path.exists(dir):
self.logger.info(f"Creating directory {dir}")
os.makedirs(dir)

# Download json index
self.logger.info(f"Downloading {self.UPDATE_INDEX}")
response = requests.get(self.UPDATE_INDEX)
if response.status_code != 200:
self.logger.error(f"Failed to download {self.UPDATE_INDEX}")
return False

# Parse json index
try:
index = json.loads(response.content)
except Exception as e:
self.logger.error(f"Failed to parse json index: {e}")
return False

# Find channel
channel = None
for channel_candidate in index["channels"]:
if channel_candidate["id"] == channel_id:
channel = channel_candidate
break

# Check if channel found
if channel is None:
self.logger.error(
f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}"
)
return False

self.logger.info(f"Using channel '{channel_id}'")

# Get latest version
try:
version = channel["versions"][0]
except Exception as e:
self.logger.error(f"Failed to get version: {e}")
return False

self.logger.info(f"Using version '{version['version']}'")

# Get changelog
changelog = None
try:
changelog = version["changelog"]
except Exception as e:
self.logger.error(f"Failed to get changelog: {e}")

# print changelog
if changelog is not None:
self.logger.info(f"Changelog:")
for line in changelog.split("\n"):
if line.strip() == "":
continue
self.logger.info(f" {line}")

# Find file
file_url = None
for file_candidate in version["files"]:
if file_candidate["type"] == self.UPDATE_TYPE:
file_url = file_candidate["url"]
break

if file_url is None:
self.logger.error(f"File not found")
return False

# Make file path
file_name = file_url.split("/")[-1]
file_path = os.path.join(dir, file_name)

# Download file
self.logger.info(f"Downloading {file_url} to {file_path}")
with open(file_path, "wb") as f:
response = requests.get(file_url)
f.write(response.content)

# Unzip tgz
self.logger.info(f"Unzipping {file_path}")
with tarfile.open(file_path, "r") as tar:
tar.extractall(dir)

return True


class Main(App):
def init(self):
self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
self.parser.add_argument(
"-c", "--channel", help="Channel name", default="release"
)
self.parser.set_defaults(func=self.update)

# logging
self.logger = logging.getLogger()

def find_wifi_board(self) -> bool:
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
blackmagics: list[ListPortInfo] = list(list_ports.grep("blackmagic")) # type: ignore
daps: list[ListPortInfo] = list(list_ports.grep("CMSIS-DAP")) # type: ignore

return len(blackmagics) > 0 or len(daps) > 0

def find_wifi_board_bootloader(self):
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
ports: list[ListPortInfo] = list(list_ports.grep("ESP32-S2")) # type: ignore

if len(ports) == 0:
# Blackmagic probe serial port not found, will be handled later
pass
elif len(ports) > 1:
raise Exception("More than one WiFi board found")
else:
port = ports[0]
if os.name == "nt":
port.device = f"\\\\.\\{port.device}"
return port.device

def update(self):
try:
port = self.find_wifi_board_bootloader()
except Exception as e:
self.logger.error(f"{e}")
return 1

if self.args.port != "auto":
port = self.args.port

available_ports = [p[0] for p in list(list_ports.comports())]
if port not in available_ports:
self.logger.error(f"Port {port} not found")
return 1

if port is None:
if self.find_wifi_board():
self.logger.error("WiFi board found, but not in bootloader mode.")
self.logger.info("Please hold down BOOT button and press RESET button")
else:
self.logger.error("WiFi board not found")
self.logger.info(
"Please connect WiFi board to your computer, hold down BOOT button and press RESET button"
)
return 1

# get temporary dir
with tempfile.TemporaryDirectory() as temp_dir:
downloader = UpdateDownloader()

# download latest channel update
try:
if not downloader.download(self.args.channel, temp_dir):
self.logger.error(f"Cannot download update")
return 1
except Exception as e:
self.logger.error(f"Cannot download update: {e}")
return 1

with open(os.path.join(temp_dir, "flash.command"), "r") as f:
flash_command = f.read()

flash_command = flash_command.replace("\n", "").replace("\r", "")
flash_command = flash_command.replace("(PORT)", port)

# We can't reset the board after flashing via usb
flash_command = flash_command.replace(
"--after hard_reset", "--after no_reset_stub"
)

args = flash_command.split(" ")[0:]
args = list(filter(None, args))

esptool_params = []
esptool_params.extend(args)

self.logger.info(f'Running command: "{" ".join(args)}" in "{temp_dir}"')

process = subprocess.Popen(
esptool_params,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=temp_dir,
bufsize=1,
universal_newlines=True,
)

while process.poll() is None:
if process.stdout is not None:
for line in process.stdout:
self.logger.debug(f"{line.strip()}")

if process.returncode != 0:
self.logger.error(f"Failed to flash WiFi board")
else:
self.logger.info("WiFi board flashed successfully")
self.logger.info("Press RESET button on WiFi board to start it")

return process.returncode


if __name__ == "__main__":
Main()()

0 comments on commit e5343fd

Please sign in to comment.