| # Copyright 2021 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Utilities for building tools that interact with pw_rpc.""" |
| |
| import threading |
| from typing import Any, Callable |
| |
| |
| class Watchdog: |
| """Simple class that times out unless reset. |
| |
| This class could be used, for example, to track a device's connection state |
| for devices that send a periodic heartbeat packet. |
| """ |
| def __init__(self, |
| on_reset: Callable[[], Any], |
| on_expiration: Callable[[], Any], |
| while_expired: Callable[[], Any] = lambda: None, |
| timeout_s: float = 1, |
| expired_timeout_s: float = None): |
| """Creates a watchdog; start() must be called to start it. |
| |
| Args: |
| on_reset: Function called when the watchdog is reset after having |
| expired. |
| on_expiration: Function called when the timeout expires. |
| while_expired: Function called repeatedly while the watchdog is |
| expired. |
| timeout_s: If reset() is not called for timeout_s, the watchdog |
| expires and calls the on_expiration callback. |
| expired_timeout_s: While expired, the watchdog calls the |
| while_expired callback every expired_timeout_s. |
| """ |
| self._on_reset = on_reset |
| self._on_expiration = on_expiration |
| self._while_expired = while_expired |
| |
| self.timeout_s = timeout_s |
| |
| if expired_timeout_s is None: |
| self.expired_timeout_s = self.timeout_s * 10 |
| else: |
| self.expired_timeout_s = expired_timeout_s |
| |
| self.expired: bool = False |
| self._watchdog = threading.Timer(0, self._timeout_expired) |
| |
| def start(self) -> None: |
| """Starts the watchdog; must be called for the watchdog to work.""" |
| self._watchdog.cancel() |
| self._watchdog = threading.Timer( |
| self.expired_timeout_s if self.expired else self.timeout_s, |
| self._timeout_expired) |
| self._watchdog.daemon = True |
| self._watchdog.start() |
| |
| def reset(self) -> None: |
| """Resets the timeout; calls the on_reset callback if expired.""" |
| if self.expired: |
| self.expired = False |
| self._on_reset() |
| |
| self.start() |
| |
| def _timeout_expired(self) -> None: |
| if self.expired: |
| self._while_expired() |
| else: |
| self.expired = True |
| self._on_expiration() |
| |
| self.start() |