Add separate factory script and device log files
If no log-file is provided write logs to BUILD_WORKSPACE_DIRECTORY as:
Device logs: ./factory-logs-20240801175511-device.txt
Factory logs: ./factory-logs-20240801175511-operator.txt
Change all print() statements to print_message() which prints then
logs to the new factory log file with colors removed.
Add a re-flash message when failing to connect to a device.
Add the last 5 sensor readings to the title of each _sample_until()
progress bar.
Change-Id: I465f20ec183c0fbea5c03f70a98096b5819d346e
Reviewed-on: https://pigweed-internal-review.git.corp.google.com/c/pigweed/showcase/rp2/+/73669
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Anthony DiGirolamo <tonymd@google.com>
diff --git a/tools/sense/factory.py b/tools/sense/factory.py
index e4df1e9..fd3dbe5 100644
--- a/tools/sense/factory.py
+++ b/tools/sense/factory.py
@@ -18,11 +18,13 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
+import functools
import logging
import math
import os
from pathlib import Path
import pwd
+import re
import sys
import threading
import time
@@ -30,7 +32,7 @@
from prompt_toolkit import prompt
from prompt_toolkit.patch_stdout import patch_stdout
-from prompt_toolkit.formatted_text import ANSI
+from prompt_toolkit.formatted_text import ANSI, to_plain_text
from prompt_toolkit.shortcuts import ProgressBar
from factory_pb import factory_pb2
@@ -45,7 +47,78 @@
get_device_connection,
)
-_LOG = logging.getLogger(__file__)
+_LOG = logging.getLogger(__package__)
+_DEVICE_LOG = logging.getLogger('rpc_device')
+_FACTORY_LOG = logging.getLogger('sense_factory')
+
+_COLOR = colors()
+
+_ANSI_SEQUENCE_REGEX = re.compile(r'\x1b[^m]*m')
+
+
+def strip_ansi(text: str):
+ """Strip out ANSI escape sequences."""
+ return _ANSI_SEQUENCE_REGEX.sub('', text)
+
+
+def print_message(*args, **kwargs) -> None:
+ """Print text normally and log the same message to _FACTORY_LOG."""
+ print(*args, **kwargs)
+ text = ''
+ if args:
+ text = strip_ansi(' '.join(args))
+ _FACTORY_LOG.info(text)
+
+
+def prompt_user_action(message: str) -> None:
+ """Prints a prompt for a user action."""
+ print_message(f'{_COLOR.yellow(">>>")} {message}')
+
+
+def prompt_enter(
+ message: str,
+ key_prompt: str = 'Press Enter to continue...',
+) -> None:
+ """Prints a prompt for a user action."""
+
+ with patch_stdout():
+ prompt_fragments = [
+ ('', '\n'),
+ ('ansiyellow', '>>> '),
+ ]
+ prompt_fragments.extend(ANSI(message).__pt_formatted_text__())
+ if key_prompt:
+ prompt_fragments.extend(
+ [
+ ('', f'\n{key_prompt}'),
+ ]
+ )
+ for line in to_plain_text(prompt_fragments).splitlines():
+ _FACTORY_LOG.info(line)
+ prompt(prompt_fragments)
+
+
+def prompt_yn(message: str) -> bool:
+ """Prints a prompt for a yes/no response from the user."""
+ with patch_stdout():
+ prompt_fragments = [
+ ('', '\n'),
+ ('ansiyellow', '>>> '),
+ ]
+ prompt_fragments.extend(ANSI(message).__pt_formatted_text__())
+ prompt_fragments.extend(
+ [
+ ('', ' [Y/n] '),
+ ]
+ )
+ for line in to_plain_text(prompt_fragments).splitlines():
+ _FACTORY_LOG.info(line)
+ result = prompt(prompt_fragments)
+ _FACTORY_LOG.info('%s', result)
+
+ if not result or result.lower() == 'y':
+ return True
+ return False
class TestTimeoutError(Exception):
@@ -91,67 +164,25 @@
"""Runs the hardware test."""
raise NotImplementedError(self.name)
- def prompt(self, message: str) -> None:
- """Prints a prompt for a user action."""
- print(f'{colors().yellow(">>>")} {message}')
-
- def prompt_enter(
- self,
- message: str,
- key_prompt: str = 'Press Enter to continue...',
- ) -> None:
- """Prints a prompt for a user action."""
-
- with patch_stdout():
- prompt_fragments = [
- ('', '\n'),
- ('ansiyellow', '>>> '),
- ]
- prompt_fragments.extend(ANSI(message).__pt_formatted_text__())
- prompt_fragments.extend(
- [
- ('', f'\n{key_prompt}'),
- ]
- )
- prompt(prompt_fragments)
-
- def prompt_yn(self, message: str) -> bool:
- """Prints a prompt for a yes/no response from the user."""
- with patch_stdout():
- prompt_fragments = [
- ('', '\n'),
- ('ansiyellow', '>>> '),
- ]
- prompt_fragments.extend(ANSI(message).__pt_formatted_text__())
- prompt_fragments.extend(
- [
- ('', ' [Y/n] '),
- ]
- )
- result = prompt(prompt_fragments)
- if not result or result.lower() == 'y':
- return True
- return False
-
def pass_test(self, name: str) -> None:
"""Records a test as passed."""
self.executed_tests.append((name, Test.Status.PASS))
self.passed_tests += 1
- print(f'{colors().green("PASS:")} {name}')
+ print_message(f'{_COLOR.green("PASS:")} {name}')
self.blink_led(Color(0, 255, 0))
def fail_test(self, name: str) -> None:
"""Records a test as failed."""
self.executed_tests.append((name, Test.Status.FAIL))
self.failed_tests += 1
- print(f'{colors().bold_red("FAIL:")} {name}')
+ print_message(f'{_COLOR.bold_red("FAIL:")} {name}')
self.blink_led(Color(255, 0, 0))
def skip_test(self, name: str) -> None:
"""Records a test as skipped."""
self.executed_tests.append((name, Test.Status.SKIP))
self.skipped_tests += 1
- print(f'{colors().yellow("SKIP:")} {name}')
+ print_message(f'{_COLOR.yellow("SKIP:")} {name}')
def set_led(self, color: Color) -> Status:
"""Sets the device's RGB LED to the specified color."""
@@ -218,7 +249,7 @@
def _test_button_press(self, button: Button) -> None:
test_name = button.name.lower().replace(' ', '_')
- self.prompt(f'Press {button.name}')
+ prompt_user_action(f'Press {button.name}')
try:
# Wait for a button press event followed by a release event.
@@ -255,10 +286,10 @@
status = self.unset_led()
assert status is Status.OK
- self._test_led(Color(255, 255, 255, 'white', colors().bold_white))
- self._test_led(Color(255, 0, 0, 'red', colors().red))
- self._test_led(Color(0, 255, 0, 'green', colors().green))
- self._test_led(Color(0, 0, 255, 'blue', colors().blue))
+ self._test_led(Color(255, 255, 255, 'white', _COLOR.bold_white))
+ self._test_led(Color(255, 0, 0, 'red', _COLOR.red))
+ self._test_led(Color(0, 255, 0, 'green', _COLOR.green))
+ self._test_led(Color(0, 0, 255, 'blue', _COLOR.blue))
self._test_led(Color(0, 0, 0, 'off'))
status = self.unset_led()
@@ -271,7 +302,7 @@
self.set_led(color)
- if self.prompt_yn(f'Is the Enviro+ LED {color.formatted_name}?'):
+ if prompt_yn(f'Is the Enviro+ LED {color.formatted_name}?'):
self.pass_test(test_name)
else:
self.fail_test(test_name)
@@ -302,13 +333,15 @@
f'max: {self.max_value}, mean: {self.mean_value}'
)
-
- def print_formatted(self, indent: int = 0, units: str | None = None) -> None:
+ def print_formatted(
+ self, indent: int = 0, units: str | None = None
+ ) -> None:
suffix = units if units is not None else ''
- print(f'{" " * indent}Samples {self.count}')
- print(f'{" " * indent}Min {self.min_value:.2f}{suffix}')
- print(f'{" " * indent}Max {self.max_value:.2f}{suffix}')
- print(f'{" " * indent}Mean {self.max_value:.2f}{suffix}')
+ print_message(f'{" " * indent}Samples {self.count}')
+ print_message(f'{" " * indent}Min {self.min_value:.2f}{suffix}')
+ print_message(f'{" " * indent}Max {self.max_value:.2f}{suffix}')
+ print_message(f'{" " * indent}Mean {self.max_value:.2f}{suffix}')
+
def _sample_until(
max_samples: int,
@@ -332,8 +365,22 @@
samples = Samples()
recent_samples = [None for _ in range(5)]
+ def _get_title(message, recent_samples) -> str:
+ """Dynamic progress bar title with the last 5 sample readings."""
+ text = 'Reading'
+ if message:
+ text = message
+ if any(recent_samples):
+ text += ' - '
+ text += ', '.join(
+ f'{i:.2f}' for i in recent_samples if i is not None
+ )
+ return text
+
with patch_stdout():
- with ProgressBar(title=message) as pb:
+ with ProgressBar(
+ title=functools.partial(_get_title, message, recent_samples)
+ ) as pb:
for i in pb(range(max_samples)):
status, response = sample_rpc_method()
if status is not Status.OK:
@@ -369,7 +416,9 @@
if status is not Status.OK:
return False
- print(colors().bold_white('\nSetting LTR559 sensor to proximity mode.'))
+ print_message(
+ _COLOR.bold_white('\nSetting LTR559 sensor to proximity mode.')
+ )
result = self._test_prox()
status, _ = self._factory_service.EndTest(
@@ -387,7 +436,9 @@
if status is not Status.OK:
return False
- print(colors().bold_white('\nSetting LTR559 sensor to ambient mode.'))
+ print_message(
+ _COLOR.bold_white('\nSetting LTR559 sensor to ambient mode.')
+ )
result = self._test_light()
status, _ = self._factory_service.EndTest(
@@ -399,7 +450,7 @@
return result
def _test_prox(self) -> bool:
- self.prompt_enter('Place your Enviro+ pack in a well-lit area')
+ prompt_enter('Place your Enviro+ pack in a well-lit area')
with patch_stdout():
with ProgressBar(title='Getting initial sensor readings') as pb:
@@ -411,11 +462,11 @@
baseline_samples.update(response.value)
- print(colors().green(' DONE'))
+ print_message(_COLOR.green(' DONE'))
baseline_samples.print_formatted(indent=4)
- print()
- self.prompt_enter('Fully cover the LIGHT sensor')
+ print_message()
+ prompt_enter('Fully cover the LIGHT sensor')
success, samples = _sample_until(
50,
@@ -429,8 +480,8 @@
return False
self.pass_test('ltr559_prox_near')
- print()
- self.prompt_enter('Fully uncover the LIGHT sensor')
+ print_message()
+ prompt_enter('Fully uncover the LIGHT sensor')
success, samples = _sample_until(
50,
@@ -446,9 +497,7 @@
return True
def _test_light(self) -> bool:
- self.prompt_enter(
- 'Place your Enviro+ pack in an area with neutral light'
- )
+ prompt_enter('Place your Enviro+ pack in an area with neutral light')
with patch_stdout():
with ProgressBar(title='Getting initial sensor readings') as pb:
baseline_samples = Samples()
@@ -460,11 +509,11 @@
baseline_samples.update(response.lux)
- print(colors().green(' DONE'))
+ print_message(_COLOR.green(' DONE'))
baseline_samples.print_formatted(indent=4, units='lux')
- print()
- self.prompt_enter('Cover the LIGHT sensor with your finger')
+ print_message()
+ prompt_enter('Cover the LIGHT sensor with your finger')
success, samples = _sample_until(
100,
@@ -479,8 +528,8 @@
self.pass_test('ltr559_light_dark')
- print()
- self.prompt_enter('Shine a light directly at the LIGHT sensor')
+ print_message()
+ prompt_enter('Shine a light directly at the LIGHT sensor')
success, samples = _sample_until(
100,
@@ -496,8 +545,8 @@
self.pass_test('ltr559_light_bright')
- print()
- self.prompt_enter('Return the LIGHT sensor to its original position')
+ print_message()
+ prompt_enter('Return the LIGHT sensor to its original position')
success, samples = _sample_until(
100,
@@ -545,16 +594,14 @@
return gas_result and temp_result
def _test_gas_sensor(self) -> bool:
- print(
- colors().bold_white(
- '\nTesting gas resistance in the BME688 sensor.'
- )
+ print_message(
+ _COLOR.bold_white('\nTesting gas resistance in the BME688 sensor.')
)
- print(
+ print_message(
"To test the BME688's gas sensor, you need an alcohol-based "
'\nsolution. E.g. dip a cotton swab in rubbing alcohol.'
)
- if not self.prompt_yn('Are you able to continue this test?'):
+ if not prompt_yn('Are you able to continue this test?'):
self.skip_test('bme688_gas_resistance')
return True
@@ -569,10 +616,10 @@
delay=0.25,
message='Getting initial sensor readings',
)
- print(colors().green(' DONE'))
+ print_message(_COLOR.green(' DONE'))
baseline_samples.print_formatted(indent=4)
- self.prompt_enter(
+ prompt_enter(
'Move the alcohol close to the BME688 sensor.',
key_prompt='Press Enter to begin measuring...',
)
@@ -592,7 +639,7 @@
self.pass_test('bme688_gas_resistance_poor')
- self.prompt_enter('Move the alcohol away from the BME688 sensor')
+ prompt_enter('Move the alcohol away from the BME688 sensor')
success, samples = _sample_until(
50,
self._air_sensor_service.Measure,
@@ -611,7 +658,9 @@
return True
def _test_temperature(self) -> bool:
- print(colors().bold_white("\nTesting BME688's temperature sensor."))
+ print_message(
+ _COLOR.bold_white("\nTesting BME688's temperature sensor.")
+ )
def log_received(_):
return False
@@ -624,10 +673,10 @@
delay=0.25,
message='Getting initial sensor readings',
)
- print(colors().green(' DONE'))
+ print_message(_COLOR.green(' DONE'))
baseline_samples.print_formatted(indent=4, units='C')
- self.prompt_enter(
+ prompt_enter(
'Put your finger on the BME688 sensor to increase its temperature',
key_prompt='Press Enter to begin measuring...',
)
@@ -648,7 +697,7 @@
self.pass_test('bme688_temperature_hot')
- self.prompt_enter(
+ prompt_enter(
'Remove your finger from the BME688 sensor',
key_prompt='Press Enter to begin measuring...',
)
@@ -677,16 +726,20 @@
time: datetime
def print_formatted(self):
- print(f'Operator: {colors().bold_white(self.operator)}')
- print(f'Date: {colors().bold_white(self.time.strftime("%Y/%m/%d %H:%M:%S"))}')
- print(f'Device flash ID: {colors().bold_white(f"{self.device_id:x}")}')
+ print_message(f'Operator: {_COLOR.bold_white(self.operator)}')
+ print_message(
+ f'Date: {_COLOR.bold_white(self.time.strftime("%Y/%m/%d %H:%M:%S"))}'
+ )
+ print_message(
+ f'Device flash ID: {_COLOR.bold_white(f"{self.device_id:x}")}'
+ )
def _run_tests(run_metadata: FactoryRunMetadata, rpcs) -> bool:
- print()
- print('===========================')
- print('Pigweed Sense Factory Tests')
- print('===========================')
+ print_message()
+ print_message('===========================')
+ print_message('Pigweed Sense Factory Tests')
+ print_message('===========================')
run_metadata.print_formatted()
tests_to_run = [
@@ -696,22 +749,22 @@
Bme688Test(rpcs),
]
- print()
- print(f'{len(tests_to_run)} tests will be performed:')
+ print_message()
+ print_message(f'{len(tests_to_run)} tests will be performed:')
for test in tests_to_run:
- print(f' - {test.name}')
+ print_message(f' - {test.name}')
- input('\nPress Enter when you are ready to begin. ')
- print(colors().green('\nStarting hardware tests.'))
+ prompt_enter('Press Enter when you are ready to begin ', key_prompt='')
+ print_message(_COLOR.green('Starting hardware tests.'))
success = True
for num, test in enumerate(tests_to_run):
start_msg = f'[{num + 1}/{len(tests_to_run)}] Running test {test.name}'
- print()
- print('=' * len(start_msg))
- print(start_msg)
- print('=' * len(start_msg))
+ print_message()
+ print_message('=' * len(start_msg))
+ print_message(start_msg)
+ print_message('=' * len(start_msg))
if not test.run():
success = False
@@ -728,10 +781,10 @@
failed = 0
skipped = 0
- print()
- print('============')
- print('Test Summary')
- print('============')
+ print_message()
+ print_message('============')
+ print_message('Test Summary')
+ print_message('============')
run_metadata.print_formatted()
@@ -740,21 +793,22 @@
failed += test_suite.failed_tests
skipped += test_suite.skipped_tests
- print()
- print(colors().bold_white(test_suite.name))
+ print_message()
+ print_message(_COLOR.bold_white(test_suite.name))
for name, status in test_suite.executed_tests:
if status is Test.Status.PASS:
- print(f' {colors().green("PASS")} | {name}')
+ print_message(f' {_COLOR.green("PASS")} | {name}')
elif status is Test.Status.FAIL:
- print(f' {colors().red("FAIL")} | {name}')
+ print_message(f' {_COLOR.red("FAIL")} | {name}')
elif status is Test.Status.SKIP:
- print(f' {colors().yellow("SKIP")} | {name}')
+ print_message(f' {_COLOR.yellow("SKIP")} | {name}')
- print(f'\n{passed} tests passed, {failed} tests failed', end='')
+ test_result_message = f'\n{passed} tests passed, {failed} tests failed'
if skipped > 0:
- print(f', {skipped} tests skipped', end='')
- print('.')
- print('=' * 40)
+ test_result_message += f', {skipped} tests skipped'
+ test_result_message += '.'
+ print_message(test_result_message)
+ print_message('=' * 40)
def _parse_args() -> argparse.Namespace:
@@ -772,12 +826,30 @@
now = datetime.now()
if log_file is None:
run_time = now.strftime('%Y%m%d%H%M%S')
- log_file = Path(f'factory-logs-{run_time}.txt')
+ log_directory = Path(os.environ.get('BUILD_WORKSPACE_DIRECTORY', '.'))
+ device_log_file = log_directory / f'factory-logs-{run_time}-device.txt'
+ factory_log_file = (
+ log_directory / f'factory-logs-{run_time}-operator.txt'
+ )
+ else:
+ device_log_file = Path(f'{log_file.stem}-device{log_file.suffix}')
+ factory_log_file = Path(f'{log_file.stem}-operator{log_file.suffix}')
+
+ _DEVICE_LOG.propagate = False
+ pw_cli.log.install(
+ level=logging.DEBUG,
+ use_color=False,
+ log_file=device_log_file,
+ logger=_DEVICE_LOG,
+ )
pw_cli.log.install(
level=logging.DEBUG,
use_color=False,
- log_file=log_file,
+ hide_timestamp=False,
+ message_format='%(message)s',
+ log_file=factory_log_file,
+ logger=_FACTORY_LOG,
)
device_connection = get_device_connection(setup_logging=False)
@@ -786,32 +858,41 @@
# Open the connection to the device.
with device_connection as device:
- print('Connected to attached device')
+ print_message('Connected to attached device')
+
+ print_message(f'Device logs: {device_log_file.resolve()}')
+ print_message(f'Factory logs: {factory_log_file.resolve()}')
try:
_, device_info = device.rpcs.factory.Factory.GetDeviceInfo()
except RpcError as e:
if e.status is Status.NOT_FOUND:
- print(
- colors().red(
+ print_message(
+ _COLOR.red(
'No factory service exists on the connected device.'
),
file=sys.stderr,
)
- print('', file=sys.stderr)
- print(
+ print_message('', file=sys.stderr)
+ print_message(
'Make sure that your Pico device is running an up-to-date factory app:',
file=sys.stderr,
)
- print('', file=sys.stderr)
- print(' bazelisk run //apps/factory:flash', file=sys.stderr)
- print('', file=sys.stderr)
+ print_message('', file=sys.stderr)
+ print_message(
+ ' bazelisk run //apps/factory:flash', file=sys.stderr
+ )
+ print_message('', file=sys.stderr)
else:
- print(f'Failed to query device info: {e}', file=sys.stderr)
+ print_message(
+ f'Failed to query device info: {e}', file=sys.stderr
+ )
return 1
except RpcTimeout as e:
- print(
- f'Timed out while trying to query device info: {e}',
+ print_message(
+ f'Timed out while trying to query device info: {e}\n\n'
+ 'Please re-flash your device and try again:\n'
+ ' bazelisk run //apps/factory:flash',
file=sys.stderr,
)
return 1
@@ -825,9 +906,10 @@
except KeyboardInterrupt:
# Turn off the LED if it was on when tests were interrupted.
device.rpcs.blinky.Blinky.SetRgb(hex=0, brightness=0)
- print('\nCtrl-C detected, exiting.')
+ print_message('\nCtrl-C detected, exiting.')
- print(f'Device logs written to {log_file.resolve()}')
+ print_message(f'Device logs written to {device_log_file.resolve()}')
+ print_message(f'Factory logs written to {factory_log_file.resolve()}')
return exit_code