pw_rpc: console_tools module and Watchdog class
Create a simple watchdog class that could be used to track connection
state in a pw_rpc console.
Change-Id: I943b514d7994e78ef52898e639c691e5b6c2c046
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/33100
Reviewed-by: Keir Mierle <keir@google.com>
Reviewed-by: Jennifer Silva <jennifersilva@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
diff --git a/pw_rpc/py/BUILD.gn b/pw_rpc/py/BUILD.gn
index 1a2d1e9..5727946 100644
--- a/pw_rpc/py/BUILD.gn
+++ b/pw_rpc/py/BUILD.gn
@@ -25,6 +25,7 @@
"pw_rpc/codegen.py",
"pw_rpc/codegen_nanopb.py",
"pw_rpc/codegen_raw.py",
+ "pw_rpc/console_tools.py",
"pw_rpc/descriptors.py",
"pw_rpc/ids.py",
"pw_rpc/packets.py",
@@ -35,6 +36,7 @@
tests = [
"callback_client_test.py",
"client_test.py",
+ "console_tools_test.py",
"descriptors_test.py",
"ids_test.py",
"packets_test.py",
diff --git a/pw_rpc/py/console_tools_test.py b/pw_rpc/py/console_tools_test.py
new file mode 100644
index 0000000..5da2aba
--- /dev/null
+++ b/pw_rpc/py/console_tools_test.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python3
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests encoding HDLC frames."""
+
+import unittest
+from unittest import mock
+
+from pw_rpc.console_tools import Watchdog
+
+
+class TestWatchdog(unittest.TestCase):
+ """Tests the Watchdog class."""
+ def setUp(self):
+ self._reset = mock.Mock()
+ self._expiration = mock.Mock()
+ self._while_expired = mock.Mock()
+
+ self._watchdog = Watchdog(self._reset, self._expiration,
+ self._while_expired, 99999)
+
+ def _trigger_timeout(self):
+ # Don't wait for the timeout -- that's too flaky. Call the internal
+ # timeout function instead.
+ self._watchdog._timeout_expired() # pylint: disable=protected-access
+
+ def test_expiration_callbacks(self):
+ self._watchdog.start()
+
+ self._expiration.not_called()
+
+ self._trigger_timeout()
+
+ self._expiration.assert_called_once_with()
+ self._while_expired.assert_not_called()
+
+ self._trigger_timeout()
+
+ self._expiration.assert_called_once_with()
+ self._while_expired.assert_called_once_with()
+
+ self._trigger_timeout()
+
+ self._expiration.assert_called_once_with()
+ self._while_expired.assert_called()
+
+ def test_reset_not_called_unless_expires(self):
+ self._watchdog.start()
+ self._watchdog.reset()
+
+ self._reset.assert_not_called()
+ self._expiration.assert_not_called()
+ self._while_expired.assert_not_called()
+
+ def test_reset_called_if_expired(self):
+ self._watchdog.start()
+ self._trigger_timeout()
+
+ self._watchdog.reset()
+
+ self._trigger_timeout()
+
+ self._reset.assert_called_once_with()
+ self._expiration.assert_called()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_rpc/py/pw_rpc/console_tools.py b/pw_rpc/py/pw_rpc/console_tools.py
new file mode 100644
index 0000000..cb43a6e
--- /dev/null
+++ b/pw_rpc/py/pw_rpc/console_tools.py
@@ -0,0 +1,83 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Utilities for building tools that interact with pw_rpc."""
+
+import threading
+from typing import Any, Callable
+
+
+class Watchdog:
+ """Simple class that times out unless reset.
+
+ This class could be used, for example, to track a device's connection state
+ for devices that send a periodic heartbeat packet.
+ """
+ def __init__(self,
+ on_reset: Callable[[], Any],
+ on_expiration: Callable[[], Any],
+ while_expired: Callable[[], Any] = lambda: None,
+ timeout_s: float = 1,
+ expired_timeout_s: float = None):
+ """Creates a watchdog; start() must be called to start it.
+
+ Args:
+ on_reset: Function called when the watchdog is reset after having
+ expired.
+ on_expiration: Function called when the timeout expires.
+ while_expired: Function called repeatedly while the watchdog is
+ expired.
+ timeout_s: If reset() is not called for timeout_s, the watchdog
+ expires and calls the on_expiration callback.
+ expired_timeout_s: While expired, the watchdog calls the
+ while_expired callback every expired_timeout_s.
+ """
+ self._on_reset = on_reset
+ self._on_expiration = on_expiration
+ self._while_expired = while_expired
+
+ self.timeout_s = timeout_s
+
+ if expired_timeout_s is None:
+ self.expired_timeout_s = self.timeout_s * 10
+ else:
+ self.expired_timeout_s = expired_timeout_s
+
+ self.expired: bool = False
+ self._watchdog = threading.Timer(0, self._timeout_expired)
+
+ def start(self) -> None:
+ """Starts the watchdog; must be called for the watchdog to work."""
+ self._watchdog.cancel()
+ self._watchdog = threading.Timer(
+ self.expired_timeout_s if self.expired else self.timeout_s,
+ self._timeout_expired)
+ self._watchdog.daemon = True
+ self._watchdog.start()
+
+ def reset(self) -> None:
+ """Resets the timeout; calls the on_reset callback if expired."""
+ if self.expired:
+ self.expired = False
+ self._on_reset()
+
+ self.start()
+
+ def _timeout_expired(self) -> None:
+ if self.expired:
+ self._while_expired()
+ else:
+ self.expired = True
+ self._on_expiration()
+
+ self.start()