| #!/usr/bin/env python3 |
| |
| # |
| # Copyright (c) 2022 Project CHIP Authors |
| # All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| Matter OTA (Over-the-air update) image utility. |
| |
| Usage examples: |
| |
| Creating OTA image file: |
| ./ota_image_tool.py create -v 0xDEAD -p 0xBEEF -vn 1 -vs "1.0" -da sha256 my-firmware.bin my-firmware.ota |
| |
| Showing OTA image file info: |
| ./ota_image_tool.py show my-firmware.ota |
| """ |
| |
| import argparse |
| import hashlib |
| import os |
| import struct |
| import sys |
| from enum import IntEnum |
| |
| sys.path.insert(0, os.path.join( |
| os.path.dirname(__file__), '../controller/python')) |
| from chip.tlv import TLVReader, TLVWriter, uint # noqa: E402 isort:skip |
| |
| HEADER_MAGIC = 0x1BEEF11E |
| FIXED_HEADER_FORMAT = '<IQI' |
| |
| DIGEST_ALGORITHM_ID = dict( |
| sha256=1, |
| sha256_128=2, |
| sha256_120=3, |
| sha256_96=4, |
| sha256_64=5, |
| sha256_32=6, |
| sha384=7, |
| sha512=8, |
| sha3_224=9, |
| sha3_256=10, |
| sha3_384=11, |
| sha3_512=12, |
| ) |
| |
| DIGEST_ALL_ALGORITHMS = hashlib.algorithms_available.intersection( |
| DIGEST_ALGORITHM_ID.keys()) |
| |
| # Buffer size used for file reads to ensure large files do not need to be loaded |
| # into memory fully before processing. |
| PAYLOAD_BUFFER_SIZE = 16 * 1024 |
| |
| |
| class HeaderTag(IntEnum): |
| VENDOR_ID = 0 |
| PRODUCT_ID = 1 |
| VERSION = 2 |
| VERSION_STRING = 3 |
| PAYLOAD_SIZE = 4 |
| MIN_VERSION = 5 |
| MAX_VERSION = 6 |
| RELEASE_NOTES_URL = 7 |
| DIGEST_TYPE = 8 |
| DIGEST = 9 |
| |
| |
| def warn(message: str): |
| sys.stderr.write(f'warning: {message}\n') |
| |
| |
| def error(message: str): |
| sys.stderr.write(f'error: {message}\n') |
| sys.exit(1) |
| |
| |
| def validate_header_attributes(args: object): |
| """ |
| Validate attributes to be stored in OTA image header |
| """ |
| |
| if args.vendor_id == 0: |
| error('Vendor ID is zero') |
| |
| if args.product_id == 0: |
| error('Product ID is zero') |
| |
| if not 1 <= len(args.version_str) <= 64: |
| error('Software version string is not of length 1-64') |
| |
| if args.min_version is not None and args.min_version >= args.version: |
| error('Minimum applicable version is greater or equal to software version') |
| |
| if args.max_version is not None and args.max_version >= args.version: |
| error('Maximum applicable version is greater or equal to software version') |
| |
| if args.min_version is not None and args.max_version is not None and args.max_version < args.min_version: |
| error('Minimum applicable version is greater than maximum applicable version') |
| |
| if args.release_notes is not None: |
| if not 1 <= len(args.release_notes) <= 256: |
| error('Release notes URL must be of length 1-256') |
| |
| if not args.release_notes.startswith('https://'): |
| warn('Release notes URL does not start with "https://"') |
| |
| |
| def generate_payload_summary(args: object): |
| """ |
| Calculate total size and hash of all concatenated input payload files |
| """ |
| |
| total_size = 0 |
| digest = hashlib.new(args.digest_algorithm) |
| |
| if digest.digest_size < (256 // 8): |
| warn('Using digest length below 256 bits is not recommended') |
| |
| for path in args.input_files: |
| with open(path, 'rb') as file: |
| while True: |
| chunk = file.read(PAYLOAD_BUFFER_SIZE) |
| if not chunk: |
| break |
| total_size += len(chunk) |
| digest.update(chunk) |
| |
| return total_size, digest.digest() |
| |
| |
| def generate_header_tlv(args: object, payload_size: int, payload_digest: bytes): |
| """ |
| Generate anonymous TLV structure with fields describing the OTA image contents |
| """ |
| |
| fields = { |
| HeaderTag.VENDOR_ID: uint(args.vendor_id), |
| HeaderTag.PRODUCT_ID: uint(args.product_id), |
| HeaderTag.VERSION: uint(args.version), |
| HeaderTag.VERSION_STRING: args.version_str, |
| HeaderTag.PAYLOAD_SIZE: uint(payload_size), |
| HeaderTag.DIGEST_TYPE: uint(DIGEST_ALGORITHM_ID[args.digest_algorithm]), |
| HeaderTag.DIGEST: payload_digest, |
| } |
| |
| if args.min_version is not None: |
| fields.update({HeaderTag.MIN_VERSION: uint(args.min_version)}) |
| |
| if args.max_version is not None: |
| fields.update({HeaderTag.MAX_VERSION: uint(args.max_version)}) |
| |
| if args.release_notes is not None: |
| fields.update({HeaderTag.RELEASE_NOTES_URL: args.release_notes}) |
| |
| writer = TLVWriter() |
| writer.put(None, fields) |
| |
| return writer.encoding |
| |
| |
| def generate_header(header_tlv: bytes, payload_size: int): |
| """ |
| Generate OTA image header |
| """ |
| |
| fixed_header = struct.pack(FIXED_HEADER_FORMAT, |
| HEADER_MAGIC, |
| struct.calcsize(FIXED_HEADER_FORMAT) + |
| len(header_tlv) + payload_size, |
| len(header_tlv)) |
| |
| return fixed_header + header_tlv |
| |
| |
| def write_image(args: object, header: bytes): |
| """ |
| Write OTA image file consisting of header and concatenated payload files |
| """ |
| |
| with open(args.output_file, 'wb') as out_file: |
| out_file.write(header) |
| |
| for path in args.input_files: |
| with open(path, 'rb') as file: |
| while True: |
| chunk = file.read(PAYLOAD_BUFFER_SIZE) |
| if not chunk: |
| break |
| out_file.write(chunk) |
| |
| |
| def generate_image(args: object): |
| """ |
| Generate OTA image header and write it along with payload files to the OTA image file |
| """ |
| payload_size, payload_digest = generate_payload_summary(args) |
| header_tlv = generate_header_tlv(args, payload_size, payload_digest) |
| header = generate_header(header_tlv, payload_size) |
| write_image(args, header) |
| |
| |
| def parse_header(args: object): |
| """ |
| Parse OTA image header |
| """ |
| |
| with open(args.image_file, 'rb') as file: |
| fixed_header = file.read(struct.calcsize(FIXED_HEADER_FORMAT)) |
| magic, total_size, header_size = struct.unpack( |
| FIXED_HEADER_FORMAT, fixed_header) |
| header_tlv = TLVReader(file.read(header_size)).get()['Any'] |
| |
| return magic, total_size, header_size, header_tlv |
| |
| |
| def full_header_size(args: object) -> int: |
| """ |
| Returns the size of the fixed header + header |
| """ |
| _magic, _total_size, header_size, _header_tlv = parse_header(args) |
| return struct.calcsize(FIXED_HEADER_FORMAT) + header_size |
| |
| |
| def read_chunk(file, size=1024): |
| while data := file.read(size): |
| yield data |
| |
| |
| def remove_header(args: object) -> None: |
| """ |
| Removes the header from args.image_file and writes to args.output_file |
| """ |
| image_start = full_header_size(args) |
| with open(args.image_file, 'rb') as file: |
| with open(args.output_file, 'wb') as outfile: |
| file.seek(image_start) |
| for chunk in read_chunk(file): |
| outfile.write(chunk) |
| |
| |
| def show_header(args: object): |
| """ |
| Parse and present OTA image header in human-readable form |
| """ |
| |
| magic, total_size, header_size, header_tlv = parse_header(args) |
| |
| print(f'Magic: {magic:x}') |
| print(f'Total Size: {total_size}') |
| print(f'Header Size: {header_size}') |
| print('Header TLV:') |
| |
| for tag in header_tlv: |
| tag_name = HeaderTag(tag).name.replace('_', ' ').title() |
| value = header_tlv[tag] |
| |
| if isinstance(value, bytes): |
| value = value.hex() |
| elif isinstance(value, int): |
| value = f'{value} (0x{value:x})' |
| |
| print(f' [{tag}] {tag_name}: {value}') |
| |
| |
| def update_header_args(args: object) -> None: |
| """ |
| Generates an image file with a new header |
| |
| New header values can be specified in args, otherwise the values from args.image_file are used. |
| The new file is written out to args.output_file. |
| """ |
| _magic, _total_size, _header_size, header_tlv = parse_header(args) |
| |
| payload_size = header_tlv[HeaderTag.PAYLOAD_SIZE] |
| payload_digest = header_tlv[HeaderTag.DIGEST] |
| |
| if args.vendor_id is None: |
| args.vendor_id = header_tlv[HeaderTag.VENDOR_ID] |
| if args.product_id is None: |
| args.product_id = header_tlv[HeaderTag.PRODUCT_ID] |
| if args.version is None: |
| args.version = header_tlv[HeaderTag.VERSION] |
| if args.version_str is None: |
| args.version_str = header_tlv[HeaderTag.VERSION_STRING] |
| if args.digest_algorithm is None: |
| val = header_tlv[HeaderTag.DIGEST_TYPE] |
| args.digest_algorithm = next(key for key, value in DIGEST_ALGORITHM_ID.items() if value == val) |
| if args.min_version is None and HeaderTag.MIN_VERSION in header_tlv: |
| args.min_version = header_tlv[HeaderTag.MIN_VERSION] |
| if args.max_version is None and HeaderTag.MAX_VERSION in header_tlv: |
| args.max_version = header_tlv[HeaderTag.MAX_VERSION] |
| if args.release_notes is None and HeaderTag.RELEASE_NOTES_URL in header_tlv: |
| args.release_notes = header_tlv[HeaderTag.RELEASE_NOTES_URL] |
| |
| new_header_tlv = generate_header_tlv(args, payload_size, payload_digest) |
| header = generate_header(new_header_tlv, payload_size) |
| |
| with open(args.image_file, 'rb') as infile: |
| with open(args.output_file, 'wb') as outfile: |
| outfile.write(header) |
| infile.seek(full_header_size(args)) |
| for chunk in read_chunk(infile): |
| outfile.write(chunk) |
| |
| |
| def main(): |
| def any_base_int(s): return int(s, 0) |
| |
| parser = argparse.ArgumentParser( |
| description='Matter OTA (Over-the-air update) image utility', fromfile_prefix_chars='@') |
| subcommands = parser.add_subparsers( |
| dest='subcommand', title='valid subcommands', required=True) |
| |
| create_parser = subcommands.add_parser('create', help='Create OTA image') |
| create_parser.add_argument('-v', '--vendor-id', type=any_base_int, |
| required=True, help='Vendor ID') |
| create_parser.add_argument('-p', '--product-id', type=any_base_int, |
| required=True, help='Product ID') |
| create_parser.add_argument('-vn', '--version', type=any_base_int, |
| required=True, help='Software version (numeric)') |
| create_parser.add_argument('-vs', '--version-str', required=True, |
| help='Software version (string)') |
| create_parser.add_argument('-da', '--digest-algorithm', choices=DIGEST_ALL_ALGORITHMS, |
| required=True, help='Digest algorithm') |
| create_parser.add_argument('-mi', '--min-version', type=any_base_int, |
| help='Minimum software version that can be updated to this image') |
| create_parser.add_argument('-ma', '--max-version', type=any_base_int, |
| help='Maximum software version that can be updated to this image') |
| create_parser.add_argument( |
| '-rn', '--release-notes', help='Release note URL') |
| create_parser.add_argument('input_files', nargs='+', |
| help='Path to input image payload file') |
| create_parser.add_argument('output_file', help='Path to output image file') |
| |
| show_parser = subcommands.add_parser('show', help='Show OTA image info') |
| show_parser.add_argument('image_file', help='Path to OTA image file') |
| |
| extract_tool = subcommands.add_parser('extract', help='Remove the OTA header from an image file') |
| extract_tool.add_argument('image_file', help='Path to OTA image file with header') |
| extract_tool.add_argument('output_file', help='Path to put the output file (no header)') |
| |
| change_tool = subcommands.add_parser('change_header', help='Change the specified values in the header') |
| change_tool.add_argument('-v', '--vendor-id', type=any_base_int, |
| help='Vendor ID') |
| change_tool.add_argument('-p', '--product-id', type=any_base_int, |
| help='Product ID') |
| change_tool.add_argument('-vn', '--version', type=any_base_int, |
| help='Software version (numeric)') |
| change_tool.add_argument('-vs', '--version-str', |
| help='Software version (string)') |
| change_tool.add_argument('-da', '--digest-algorithm', choices=DIGEST_ALL_ALGORITHMS, |
| help='Digest algorithm') |
| change_tool.add_argument('-mi', '--min-version', type=any_base_int, |
| help='Minimum software version that can be updated to this image') |
| change_tool.add_argument('-ma', '--max-version', type=any_base_int, |
| help='Maximum software version that can be updated to this image') |
| change_tool.add_argument( |
| '-rn', '--release-notes', help='Release note URL') |
| change_tool.add_argument('image_file', |
| help='Path to input OTA file') |
| change_tool.add_argument('output_file', help='Path to output OTA file') |
| |
| args = parser.parse_args() |
| |
| if args.subcommand == 'create': |
| validate_header_attributes(args) |
| generate_image(args) |
| elif args.subcommand == 'show': |
| show_header(args) |
| elif args.subcommand == 'extract': |
| remove_header(args) |
| elif args.subcommand == 'change_header': |
| update_header_args(args) |
| |
| |
| if __name__ == "__main__": |
| main() |