| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2025 Nordic Semiconductor ASA |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| """ |
| This script is used to install TLS credentials on a device via a serial connection. |
| It supports both deleting and writing credentials, as well as checking for their existence. |
| It also verifies the hash of the installed credentials against the expected hash. |
| |
| This script is based on https://github.com/nRFCloud/utils/, specifically |
| "command_interface.py" and "device_credentials_installer.py". |
| """ |
| |
| import argparse |
| import base64 |
| import hashlib |
| import logging |
| import math |
| import os |
| import sys |
| import time |
| |
| import serial |
| |
| # Configure logging |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
| |
| CMD_TERM_DICT = {'NULL': '\0', 'CR': '\r', 'LF': '\n', 'CRLF': '\r\n'} |
| # 'CR' is the default termination value for the at_host library in the nRF Connect SDK |
| cmd_term_key = 'CR' |
| |
| TLS_CRED_TYPES = ["CA", "SERV", "PK"] |
| TLS_CRED_CHUNK_SIZE = 48 |
| serial_timeout = 1 |
| ser = None |
| |
| |
| class TLSCredShellInterface: |
| def __init__(self, serial_write_line, serial_wait_for_response, verbose): |
| self.serial_write_line = serial_write_line |
| self.serial_wait_for_response = serial_wait_for_response |
| self.verbose = verbose |
| |
| def write_raw(self, command): |
| if self.verbose: |
| logger.debug(f'-> {command}') |
| self.serial_write_line(command) |
| |
| def write_credential(self, sectag, cred_type, cred_text): |
| # Because the Zephyr shell does not support multi-line commands, |
| # we must base-64 encode our PEM strings and install them as if they were binary. |
| # Yes, this does mean we are base-64 encoding a string which is already mostly base-64. |
| # We could alternatively strip the ===== BEGIN/END XXXX ===== header/footer, and then pass |
| # everything else directly as a binary payload (using BIN mode instead of BINT, since |
| # MBedTLS uses the NULL terminator to determine if the credential is raw DER, or is a |
| # PEM string). But this will fail for multi-CA installs, such as CoAP. |
| |
| # text -> bytes -> base64 bytes -> base64 text |
| encoded = base64.b64encode(cred_text.encode()).decode() |
| self.write_raw("cred buf clear") |
| chunks = math.ceil(len(encoded) / TLS_CRED_CHUNK_SIZE) |
| for c in range(chunks): |
| chunk = encoded[c * TLS_CRED_CHUNK_SIZE : (c + 1) * TLS_CRED_CHUNK_SIZE] |
| self.write_raw(f"cred buf {chunk}") |
| result, output = self.serial_wait_for_response("Stored", "RX ring buffer full") |
| if not result: |
| logging.error("Failed to store chunk in the device: unknown error") |
| if output and b"RX ring buffer full" in output: |
| logging.error(f"Failed to store chunk in the device: {output}") |
| return False |
| if not 0 <= cred_type < len(TLS_CRED_TYPES): |
| logger.error( |
| f"Invalid credential type: {cred_type}. Range [0, {len(TLS_CRED_TYPES) - 1}]." |
| ) |
| return False |
| self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint") |
| result, _ = self.serial_wait_for_response("Added TLS credential", "already exists") |
| time.sleep(1) |
| return result |
| |
| def delete_credential(self, sectag, cred_type): |
| if not 0 <= cred_type < len(TLS_CRED_TYPES): |
| logger.error( |
| f"Invalid credential type: {cred_type}. Range [0, {len(TLS_CRED_TYPES) - 1}]." |
| ) |
| return False |
| self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}') |
| result, _ = self.serial_wait_for_response( |
| "Deleted TLS credential", "There is no TLS credential" |
| ) |
| time.sleep(2) |
| return result |
| |
| def check_credential_exists(self, sectag, cred_type, get_hash=True): |
| self.write_raw(f'cred list {sectag} {TLS_CRED_TYPES[cred_type]}') |
| _, output = self.serial_wait_for_response( |
| "1 credentials found.", |
| "0 credentials found.", |
| store=f"{sectag},{TLS_CRED_TYPES[cred_type]}", |
| ) |
| |
| if not output: |
| return False, None |
| |
| if not get_hash: |
| return True, None |
| |
| data = output.decode().split(",") |
| logger.debug(f"Cred list output: {data}") |
| if len(data) < 4: |
| logger.error("Invalid output format from device, skipping hash check.") |
| return False, None |
| cred_hash = data[2].strip() |
| status_code = data[3].strip() |
| |
| if status_code != "0": |
| logger.warning(f"Error retrieving credential hash: {output.decode().strip()}.") |
| logger.warning("Device might not support credential digests.") |
| return True, None |
| |
| return True, cred_hash |
| |
| def calculate_expected_hash(self, cred_text): |
| cred_hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00') |
| return base64.b64encode(cred_hash.digest()).decode() |
| |
| def check_cred_command(self): |
| logger.info("Checking for 'cred' command existence...") |
| self.serial_write_line("cred") |
| result, output = self.serial_wait_for_response( |
| "TLS Credentials Commands", "command not found", store="cred" |
| ) |
| logger.debug(f"Result: {result}, Output: {output}") |
| if not result: |
| logger.error("Device did not respond to 'cred' command.") |
| return False |
| if output and b"command not found" in output: |
| logger.error("Device does not support 'cred' command.") |
| logger.error("Hint: Add 'CONFIG_TLS_CREDENTIALS_SHELL=y' to your prj.conf file.") |
| return False |
| logger.info("'cred' command found.") |
| return True |
| |
| |
| def write_line(line, hidden=False): |
| if not hidden: |
| logger.debug(f'-> {line}') |
| ser.write(bytes((line + CMD_TERM_DICT[cmd_term_key]).encode('utf-8'))) |
| |
| |
| def wait_for_prompt(val1='uart:~$ ', val2=None, timeout=15, store=None): |
| found = False |
| retval = False |
| output = None |
| |
| if not ser: |
| logger.error('Serial interface not initialized') |
| return False, None |
| |
| if isinstance(val1, str): |
| val1 = val1.encode() |
| |
| if isinstance(val2, str): |
| val2 = val2.encode() |
| |
| if isinstance(store, str): |
| store = store.encode() |
| |
| ser.flush() |
| |
| while not found and timeout != 0: |
| try: |
| line = ser.readline() |
| except serial.SerialException as e: |
| logger.error(f"Error reading from serial interface: {e}") |
| return False, None |
| except Exception as e: |
| logger.error(f"Unexpected error: {e}") |
| return False, None |
| |
| if line == b'\r\n': |
| continue |
| |
| if line is None or len(line) == 0: |
| if timeout > 0: |
| timeout -= serial_timeout |
| continue |
| |
| logger.debug(f'<- {line.decode("utf-8", errors="replace")}') |
| |
| if val1 in line: |
| found = True |
| retval = True |
| elif val2 is not None and val2 in line: |
| found = True |
| retval = False |
| elif store is not None and (store in line or str(store) in str(line)): |
| output = line |
| |
| if b'\n' not in line: |
| logger.debug('') |
| |
| ser.flush() |
| if store is not None and output is None: |
| logger.error(f'String {store} not detected in line {line}') |
| |
| if timeout == 0: |
| logger.error('Serial timeout waiting for prompt') |
| |
| return retval, output |
| |
| |
| def parse_args(in_args): |
| parser = argparse.ArgumentParser( |
| description="Device Credentials Installer", |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| allow_abbrev=False, |
| ) |
| parser.add_argument( |
| "-p", "--port", type=str, help="Specify which serial port to open", default="/dev/ttyACM1" |
| ) |
| parser.add_argument( |
| "-x", |
| "--xonxoff", |
| help="Enable software flow control for serial connection", |
| action='store_true', |
| default=False, |
| ) |
| parser.add_argument( |
| "-r", |
| "--rtscts-off", |
| help="Disable hardware (RTS/CTS) flow control for serial connection", |
| action='store_true', |
| default=False, |
| ) |
| parser.add_argument( |
| "-f", |
| "--dsrdtr", |
| help="Enable hardware (DSR/DTR) flow control for serial connection", |
| action='store_true', |
| default=False, |
| ) |
| parser.add_argument( |
| "-d", "--delete", help="Delete sectag from device first", action='store_true', default=False |
| ) |
| parser.add_argument( |
| "-l", |
| "--local-cert-file", |
| type=str, |
| help="Filepath to a local certificate (PEM) to use for the device", |
| required=True, |
| ) |
| parser.add_argument( |
| "-t", "--cert-type", type=int, help="Certificate type to use for the device", default=1 |
| ) |
| parser.add_argument( |
| "-S", "--sectag", type=int, help="integer: Security tag to use", default=16842753 |
| ) |
| parser.add_argument( |
| "-H", |
| "--check-hash", |
| help="Check hash of the credential after writing", |
| action='store_true', |
| default=False, |
| ) |
| |
| parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output") |
| args = parser.parse_args(in_args) |
| return args |
| |
| |
| def main(in_args): |
| global ser |
| |
| args = parse_args(in_args) |
| |
| if args.verbose: |
| logger.setLevel(logging.DEBUG) |
| |
| if not os.path.isfile(args.local_cert_file): |
| logger.error(f'Local certificate file {args.local_cert_file} does not exist') |
| sys.exit(3) |
| |
| logger.info(f'Opening port {args.port}') |
| try: |
| try: |
| ser = serial.Serial( |
| args.port, |
| 115200, |
| xonxoff=args.xonxoff, |
| rtscts=(not args.rtscts_off), |
| dsrdtr=args.dsrdtr, |
| timeout=serial_timeout, |
| ) |
| ser.reset_input_buffer() |
| ser.reset_output_buffer() |
| except FileNotFoundError: |
| logger.error(f'Specified port {args.port} does not exist or cannot be accessed') |
| sys.exit(2) |
| except serial.SerialException as e: |
| logger.error(f'Failed to open serial port {args.port}: {e}') |
| sys.exit(2) |
| except serial.serialutil.SerialException: |
| logger.error('Port could not be opened; not a device, or open already') |
| sys.exit(2) |
| |
| cred_if = TLSCredShellInterface(write_line, wait_for_prompt, args.verbose) |
| cmd_exits = cred_if.check_cred_command() |
| if not cmd_exits: |
| sys.exit(1) |
| |
| with open(args.local_cert_file) as f: |
| dev_bytes = f.read() |
| |
| if args.delete: |
| logger.info(f'Deleting sectag {args.sectag}...') |
| cred_if.delete_credential(args.sectag, args.cert_type) |
| |
| result = cred_if.write_credential(args.sectag, args.cert_type, dev_bytes) |
| if not result: |
| logger.error(f'Failed to write credential for sectag {args.sectag}, it may already exist') |
| sys.exit(5) |
| logger.info(f'Writing sectag {args.sectag}...') |
| result, cred_hash = cred_if.check_credential_exists( |
| args.sectag, args.cert_type, args.check_hash |
| ) |
| if args.check_hash: |
| logger.debug(f'Checking hash for sectag {args.sectag}...') |
| if not result: |
| logger.error(f'Failed to check credential existence for sectag {args.sectag}') |
| sys.exit(4) |
| if cred_hash: |
| logger.debug(f'Credential hash: {cred_hash}') |
| expected_hash = cred_if.calculate_expected_hash(dev_bytes) |
| if cred_hash != expected_hash: |
| logger.error( |
| f'Hash mismatch for sectag {args.sectag}. Exp: {expected_hash}, got: {cred_hash}' |
| ) |
| sys.exit(6) |
| logger.info(f'Credential for sectag {args.sectag} written successfully') |
| sys.exit(0) |
| |
| |
| def run(): |
| try: |
| main(sys.argv[1:]) |
| except KeyboardInterrupt: |
| logger.info("Execution interrupted by user (Ctrl-C). Exiting...") |
| sys.exit(1) |
| |
| |
| if __name__ == '__main__': |
| run() |