Add separate factory script and device log files

If no log-file is provided write logs to BUILD_WORKSPACE_DIRECTORY as:
Device logs: ./factory-logs-20240801175511-device.txt
Factory logs: ./factory-logs-20240801175511-operator.txt

Change all print() statements to print_message() which prints then
logs to the new factory log file with colors removed.

Add a re-flash message when failing to connect to a device.

Add the last 5 sensor readings to the title of each _sample_until()
progress bar.

Change-Id: I465f20ec183c0fbea5c03f70a98096b5819d346e
Reviewed-on: https://pigweed-internal-review.git.corp.google.com/c/pigweed/showcase/rp2/+/73669
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Anthony DiGirolamo <tonymd@google.com>
diff --git a/tools/sense/factory.py b/tools/sense/factory.py
index e4df1e9..fd3dbe5 100644
--- a/tools/sense/factory.py
+++ b/tools/sense/factory.py
@@ -18,11 +18,13 @@
 from dataclasses import dataclass
 from datetime import datetime
 from enum import Enum
+import functools
 import logging
 import math
 import os
 from pathlib import Path
 import pwd
+import re
 import sys
 import threading
 import time
@@ -30,7 +32,7 @@
 
 from prompt_toolkit import prompt
 from prompt_toolkit.patch_stdout import patch_stdout
-from prompt_toolkit.formatted_text import ANSI
+from prompt_toolkit.formatted_text import ANSI, to_plain_text
 from prompt_toolkit.shortcuts import ProgressBar
 
 from factory_pb import factory_pb2
@@ -45,7 +47,78 @@
     get_device_connection,
 )
 
-_LOG = logging.getLogger(__file__)
+_LOG = logging.getLogger(__package__)
+_DEVICE_LOG = logging.getLogger('rpc_device')
+_FACTORY_LOG = logging.getLogger('sense_factory')
+
+_COLOR = colors()
+
+_ANSI_SEQUENCE_REGEX = re.compile(r'\x1b[^m]*m')
+
+
+def strip_ansi(text: str):
+    """Strip out ANSI escape sequences."""
+    return _ANSI_SEQUENCE_REGEX.sub('', text)
+
+
+def print_message(*args, **kwargs) -> None:
+    """Print text normally and log the same message to _FACTORY_LOG."""
+    print(*args, **kwargs)
+    text = ''
+    if args:
+        text = strip_ansi(' '.join(args))
+    _FACTORY_LOG.info(text)
+
+
+def prompt_user_action(message: str) -> None:
+    """Prints a prompt for a user action."""
+    print_message(f'{_COLOR.yellow(">>>")} {message}')
+
+
+def prompt_enter(
+    message: str,
+    key_prompt: str = 'Press Enter to continue...',
+) -> None:
+    """Prints a prompt for a user action."""
+
+    with patch_stdout():
+        prompt_fragments = [
+            ('', '\n'),
+            ('ansiyellow', '>>> '),
+        ]
+        prompt_fragments.extend(ANSI(message).__pt_formatted_text__())
+        if key_prompt:
+            prompt_fragments.extend(
+                [
+                    ('', f'\n{key_prompt}'),
+                ]
+            )
+        for line in to_plain_text(prompt_fragments).splitlines():
+            _FACTORY_LOG.info(line)
+        prompt(prompt_fragments)
+
+
+def prompt_yn(message: str) -> bool:
+    """Prints a prompt for a yes/no response from the user."""
+    with patch_stdout():
+        prompt_fragments = [
+            ('', '\n'),
+            ('ansiyellow', '>>> '),
+        ]
+        prompt_fragments.extend(ANSI(message).__pt_formatted_text__())
+        prompt_fragments.extend(
+            [
+                ('', ' [Y/n] '),
+            ]
+        )
+        for line in to_plain_text(prompt_fragments).splitlines():
+            _FACTORY_LOG.info(line)
+        result = prompt(prompt_fragments)
+        _FACTORY_LOG.info('%s', result)
+
+    if not result or result.lower() == 'y':
+        return True
+    return False
 
 
 class TestTimeoutError(Exception):
