Skip to content

Commit

Permalink
Update OTA image tool (#20491)
Browse files Browse the repository at this point in the history
* Update OTA image tool

- allow removing headers
- allow re-writing headers

* Read/write in chunks
  • Loading branch information
cecille authored Jul 13, 2022
1 parent b682834 commit cbabee6
Showing 1 changed file with 100 additions and 6 deletions.
106 changes: 100 additions & 6 deletions src/app/ota_image_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from chip.tlv import TLVReader, TLVWriter, uint # noqa: E402

HEADER_MAGIC = 0x1BEEF11E
FIXED_HEADER_FORMAT = '<IQI'

DIGEST_ALGORITHM_ID = dict(
sha256=1,
Expand Down Expand Up @@ -176,10 +177,9 @@ def generate_header(header_tlv: bytes, payload_size: int):
Generate OTA image header
"""

fixed_header_format = '<IQI'
fixed_header = struct.pack(fixed_header_format,
fixed_header = struct.pack(FIXED_HEADER_FORMAT,
HEADER_MAGIC,
struct.calcsize(fixed_header_format) +
struct.calcsize(FIXED_HEADER_FORMAT) +
len(header_tlv) + payload_size,
len(header_tlv))

Expand Down Expand Up @@ -219,15 +219,39 @@ def parse_header(args: object):
"""

with open(args.image_file, 'rb') as file:
fixed_header_format = '<IQI'
fixed_header = file.read(struct.calcsize(fixed_header_format))
fixed_header = file.read(struct.calcsize(FIXED_HEADER_FORMAT))
magic, total_size, header_size = struct.unpack(
fixed_header_format, fixed_header)
FIXED_HEADER_FORMAT, fixed_header)
header_tlv = TLVReader(file.read(header_size)).get()['Any']

return magic, total_size, header_size, header_tlv


def full_header_size(args: object) -> int:
"""
Returns the size of the fixed header + header
"""
_magic, _total_size, header_size, _header_tlv = parse_header(args)
return struct.calcsize(FIXED_HEADER_FORMAT) + header_size


def read_chunk(file, size=1024):
while data := file.read(size):
yield data


def remove_header(args: object) -> None:
"""
Removes the header from args.image_file and writes to args.output_file
"""
image_start = full_header_size(args)
with open(args.image_file, 'rb') as file:
with open(args.output_file, 'wb') as outfile:
file.seek(image_start)
for chunk in read_chunk(file):
outfile.write(chunk)


def show_header(args: object):
"""
Parse and present OTA image header in human-readable form
Expand All @@ -252,6 +276,47 @@ def show_header(args: object):
print(f' [{tag}] {tag_name}: {value}')


def update_header_args(args: object) -> None:
"""
Generates an image file with a new header
New header values can be specified in args, otherwise the values from args.image_file are used.
The new file is written out to args.output_file.
"""
_magic, _total_size, _header_size, header_tlv = parse_header(args)

payload_size = header_tlv[HeaderTag.PAYLOAD_SIZE]
payload_digest = header_tlv[HeaderTag.DIGEST]

if args.vendor_id is None:
args.vendor_id = header_tlv[HeaderTag.VENDOR_ID]
if args.product_id is None:
args.product_id = header_tlv[HeaderTag.PRODUCT_ID]
if args.version is None:
args.version = header_tlv[HeaderTag.VERSION]
if args.version_str is None:
args.version_str = header_tlv[HeaderTag.VERSION_STRING]
if args.digest_algorithm is None:
val = header_tlv[HeaderTag.DIGEST_TYPE]
args.digest_algorithm = next(key for key, value in DIGEST_ALGORITHM_ID.items() if value == val)
if args.min_version is None and HeaderTag.MIN_VERSION in header_tlv:
args.min_version = header_tlv[HeaderTag.MIN_VERSION]
if args.max_version is None and HeaderTag.MAX_VERSION in header_tlv:
args.max_version = header_tlv[HeaderTag.MAX_VERSION]
if args.release_notes is None and HeaderTag.RELEASE_NOTES_URL in header_tlv:
args.release_notes = header_tlv[HeaderTag.RELEASE_NOTES_URL]

new_header_tlv = generate_header_tlv(args, payload_size, payload_digest)
header = generate_header(new_header_tlv, payload_size)

with open(args.image_file, 'rb') as infile:
with open(args.output_file, 'wb') as outfile:
outfile.write(header)
infile.seek(full_header_size(args))
for chunk in read_chunk(infile):
outfile.write(chunk)


def main():
def any_base_int(s): return int(s, 0)

Expand Down Expand Up @@ -284,13 +349,42 @@ def any_base_int(s): return int(s, 0)
show_parser = subcommands.add_parser('show', help='Show OTA image info')
show_parser.add_argument('image_file', help='Path to OTA image file')

extract_tool = subcommands.add_parser('extract', help='Remove the OTA header from an image file')
extract_tool.add_argument('image_file', help='Path to OTA image file with header')
extract_tool.add_argument('output_file', help='Path to put the output file (no header)')

change_tool = subcommands.add_parser('change_header', help='Change the specified values in the header')
change_tool.add_argument('-v', '--vendor-id', type=any_base_int,
help='Vendor ID')
change_tool.add_argument('-p', '--product-id', type=any_base_int,
help='Product ID')
change_tool.add_argument('-vn', '--version', type=any_base_int,
help='Software version (numeric)')
change_tool.add_argument('-vs', '--version-str',
help='Software version (string)')
change_tool.add_argument('-da', '--digest-algorithm', choices=DIGEST_ALL_ALGORITHMS,
help='Digest algorithm')
change_tool.add_argument('-mi', '--min-version', type=any_base_int,
help='Minimum software version that can be updated to this image')
change_tool.add_argument('-ma', '--max-version', type=any_base_int,
help='Maximum software version that can be updated to this image')
change_tool.add_argument(
'-rn', '--release-notes', help='Release note URL')
change_tool.add_argument('image_file',
help='Path to input OTA file')
change_tool.add_argument('output_file', help='Path to output OTA file')

args = parser.parse_args()

if args.subcommand == 'create':
validate_header_attributes(args)
generate_image(args)
elif args.subcommand == 'show':
show_header(args)
elif args.subcommand == 'extract':
remove_header(args)
elif args.subcommand == 'change_header':
update_header_args(args)


if __name__ == "__main__":
Expand Down

0 comments on commit cbabee6

Please sign in to comment.