| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Write a binary over serial to provision the Gonk FPGA.""" |
| |
| import argparse |
| from itertools import islice |
| import logging |
| import operator |
| from pathlib import Path |
| import sys |
| import time |
| from typing import Iterable |
| |
| import serial |
| from serial import Serial |
| from serial.tools.list_ports import comports |
| from serial.tools.miniterm import ConsoleBase, Miniterm, Transform |
| |
| from pw_cli import log as pw_cli_log |
| from pw_console import python_logging |
| import pw_cli.color |
| import pw_log.log_decoder |
| from pw_tokenizer import ( |
| database as pw_tokenizer_database, |
| detokenize, |
| tokens, |
| ) |
| |
| from gonk_tools.adc import log_csv_header |
| from gonk_tools.gonk_log_stream import GonkLogStream |
| from gonk_tools.firmware_files import ( |
| BUNDLED_ELF, |
| BUNDLED_FPGA_BINFILE, |
| bundled_elf_path, |
| bundled_fpga_binfile_path, |
| ) |
| |
| _ROOT_LOG = logging.getLogger() |
| _LOG = logging.getLogger('host') |
| _DEVICE_LOG = logging.getLogger('gonk') |
| _CSV_LOG = logging.getLogger('gonk.csv') |
| |
| _COLOR = pw_cli.color.colors() |
| |
| DEFAULT_HOST_LOGFILE = 'gonk-host-log.txt' |
| DEFAULT_DEVICE_LOGFILE = 'gonk-device-log.txt' |
| DEFAULT_CSV_LOGFILE = 'gonk-csv-log.txt' |
| |
| |
| def _parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| parser.add_argument( |
| '--databases', |
| metavar='elf_or_token_database', |
| action=pw_tokenizer_database.LoadTokenDatabases, |
| nargs='+', |
| help=( |
| 'ELF or token database files from which to read strings and ' |
| 'tokens. For ELF files, the tokenization domain to read from ' |
| 'may specified after the path as #domain_name (e.g. ' |
| 'foo.elf#TEST_DOMAIN). Unless specified, only the default ' |
| 'domain ("") is read from ELF files; .* reads all domains. ' |
| 'Globs are expanded to compatible database files.' |
| ), |
| ) |
| parser.add_argument( |
| '-p', |
| '--port', |
| type=str, |
| help='Serial port path.', |
| ) |
| parser.add_argument( |
| '-b', |
| '--baudrate', |
| type=int, |
| default=2000000, |
| help='Sets the baudrate for serial communication.', |
| ) |
| parser.add_argument( |
| '--product', |
| default='GENERIC_F730R8TX', |
| help='Use first serial port matching this product name.', |
| ) |
| parser.add_argument( |
| '--serial-number', |
| help='Use the first serial port matching this number.', |
| ) |
| parser.add_argument( |
| '--bitstream-file', |
| type=Path, |
| help=( |
| 'FPGA Bitstream file. Can be a filesystem path or "DEFAULT" to ' |
| 'use a Gonk Python tools bundled binary file.' |
| ), |
| ) |
| parser.add_argument( |
| '--logfile', |
| type=Path, |
| default=Path(DEFAULT_HOST_LOGFILE), |
| help=( |
| 'Default log file. This will contain host side ' |
| 'log messages only unles the ' |
| '--merge-device-and-host-logs argument is used.' |
| f'Default: "{DEFAULT_HOST_LOGFILE}"' |
| ), |
| ) |
| parser.add_argument( |
| '--merge-device-and-host-logs', |
| action='store_true', |
| help=( |
| 'Include device logs in the default --logfile.' |
| 'These are normally shown in a separate device ' |
| 'only log file.' |
| ), |
| ) |
| parser.add_argument( |
| '--log-colors', |
| action=argparse.BooleanOptionalAction, |
| help=('Colors in log messages.'), |
| ) |
| parser.add_argument( |
| '--log-to-stderr', |
| action='store_true', |
| help=('Log to STDERR'), |
| ) |
| parser.add_argument( |
| '--host-logfile', |
| type=Path, |
| help=( |
| 'Additional host only log file. Normally all logs in the ' |
| 'default logfile are host only.' |
| ), |
| ) |
| parser.add_argument( |
| '--device-logfile', |
| type=Path, |
| default=Path(DEFAULT_DEVICE_LOGFILE), |
| help=f'Device only log file. Default: "{DEFAULT_DEVICE_LOGFILE}"', |
| ) |
| parser.add_argument( |
| '--json-logfile', |
| type=Path, |
| help='Device only JSON formatted log file.', |
| ) |
| parser.add_argument( |
| '--csv-logfile', |
| type=Path, |
| default=Path(DEFAULT_CSV_LOGFILE), |
| help=( |
| 'Write ADC data to a CSV formatted log file. ' |
| f'Default: "{DEFAULT_CSV_LOGFILE}"' |
| ), |
| ) |
| parser.add_argument( |
| '--truncate-logfiles', |
| action=argparse.BooleanOptionalAction, |
| default=True, |
| help=('Truncate logfiles on before logging.'), |
| ) |
| parser.add_argument( |
| '--read-only', |
| action='store_true', |
| help=( |
| 'Running in read-only disables stdin keyboard interaction with ' |
| 'Gonk and allows running the process in the background.' |
| ), |
| ) |
| |
| return parser.parse_args() |
| |
| |
| def get_serial_port( |
| product: str | None = None, |
| serial_number: str | None = None, |
| ) -> str: |
| """Return serial ports that match give serial numbers or product names.""" |
| |
| ports = sorted(comports(), key=operator.attrgetter('device')) |
| |
| # Return devices matching serial numbers first. |
| for port in ports: |
| if ( |
| serial_number is not None |
| and port.serial_number is not None |
| and serial_number in port.serial_number |
| ): |
| return port.device |
| # If no matching serial numbers, check for matching product names. |
| for port in ports: |
| if ( |
| product is not None |
| and port.product is not None |
| and product in port.product |
| ): |
| return port.device |
| |
| # No matches found. |
| return '' |
| |
| |
| FILE_LENGTH = 135100 |
| FILE_START_STR = 'FF 00 00 FF' |
| SYNC_START_STR = '7E AA 99 7E' |
| FILE_START_BYTES = bytes.fromhex(FILE_START_STR) |
| SYNC_START_BYTES = bytes.fromhex(SYNC_START_STR) |
| |
| |
| class UnknownSerialDevice(Exception): |
| """Exception raised when no device is specified.""" |
| |
| |
| class IncorrectBinaryFormat(Exception): |
| """Exception raised when FPGA bitstream file is in an unexpected format.""" |
| |
| |
| def batched(iterable, n): |
| """Batch data into tuples of length n. The last batch may be shorter. |
| |
| Example usage: |
| |
| .. code-block:: pycon |
| |
| >>> list(batched('ABCDEFG', 3)) |
| ['ABC', 'DEF', 'G'] |
| """ |
| if n < 1: |
| raise ValueError('n must be at least one') |
| it = iter(iterable) |
| while batch := tuple(islice(it, n)): |
| yield batch |
| |
| |
| class HandleBinaryData(Transform): |
| """Miniterm transform to handle incoming byte data.""" |
| |
| def __init__(self, gonk_log_stream: GonkLogStream) -> None: |
| self.gonk_log_stream = gonk_log_stream |
| |
| def rx(self, text: str, data: bytes | None = None) -> str: |
| """Text received from the serial port.""" |
| if data: |
| self.gonk_log_stream.write(data) |
| return '' |
| return text |
| |
| |
| class DebugSerialIO(Transform): |
| """Print sent and received data to stderr.""" |
| |
| def rx(self, text: str) -> str: |
| """Text received from the serial port.""" |
| sys.stderr.write('[Recv: {!r}] '.format(text)) |
| sys.stderr.flush() |
| return text |
| |
| def tx(self, text: str): |
| """Text to be sent to the serial port.""" |
| sys.stderr.write(' [Send: {!r}] '.format(text)) |
| sys.stderr.flush() |
| return text |
| |
| def echo(self, text: str): |
| """Text to be sent but displayed on console.""" |
| return text |
| |
| |
| class MinitermBinary(Miniterm): |
| def reader(self): |
| """loop and copy serial->console""" |
| # pylint: disable=too-many-nested-blocks |
| try: |
| while self.alive and self._reader_alive: |
| # Read all that is there or wait for one byte |
| data = self.serial.read(self.serial.in_waiting or 1) |
| if data: |
| if self.raw: |
| self.console.write_bytes(data) |
| else: |
| text = self.rx_decoder.decode(data) |
| for transformation in self.rx_transformations: |
| if isinstance(transformation, HandleBinaryData): |
| text = transformation.rx(text, data) |
| else: |
| text = transformation.rx(text) |
| self.console.write(text) |
| except serial.SerialException: |
| self.alive = False |
| self.console.cancel() |
| raise |
| |
| |
| class MinitermBinaryReadOnly(MinitermBinary): |
| # pylint: disable=too-many-instance-attributes,super-init-not-called |
| def __init__(self, serial_instance, echo=False, eol='crlf', filters=()): |
| # Use ConsoleBase |
| self.console = ConsoleBase() |
| # Remaining lines are identical to Miniterm init(). |
| self.serial = serial_instance |
| self.echo = echo |
| self.raw = False |
| self.input_encoding = 'UTF-8' |
| self.output_encoding = 'UTF-8' |
| self.eol = eol |
| self.filters = filters |
| self.update_transformations() |
| self.exit_character = chr(0x1D) # GS/CTRL+] |
| self.menu_character = chr(0x14) # Menu: CTRL+T |
| self.alive = None |
| self._reader_alive = None |
| self.receiver_thread = None |
| self.rx_decoder = None |
| self.tx_decoder = None |
| |
| def writer(self) -> None: |
| while self.alive: |
| if not self.alive: |
| break |
| time.sleep(0.5) |
| |
| |
| def load_bitstream_file(bitstream_file: Path | None) -> bytes: |
| """Check for valid bitstream file and load it as bytes.""" |
| bitstream_bytes = b'' |
| |
| if not bitstream_file or str(bitstream_file) == 'DEFAULT': |
| if not BUNDLED_FPGA_BINFILE: |
| raise FileNotFoundError('No default bitstream file is available.') |
| |
| bitstream_path = bundled_fpga_binfile_path() |
| if not bitstream_path: |
| raise FileNotFoundError('No default bitstream file is available.') |
| |
| bitstream_bytes = bitstream_path.read_bytes() |
| else: |
| if not bitstream_file.is_file(): |
| raise FileNotFoundError(f'\nUnable to load "{bitstream_file}"') |
| bitstream_bytes = bitstream_file.read_bytes() |
| |
| if ( |
| len(bitstream_bytes) != 135100 |
| or bitstream_bytes[0:8] != FILE_START_BYTES + SYNC_START_BYTES |
| ): |
| raise IncorrectBinaryFormat( |
| f'the bitstream file must be:\n' |
| f' {FILE_LENGTH} bytes in length\n' |
| f' Start with "{FILE_START_STR} {SYNC_START_STR}"' |
| ) |
| |
| return bitstream_bytes |
| |
| |
| def write_bitstream_file( |
| bitstream_bytes: bytes, serial_instance: Serial |
| ) -> None: |
| """Write a series of bytes to serial.""" |
| |
| # Write out the bitstream in batches. |
| _LOG.info('Sending bitstream...') |
| written_bytes: int = 0 |
| for byte_batch in batched(bitstream_bytes, 8): |
| result = serial_instance.write(byte_batch) |
| if result: |
| written_bytes += result |
| serial_instance.flush() |
| _LOG.info('Done sending bitstream. Wrote %d', written_bytes) |
| |
| |
| class FpgaProvisioner: |
| """Handles the Gonk FPGA provision process.""" |
| |
| def __init__( |
| self, |
| bitstream_file: Path | None, |
| serial_instance: Serial, |
| ) -> None: |
| self.fpga_provisioned = False |
| self.bitstream_bytes = load_bitstream_file(bitstream_file) |
| self.serial_instance = serial_instance |
| |
| def log_handler(self, log: pw_log.log_decoder.Log) -> None: |
| """Listen to Gonk logs and start the provision process when needed.""" |
| if self.fpga_provisioned: |
| return |
| |
| if 'Waiting for bitstream' not in log.message: |
| return |
| |
| # Send the bitstream_file. |
| write_bitstream_file(self.bitstream_bytes, self.serial_instance) |
| self.fpga_provisioned = True |
| |
| |
| def gonk_main( |
| # pylint: disable=too-many-arguments,too-many-locals |
| baudrate: int, |
| databases: Iterable, |
| bitstream_file: Path | None, |
| port: str | None = None, |
| product: str | None = None, |
| serial_number: str | None = None, |
| logfile: Path | None = None, |
| host_logfile: Path | None = None, |
| device_logfile: Path | None = None, |
| json_logfile: Path | None = None, |
| csv_logfile: Path | None = None, |
| log_colors: bool | None = False, |
| merge_device_and_host_logs: bool = False, |
| verbose: bool = False, |
| log_to_stderr: bool = False, |
| truncate_logfiles: bool = True, |
| read_only: bool = False, |
| ) -> int: |
| """Write a bitstream file over serial while monitoring output.""" |
| |
| if not databases and BUNDLED_ELF: |
| databases = [] |
| elf_file = bundled_elf_path() |
| if elf_file: |
| databases.append( |
| pw_tokenizer_database.load_token_database(elf_file) |
| ) |
| |
| if not logfile: |
| # Create a temp logfile to prevent logs from appearing over stdout. This |
| # would corrupt the prompt toolkit UI. |
| logfile = Path(python_logging.create_temp_log_file()) |
| |
| if truncate_logfiles: |
| logfile.write_text('', encoding='utf-8') |
| |
| if log_colors is None: |
| log_colors = True |
| colors = pw_cli.color.colors(log_colors) |
| log_level = logging.DEBUG if verbose else logging.INFO |
| logger_name_format = colors.cyan('%(name)s') |
| logger_message_format = f'[{logger_name_format}] %(levelname)s %(message)s' |
| |
| pw_cli_log.install( |
| level=log_level, |
| use_color=log_colors, |
| hide_timestamp=False, |
| log_file=logfile, |
| message_format=logger_message_format, |
| ) |
| |
| if device_logfile: |
| if truncate_logfiles: |
| device_logfile.write_text('', encoding='utf-8') |
| pw_cli_log.install( |
| level=log_level, |
| use_color=log_colors, |
| hide_timestamp=False, |
| log_file=device_logfile, |
| logger=_DEVICE_LOG, |
| message_format=logger_message_format, |
| ) |
| if host_logfile: |
| if truncate_logfiles: |
| host_logfile.write_text('', encoding='utf-8') |
| pw_cli_log.install( |
| level=log_level, |
| use_color=log_colors, |
| hide_timestamp=False, |
| log_file=host_logfile, |
| logger=_ROOT_LOG, |
| message_format=logger_message_format, |
| ) |
| |
| # By default don't send device logs to the root logger. |
| _DEVICE_LOG.propagate = False |
| if merge_device_and_host_logs: |
| # Add device logs to the default logfile. |
| pw_cli_log.install( |
| level=log_level, |
| use_color=log_colors, |
| hide_timestamp=False, |
| log_file=logfile, |
| logger=_DEVICE_LOG, |
| message_format=logger_message_format, |
| ) |
| |
| if json_logfile: |
| if truncate_logfiles: |
| json_logfile.write_text('', encoding='utf-8') |
| json_filehandler = logging.FileHandler(json_logfile, encoding='utf-8') |
| json_filehandler.setLevel(log_level) |
| json_filehandler.setFormatter(python_logging.JsonLogFormatter()) |
| _DEVICE_LOG.addHandler(json_filehandler) |
| |
| # Don't send CSV lines to the root logger. |
| _CSV_LOG.propagate = False |
| if csv_logfile: |
| if truncate_logfiles: |
| csv_logfile.write_text('', encoding='utf-8') |
| csv_filehandler = logging.FileHandler(csv_logfile, encoding='utf-8') |
| csv_filehandler.setLevel(logging.NOTSET) |
| csv_filehandler.setFormatter(logging.Formatter(fmt='%(message)s')) |
| _CSV_LOG.addHandler(csv_filehandler) |
| |
| # Write the csv header if new file or truncate_logfiles is on. |
| if truncate_logfiles or not csv_logfile.is_file(): |
| log_csv_header() |
| |
| if log_to_stderr: |
| pw_cli_log.install( |
| level=log_level, |
| use_color=log_colors, |
| hide_timestamp=False, |
| message_format=logger_message_format, |
| ) |
| |
| _LOG.setLevel(log_level) |
| _DEVICE_LOG.setLevel(log_level) |
| _ROOT_LOG.setLevel(log_level) |
| |
| # Init serial port. |
| if port is None: |
| port = get_serial_port(product=product, serial_number=serial_number) |
| |
| if not port: |
| raise UnknownSerialDevice( |
| 'No --serial-number --product or --port path provided.' |
| ) |
| |
| serial_instance = Serial(port=port, baudrate=baudrate, timeout=0.1) |
| |
| detokenizer = detokenize.Detokenizer( |
| tokens.Database.merged(*databases), show_errors=True |
| ) |
| |
| # Use pyserial miniterm to monitor recieved data. |
| miniterm_impl = MinitermBinary |
| if read_only: |
| miniterm_impl = MinitermBinaryReadOnly |
| |
| miniterm = miniterm_impl( |
| serial_instance, |
| echo=True, |
| eol='lf', |
| filters=(), |
| ) |
| |
| # Use Ctrl-C as the exit character. (Miniterm default is Ctrl-]) |
| miniterm.exit_character = chr(0x03) |
| miniterm.set_rx_encoding('utf-8', errors='backslashreplace') |
| miniterm.set_tx_encoding('utf-8') |
| |
| log_message_handlers = [] |
| fpga_provisioner = FpgaProvisioner(bitstream_file, serial_instance) |
| log_message_handlers.append(fpga_provisioner.log_handler) |
| |
| gonk_log_stream = GonkLogStream( |
| detokenizer, log_message_handlers=log_message_handlers |
| ) |
| miniterm.rx_transformations.append(HandleBinaryData(gonk_log_stream)) |
| miniterm.tx_transformations.append(DebugSerialIO()) |
| |
| # Start monitoring serial data. |
| miniterm.start() |
| |
| # Wait for ctrl-c, then shutdown miniterm. |
| try: |
| miniterm.join(True) |
| except KeyboardInterrupt: |
| miniterm.stop() |
| sys.stderr.write('\n--- exit ---\n') |
| miniterm.join() |
| miniterm.close() |
| |
| return 0 |
| |
| |
| def main(): |
| sys.exit(gonk_main(**vars(_parse_args()))) |
| |
| |
| if __name__ == '__main__': |
| main() |