blob: 4e8205e39f77832d956c1efec91558500e34d61d [file] [log] [blame]
# Copyright (c) 2009-2021 Arm Limited
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
import threading
from .device import Device
log = logging.getLogger(__name__)
def strip_escape(string_to_escape) -> str:
"""
Strip escape characters from string.
:param string_to_escape: string to work on
:return: stripped string
"""
raw_ansi_pattern = r'\033\[((?:\d|;)*)([a-zA-Z])'
ansi_pattern = raw_ansi_pattern.encode()
ansi_eng = re.compile(ansi_pattern)
matches = []
for match in ansi_eng.finditer(string_to_escape):
matches.append(match)
matches.reverse()
for match in matches:
start = match.start()
end = match.end()
string_to_escape = string_to_escape[0:start] + string_to_escape[end:]
return string_to_escape
class SerialDevice(Device):
def __init__(self, serial_connection, name=None):
"""
Serial Device runner class containing device handling functions and logging, inherits the Device runner class
:param serial_connection: Serial connection object
:param name: Logging name for the client
"""
self.serial = serial_connection
self.run = False
super(SerialDevice, self).__init__(name)
input_thread_name = '<-- {}'.format(self.name)
output_thread_name = '--> {}'.format(self.name)
self.it = threading.Thread(
target=self._input_thread, name=input_thread_name)
self.ot = threading.Thread(
target=self._output_thread, name=output_thread_name)
def reset(self, duration=0.25):
"""
Sends break to serial connection
:param duration: Break duration
"""
self.serial.send_break(duration)
def start(self):
"""
Start the processing of the serial
"""
log.info('Starting "{}" runner...'.format(self.name))
self.run = True
self.it.start()
self.ot.start()
log.info('"{}" runner started'.format(self.name))
def stop(self):
"""
Stop the processing of the serial
"""
log.info('Stopping "{}" runner...'.format(self.name))
self.run = False
self.oq.put(None)
self.it.join()
self.ot.join()
log.info('"{}" runner stoped'.format(self.name))
def _input_thread(self):
while self.run:
line = self.serial.readline()
if line:
plain_line = strip_escape(line)
# Testapp uses \r to print characters to the same line, strip those and return only the last part
# If there is only one \r, don't remove anything.
if b'\r' in line and line.count(b'\r') > 1:
plain_line = plain_line.split(b'\r')[-2]
# Debug traces use tabulator characters, change those to spaces for readability
plain_line = plain_line.replace(b'\t', b' ')
plain_line = plain_line.decode('utf-8', 'ignore')
plain_line.rstrip()
log.info('<--|{}| {}'.format(self.name, plain_line.strip()))
self.iq.put(plain_line)
else:
pass
def _output_thread(self):
while self.run:
line = self.oq.get()
if line:
log.info('-->|{}| {}'.format(self.name, line.strip()))
data = line + '\n'
self.serial.write(data.encode('utf-8'))
else:
log.debug('Nothing sent')