diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/README.md b/app/resources/plugins/MCH22BToolchain/mch2022-tools/README.md index 9dc076941..99b906347 100644 --- a/app/resources/plugins/MCH22BToolchain/mch2022-tools/README.md +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/README.md @@ -3,34 +3,92 @@ This directory contains command line tools for use with your badge. To use them, you will need to `pip install pyusb`. -### AppFS +### Application management The AppFS contains binary ESP32 apps - standalone firmwares that can be booted and used as apps. -`webusb_ls.py` +`app_list.py` + Lists all apps on the AppFS. -`webusb_push.py {target_name} {filename} [--run]` +`app_push.py {file} {name} {title} {version} [--run]` + Installs an ESP32 app to the AppFS. The `--run` flag will also immediately start the app after installing. -`webusb_boot.py {app_name}` +`app_pull.py {name} {target}` + +Downloads the ESP32 app binary to your computer, the file will be saved at the location provided as target. + +`app_run.py {name}` + Boots an ESP32 app with the specified name. -`webusb_rm.py {app_name}` +`webusb_remove.py {app_name}` + Removes an ESP32 app from the AppFS. -### Filesystem -`webusb_fat_ls.py [path]` +### FAT filesystem +`filesystem_list.py [path] [--recursive]` + Returns a directory listing for the specified path. -`webusb_fat_push.py {filename} {target_location}` -Uploads a local file to the FAT filesystem. -`target_location` should always start with `/flash` or `/sdcard` and include the target filename. +`filesystem_push.py {name} {target}` + +Uploads file `{name}` to location `{target}` on the filesystem of the badge. +`target` should always start with `/internal` or `/sd` and the target path should always end with a filename. + +`filesystem_pull.py {name} {target}` + +Downloads file `{name}` from the filesystem of the badge to location `{target}` on your computer. +`name` should always start with `/internal` or `/sd` and the path should always end with a filename. + +`filesystem_remove.py {name}` + +Removes a file or a directory from the filesystem of the badge. In case of a directory the directory is removed recursively. +`name` should always start with `/internal` or `/sd`. + +`filesystem_create_directory.py {name}` + +Creates a directory on the filesystem of the badge. +`name` should always start with `/internal` or `/sd`. + +`filesystem_exists.py {name}` + +Checks if a file exists on the filesystem of the badge. +`name` should always start with `/internal` or `/sd`. + +### Configuration management + +`configuration_list.py [namespace]` + +Lists all configuration entries in the NVS partition of the badge. The optional `namespace` argument allows for filtering the results, returning only the entries in the provided namespace. + +`configuration_read.py {namespace} {key} {type}` + +Reads the value of a configuration entry with key `{key}` in namespace `{namespace}`. Valid types are u8 (8-bit unsigned integer), i8 (8-bit signed integer), u16 (16-bit unsigned integer), i16 (16-bit signed integer), u32 (32-bit unsigned integer), i32 (32-bit signed integer), u64 (64-bit unsigned integer), i64 (64-bit signed integer), string (a text string) and blob (binary data). + +Note that reading entries of type `blob` will output raw binary data to stdout. You might want to pipe the output to another program (`python configuration_read.py example example blob | xxd`) or to a file (`python configuration_read.py example example blob > data.bin`). + +`configuration_write.py {namespace} {key} {type} [value]` + +Writes the value of a configuration entry with key `{key}` in namespace `{namespace}`. Valid types are u8 (8-bit unsigned integer), i8 (8-bit signed integer), u16 (16-bit unsigned integer), i16 (16-bit signed integer), u32 (32-bit unsigned integer), i32 (32-bit signed integer), u64 (64-bit unsigned integer), i64 (64-bit signed integer), string (a text string) and blob (binary data). + +The value can either be provided using optional command line argument `[value]` or by writing a value to stdin (`echo "test" | python configuration_write.py owner nickname string`). Writing to stdin can also be used for storing binary data to a configuration entry of type `blob`. + +`configuration_remove.py {namespace} {key}` + +Removes a configuration entry with key `{key}` in namespace `{namespace}` from the NVS partition. ### FPGA -`webusb_fpga.py {filename} [bindings]` -Loads a bit stream from a file into the FPGA. +`fpga.py {filename} [bindings]` + +Loads a bit stream from a file into the FPGA. This tool also allows for uploading and presenting files to the FPGA via the SPI interface that connects the FPGA to the ESP32. + +### Other +`exit.py` + +Reboots the badge, exiting webusb mode. + +`information.py` -### General -`webusb_reset.py` -Reboots the device. +Returns usage information about the FAT filesystems and the AppFS filesystem diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_list.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_list.py new file mode 100755 index 000000000..117a55668 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_list.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +from webusb import * +import argparse +import sys +import time + +def listapps(): + applist = badge.app_list() + if not applist == None: + print("\x1b[4m{: <48}\x1b[0m \x1b[4m{: <64}\x1b[0m \x1b[4m{: <8}\x1b[0m \x1b[4m{: <10}\x1b[0m".format("Name", "Title", "Version", "Size")) + for app in applist: + size = str(app["size"]) + " B" + if app["size"] > 1024: + size = str(round(app["size"] / 1024, 2)) + " KB" + print("{: <48} {: <64} {: <8} {: <10}".format(app["name"].decode("ascii", errors="ignore"), app["title"].decode("ascii", errors="ignore"), str(app["version"]), size)) + else: + print(location.decode("ascii") + " ** Failed to load application list **") + +parser = argparse.ArgumentParser(description='MCH2022 badge application list tool') +args = parser.parse_args() + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +listapps() diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_pull.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_pull.py new file mode 100755 index 000000000..02ed9a30b --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_pull.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge app download tool') +parser.add_argument("name", help="Remote app") +parser.add_argument("target", help="Local file") +args = parser.parse_args() + +name = args.name +target = args.target + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.app_read(name.encode("ascii", "ignore")) + +if result: + with open(target, "wb") as f: + f.write(result) + f.truncate(len(result)) + print("App downloaded succesfully") +else: + print("Failed to download app") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_push.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_push.py new file mode 100755 index 000000000..215ad8629 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_push.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge app upload tool') +parser.add_argument("file", help="Application binary") +parser.add_argument("name", help="Application name") +parser.add_argument("title", help="Application title") +parser.add_argument("version", type=int, help="Application version") +parser.add_argument('--run', '-r', '-R', action='store_true', help="Run application after uploading") +args = parser.parse_args() + +name = args.name.encode("ascii", "ignore") +title = args.title.encode("ascii", "ignore") +version = args.version +if version < 0: + version = 0 + +with open(args.file, "rb") as f: + data = f.read() + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.app_write(name, title, version, data) + +if result: + print("App installed succesfully") +else: + print("Failed to install app") + sys.exit(1) + +if args.run: + result = badge.app_run(name) + + if result: + badge.reset(False) + print("Started") + else: + print("Failed to start") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_remove.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_remove.py new file mode 100755 index 000000000..7d28967bb --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_remove.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge app removal tool') +parser.add_argument("name", help="Name of app to be removed") +args = parser.parse_args() + +name = args.name + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.app_remove(name.encode('ascii', "ignore")) + +if result: + print("Removed") +else: + print("Failed to remove") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_run.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_run.py new file mode 100755 index 000000000..bcfc4f892 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/app_run.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge app run tool') +parser.add_argument("name", help="Name of app to be started") +parser.add_argument("--command", "-c", help="String to be stored in memory", required=False) +args = parser.parse_args() + +name = args.name +command = args.command + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.app_run(name.encode('ascii', "ignore"), command.encode('ascii', "ignore") if command else None) + +if result: + badge.reset(False) + print("Started") +else: + print("Failed to start") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_list.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_list.py new file mode 100755 index 000000000..9328bbdee --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_list.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge NVS list tool') +parser.add_argument("namespace", help="Namespace", nargs='?', default=None) +args = parser.parse_args() + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +print("\x1b[4m{: <32}\x1b[0m \x1b[4m{: <32}\x1b[0m \x1b[4m{: <8}\x1b[0m \x1b[4m{: <10}\x1b[0m \x1b[4m{: <32}\x1b[0m".format("Namespace", "Key", "Type", "Size", "Value")) +badge.printGarbage = True +if args.namespace: + entries = badge.nvs_list(args.namespace) +else: + entries = badge.nvs_list() + +if not entries: + print("Failed to read data") +else: + for namespace in entries: + for entry in entries[namespace]: + value = "(skipped)" + if entry["size"] < 64 and badge.nvs_should_read(entry["type"]): + value = str(badge.nvs_read(namespace, entry["key"], entry["type"])) + print("{: <32} {: <32} {: <8} {:10d} {}".format(namespace, entry["key"], badge.nvs_type_to_name(entry["type"]), entry["size"], value)) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_read.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_read.py new file mode 100755 index 000000000..d4c2b6f78 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_read.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +from webusb import * +import argparse +import sys +import time +import sys + +parser = argparse.ArgumentParser(description='MCH2022 badge NVS read tool') +parser.add_argument("namespace", help="Namespace") +parser.add_argument("key", help="Key") +parser.add_argument("type", help="Type, one of u8, i8, u16, i16, u32, i32, u64, i64, string or blob") +args = parser.parse_args() + +badge = Badge() + +type_name = args.type.lower() +type_number = badge.nvs_name_to_type(type_name) + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +value = badge.nvs_read(args.namespace, args.key, type_number) + +if type_name == "blob": + sys.stdout.buffer.write(value) +else: + print(value) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_remove.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_remove.py new file mode 100755 index 000000000..dcc015f8e --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_remove.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from webusb import * +import argparse +import sys +import time +import sys + +parser = argparse.ArgumentParser(description='MCH2022 badge NVS remove tool') +parser.add_argument("namespace", help="Namespace") +parser.add_argument("key", help="Key") +args = parser.parse_args() + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.nvs_remove(args.namespace, args.key) + +if result: + print("Entry removed") +else: + print("Failed to remove entry") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_write.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_write.py new file mode 100755 index 000000000..be1b92887 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/configuration_write.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +from webusb import * +import argparse +import sys +import time +import sys + +parser = argparse.ArgumentParser(description='MCH2022 badge NVS write tool') +parser.add_argument("namespace", help="Namespace") +parser.add_argument("key", help="Key") +parser.add_argument("type", help="Type, one of u8, i8, u16, i16, u32, i32, u64, i64, string or blob") +parser.add_argument("value", help="Value, optional. If no value is provided the application will read from stdin", nargs='?', default=None) +args = parser.parse_args() + +value = args.value + +if not value: + value = sys.stdin.buffer.read() + +badge = Badge() + +type_name = args.type.lower() +type_number = badge.nvs_name_to_type(type_name) + +if type_name in ["u8", "i8", "u16", "i16", "u32", "i32", "u64", "i64"]: + value = int(value) + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.nvs_write(args.namespace, args.key, type_number, value) + +if result: + print("Value stored") +else: + print("Failed to store value") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/generate_tls_cert.sh b/app/resources/plugins/MCH22BToolchain/mch2022-tools/development/generate_tls_cert.sh similarity index 100% rename from app/resources/plugins/MCH22BToolchain/mch2022-tools/generate_tls_cert.sh rename to app/resources/plugins/MCH22BToolchain/mch2022-tools/development/generate_tls_cert.sh diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/exit.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/exit.py new file mode 100755 index 000000000..6e5c4dd34 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/exit.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 +from webusb import * +badge = Badge() +badge.reset() diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_create_directory.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_create_directory.py new file mode 100755 index 000000000..dd42df2fe --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_create_directory.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge FAT filesystem directory creation tool') +parser.add_argument("name", help="directory name") +args = parser.parse_args() + +name = args.name + +if not (name.startswith("/internal") or name.startswith("/sd")): + print("Path should always start with /internal or /sd") + sys.exit(1) + +if name.endswith("/"): + name = name[:-1] + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.fs_create_directory(name.encode('ascii', "ignore")) + +if result: + print("Directory created") +else: + print("Failed to create directory") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_exists.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_exists.py new file mode 100755 index 000000000..3e0ac78e9 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_exists.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge FAT filesystem file exists check tool') +parser.add_argument("name", help="Name of file") +args = parser.parse_args() + +name = args.name + +if not (name.startswith("/internal") or name.startswith("/sd")): + print("Path should always start with /internal or /sd") + sys.exit(1) + +if name.endswith("/"): + name = name[:-1] + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.fs_file_exists(name.encode('ascii', "ignore")) + +if result: + print("File exists") +else: + print("File does not exist") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_list.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_list.py new file mode 100755 index 000000000..0c6e18830 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_list.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +from webusb import * +import argparse +import sys +import time + +def listdir(location, recursive = False): + filelist = badge.fs_list(location) + if not filelist == None: + for f in filelist: + sizestring = "" + modifiedstring = "" + if f["stat"]: + if f["type"] != 2: + s = f["stat"]["size"] + if s >= 1024: + sizestring = str(round(s / 1024, 2)) + " KB" + else: + sizestring = str(s) + " B" + modifiedstring = datetime.utcfromtimestamp(f["stat"]["modified"]).strftime('%Y-%m-%d %H:%M:%S') + typestring = "File" + if f["type"] == 2: + typestring = "Dir" + newlocation = location + b"/" + f["name"] + locationstring = newlocation.decode("ascii", errors="ignore") + print("{: <5} {: <64} {: <12} {: <19}".format(typestring, locationstring, sizestring, modifiedstring)) + if f["type"] == 2 and recursive: + listdir(newlocation, recursive) + else: + print(location.decode("ascii") + " ** Failed to open directory **") + +parser = argparse.ArgumentParser(description='MCH2022 badge FAT filesystem directory list tool') +parser.add_argument("name", help="directory name") +parser.add_argument('--recursive', '-r', '-R', action='store_true') +args = parser.parse_args() + +name = args.name +recursive = args.recursive + +if name == "/": + print("\x1b[4m{: <5}\x1b[0m \x1b[4m{: <64}\x1b[0m \x1b[4m{: <12}\x1b[0m \x1b[4m{: <19}\x1b[0m".format("Type", "Name", "Size", "Modified")) + print("{: <5} {: <64} {: <12} {: <19}".format("Dir", "/internal", "", "")) + print("{: <5} {: <64} {: <12} {: <19}".format("Dir", "/sd", "", "")) + sys.exit(0) + +if not (name.startswith("/internal") or name.startswith("/sd")): + print("Path should always start with /internal or /sd") + sys.exit(1) + +if name.endswith("/"): + name = name[:-1] + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +print("\x1b[4m{: <5}\x1b[0m \x1b[4m{: <64}\x1b[0m \x1b[4m{: <12}\x1b[0m \x1b[4m{: <19}\x1b[0m".format("Type", "Name", "Size", "Modified")) +listdir(name.encode("ascii"), recursive) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_pull.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_pull.py new file mode 100755 index 000000000..4c039d57d --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_pull.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge FAT filesystem file download tool') +parser.add_argument("name", help="Remote file") +parser.add_argument("target", help="Local file") +args = parser.parse_args() + +name = args.name +target = args.target + +if not (name.startswith("/internal") or name.startswith("/sd")): + print("Path should always start with /internal or /sd") + sys.exit(1) + +if name.endswith("/"): + name = name[:-1] + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.fs_read_file(name.encode("ascii", "ignore")) + +if result: + with open(target, "wb") as f: + f.write(result) + f.truncate(len(result)) + print("File downloaded succesfully") +else: + print("Failed to download file") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_push.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_push.py new file mode 100755 index 000000000..25984bf10 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_push.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge FAT filesystem file upload tool') +parser.add_argument("name", help="Local file") +parser.add_argument("target", help="Remote file") +args = parser.parse_args() + +name = args.name +target = args.target + +with open(name, "rb") as f: + data = f.read() + +if not (target.startswith("/internal") or target.startswith("/sd")): + print("Path should always start with /internal or /sd") + sys.exit(1) + +if target.endswith("/"): + target = target[:-1] + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.fs_write_file(target.encode("ascii", "ignore"), data) + +if result: + print("File pushed succesfully") +else: + print("Failed to push file") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_remove.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_remove.py new file mode 100755 index 000000000..61fca56e0 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/filesystem_remove.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge FAT filesystem file removal tool') +parser.add_argument("name", help="Name of file or directory to be removed") +args = parser.parse_args() + +name = args.name + +if not (name.startswith("/internal") or name.startswith("/sd")): + print("Path should always start with /internal or /sd") + sys.exit(1) + +if name.endswith("/"): + name = name[:-1] + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +result = badge.fs_remove(name.encode('ascii', "ignore")) + +if result: + print("Removed") +else: + print("Failed to remove") + sys.exit(1) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_fpga.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/fpga.py similarity index 92% rename from app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_fpga.py rename to app/resources/plugins/MCH22BToolchain/mch2022-tools/fpga.py index e89303c1e..f226f376b 100755 --- a/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_fpga.py +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/fpga.py @@ -76,15 +76,18 @@ def usb_tx(title, data): else: print("ESP32 already in FPGA download mode, connecting...") -while True: - data = None +requested_len = 4 + 4 - 1 # 2 times "FPGA" if we are unlucky and "PGAFPGA" got transmitted +data = bytearray() +timeout = time.monotonic() + 5 + +while (data.find(b'FPGA') == -1): + if len(data) >= requested_len * 50 or time.monotonic() > timeout: # ~50 tries or 5 seconds + raise ValueError("Badge does not answer") try: - data = bytes(esp32_ep_in.read(32)) + data += bytes(esp32_ep_in.read(32)) except Exception as e: #print(e) pass - if (data == b"FPGA"): - break esp32_ep_out.write(b'FPGA') diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb.py new file mode 100644 index 000000000..b9ffdd6fd --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb.py @@ -0,0 +1,335 @@ +import os +import sys +import usb.core +import usb.util +from enum import Enum +import struct +import time + +# Print iterations progress +def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"): + """ + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : positive number of decimals in percent complete (Int) + length - Optional : character length of bar (Int) + fill - Optional : bar fill character (Str) + printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filledLength = int(length * iteration // total) + bar = fill * filledLength + '-' * (length - filledLength) + print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) + # Print New Line on Complete + if iteration == total: + print() + +def printProgressTime( + totalMilliSeconds, + steps=10, + prefix="", + suffix="", + decimals=1, + length=20, + fill="█", + printEnd="\r", +): + """ + prints progress while sleeping totalMilliSeconds, bar is updated every `steps` milliseconds + other params + """ + for i in range(0, totalMilliSeconds, steps): + printProgressBar( + i, + totalMilliSeconds - steps, + prefix, + suffix, + decimals, + length, + fill, + printEnd, + ) + time.sleep(steps / 1000) + +class Commands(Enum): + EXECFILE = 0 + HEARTBEAT = 1 + APPFSBOOT = 3 + GETDIR = 4096 + READFILE = 4097 + WRITEFILE = 4098 + DELFILE = 4099 + DUPLFILE = 4100 + MVFILE = 4101 + MAKEDIR = 4102 + APPFSFDIR = 4103 + APPFSDEL = 4104 + APPFSWRITE = 4105 + +class WebUSBPacket(): + def __init__(self, command, message_id, payload=None): + self.command = command + self.message_id = message_id + if (payload == None): + self.payload = b"" + else: + self.payload = payload + + def getMessage(self): + return self._generateHeader() + self.payload + + def _generateHeader(self): + return struct.pack("= self.PAYLOADHEADERLEN: + if show_hourglass: + print() + command, payloadlen, verif, message_id = struct.unpack_from("= (payloadlen + self.PAYLOADHEADERLEN): + return (command, message_id, response[self.PAYLOADHEADERLEN:]) + raise Exception("Timeout in receiving") + + + def sendPacket(self, packet, transfersize=2048, show_response_hourglass=False): + transfersize = 2048 + msg = packet.getMessage() + starttime = time.time() + + if len(msg) > transfersize: + printProgressBar(0, len(msg) // transfersize, length=20) + for i in range(0, len(msg), transfersize): + self.ep_out.write(msg[i:i+transfersize]) + time.sleep(0.01) + if len(msg) > transfersize: + printProgressBar(i//transfersize, len(msg) // transfersize, length=20) + #self.ep_out.write(packet.getMessage()) + print(f"transfer speed: {(len(msg)/(time.time()-starttime)/1024):0.2f} kb/s") + command, message_id, data = self.receiveResponse(show_hourglass=show_response_hourglass) + if message_id != packet.message_id: + raise Exception("Mismatch in id") + if command != packet.command.value: + raise Exception("Mismatch in command") + return data + + def sendHeartbeat(self): + """ + Send heartbeat towards the badges + + parameters: + None + + returns: + bool : True if badge responded, false if no response + """ + + data = self.sendPacket(WebUSBPacket(Commands.HEARTBEAT, self.getMessageId())) + return data.decode().rstrip('\x00') == "ok" + + def getFSDir(self, dir): + """ + Get files and directories on the badge filesystem + + parameters: + dir (str) : Directory to get contents from + + returns: + res (dict) : Dict containing 'dir' which is the requested directory, 'files' list of files in the directory, 'dirs' list of directories + + """ + + data = self.sendPacket(WebUSBPacket(Commands.GETDIR, self.getMessageId(), dir.encode(encoding='ascii'))) + data = data.decode() + data = data.split("\n") + result = dict() + result["dir"] = data[0] + result["files"] = list() + result["dirs"] = list() + for i in range(1, len(data)): + fd = data[i] + if fd[0] == "f": + result["files"].append(fd[1:]) + else: + result["dirs"].append(fd[1:]) + return result + + def pushFSfile(self, filename, file): + """ + Upload file to fs + root path should /flash or /sdcard + + parameters: + filename (str) : name of the file + file (bytes) : file contents as byte array + + returns: + bool : true if file was uploaded + """ + + payload = filename.encode(encoding='ascii') + b"\x00" + file + data = self.sendPacket(WebUSBPacket(Commands.WRITEFILE, self.getMessageId(), payload)) + return data.decode().rstrip('\x00') == "ok" + + def appfsUpload(self, appname, file): + """ + Upload app to appfs + + parameters: + appname (str) : name of the app + file (bytes) : the app to be upload + + returns: + bool : true if app was uploaded succesfully + """ + + payload = appname.encode(encoding="ascii") + b"\x00" + file + data = self.sendPacket(WebUSBPacket(Commands.APPFSWRITE, self.getMessageId(), payload), show_response_hourglass=True) + return data.decode().rstrip('\x00') == "ok" + + def appfsRemove(self, appname): + """ + Remove app from appfs + + parameters: + appname (str) : name of the app to be removed + + returns: + bool : true if app was deleted succesfully + """ + + payload = appname.encode(encoding="ascii") + b"\x00" + data = self.sendPacket(WebUSBPacket(Commands.APPFSDEL, self.getMessageId(), payload)) + return data.decode().rstrip('\x00') == "ok" + + def appfsExecute(self, appname): + """ + Execute app from appfs + + parameters: + appname (str) : name of the app to be executed + + returns: + bool : true if app was executed + + """ + + + payload = appname.encode(encoding="ascii") + b"\x00" + data = self.sendPacket(WebUSBPacket(Commands.APPFSBOOT, self.getMessageId(), payload)) + return data.decode().rstrip('\x00') == "ok" + + def appfsList(self): + """ + Get apps from appfs + + parameters: + None + + returns: + list : list of dicts containing 'name' and 'size' + """ + + data = self.sendPacket(WebUSBPacket(Commands.APPFSFDIR, self.getMessageId())) + num_apps, = struct.unpack_from("= 20: + header = struct.unpack(" 0: + try: + new_data = bytes(esp32_ep_in.read(esp32_ep_in.wMaxPacketSize, 20)) + data += new_data + if len(new_data) > 0: + t = 20 + except Exception as e: + t-=1 + +parse_response(data) + +device.ctrl_transfer(request_type_out, REQUEST_STATE, 0x0000, webusb_esp32.bInterfaceNumber) diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_fat_push.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb_fat_push.py similarity index 100% rename from app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_fat_push.py rename to app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb_fat_push.py diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_push.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb_push.py similarity index 100% rename from app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_push.py rename to app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb_push.py diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_reset.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb_reset.py similarity index 100% rename from app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_reset.py rename to app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb_reset.py diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_rm.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb_rm.py similarity index 100% rename from app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb_rm.py rename to app/resources/plugins/MCH22BToolchain/mch2022-tools/fsoverbus/webusb_rm.py diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/information.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/information.py new file mode 100644 index 000000000..7618567a3 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/information.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +from webusb import * +import argparse +import sys +import time + +parser = argparse.ArgumentParser(description='MCH2022 badge filesystem info tool') +args = parser.parse_args() + +badge = Badge() + +if not badge.begin(): + print("Failed to connect") + sys.exit(1) + +try: + info = badge.info().split(" ") + print("Device name: {}".format(info[0])) + print("Firmware version: {}".format(info[1])) +except: + pass + +result = badge.fs_state() + +internal_size = result["internal"]["size"] +internal_free = result["internal"]["free"] +sdcard_size = result["sd"]["size"] +sdcard_free = result["sd"]["free"] +appfs_size = result["app"]["size"] +appfs_free = result["app"]["free"] + +badge.printProgressBar(appfs_size - appfs_free, appfs_size, 'Internal APP filesystem: {: <32}'.format('{} of {} KB used'.format(int((appfs_size-appfs_free) / 1024), int(appfs_size / 1024))), '', 0, printEnd="\r\n") +badge.printProgressBar(internal_size - internal_free, internal_size, 'Internal FAT filesystem: {: <32}'.format('{} of {} KB used'.format(int((internal_size-internal_free) / 1024), int(internal_size / 1024))), '', 0, printEnd="\r\n") +if sdcard_size > 0: + badge.printProgressBar(sdcard_size - sdcard_free, sdcard_size, 'SD card FAT filesystem: {: <32}'.format('{} of {} KB used'.format(int((sdcard_size-sdcard_free) / 1024), int(sdcard_size / 1024))), '', 0, printEnd="\r\n") diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/shell.nix b/app/resources/plugins/MCH22BToolchain/mch2022-tools/shell.nix new file mode 100644 index 000000000..f428ec428 --- /dev/null +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/shell.nix @@ -0,0 +1,7 @@ +{ pkgs ? import {} }: + +pkgs.mkShell { + packages = [ + (pkgs.python3.withPackages (ps: [ ps.pyusb ])) + ]; +} diff --git a/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb.py b/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb.py old mode 100644 new mode 100755 index e0b8b8d68..73ed159e5 --- a/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb.py +++ b/app/resources/plugins/MCH22BToolchain/mch2022-tools/webusb.py @@ -1,329 +1,751 @@ +#!/usr/bin/env python3 + import os import usb.core import usb.util -from enum import Enum -import struct +import binascii import time +import sys +import struct +from datetime import datetime + +class Badge: + # Defined in webusb_task.c of the RP2040 firmware + REQUEST_STATE = 0x22 + REQUEST_RESET = 0x23 + REQUEST_BAUDRATE = 0x24 + REQUEST_MODE = 0x25 + REQUEST_MODE_GET = 0x26 + REQUEST_FW_VERSION_GET = 0x27 + + # Defined in main.c of the ESP32 firmware + BOOT_MODE_NORMAL = 0x00 + BOOT_MODE_WEBUSB_LEGACY = 0x01 + BOOT_MODE_FPGA_DOWNLOAD = 0x02 + BOOT_MODE_WEBUSB = 0x03 + + MAGIC = 0xFEEDF00D -# Print iterations progress -def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"): - """ - Call in a loop to create terminal progress bar - @params: - iteration - Required : current iteration (Int) - total - Required : total iterations (Int) - prefix - Optional : prefix string (Str) - suffix - Optional : suffix string (Str) - decimals - Optional : positive number of decimals in percent complete (Int) - length - Optional : character length of bar (Int) - fill - Optional : bar fill character (Str) - printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) - """ - percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) - filledLength = int(length * iteration // total) - bar = fill * filledLength + '-' * (length - filledLength) - print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) - # Print New Line on Complete - if iteration == total: - print() - -def printProgressTime( - totalMilliSeconds, - steps=10, - prefix="", - suffix="", - decimals=1, - length=20, - fill="█", - printEnd="\r", -): - """ - prints progress while sleeping totalMilliSeconds, bar is updated every `steps` milliseconds - other params - """ - for i in range(0, totalMilliSeconds, steps): - printProgressBar( - i, - totalMilliSeconds - steps, - prefix, - suffix, - decimals, - length, - fill, - printEnd, - ) - time.sleep(steps / 1000) - -class Commands(Enum): - EXECFILE = 0 - HEARTBEAT = 1 - APPFSBOOT = 3 - GETDIR = 4096 - READFILE = 4097 - WRITEFILE = 4098 - DELFILE = 4099 - DUPLFILE = 4100 - MVFILE = 4101 - MAKEDIR = 4102 - APPFSFDIR = 4103 - APPFSDEL = 4104 - APPFSWRITE = 4105 - -class WebUSBPacket(): - def __init__(self, command, message_id, payload=None): - self.command = command - self.message_id = message_id - if (payload == None): - self.payload = b"" - else: - self.payload = payload - - def getMessage(self): - return self._generateHeader() + self.payload - - def _generateHeader(self): - return struct.pack(" 0: + self.esp32_ep_out.write(payload) + + def receive_data(self, timeout = 100): + while timeout > 0: try: - self.ep_in.read(128) + new_data = bytes(self.esp32_ep_in.read(self.esp32_ep_in.wMaxPacketSize, 5)) + self.rx_data += new_data + if len(new_data) > 0: + timeout = 5 except Exception as e: + timeout-=1 + + def receive_packets(self, timeout = 100): + garbage = bytearray() + self.receive_data(timeout) + if len(self.rx_data) < 20: # length of header + return False + while len(self.rx_data) >= 20: + header = struct.unpack("= self.PAYLOADHEADERLEN: - if show_hourglass: - print() - command, payloadlen, verif, message_id = struct.unpack_from("= (payloadlen + self.PAYLOADHEADERLEN): - return (command, message_id, response[self.PAYLOADHEADERLEN:]) - raise Exception("Timeout in receiving") - + #print("Receive payload length", command_ascii, payload_length) + self.rx_data = self.rx_data[20:] + payload = self.rx_data[:payload_length] + self.rx_data = self.rx_data[payload_length:] + payload_crc_check = binascii.crc32(payload) + if payload_crc != payload_crc_check: + #print("payload", payload) + print("Payload CRC doesn't match {:08X} {:08X}".format(payload_crc, payload_crc_check)) + #continue + self.packets.append({ + "identifier": identifier, + "command": command_ascii, + "payload": payload + }) + if len(garbage) > 0 and self.printGarbage: + print("Garbage:", garbage, garbage.decode("ascii", "ignore")) + return True - def sendPacket(self, packet, transfersize=2048, show_response_hourglass=False): - transfersize = 2048 - msg = packet.getMessage() - starttime = time.time() - - if len(msg) > transfersize: - printProgressBar(0, len(msg) // transfersize, length=20) - for i in range(0, len(msg), transfersize): - self.ep_out.write(msg[i:i+transfersize]) - time.sleep(0.01) - if len(msg) > transfersize: - printProgressBar(i//transfersize, len(msg) // transfersize, length=20) - #self.ep_out.write(packet.getMessage()) - print(f"transfer speed: {(len(msg)/(time.time()-starttime)/1024):0.2f} kb/s") - command, message_id, data = self.receiveResponse(show_hourglass=show_response_hourglass) - if message_id != packet.message_id: - raise Exception("Mismatch in id") - if command != packet.command.value: - raise Exception("Mismatch in command") - return data - - def sendHeartbeat(self): - """ - Send heartbeat towards the badges - - parameters: - None - - returns: - bool : True if badge responded, false if no response - """ - - data = self.sendPacket(WebUSBPacket(Commands.HEARTBEAT, self.getMessageId())) - return data.decode().rstrip('\x00') == "ok" - - def getFSDir(self, dir): - """ - Get files and directories on the badge filesystem - - parameters: - dir (str) : Directory to get contents from - - returns: - res (dict) : Dict containing 'dir' which is the requested directory, 'files' list of files in the directory, 'dirs' list of directories - - """ - - data = self.sendPacket(WebUSBPacket(Commands.GETDIR, self.getMessageId(), dir.encode(encoding='ascii'))) - data = data.decode() - data = data.split("\n") - result = dict() - result["dir"] = data[0] - result["files"] = list() - result["dirs"] = list() - for i in range(1, len(data)): - fd = data[i] - if fd[0] == "f": - result["files"].append(fd[1:]) - else: - result["dirs"].append(fd[1:]) - return result - - def pushFSfile(self, filename, file): - """ - Upload file to fs - root path should /flash or /sdcard - - parameters: - filename (str) : name of the file - file (bytes) : file contents as byte array - - returns: - bool : true if file was uploaded - """ - - payload = filename.encode(encoding='ascii') + b"\x00" + file - data = self.sendPacket(WebUSBPacket(Commands.WRITEFILE, self.getMessageId(), payload)) - return data.decode().rstrip('\x00') == "ok" - - def appfsUpload(self, appname, file): - """ - Upload app to appfs - - parameters: - appname (str) : name of the app - file (bytes) : the app to be upload - - returns: - bool : true if app was uploaded succesfully - """ - - payload = appname.encode(encoding="ascii") + b"\x00" + file - data = self.sendPacket(WebUSBPacket(Commands.APPFSWRITE, self.getMessageId(), payload), show_response_hourglass=True) - return data.decode().rstrip('\x00') == "ok" + def receive_packet(self, timeout = 100): + self.receive_packets(timeout) + packet = None + if len(self.packets) > 0: + packet = self.packets[0] + self.packets = self.packets[1:] + return packet - def appfsRemove(self, appname): - """ - Remove app from appfs - - parameters: - appname (str) : name of the app to be removed + def peek_packet(self, timeout = 100): + self.receive_packets(timeout) + packet = None + if len(self.packets) > 0: + packet = self.packets[0] + return packet + + def sync(self): + self.receive_packets() + self.packets = [] + self.send_packet(b"SYNC") + response = self.receive_packet() + if not response: + return False + if not response["command"] == b"SYNC": + return False + if len(response["payload"]) != 2 or (struct.unpack(" Install experimental firmware on your badge.") + print() + print("The tools for older firmwares can be found in the 'fsoverbus' subfolder of this repository") + sys.exit(1) + return False + return True + + def ping(self, payload): + self.send_packet(b"PING", payload) + response = self.receive_packet() + if not response: + return False + if not response["command"] == b"PING": + print("No PING", response["command"]) + return False + if not response["payload"] == payload: + print("Payload mismatch", payload, response["payload"]) + for i in range(len(payload)): + print(i, int(payload[i]), int(response["payload"][i])) + return False + return True + + def info(self): + self.send_packet(b"INFO") + response = self.receive_packet() + if not response: + return False + if not response["command"] == b"INFO": + print("No INFO", response["command"]) + return False + return response["payload"].decode("ascii", "ignore") + + def fs_list(self, payload): + self.send_packet(b"FSLS", payload + b"\0") + response = self.receive_packet() + if not response: + print("No response") + return None + if not response["command"] == b"FSLS": + if not response["command"] == b"ERR5": # Failed to open directory + print("No FSLS", response["command"]) + return None + payload = response["payload"] + + output = [] - returns: - bool : true if app was deleted succesfully - """ + while len(payload) > 0: + data = payload[:1 + 4] + payload = payload[1 + 4:] + (item_type, item_name_length) = struct.unpack(" 0: + name_length = header = struct.unpack(" 0: + namespace_name_length = struct.unpack("