diff --git a/ltctplugin/upk2esphome/cli.py b/ltctplugin/upk2esphome/cli.py index c1c0441..0cd90d2 100644 --- a/ltctplugin/upk2esphome/cli.py +++ b/ltctplugin/upk2esphome/cli.py @@ -106,7 +106,6 @@ def kickstart(url: str, output: click.File, **kwargs): work = UpkThread( url=url, on_storage=lambda data: write_upk(data, output, **kwargs), - on_error=error, ) work.start() @@ -155,7 +154,6 @@ def firmware(file: click.File, output: click.File, **kwargs): work = UpkThread( file=file.name, on_storage=lambda data: write_upk(data, output, **kwargs), - on_error=error, ) work.start() diff --git a/ltctplugin/upk2esphome/gui.py b/ltctplugin/upk2esphome/gui.py index 6b146ae..1fbb632 100644 --- a/ltctplugin/upk2esphome/gui.py +++ b/ltctplugin/upk2esphome/gui.py @@ -232,15 +232,6 @@ def OnStorageData(self, storage: dict): self.data = storage self.DoUpdate() - def OnStorageError(self, error_text: str): - self.data = None - self.DoUpdate() - wx.MessageBox( - message=error_text, - caption="Error", - style=wx.ICON_ERROR, - ) - def OnSchemaResponse(self, response: dict): self.schema_response = response self.DoUpdate() @@ -261,10 +252,10 @@ def OnFileDrop(self, *files): if not isfile(file): return self.last_dir = dirname(file) + self.data = None work = UpkThread( file=file, on_storage=self.OnStorageData, - on_error=self.OnStorageError, ) self.StartWork(work) @@ -340,10 +331,10 @@ def OnDoKickstart(self, force_custom: bool = False): debug(f"Kickstart URL: {url}") self.last_url = url + self.data = None work = UpkThread( url=url, on_storage=self.OnStorageData, - on_error=self.OnStorageError, ) self.StartWork(work) @@ -392,10 +383,10 @@ def OnDoDumpClick(self): return file = dialog.GetPath() self.last_dir = dirname(file) + self.data = None work = UpkThread( file=file, on_storage=self.OnStorageData, - on_error=self.OnStorageError, ) self.StartWork(work) @@ -431,7 +422,6 @@ def OnSchemaDownloadClick(self): and self.last_result.config.schema_id ), on_success=self.OnSchemaResponse, - on_error=self.OnSchemaError, ) self.StartWork(work) diff --git a/ltctplugin/upk2esphome/work.py b/ltctplugin/upk2esphome/work.py index f1b0cad..3f41b62 100644 --- a/ltctplugin/upk2esphome/work.py +++ b/ltctplugin/upk2esphome/work.py @@ -1,6 +1,6 @@ # Copyright (c) Kuba SzczodrzyƄski 2023-5-13. -from logging import debug, error, warning +from logging import debug, warning from os import stat from os.path import isfile from pprint import pprint @@ -13,7 +13,6 @@ from bk7231tools.analysis.kvstorage import KVStorage from ltchiptool.gui.work.base import BaseThread from ltchiptool.util.flash import ClickProgressCallback -from ltchiptool.util.logging import LoggingHandler class UpkThread(BaseThread): @@ -22,13 +21,11 @@ def __init__( file: str = None, url: str = None, on_storage: Callable[[dict], None] = None, - on_error: Callable[[str], None] = None, ): super().__init__() self.file = file self.url = url self.on_storage = on_storage - self.on_error = on_error def run_impl(self): if self.file is not None: @@ -44,8 +41,7 @@ def run_url(self, url: str): url = urlparse(url) url = url.netloc if not url: - self.on_error(f"Invalid URL: {url}") - return + raise ValueError(f"Invalid URL: {url}") offset = 0x1E0000 start = 0x1E0000 - offset @@ -58,10 +54,8 @@ def run_url(self, url: str): bar.on_message(f"Resolving {url}...") try: ip = gethostbyname(url) - except Exception as e: - LoggingHandler.get().emit_exception(e, no_hook=True) - self.on_error(f"Couldn't find hostname: {url}") - return + except Exception: + raise ConnectionError(f"Couldn't find hostname: {url}") url = f"http://{ip}/hub/flash_read" params = dict(offset=0, length=init_size) @@ -70,11 +64,10 @@ def run_url(self, url: str): with requests.get(url, params, timeout=5.0) as r: data = r.content if len(data) != init_size: - self.on_error( + raise RuntimeError( f"Incomplete response read: {len(data)}/{init_size}\n\n" f"Is the chip running Kickstart firmware?" ) - return while start < end and self.should_run(): bar.on_message(f"Reading from 0x{offset + start:06X}") @@ -100,13 +93,11 @@ def run_url(self, url: str): def run_file(self, file: str): # read storage from file if not isfile(file): - self.on_error("File not found") - return + raise FileNotFoundError(f"File '{file}' not found") size = stat(file).st_size if size > 0x400000: # file too large - self.on_error("File larger than 4 MiB, refusing to load!") - return + raise RuntimeError(f"File larger than 4 MiB ({size}), refusing to load!") if size == 0x200000: # probably full flash dump with open(file, "rb") as f: @@ -124,31 +115,33 @@ def run_data(self, data: bytes): # parse raw storage result = KVStorage.find_storage(data) if not result: - self.on_error("File doesn't contain known storage area") - return + raise ValueError("File doesn't contain known storage area") _, data = result try: kvs = KVStorage.decrypt_and_unpack(data) - except Exception as e: - self.on_error("Couldn't parse storage data - see program logs") - error("Storage keys failed", exc_info=e) - return + except Exception: + raise RuntimeError("Couldn't unpack storage data - see program logs") keys = list(kvs.indexes.keys()) debug(f"Found {len(keys)} keys! {keys}") if not keys: - self.on_error("No keys found in storage! Is the data corrupt?") - pprint(kvs) - return + # noinspection PyBroadException + try: + pprint(kvs) + except Exception: + pass + raise RuntimeError("No keys found in storage! Is the data corrupt?") try: storage = kvs.read_all_values_parsed() - except Exception as e: - self.on_error("Couldn't extract storage data - see program logs") - error("Storage values failed", exc_info=e) - pprint(kvs) - return + except Exception: + # noinspection PyBroadException + try: + pprint(kvs) + except Exception: + pass + raise RuntimeError("Couldn't parse storage data - see program logs") self.on_storage(storage) diff --git a/ltctplugin/upk2esphome/work_schema.py b/ltctplugin/upk2esphome/work_schema.py index 9262d1a..d9c6724 100644 --- a/ltctplugin/upk2esphome/work_schema.py +++ b/ltctplugin/upk2esphome/work_schema.py @@ -20,7 +20,6 @@ def __init__( license_: dict | None, schema_id: str | None, on_success: Callable[[dict], None] = None, - on_error: Callable[[str], None] = None, ): super().__init__() self.device = device or {} @@ -28,7 +27,6 @@ def __init__( self.license = license_ or {} self.schema_id = schema_id self.on_success = on_success - self.on_error = on_error def run_impl(self): auth = (SCHEMA_PULL_USER, SCHEMA_PULL_PASS) @@ -48,14 +46,11 @@ def run_impl(self): and not self.device.get("productKey", None) and not self.device.get("factoryPin", None) ): - self.on_error("Missing FK/PK/FC") - return + raise ValueError("Missing FK/PK/FC") if self.license.get("uuid", None) and not self.license.get("authKey", None): - self.on_error("Missing License Auth Key") - return + raise ValueError("Missing License Auth Key") if not self.license.get("uuid", None) and self.license.get("authKey", None): - self.on_error("Missing License UUID") - return + raise ValueError("Missing License UUID") data = dict( device=self.device, @@ -64,8 +59,7 @@ def run_impl(self): ) with requests.post(url=SCHEMA_PULL_URL, json=data, auth=auth) as r: if r.status_code != 200: - self.on_error( + raise RuntimeError( r.json().get("message", f"Response status code {r.status_code}"), ) - return self.on_success(r.json())