Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scripts: WiFi board updater #2625

Merged
merged 5 commits into from
Jun 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions scripts/wifi_board.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
#!/usr/bin/env python3

from flipper.app import App
from serial.tools.list_ports_common import ListPortInfo

import logging
import os
import tempfile
import subprocess
import serial.tools.list_ports as list_ports
import json
import requests
import tarfile


class UpdateDownloader:
UPDATE_SERVER = "https://update.flipperzero.one"
UPDATE_PROJECT = "/blackmagic-firmware"
UPDATE_INDEX = UPDATE_SERVER + UPDATE_PROJECT + "/directory.json"
UPDATE_TYPE = "full_tgz"

CHANNEL_ID_ALIAS = {
"dev": "development",
"rc": "release-candidate",
"r": "release",
"rel": "release",
}

def __init__(self):
self.logger = logging.getLogger()

def download(self, channel_id: str, dir: str) -> bool:
# Aliases
if channel_id in self.CHANNEL_ID_ALIAS:
channel_id = self.CHANNEL_ID_ALIAS[channel_id]

# Make directory
if not os.path.exists(dir):
self.logger.info(f"Creating directory {dir}")
os.makedirs(dir)

# Download json index
self.logger.info(f"Downloading {self.UPDATE_INDEX}")
response = requests.get(self.UPDATE_INDEX)
if response.status_code != 200:
self.logger.error(f"Failed to download {self.UPDATE_INDEX}")
return False

# Parse json index
try:
index = json.loads(response.content)
except Exception as e:
self.logger.error(f"Failed to parse json index: {e}")
return False

# Find channel
channel = None
for channel_candidate in index["channels"]:
if channel_candidate["id"] == channel_id:
channel = channel_candidate
break

# Check if channel found
if channel is None:
self.logger.error(
f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}"
)
return False

self.logger.info(f"Using channel '{channel_id}'")

# Get latest version
try:
version = channel["versions"][0]
except Exception as e:
self.logger.error(f"Failed to get version: {e}")
return False

self.logger.info(f"Using version '{version['version']}'")

# Get changelog
changelog = None
try:
changelog = version["changelog"]
except Exception as e:
self.logger.error(f"Failed to get changelog: {e}")

# print changelog
if changelog is not None:
self.logger.info(f"Changelog:")
for line in changelog.split("\n"):
if line.strip() == "":
continue
self.logger.info(f" {line}")

# Find file
file_url = None
for file_candidate in version["files"]:
if file_candidate["type"] == self.UPDATE_TYPE:
file_url = file_candidate["url"]
break

if file_url is None:
self.logger.error(f"File not found")
return False

# Make file path
file_name = file_url.split("/")[-1]
file_path = os.path.join(dir, file_name)

# Download file
self.logger.info(f"Downloading {file_url} to {file_path}")
with open(file_path, "wb") as f:
response = requests.get(file_url)
f.write(response.content)

# Unzip tgz
self.logger.info(f"Unzipping {file_path}")
with tarfile.open(file_path, "r") as tar:
tar.extractall(dir)

return True


class Main(App):
def init(self):
self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
self.parser.add_argument(
"-c", "--channel", help="Channel name", default="release"
)
self.parser.set_defaults(func=self.update)

# logging
self.logger = logging.getLogger()

def find_wifi_board(self) -> bool:
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
blackmagics: list[ListPortInfo] = list(list_ports.grep("blackmagic")) # type: ignore
daps: list[ListPortInfo] = list(list_ports.grep("CMSIS-DAP")) # type: ignore

return len(blackmagics) > 0 or len(daps) > 0

def find_wifi_board_bootloader(self):
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
ports: list[ListPortInfo] = list(list_ports.grep("ESP32-S2")) # type: ignore

if len(ports) == 0:
# Blackmagic probe serial port not found, will be handled later
pass
elif len(ports) > 1:
raise Exception("More than one WiFi board found")
else:
port = ports[0]
if os.name == "nt":
port.device = f"\\\\.\\{port.device}"
return port.device

def update(self):
try:
port = self.find_wifi_board_bootloader()
except Exception as e:
self.logger.error(f"{e}")
return 1

if self.args.port != "auto":
port = self.args.port

available_ports = [p[0] for p in list(list_ports.comports())]
if port not in available_ports:
self.logger.error(f"Port {port} not found")
return 1

if port is None:
if self.find_wifi_board():
self.logger.error("WiFi board found, but not in bootloader mode.")
self.logger.info("Please hold down BOOT button and press RESET button")
else:
self.logger.error("WiFi board not found")
self.logger.info(
"Please connect WiFi board to your computer, hold down BOOT button and press RESET button"
)
return 1

# get temporary dir
with tempfile.TemporaryDirectory() as temp_dir:
downloader = UpdateDownloader()

# download latest channel update
try:
if not downloader.download(self.args.channel, temp_dir):
self.logger.error(f"Cannot download update")
return 1
except Exception as e:
self.logger.error(f"Cannot download update: {e}")
return 1

with open(os.path.join(temp_dir, "flash.command"), "r") as f:
flash_command = f.read()

flash_command = flash_command.replace("\n", "").replace("\r", "")
flash_command = flash_command.replace("(PORT)", port)

# We can't reset the board after flashing via usb
flash_command = flash_command.replace(
"--after hard_reset", "--after no_reset_stub"
)

args = flash_command.split(" ")[0:]
args = list(filter(None, args))

esptool_params = []
esptool_params.extend(args)

self.logger.info(f'Running command: "{" ".join(args)}" in "{temp_dir}"')

process = subprocess.Popen(
esptool_params,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=temp_dir,
bufsize=1,
universal_newlines=True,
)

while process.poll() is None:
if process.stdout is not None:
for line in process.stdout:
self.logger.debug(f"{line.strip()}")

if process.returncode != 0:
self.logger.error(f"Failed to flash WiFi board")
else:
self.logger.info("WiFi board flashed successfully")
self.logger.info("Press RESET button on WiFi board to start it")

return process.returncode


if __name__ == "__main__":
Main()()