blob: 2398cd8728e3f1e0ed8cdeca93e4fcb6eed6c861 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import io
import logging
import os
import threading
import time
from pathlib import Path
logger = logging.getLogger(__name__)
class FifoHandler:
"""
Class dedicated for handling communication over POSIX system FIFO (named
pipes).
"""
def __init__(self, fifo_path: str | Path, timeout: float):
"""
:param fifo_path: path to basic fifo name
:param timeout: timeout for establishing connection over FIFO
"""
self._fifo_out_path = str(fifo_path) + '.out'
self._fifo_in_path = str(fifo_path) + '.in'
self._fifo_out_file: io.FileIO | None = None
self._fifo_in_file: io.FileIO | None = None
self._open_fifo_thread: threading.Thread | None = None
self._opening_monitor_thread: threading.Thread | None = None
self._fifo_opened: threading.Event = threading.Event()
self._stop_waiting_for_opening: threading.Event = threading.Event()
self._timeout = timeout
def initiate_connection(self) -> None:
"""
Opening FIFO could be a blocking operation (it requires also opening
FIFO on the other side - by separate program/process). So, to avoid
blockage, execute opening FIFO in separate thread and additionally run
in second thread opening time monitor to alternatively unblock first
thread when timeout will expire.
"""
self._stop_waiting_for_opening.clear()
self._make_fifo_file(self._fifo_out_path)
self._make_fifo_file(self._fifo_in_path)
if self._open_fifo_thread is None:
self._open_fifo_thread = threading.Thread(target=self._open_fifo, daemon=True)
self._open_fifo_thread.start()
if self._opening_monitor_thread is None:
self._opening_monitor_thread = threading.Thread(target=self._opening_monitor, daemon=True)
self._opening_monitor_thread.start()
@staticmethod
def _make_fifo_file(filename: str) -> None:
if not os.path.exists(filename):
os.mkfifo(filename)
def _open_fifo(self) -> None:
self._fifo_out_file = open(self._fifo_out_path, 'rb', buffering=0)
self._fifo_in_file = open(self._fifo_in_path, 'wb', buffering=0)
if not self._stop_waiting_for_opening.is_set():
self._fifo_opened.set()
def _opening_monitor(self) -> None:
"""
Monitor opening FIFO operation - if timeout was expired (or disconnect
was called in the meantime), then interrupt opening FIFO in other
thread.
"""
timeout_time: float = time.time() + self._timeout
while time.time() < timeout_time and not self._stop_waiting_for_opening.is_set():
if self._fifo_opened.is_set():
return
time.sleep(0.1)
self._stop_waiting_for_opening.set()
self._unblock_open_fifo_operation()
def _unblock_open_fifo_operation(self) -> None:
"""
This is workaround for unblocking opening FIFO operation - imitate
opening FIFO "on the other side".
"""
if os.path.exists(self._fifo_out_path):
open(self._fifo_out_path, 'wb', buffering=0)
if os.path.exists(self._fifo_in_path):
open(self._fifo_in_path, 'rb', buffering=0)
def disconnect(self) -> None:
self._stop_waiting_for_opening.set()
if self._open_fifo_thread and self._open_fifo_thread.is_alive():
self._open_fifo_thread.join(timeout=1)
self._open_fifo_thread = None
if self._opening_monitor_thread and self._opening_monitor_thread.is_alive():
self._opening_monitor_thread.join(timeout=1)
self._opening_monitor_thread = None
self._fifo_opened.clear()
if self._fifo_out_file:
self._fifo_out_file.close()
if self._fifo_in_file:
self._fifo_in_file.close()
if os.path.exists(self._fifo_out_path):
os.unlink(self._fifo_out_path)
if os.path.exists(self._fifo_in_path):
os.unlink(self._fifo_in_path)
@property
def is_open(self) -> bool:
try:
return bool(
self._fifo_opened.is_set()
and self._fifo_in_file is not None and self._fifo_out_file is not None
and self._fifo_in_file.fileno() and self._fifo_out_file.fileno()
)
except ValueError:
return False
def read(self, __size: int = -1) -> bytes:
return self._fifo_out_file.read(__size) # type: ignore[union-attr]
def readline(self, __size: int | None = None) -> bytes:
return self._fifo_out_file.readline(__size) # type: ignore[union-attr]
def write(self, __buffer: bytes) -> int:
return self._fifo_in_file.write(__buffer) # type: ignore[union-attr]
def flush_write(self) -> None:
if self._fifo_in_file:
self._fifo_in_file.flush()
def flush_read(self) -> None:
if self._fifo_out_file:
self._fifo_out_file.flush()