Skip to content

Commit

Permalink
Update bk7231tools, migrate to KVStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba2k2 committed Nov 18, 2023
1 parent 73680dc commit 835eaea
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 322 deletions.
40 changes: 25 additions & 15 deletions ltctplugin/upk2esphome/work.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# Copyright (c) Kuba Szczodrzyński 2023-5-13.

from logging import debug, warning
from logging import debug, error, warning
from os import stat
from os.path import isfile
from pprint import pprint
from socket import gethostbyname
from time import sleep
from typing import Callable
from urllib.parse import urlparse

import requests
from bk7231tools.analysis.storage import TuyaStorage
from bk7231tools.analysis.kvstorage import KVStorage
from ltchiptool.gui.work.base import BaseThread
from ltchiptool.util.flash import ClickProgressCallback
from ltchiptool.util.logging import LoggingHandler
Expand Down Expand Up @@ -102,9 +103,9 @@ def run_file(self, file: str):
self.on_error("File not found")
return
size = stat(file).st_size
if size > 0x200000:
if size > 0x400000:
# file too large
self.on_error("File larger than 2 MiB, refusing to load!")
self.on_error("File larger than 4 MiB, refusing to load!")
return
if size == 0x200000:
# probably full flash dump
Expand All @@ -121,25 +122,34 @@ def run_file(self, file: str):

def run_data(self, data: bytes):
# parse raw storage
st = TuyaStorage()
if st.load_raw(data, allow_incomplete=True) is None:
result = KVStorage.find_storage(data)
if not result:
self.on_error("File doesn't contain known storage area")
return
if not st.decrypt():
self.on_error("Couldn't decrypt the storage area")

_, data = result
try:
kvs = KVStorage.decrypt_and_unpack(data)
except Exception as e:
self.on_error("Couldn't parse storage data - see program logs")
error("Storage keys failed", exc_info=e)
return
keys = st.find_all_keys()

keys = list(kvs.indexes.keys())
debug(f"Found {len(keys)} keys! {keys}")
if not keys:
self.on_error("No keys found in storage! Is the data corrupt?")
try:
from hexdump import hexdump
pprint(kvs)
return

hexdump(st.data)
except (ImportError, ModuleNotFoundError):
pass
try:
storage = kvs.read_all_values_parsed()
except Exception as e:
self.on_error("Couldn't extract storage data - see program logs")
error("Storage values failed", exc_info=e)
pprint(kvs)
return
storage = st.read_all_keys()

self.on_storage(storage)

def stop(self):
Expand Down
Loading

0 comments on commit 835eaea

Please sign in to comment.