@@ -91,67 +164,25 @@
         """Runs the hardware test."""
         raise NotImplementedError(self.name)
 
-    def prompt(self, message: str) -> None:
-        """Prints a prompt for a user action."""
-        print(f'{colors().yellow(">>>")} {message}')
-
-    def prompt_enter(
-            self,
-            message: str,
-            key_prompt: str = 'Press Enter to continue...',
-        ) -> None:
-        """Prints a prompt for a user action."""
-
-        with patch_stdout():
-            prompt_fragments = [
-                ('', '\n'),
-                ('ansiyellow', '>>> '),
-            ]
-            prompt_fragments.extend(ANSI(message).__pt_formatted_text__())
-            prompt_fragments.extend(
-                [
-                    ('', f'\n{key_prompt}'),
-                ]
-            )
-            prompt(prompt_fragments)
-
-    def prompt_yn(self, message: str) -> bool:
-        """Prints a prompt for a yes/no response from the user."""
-        with patch_stdout():
-            prompt_fragments = [
-                ('', '\n'),
-                ('ansiyellow', '>>> '),
-            ]
-            prompt_fragments.extend(ANSI(message).__pt_formatted_text__())
-            prompt_fragments.extend(
-                [
-                    ('', ' [Y/n] '),
-                ]
-            )
-            result = prompt(prompt_fragments)
-        if not result or result.lower() == 'y':
-            return True
-        return False
-
     def pass_test(self, name: str) -> None:
         """Records a test as passed."""
         self.executed_tests.append((name, Test.Status.PASS))
         self.passed_tests += 1
-        print(f'{colors().green("PASS:")} {name}')
+        print_message(f'{_COLOR.green("PASS:")} {name}')
         self.blink_led(Color(0, 255, 0))
 
     def fail_test(self, name: str) -> None:
         """Records a test as failed."""
         self.executed_tests.append((name, Test.Status.FAIL))
         self.failed_tests += 1
-        print(f'{colors().bold_red("FAIL:")} {name}')
+        print_message(f'{_COLOR.bold_red("FAIL:")} {name}')
         self.blink_led(Color(255, 0, 0))
 
     def skip_test(self, name: str) -> None:
         """Records a test as skipped."""
         self.executed_tests.append((name, Test.Status.SKIP))
         self.skipped_tests += 1
-        print(f'{colors().yellow("SKIP:")} {name}')
+        print_message(f'{_COLOR.yellow("SKIP:")} {name}')
 
     def set_led(self, color: Color) -> Status:
         """Sets the device's RGB LED to the specified color."""
@@ -218,7 +249,7 @@
 
     def _test_button_press(self, button: Button) -> None:
         test_name = button.name.lower().replace(' ', '_')
-        self.prompt(f'Press {button.name}')
+        prompt_user_action(f'Press {button.name}')
 
         try:
             # Wait for a button press event followed by a release event.
@@ -255,10 +286,10 @@
         status = self.unset_led()
         assert status is Status.OK
 
-        self._test_led(Color(255, 255, 255, 'white', colors().bold_white))
-        self._test_led(Color(255, 0, 0, 'red', colors().red))
-        self._test_led(Color(0, 255, 0, 'green', colors().green))
-        self._test_led(Color(0, 0, 255, 'blue', colors().blue))
+        self._test_led(Color(255, 255, 255, 'white', _COLOR.bold_white))
+        self._test_led(Color(255, 0, 0, 'red', _COLOR.red))
+        self._test_led(Color(0, 255, 0, 'green', _COLOR.green))
+        self._test_led(Color(0, 0, 255, 'blue', _COLOR.blue))
         self._test_led(Color(0, 0, 0, 'off'))
 
         status = self.unset_led()
