blob: b72dad103d110742818377c4b520def0863693bf [file] [log] [blame]
# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Write a binary over serial to provision the Gonk FPGA."""
import argparse
from itertools import islice
import logging
import operator
from pathlib import Path
import sys
import time
from typing import Iterable
import serial
from serial import Serial
from serial.tools.list_ports import comports
from serial.tools.miniterm import ConsoleBase, Miniterm, Transform
from pw_cli import log as pw_cli_log
from pw_console import python_logging
import pw_cli.color
import pw_log.log_decoder
from pw_tokenizer import (
database as pw_tokenizer_database,
detokenize,
tokens,
)
from gonk_tools.adc import log_csv_header
from gonk_tools.gonk_log_stream import GonkLogStream
from gonk_tools.firmware_files import (
BUNDLED_ELF,
BUNDLED_FPGA_BINFILE,
bundled_elf_path,
bundled_fpga_binfile_path,
)
_ROOT_LOG = logging.getLogger()
_LOG = logging.getLogger('host')
_DEVICE_LOG = logging.getLogger('gonk')
_CSV_LOG = logging.getLogger('gonk.csv')
_COLOR = pw_cli.color.colors()
DEFAULT_HOST_LOGFILE = 'gonk-host-log.txt'
DEFAULT_DEVICE_LOGFILE = 'gonk-device-log.txt'
DEFAULT_CSV_LOGFILE = 'gonk-csv-log.txt'
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'--databases',
metavar='elf_or_token_database',
action=pw_tokenizer_database.LoadTokenDatabases,
nargs='+',
help=(
'ELF or token database files from which to read strings and '
'tokens. For ELF files, the tokenization domain to read from '
'may specified after the path as #domain_name (e.g. '
'foo.elf#TEST_DOMAIN). Unless specified, only the default '
'domain ("") is read from ELF files; .* reads all domains. '
'Globs are expanded to compatible database files.'
),
)
parser.add_argument(
'-p',
'--port',
type=str,
help='Serial port path.',
)
parser.add_argument(
'-b',
'--baudrate',
type=int,
default=2000000,
help='Sets the baudrate for serial communication.',
)
parser.add_argument(
'--product',
default='GENERIC_F730R8TX',
help='Use first serial port matching this product name.',
)
parser.add_argument(
'--serial-number',
help='Use the first serial port matching this number.',
)
parser.add_argument(
'--bitstream-file',
type=Path,
help=(
'FPGA Bitstream file. Can be a filesystem path or "DEFAULT" to '
'use a Gonk Python tools bundled binary file.'
),
)
parser.add_argument(
'--logfile',
type=Path,
default=Path(DEFAULT_HOST_LOGFILE),
help=(
'Default log file. This will contain host side '
'log messages only unles the '
'--merge-device-and-host-logs argument is used.'
f'Default: "{DEFAULT_HOST_LOGFILE}"'
),
)
parser.add_argument(
'--merge-device-and-host-logs',
action='store_true',
help=(
'Include device logs in the default --logfile.'
'These are normally shown in a separate device '
'only log file.'
),
)
parser.add_argument(
'--log-colors',
action=argparse.BooleanOptionalAction,
help=('Colors in log messages.'),
)
parser.add_argument(
'--log-to-stderr',
action='store_true',
help=('Log to STDERR'),
)
parser.add_argument(
'--host-logfile',
type=Path,
help=(
'Additional host only log file. Normally all logs in the '
'default logfile are host only.'
),
)
parser.add_argument(
'--device-logfile',
type=Path,
default=Path(DEFAULT_DEVICE_LOGFILE),
help=f'Device only log file. Default: "{DEFAULT_DEVICE_LOGFILE}"',
)
parser.add_argument(
'--json-logfile',
type=Path,
help='Device only JSON formatted log file.',
)
parser.add_argument(
'--csv-logfile',
type=Path,
default=Path(DEFAULT_CSV_LOGFILE),
help=(
'Write ADC data to a CSV formatted log file. '
f'Default: "{DEFAULT_CSV_LOGFILE}"'
),
)
parser.add_argument(
'--truncate-logfiles',
action=argparse.BooleanOptionalAction,
default=True,
help=('Truncate logfiles on before logging.'),
)
parser.add_argument(
'--read-only',
action='store_true',
help=(
'Running in read-only disables stdin keyboard interaction with '
'Gonk and allows running the process in the background.'
),
)
return parser.parse_args()
def get_serial_port(
product: str | None = None,
serial_number: str | None = None,
) -> str:
"""Return serial ports that match give serial numbers or product names."""
ports = sorted(comports(), key=operator.attrgetter('device'))
# Return devices matching serial numbers first.
for port in ports:
if (
serial_number is not None
and port.serial_number is not None
and serial_number in port.serial_number
):
return port.device
# If no matching serial numbers, check for matching product names.
for port in ports:
if (
product is not None
and port.product is not None
and product in port.product
):
return port.device
# No matches found.
return ''
FILE_LENGTH = 135100
FILE_START_STR = 'FF 00 00 FF'
SYNC_START_STR = '7E AA 99 7E'
FILE_START_BYTES = bytes.fromhex(FILE_START_STR)
SYNC_START_BYTES = bytes.fromhex(SYNC_START_STR)
class UnknownSerialDevice(Exception):
"""Exception raised when no device is specified."""
class IncorrectBinaryFormat(Exception):
"""Exception raised when FPGA bitstream file is in an unexpected format."""
def batched(iterable, n):
"""Batch data into tuples of length n. The last batch may be shorter.
Example usage:
.. code-block:: pycon
>>> list(batched('ABCDEFG', 3))
['ABC', 'DEF', 'G']
"""
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
class HandleBinaryData(Transform):
"""Miniterm transform to handle incoming byte data."""
def __init__(self, gonk_log_stream: GonkLogStream) -> None:
self.gonk_log_stream = gonk_log_stream
def rx(self, text: str, data: bytes | None = None) -> str:
"""Text received from the serial port."""
if data:
self.gonk_log_stream.write(data)
return ''
return text
class DebugSerialIO(Transform):
"""Print sent and received data to stderr."""
def rx(self, text: str) -> str:
"""Text received from the serial port."""
sys.stderr.write('[Recv: {!r}] '.format(text))
sys.stderr.flush()
return text
def tx(self, text: str):
"""Text to be sent to the serial port."""
sys.stderr.write(' [Send: {!r}] '.format(text))
sys.stderr.flush()
return text
def echo(self, text: str):
"""Text to be sent but displayed on console."""
return text
class MinitermBinary(Miniterm):
def reader(self):
"""loop and copy serial->console"""
# pylint: disable=too-many-nested-blocks
try:
while self.alive and self._reader_alive:
# Read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
if isinstance(transformation, HandleBinaryData):
text = transformation.rx(text, data)
else:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise
class MinitermBinaryReadOnly(MinitermBinary):
# pylint: disable=too-many-instance-attributes,super-init-not-called
def __init__(self, serial_instance, echo=False, eol='crlf', filters=()):
# Use ConsoleBase
self.console = ConsoleBase()
# Remaining lines are identical to Miniterm init().
self.serial = serial_instance
self.echo = echo
self.raw = False
self.input_encoding = 'UTF-8'
self.output_encoding = 'UTF-8'
self.eol = eol
self.filters = filters
self.update_transformations()
self.exit_character = chr(0x1D) # GS/CTRL+]
self.menu_character = chr(0x14) # Menu: CTRL+T
self.alive = None
self._reader_alive = None
self.receiver_thread = None
self.rx_decoder = None
self.tx_decoder = None
def writer(self) -> None:
while self.alive:
if not self.alive:
break
time.sleep(0.5)
def load_bitstream_file(bitstream_file: Path | None) -> bytes:
"""Check for valid bitstream file and load it as bytes."""
bitstream_bytes = b''
if not bitstream_file or str(bitstream_file) == 'DEFAULT':
if not BUNDLED_FPGA_BINFILE:
raise FileNotFoundError('No default bitstream file is available.')
bitstream_path = bundled_fpga_binfile_path()
if not bitstream_path:
raise FileNotFoundError('No default bitstream file is available.')
bitstream_bytes = bitstream_path.read_bytes()
else:
if not bitstream_file.is_file():
raise FileNotFoundError(f'\nUnable to load "{bitstream_file}"')
bitstream_bytes = bitstream_file.read_bytes()
if (
len(bitstream_bytes) != 135100
or bitstream_bytes[0:8] != FILE_START_BYTES + SYNC_START_BYTES
):
raise IncorrectBinaryFormat(
f'the bitstream file must be:\n'
f' {FILE_LENGTH} bytes in length\n'
f' Start with "{FILE_START_STR} {SYNC_START_STR}"'
)
return bitstream_bytes
def write_bitstream_file(
bitstream_bytes: bytes, serial_instance: Serial
) -> None:
"""Write a series of bytes to serial."""
# Write out the bitstream in batches.
_LOG.info('Sending bitstream...')
written_bytes: int = 0
for byte_batch in batched(bitstream_bytes, 8):
result = serial_instance.write(byte_batch)
if result:
written_bytes += result
serial_instance.flush()
_LOG.info('Done sending bitstream. Wrote %d', written_bytes)
class FpgaProvisioner:
"""Handles the Gonk FPGA provision process."""
def __init__(
self,
bitstream_file: Path | None,
serial_instance: Serial,
) -> None:
self.fpga_provisioned = False
self.bitstream_bytes = load_bitstream_file(bitstream_file)
self.serial_instance = serial_instance
def log_handler(self, log: pw_log.log_decoder.Log) -> None:
"""Listen to Gonk logs and start the provision process when needed."""
if self.fpga_provisioned:
return
if 'Waiting for bitstream' not in log.message:
return
# Send the bitstream_file.
write_bitstream_file(self.bitstream_bytes, self.serial_instance)
self.fpga_provisioned = True
def gonk_main(
# pylint: disable=too-many-arguments,too-many-locals
baudrate: int,
databases: Iterable,
bitstream_file: Path | None,
port: str | None = None,
product: str | None = None,
serial_number: str | None = None,
logfile: Path | None = None,
host_logfile: Path | None = None,
device_logfile: Path | None = None,
json_logfile: Path | None = None,
csv_logfile: Path | None = None,
log_colors: bool | None = False,
merge_device_and_host_logs: bool = False,
verbose: bool = False,
log_to_stderr: bool = False,
truncate_logfiles: bool = True,
read_only: bool = False,
) -> int:
"""Write a bitstream file over serial while monitoring output."""
if not databases and BUNDLED_ELF:
databases = []
elf_file = bundled_elf_path()
if elf_file:
databases.append(
pw_tokenizer_database.load_token_database(elf_file)
)
if not logfile:
# Create a temp logfile to prevent logs from appearing over stdout. This
# would corrupt the prompt toolkit UI.
logfile = Path(python_logging.create_temp_log_file())
if truncate_logfiles:
logfile.write_text('', encoding='utf-8')
if log_colors is None:
log_colors = True
colors = pw_cli.color.colors(log_colors)
log_level = logging.DEBUG if verbose else logging.INFO
logger_name_format = colors.cyan('%(name)s')
logger_message_format = f'[{logger_name_format}] %(levelname)s %(message)s'
pw_cli_log.install(
level=log_level,
use_color=log_colors,
hide_timestamp=False,
log_file=logfile,
message_format=logger_message_format,
)
if device_logfile:
if truncate_logfiles:
device_logfile.write_text('', encoding='utf-8')
pw_cli_log.install(
level=log_level,
use_color=log_colors,
hide_timestamp=False,
log_file=device_logfile,
logger=_DEVICE_LOG,
message_format=logger_message_format,
)
if host_logfile:
if truncate_logfiles:
host_logfile.write_text('', encoding='utf-8')
pw_cli_log.install(
level=log_level,
use_color=log_colors,
hide_timestamp=False,
log_file=host_logfile,
logger=_ROOT_LOG,
message_format=logger_message_format,
)
# By default don't send device logs to the root logger.
_DEVICE_LOG.propagate = False
if merge_device_and_host_logs:
# Add device logs to the default logfile.
pw_cli_log.install(
level=log_level,
use_color=log_colors,
hide_timestamp=False,
log_file=logfile,
logger=_DEVICE_LOG,
message_format=logger_message_format,
)
if json_logfile:
if truncate_logfiles:
json_logfile.write_text('', encoding='utf-8')
json_filehandler = logging.FileHandler(json_logfile, encoding='utf-8')
json_filehandler.setLevel(log_level)
json_filehandler.setFormatter(python_logging.JsonLogFormatter())
_DEVICE_LOG.addHandler(json_filehandler)
# Don't send CSV lines to the root logger.
_CSV_LOG.propagate = False
if csv_logfile:
if truncate_logfiles:
csv_logfile.write_text('', encoding='utf-8')
csv_filehandler = logging.FileHandler(csv_logfile, encoding='utf-8')
csv_filehandler.setLevel(logging.NOTSET)
csv_filehandler.setFormatter(logging.Formatter(fmt='%(message)s'))
_CSV_LOG.addHandler(csv_filehandler)
# Write the csv header if new file or truncate_logfiles is on.
if truncate_logfiles or not csv_logfile.is_file():
log_csv_header()
if log_to_stderr:
pw_cli_log.install(
level=log_level,
use_color=log_colors,
hide_timestamp=False,
message_format=logger_message_format,
)
_LOG.setLevel(log_level)
_DEVICE_LOG.setLevel(log_level)
_ROOT_LOG.setLevel(log_level)
# Init serial port.
if port is None:
port = get_serial_port(product=product, serial_number=serial_number)
if not port:
raise UnknownSerialDevice(
'No --serial-number --product or --port path provided.'
)
serial_instance = Serial(port=port, baudrate=baudrate, timeout=0.1)
detokenizer = detokenize.Detokenizer(
tokens.Database.merged(*databases), show_errors=True
)
# Use pyserial miniterm to monitor recieved data.
miniterm_impl = MinitermBinary
if read_only:
miniterm_impl = MinitermBinaryReadOnly
miniterm = miniterm_impl(
serial_instance,
echo=True,
eol='lf',
filters=(),
)
# Use Ctrl-C as the exit character. (Miniterm default is Ctrl-])
miniterm.exit_character = chr(0x03)
miniterm.set_rx_encoding('utf-8', errors='backslashreplace')
miniterm.set_tx_encoding('utf-8')
log_message_handlers = []
fpga_provisioner = FpgaProvisioner(bitstream_file, serial_instance)
log_message_handlers.append(fpga_provisioner.log_handler)
gonk_log_stream = GonkLogStream(
detokenizer, log_message_handlers=log_message_handlers
)
miniterm.rx_transformations.append(HandleBinaryData(gonk_log_stream))
miniterm.tx_transformations.append(DebugSerialIO())
# Start monitoring serial data.
miniterm.start()
# Wait for ctrl-c, then shutdown miniterm.
try:
miniterm.join(True)
except KeyboardInterrupt:
miniterm.stop()
sys.stderr.write('\n--- exit ---\n')
miniterm.join()
miniterm.close()
return 0
def main():
sys.exit(gonk_main(**vars(_parse_args())))
if __name__ == '__main__':
main()