blob: c6bda2f0a4ea3611ac50501daf294c2d9d147fb1 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import io
import logging
import os
import threading
from pathlib import Path
logger = logging.getLogger(__name__)
class FifoHandler:
"""Creates FIFO file for reading and writing."""
def __init__(self, fifo: str | Path):
"""
:param fifo: path to fifo file
"""
self._fifo_in = str(fifo) + '.in'
self._fifo_out = str(fifo) + '.out'
self.file_in: io.BytesIO | None = None
self.file_out: io.BytesIO | None = None
self._threads: list[threading.Thread] = []
@staticmethod
def _make_fifo_file(filename: str) -> None:
if os.path.exists(filename):
os.unlink(filename)
os.mkfifo(filename)
logger.debug('Created new fifo file: %s', filename)
@property
def is_open(self) -> bool:
try:
return bool(
self.file_in is not None and self.file_out is not None
and self.file_in.fileno() and self.file_out.fileno()
)
except ValueError:
return False
def connect(self):
self._make_fifo_file(self._fifo_in)
self._make_fifo_file(self._fifo_out)
self._threads = [
threading.Thread(target=self._open_fifo_in, daemon=True),
threading.Thread(target=self._open_fifo_out, daemon=True)
]
for t in self._threads:
t.start()
def _open_fifo_in(self):
self.file_in = open(self._fifo_in, 'wb', buffering=0)
def _open_fifo_out(self):
self.file_out = open(self._fifo_out, 'rb', buffering=0)
def disconnect(self):
if self.file_in is not None:
self.file_in.close()
if self.file_out is not None:
self.file_out.close()
for t in self._threads:
t.join(timeout=1)
logger.debug(f'Unlink {self._fifo_in}')
os.unlink(self._fifo_in)
logger.debug(f'Unlink {self._fifo_out}')
os.unlink(self._fifo_out)
def read(self, __size: int | None = None) -> bytes:
return self.file_out.read(__size) # type: ignore[union-attr]
def readline(self, __size: int | None = None) -> bytes:
line = self.file_out.readline(__size) # type: ignore[union-attr]
return line
def write(self, __buffer: bytes) -> int:
return self.file_in.write(__buffer) # type: ignore[union-attr]
def flush(self):
if self.file_in:
self.file_in.flush()
if self.file_out:
self.file_out.flush()
def fileno(self) -> int:
return self.file_out.fileno() # type: ignore[union-attr]