blob: 9699bcd843183d54664b192ec5577c7739484da5 [file] [log] [blame]
# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Write a binary over serial to provision the Gonk FPGA."""
import argparse
from datetime import datetime
import importlib.resources
from itertools import islice
import operator
from pathlib import Path
import sys
import time
from typing import Optional
import serial
from serial import Serial
from serial.tools.list_ports import comports
from serial.tools.miniterm import Miniterm, Transform
from google.protobuf.message import DecodeError
import pw_cli.color
from gonk_adc.adc_measurement_pb2 import FramedProto
_COLOR = pw_cli.color.colors()
_BUNDLED_FPGA_BINFILE = ''
_BUNDLED_FPGA_BINFILE_NAME = 'toplevel.bin'
# Check for a bundled FPGA bitstream file.
try:
with importlib.resources.as_file(
importlib.resources.files('gonk_fpga') /
_BUNDLED_FPGA_BINFILE_NAME) as bin_path:
if bin_path.is_file():
_BUNDLED_FPGA_BINFILE = _BUNDLED_FPGA_BINFILE_NAME
except ModuleNotFoundError:
pass
def _parse_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-p',
'--port',
type=str,
help='Serial port path.',
)
parser.add_argument(
'-b',
'--baudrate',
type=int,
default=2000000,
help='Sets the baudrate for serial communication.',
)
parser.add_argument(
'--product',
default='GENERIC_F730R8TX',
help='Use first serial port matching this product name.',
)
parser.add_argument(
'--serial-number',
help='Use the first serial port matching this number.',
)
parser.add_argument(
'--bitstream-file',
type=Path,
help=('FPGA Bitstream file. Can be a filesystem path or "DEFAULT" to '
'use a Gonk Python tools bundled binary file.'),
)
return parser.parse_args()
def get_serial_port(
product: Optional[str] = None,
serial_number: Optional[str] = None,
) -> str:
"""Return serial ports that match give serial numbers or product names."""
ports = sorted(comports(), key=operator.attrgetter('device'))
# Return devices matching serial numbers first.
for port in ports:
if (serial_number is not None and port.serial_number is not None
and serial_number in port.serial_number):
return port.device
# If no matching serial numbers, check for matching product names.
for port in ports:
if (product is not None and port.product is not None
and product in port.product):
return port.device
# No matches found.
return ''
FILE_LENGTH = 135100
FILE_START_STR = 'FF 00 00 FF'
SYNC_START_STR = '7E AA 99 7E'
FILE_START_BYTES = bytes.fromhex(FILE_START_STR)
SYNC_START_BYTES = bytes.fromhex(SYNC_START_STR)
BIN_LOG_SYNC_START_STR = '0d 99 bb aa 7e'
BIN_LOG_SYNC_START_BYTES = bytes.fromhex(BIN_LOG_SYNC_START_STR)
class UnknownSerialDevice(Exception):
"""Exception raised when no device is specified."""
class IncorrectBinaryFormat(Exception):
"""Exception raised when FPGA bitstream file is in an unexpected format."""
def batched(iterable, n):
"""Batch data into tuples of length n. The last batch may be shorter.
Example usage:
.. code-block:: pycon
>>> list(batched('ABCDEFG', 3))
['ABC', 'DEF', 'G']
"""
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
class HandleBinaryData(Transform):
"""Miniterm transform to handle incoming byte data."""
def __init__(self) -> None:
self.data = bytes()
self.timestamp_prefix = ' delta_microseconds:'
self.vbus_prefix = ' vbus:'
self.vshunt_prefix = ' vshunt:'
self.start_time = time.time()
self.time_format = '%Y%m%d %H:%M:%S.%f'
self.binary_format_started = False
def _parse_proto(self, proto_bytes: bytes) -> str:
# Parse the proto message.
try:
framed_proto = FramedProto()
framed_proto.ParseFromString(proto_bytes)
vbus_values = []
vshunt_values = []
for adc_measure in framed_proto.payload.adc_measurements:
vbus_values.append(adc_measure.vbus_value)
vshunt_values.append(adc_measure.vshunt_value)
# TODO(tonymd): Use Python logging and separate output file
output = [
# Host time
datetime.now().strftime(self.time_format),
# Update byte size
f' size: {str(len(proto_bytes))} ',
self.timestamp_prefix,
# Delta microseconds
str(framed_proto.payload.timestamp),
# Vshunt values
self.vshunt_prefix,
','.join(str(value) for value in vshunt_values),
# Vbus values
self.vbus_prefix,
','.join(str(value) for value in vbus_values),
]
return ' '.join(output) + '\n'
except DecodeError:
# TODO(tonymd): Handle failed packets.
return 'FramedProto.DecodeError\n'
def rx(self, text, data=None):
"""Text received from the serial port."""
if not data:
return text
# Concat data with any existing captures
self.data = self.data + data
# Check for binary start bytes.
if BIN_LOG_SYNC_START_BYTES in self.data:
self.binary_format_started = True
if not self.binary_format_started:
return text
# Check for a complete proto capture.
sections = self.data.split(BIN_LOG_SYNC_START_BYTES, maxsplit=2)
if len(sections) < 3:
# Don't print anything / filter out this text
return ''
# Section 0 contains previous bytes not between two proto start bytes.
_discard_bytes = sections[0]
# Section 1 is a complete proto packet.
proto_bytes = BIN_LOG_SYNC_START_BYTES + sections[1]
# Done, reset self.data to the remaining bytes minus the above packet.
self.data = BIN_LOG_SYNC_START_BYTES + sections[2]
return self._parse_proto(proto_bytes)
def tx(self, text):
"""Text to be sent to the serial port."""
return text
def echo(self, text):
"""Text to be sent but displayed on console."""
return text
class MinitermBinary(Miniterm):
def reader(self):
"""loop and copy serial->console"""
# pylint: disable=too-many-nested-blocks
try:
while self.alive and self._reader_alive:
# Read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
if isinstance(transformation, HandleBinaryData):
text = transformation.rx(text, data)
else:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise # XXX handle instead of re-raise?
def load_bitstream_file(bitstream_file: Path) -> bytes:
"""Check for valid bitstream file and load it as bytes."""
bitstream_bytes = b''
if str(bitstream_file) == 'DEFAULT':
if not _BUNDLED_FPGA_BINFILE:
raise FileNotFoundError('No default bitstream file is available.')
bitstream_path = importlib.resources.files('gonk_fpga').joinpath(
_BUNDLED_FPGA_BINFILE)
bitstream_bytes = bitstream_path.read_bytes()
else:
if not bitstream_file.is_file():
raise FileNotFoundError(f'\nUnable to load "{bitstream_file}"')
bitstream_bytes = bitstream_file.read_bytes()
if (len(bitstream_bytes) != 135100
or bitstream_bytes[0:8] != FILE_START_BYTES + SYNC_START_BYTES):
raise IncorrectBinaryFormat(
f'the bitstream file must be:\n'
f' {FILE_LENGTH} bytes in length\n'
f' Start with "{FILE_START_STR} {SYNC_START_STR}"')
return bitstream_bytes
def write_bitstream_file(bitstream_bytes: bytes,
serial_instance: Serial) -> None:
"""Write a series of bytes to serial."""
# Write out the bitstream in batches.
print('Sending bitstream...')
written_bytes: int = 0
for byte_batch in batched(bitstream_bytes, 8):
result = serial_instance.write(byte_batch)
if result:
written_bytes += result
serial_instance.flush()
print(f'Done sending bitstream. Wrote {written_bytes}')
def main(
baudrate: int,
bitstream_file: Optional[Path],
port: Optional[str] = None,
product: Optional[str] = None,
serial_number: Optional[str] = None,
) -> int:
"""Write a bitstream file over serial while monitoring output."""
# Init serial port.
if port is None:
port = get_serial_port(product=product, serial_number=serial_number)
if not port:
raise UnknownSerialDevice(
'No --serial-number --product or --port path provided.')
serial_instance = Serial(port=port, baudrate=baudrate, timeout=0.1)
# Use pyserial miniterm to monitor recieved data.
miniterm = MinitermBinary(
serial_instance,
echo=False,
eol='crlf',
filters=(),
)
# Use Ctrl-C as the exit character. (Miniterm default is Ctrl-])
miniterm.exit_character = chr(0x03)
miniterm.set_rx_encoding('utf-8', errors='backslashreplace')
miniterm.set_tx_encoding('utf-8')
miniterm.rx_transformations.append(HandleBinaryData())
# Start monitoring serial data.
miniterm.start()
# Send the bitstream_file.
if bitstream_file:
bitstream_bytes = load_bitstream_file(bitstream_file)
# Wait a couple seconds to print early log messages from Gonk.
time.sleep(2)
write_bitstream_file(bitstream_bytes, serial_instance)
# Wait for ctrl-c, then shutdown miniterm.
try:
miniterm.join(True)
except KeyboardInterrupt:
pass
sys.stderr.write('\n--- exit ---\n')
miniterm.join()
miniterm.close()
return 0
if __name__ == '__main__':
sys.exit(main(**vars(_parse_args())))