@@ -271,7 +302,7 @@
 
         self.set_led(color)
 
-        if self.prompt_yn(f'Is the Enviro+ LED {color.formatted_name}?'):
+        if prompt_yn(f'Is the Enviro+ LED {color.formatted_name}?'):
             self.pass_test(test_name)
         else:
             self.fail_test(test_name)
@@ -302,13 +333,15 @@
             f'max: {self.max_value}, mean: {self.mean_value}'
         )
 
-
-    def print_formatted(self, indent: int = 0, units: str | None = None) -> None:
+    def print_formatted(
+        self, indent: int = 0, units: str | None = None
+    ) -> None:
         suffix = units if units is not None else ''
-        print(f'{" " * indent}Samples   {self.count}')
-        print(f'{" " * indent}Min       {self.min_value:.2f}{suffix}')
-        print(f'{" " * indent}Max       {self.max_value:.2f}{suffix}')
-        print(f'{" " * indent}Mean      {self.max_value:.2f}{suffix}')
+        print_message(f'{" " * indent}Samples   {self.count}')
+        print_message(f'{" " * indent}Min       {self.min_value:.2f}{suffix}')
+        print_message(f'{" " * indent}Max       {self.max_value:.2f}{suffix}')
+        print_message(f'{" " * indent}Mean      {self.max_value:.2f}{suffix}')
+
 
 def _sample_until(
     max_samples: int,
@@ -332,8 +365,22 @@
     samples = Samples()
     recent_samples = [None for _ in range(5)]
 
+    def _get_title(message, recent_samples) -> str:
+        """Dynamic progress bar title with the last 5 sample readings."""
+        text = 'Reading'
+        if message:
+            text = message
+        if any(recent_samples):
+            text += ' - '
+            text += ', '.join(
+                f'{i:.2f}' for i in recent_samples if i is not None
+            )
+        return text
+
     with patch_stdout():
-        with ProgressBar(title=message) as pb:
+        with ProgressBar(
+            title=functools.partial(_get_title, message, recent_samples)
+        ) as pb:
             for i in pb(range(max_samples)):
                 status, response = sample_rpc_method()
                 if status is not Status.OK:
@@ -369,7 +416,9 @@
         if status is not Status.OK:
             return False
 
-        print(colors().bold_white('\nSetting LTR559 sensor to proximity mode.'))
+        print_message(
+            _COLOR.bold_white('\nSetting LTR559 sensor to proximity mode.')
+        )
         result = self._test_prox()
 
         status, _ = self._factory_service.EndTest(
@@ -387,7 +436,9 @@
         if status is not Status.OK:
             return False
 
-        print(colors().bold_white('\nSetting LTR559 sensor to ambient mode.'))
+        print_message(
+            _COLOR.bold_white('\nSetting LTR559 sensor to ambient mode.')
+        )
         result = self._test_light()
 
         status, _ = self._factory_service.EndTest(
@@ -399,7 +450,7 @@
         return result
 
     def _test_prox(self) -> bool:
-        self.prompt_enter('Place your Enviro+ pack in a well-lit area')
+        prompt_enter('Place your Enviro+ pack in a well-lit area')
 
         with patch_stdout():
             with ProgressBar(title='Getting initial sensor readings') as pb:
@@ -411,11 +462,11 @@
 
                     baseline_samples.update(response.value)
 
-        print(colors().green(' DONE'))
+        print_message(_COLOR.green(' DONE'))
         baseline_samples.print_formatted(indent=4)
 
-        print()
-        self.prompt_enter('Fully cover the LIGHT sensor')
+        print_message()
+        prompt_enter('Fully cover the LIGHT sensor')
 
         success, samples = _sample_until(
             50,
@@ -429,8 +480,8 @@
             return False
 
         self.pass_test('ltr559_prox_near')
-        print()
-        self.prompt_enter('Fully uncover the LIGHT sensor')
+        print_message()
+        prompt_enter('Fully uncover the LIGHT sensor')
 
         success, samples = _sample_until(
             50,
@@ -446,9 +497,7 @@
         return True
 
     def _test_light(self) -> bool:
-        self.prompt_enter(
-            'Place your Enviro+ pack in an area with neutral light'
-        )
+        prompt_enter('Place your Enviro+ pack in an area with neutral light')
         with patch_stdout():
             with ProgressBar(title='Getting initial sensor readings') as pb:
                 baseline_samples = Samples()
@@ -460,11 +509,11 @@
 
                     baseline_samples.update(response.lux)
 
-        print(colors().green(' DONE'))
+        print_message(_COLOR.green(' DONE'))
         baseline_samples.print_formatted(indent=4, units='lux')
 
-        print()
-        self.prompt_enter('Cover the LIGHT sensor with your finger')
+        print_message()
+        prompt_enter('Cover the LIGHT sensor with your finger')
 
         success, samples = _sample_until(
             100,
@@ -479,8 +528,8 @@
 
         self.pass_test('ltr559_light_dark')
 
-        print()
-        self.prompt_enter('Shine a light directly at the LIGHT sensor')
+        print_message()
+        prompt_enter('Shine a light directly at the LIGHT sensor')
 
         success, samples = _sample_until(
             100,
@@ -496,8 +545,8 @@
 
         self.pass_test('ltr559_light_bright')
 
-        print()
-        self.prompt_enter('Return the LIGHT sensor to its original position')
+        print_message()
+        prompt_enter('Return the LIGHT sensor to its original position')
 
         success, samples = _sample_until(
             100,
@@ -545,16 +594,14 @@
         return gas_result and temp_result
 
     def _test_gas_sensor(self) -> bool:
-        print(
-            colors().bold_white(
-                '\nTesting gas resistance in the BME688 sensor.'
-            )
+        print_message(
+            _COLOR.bold_white('\nTesting gas resistance in the BME688 sensor.')
         )
-        print(
+        print_message(
             "To test the BME688's gas sensor, you need an alcohol-based "
             '\nsolution. E.g. dip a cotton swab in rubbing alcohol.'
         )
-        if not self.prompt_yn('Are you able to continue this test?'):
+        if not prompt_yn('Are you able to continue this test?'):
             self.skip_test('bme688_gas_resistance')
             return True
 
@@ -569,10 +616,10 @@
             delay=0.25,
             message='Getting initial sensor readings',
         )
-        print(colors().green(' DONE'))
+        print_message(_COLOR.green(' DONE'))
         baseline_samples.print_formatted(indent=4)
 
-        self.prompt_enter(
+        prompt_enter(
             'Move the alcohol close to the BME688 sensor.',
             key_prompt='Press Enter to begin measuring...',
         )
@@ -592,7 +639,7 @@
 
         self.pass_test('bme688_gas_resistance_poor')
 
-        self.prompt_enter('Move the alcohol away from the BME688 sensor')
+        prompt_enter('Move the alcohol away from the BME688 sensor')
         success, samples = _sample_until(
             50,
             self._air_sensor_service.Measure,
@@ -611,7 +658,9 @@
         return True
 
     def _test_temperature(self) -> bool:
-        print(colors().bold_white("\nTesting BME688's temperature sensor."))
+        print_message(
+            _COLOR.bold_white("\nTesting BME688's temperature sensor.")
+        )
 
         def log_received(_):
             return False
@@ -624,10 +673,10 @@
             delay=0.25,
             message='Getting initial sensor readings',
         )
-        print(colors().green(' DONE'))
+        print_message(_COLOR.green(' DONE'))
         baseline_samples.print_formatted(indent=4, units='C')
 
-        self.prompt_enter(
+        prompt_enter(
             'Put your finger on the BME688 sensor to increase its temperature',
             key_prompt='Press Enter to begin measuring...',
         )
@@ -648,7 +697,7 @@
 
         self.pass_test('bme688_temperature_hot')
 
-        self.prompt_enter(
+        prompt_enter(
             'Remove your finger from the BME688 sensor',
             key_prompt='Press Enter to begin measuring...',
         )
@@ -677,16 +726,20 @@
     time: datetime
 
     def print_formatted(self):
-        print(f'Operator: {colors().bold_white(self.operator)}')
-        print(f'Date: {colors().bold_white(self.time.strftime("%Y/%m/%d %H:%M:%S"))}')
-        print(f'Device flash ID: {colors().bold_white(f"{self.device_id:x}")}')
+        print_message(f'Operator: {_COLOR.bold_white(self.operator)}')
+        print_message(
+            f'Date: {_COLOR.bold_white(self.time.strftime("%Y/%m/%d %H:%M:%S"))}'
+        )
+        print_message(
+            f'Device flash ID: {_COLOR.bold_white(f"{self.device_id:x}")}'
+        )
 
 
 def _run_tests(run_metadata: FactoryRunMetadata, rpcs) -> bool:
-    print()
-    print('===========================')
-    print('Pigweed Sense Factory Tests')
-    print('===========================')
+    print_message()
+    print_message('===========================')
+    print_message('Pigweed Sense Factory Tests')
+    print_message('===========================')
     run_metadata.print_formatted()
 
     tests_to_run = [
@@ -696,22 +749,22 @@
         Bme688Test(rpcs),
     ]
 
-    print()
-    print(f'{len(tests_to_run)} tests will be performed:')
+    print_message()
+    print_message(f'{len(tests_to_run)} tests will be performed:')
     for test in tests_to_run:
-        print(f'  - {test.name}')
+        print_message(f'  - {test.name}')
 
-    input('\nPress Enter when you are ready to begin. ')
-    print(colors().green('\nStarting hardware tests.'))
+    prompt_enter('Press Enter when you are ready to begin ', key_prompt='')
+    print_message(_COLOR.green('Starting hardware tests.'))
 
     success = True
 
     for num, test in enumerate(tests_to_run):
         start_msg = f'[{num + 1}/{len(tests_to_run)}] Running test {test.name}'
-        print()
-        print('=' * len(start_msg))
-        print(start_msg)
-        print('=' * len(start_msg))
+        print_message()
+        print_message('=' * len(start_msg))
+        print_message(start_msg)
+        print_message('=' * len(start_msg))
 
         if not test.run():
             success = False
@@ -728,10 +781,10 @@
     failed = 0
     skipped = 0
 
-    print()
-    print('============')
-    print('Test Summary')
-    print('============')
+    print_message()
+    print_message('============')
+    print_message('Test Summary')
+    print_message('============')
 
     run_metadata.print_formatted()
 
@@ -740,21 +793,22 @@
         failed += test_suite.failed_tests
         skipped += test_suite.skipped_tests
 
-        print()
-        print(colors().bold_white(test_suite.name))
+        print_message()
+        print_message(_COLOR.bold_white(test_suite.name))
         for name, status in test_suite.executed_tests:
             if status is Test.Status.PASS:
-                print(f'  {colors().green("PASS")} | {name}')
+                print_message(f'  {_COLOR.green("PASS")} | {name}')
             elif status is Test.Status.FAIL:
-                print(f'  {colors().red("FAIL")} | {name}')
+                print_message(f'  {_COLOR.red("FAIL")} | {name}')
             elif status is Test.Status.SKIP:
-                print(f'  {colors().yellow("SKIP")} | {name}')
+                print_message(f'  {_COLOR.yellow("SKIP")} | {name}')
 
-    print(f'\n{passed} tests passed, {failed} tests failed', end='')
+    test_result_message = f'\n{passed} tests passed, {failed} tests failed'
     if skipped > 0:
-        print(f', {skipped} tests skipped', end='')
-    print('.')
-    print('=' * 40)
+        test_result_message += f', {skipped} tests skipped'
+    test_result_message += '.'
+    print_message(test_result_message)
+    print_message('=' * 40)
 
 
 def _parse_args() -> argparse.Namespace:
@@ -772,12 +826,30 @@
     now = datetime.now()
     if log_file is None:
         run_time = now.strftime('%Y%m%d%H%M%S')
-        log_file = Path(f'factory-logs-{run_time}.txt')
+        log_directory = Path(os.environ.get('BUILD_WORKSPACE_DIRECTORY', '.'))
+        device_log_file = log_directory / f'factory-logs-{run_time}-device.txt'
+        factory_log_file = (
+            log_directory / f'factory-logs-{run_time}-operator.txt'
+        )
+    else:
+        device_log_file = Path(f'{log_file.stem}-device{log_file.suffix}')
+        factory_log_file = Path(f'{log_file.stem}-operator{log_file.suffix}')
+
+    _DEVICE_LOG.propagate = False
+    pw_cli.log.install(
+        level=logging.DEBUG,
+        use_color=False,
+        log_file=device_log_file,
+        logger=_DEVICE_LOG,
+    )
 
     pw_cli.log.install(
         level=logging.DEBUG,
         use_color=False,
-        log_file=log_file,
+        hide_timestamp=False,
+        message_format='%(message)s',
+        log_file=factory_log_file,
+        logger=_FACTORY_LOG,
     )
 
     device_connection = get_device_connection(setup_logging=False)
@@ -786,32 +858,41 @@
 
     # Open the connection to the device.
     with device_connection as device:
-        print('Connected to attached device')
+        print_message('Connected to attached device')
+
+        print_message(f'Device logs: {device_log_file.resolve()}')
+        print_message(f'Factory logs: {factory_log_file.resolve()}')
 
         try:
             _, device_info = device.rpcs.factory.Factory.GetDeviceInfo()
         except RpcError as e:
             if e.status is Status.NOT_FOUND:
-                print(
-                    colors().red(
+                print_message(
+                    _COLOR.red(
                         'No factory service exists on the connected device.'
                     ),
                     file=sys.stderr,
                 )
-                print('', file=sys.stderr)
-                print(
+                print_message('', file=sys.stderr)
+                print_message(
                     'Make sure that your Pico device is running an up-to-date factory app:',
                     file=sys.stderr,
                 )
-                print('', file=sys.stderr)
-                print('  bazelisk run //apps/factory:flash', file=sys.stderr)
-                print('', file=sys.stderr)
+                print_message('', file=sys.stderr)
+                print_message(
+                    '  bazelisk run //apps/factory:flash', file=sys.stderr
+                )
+                print_message('', file=sys.stderr)
             else:
-                print(f'Failed to query device info: {e}', file=sys.stderr)
+                print_message(
+                    f'Failed to query device info: {e}', file=sys.stderr
+                )
             return 1
         except RpcTimeout as e:
-            print(
-                f'Timed out while trying to query device info: {e}',
+            print_message(
+                f'Timed out while trying to query device info: {e}\n\n'
+                'Please re-flash your device and try again:\n'
+                '  bazelisk run //apps/factory:flash',
                 file=sys.stderr,
             )
             return 1
@@ -825,9 +906,10 @@
         except KeyboardInterrupt:
             # Turn off the LED if it was on when tests were interrupted.
             device.rpcs.blinky.Blinky.SetRgb(hex=0, brightness=0)
-            print('\nCtrl-C detected, exiting.')
+            print_message('\nCtrl-C detected, exiting.')
 
-    print(f'Device logs written to {log_file.resolve()}')
+    print_message(f'Device logs written to {device_log_file.resolve()}')
+    print_message(f'Factory logs written to {factory_log_file.resolve()}')
     return exit_code