#!/usr/bin/env python
# Copyright 2020 The Pigweed Authors
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Simple example script that uses pw_rpc."""
import argparse
import logging
import os
from pathlib import Path
import sys
from typing import Any, Callable, List, Tuple
import serial # type: ignore
import pw_cli.log
from pw_hdlc.rpc import HdlcRpcClient, default_channels
from pw_tokenizer.detokenize import AutoUpdatingDetokenizer, detokenize_base64
# Point the script to the .proto file with our RPC services.
Path(os.environ['PW_ROOT'], 'pw_rpc', 'echo.proto'),
Path(os.environ['PW_PROJECT_ROOT'], 'workshop', '03-rpc',
'remoticon_proto', 'remoticon.proto'),
_LOG = logging.getLogger('factory_test_example')
class TestFailure(Exception):
"""Indicates that a test failed."""
TestFunction = Callable[['TestContext'], Any]
FailedTests = List[Tuple[TestFunction, TestFailure]]
# Set up a detokenizer so logs can be printed in plain text.
_detokenizer = AutoUpdatingDetokenizer(
Path(__file__).parent / 'tokenizer_database.csv')
def _detokenize_and_print(data: bytes):
print(detokenize_base64(_detokenizer, data).decode(errors='replace'))
class TestContext:
"""Used to track the state of ongoing tests."""
def __init__(self, client: HdlcRpcClient):
self.client = client
self.passed: List[TestFunction] = []
self.failed: FailedTests = []
self._current_test: str = ''
def rpcs(self):
"""Returns the RPC accessor object for the device."""
return self.client.rpcs()
def run(self, test: TestFunction) -> None:
"""Runs a function as a factory test."""
try:'%s: Running test', test.__name__)
self.passed.append(test)'%s: PASSED ', test.__name__)
except TestFailure as failure:
self.failed.append((test, failure))'%s: FAILED - %s', test.__name__, failure)
def log_results(self):
"""Write results to the log."""
total = len(self.passed) + len(self.failed)'%d/%d tests passed', len(self.passed), total)
if self.failed:
_LOG.error('%d/%d tests FAILED', len(self.failed), total)
for test, failure in self.failed:
_LOG.warning(' %s: %s', test.__name__, failure)
def expect_ok(result: tuple):
"""Helper to check the result of a unary RPC."""
status, value = result
if status.ok():
return value
raise TestFailure(str(status))
def test_echo(ctx: TestContext) -> None:
"""Tests that the device responds to RPCs as expected."""
for test_str in ['', 'abc', 'This is a test']:
reply = expect_ok(
if reply.msg != test_str:
raise TestFailure(f'Expected {test_str!r}, got {reply.msg!r}')
def test_counter(ctx: TestContext) -> None:
"""Tests that the super loop is iterating."""
stats_1 = expect_ok(ctx.rpcs.remoticon.Superloop.GetStats())
stats_2 = expect_ok(ctx.rpcs.remoticon.Superloop.GetStats())
if stats_1.loop_iterations >= stats_2.loop_iterations:
raise TestFailure(
'Super loop counter is not incrementing! '
f'{stats_1.loop_iterations} >= {stats_2.loop_iterations}')
def test_led(ctx: TestContext) -> None:
"""Tests that the LED functions correctly."""
result = input('Did the LED turn on? (Y/N) ').lower()
_ = ctx.rpcs # WORKSHOP TASK: call the RPC to turn on the LED
if result not in ('y', 'yes', 'yup'):
raise TestFailure('LED did not turn on')
result = input('Did the LED turn off? (Y/N) ').lower()
_ = ctx.rpcs # WORKSHOP TASK: call the RPC to turn off the LED
if result not in ('y', 'yes', 'yup'):
raise TestFailure('LED did not turn off')
def run_tests(device: str, baud: int) -> FailedTests:
"""Runs the factory tests."""
# Set up a pw_rpc client that uses HDLC.
ser = serial.Serial(device, baud, timeout=0.01)
hdlc_client = HdlcRpcClient(lambda:,
output=_detokenize_and_print)'Starting factory tests!')
# Create a TestContext and run the tests with it.
tester = TestContext(hdlc_client)
# WORKSHOP TASK: Add a new test.'Factory tests completed.')
return tester.failed
def main() -> int:
"""Parses the command line args and runs the tests."""
parser = argparse.ArgumentParser(
help='serial device to use')
help='baud rate for the serial device')
return 1 if run_tests(**vars(parser.parse_args())) else 0
if __name__ == '__main__':