blob: da84716a1deec0e02d066bddb0f36ec319df9946 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import sys
import threading
import time
from argparse import ArgumentParser
from zen_of_python import zen_of_python
class FifoFile:
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.thread = None
self.file = None
self.logger = logging.getLogger(__name__)
def _open(self):
self.logger.info(f'Creating fifo file: {self.filename}')
end_time = time.time() + 2
while not os.path.exists(self.filename):
time.sleep(0.1)
if time.time() > end_time:
self.logger.error(f'Did not able create fifo file: {self.filename}')
return
self.file = open(self.filename, self.mode, buffering=0)
self.logger.info(f'File created: {self.filename}')
def open(self):
self.thread = threading.Thread(target=self._open(), daemon=True)
self.thread.start()
def write(self, data):
if self.file:
self.file.write(data)
def read(self):
if self.file:
return self.file.readline()
def close(self):
if self.file:
self.file.close()
self.thread.join(1)
self.logger.info(f'Closed file: {self.filename}')
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def main():
logging.basicConfig(level='DEBUG')
parser = ArgumentParser()
parser.add_argument('file')
args = parser.parse_args()
read_path = args.file + '.in'
write_path = args.file + '.out'
logger = logging.getLogger(__name__)
logger.info('Start')
with FifoFile(write_path, 'wb') as wf, FifoFile(read_path, 'rb'):
for line in zen_of_python:
wf.write(f'{line}\n'.encode('utf-8'))
time.sleep(1) # give a moment for external programs to collect all outputs
return 0
if __name__ == '__main__':
sys.exit(main())