blob: cc1c42c3d5c1d9715d9baeb118c04b8823d3550f [file] [log] [blame]
# Copyright (c) 2009-2021 Arm Limited
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from telnetlib import Telnet
from time import sleep
log = logging.getLogger(__name__)
class TelnetConnection:
"""
Telnet Connection class containing telnet connection handling functions
:param host: host name
:param port: prot number
"""
def __init__(self, host=None, port=0):
self.telnet = Telnet()
self.host = host
self.port = port
self.is_open = False
self.output_line = bytearray()
def open(self):
"""
Open telnet connection
"""
try:
self.telnet.open(self.host, self.port)
except Exception as e:
log.error('Open telnet connection to {}:{} failed {}'.format(self.host, self.port, e))
return None
self.is_open = True
def __strip_escape(self, string_to_escape) -> str:
"""
Strip escape characters from string.
:param string_to_escape: string to work on
:return: stripped string
"""
raw_ansi_pattern = r'\033\[((?:\d|;)*)([a-zA-Z])'
ansi_pattern = raw_ansi_pattern.encode()
ansi_eng = re.compile(ansi_pattern)
matches = []
for match in ansi_eng.finditer(string_to_escape):
matches.append(match)
matches.reverse()
for match in matches:
start = match.start()
end = match.end()
string_to_escape = string_to_escape[0:start] + string_to_escape[end:]
return string_to_escape
def __formatline(self, line) -> str:
output_line = self.__strip_escape(line)
# Testapp uses \r to print characters to the same line, strip those and return only the last part
# If there is only one \r, don't remove anything.
if b'\r' in line and line.count(b'\r') > 1:
output_line = output_line.split(b'\r')[-2]
# Debug traces use tabulator characters, change those to spaces for readability
output_line = output_line.replace(b'\t', b' ')
output_line = output_line.decode('utf-8', 'ignore')
output_line.rstrip()
return output_line
def readline(self):
"""
Read line from telnet session
:return: One line from telnet stream
"""
if not self.is_open:
return None
try:
self.output_line.extend(self.telnet.read_until(b"\n", 1))
if b"\n" in self.output_line:
output = self.__formatline(self.output_line)
self.output_line.clear()
return output
except Exception as e:
log.error('Telnet read failed {}'.format(e))
return None
return None
def write(self, data):
"""
Write data to telnet input
:param data: Data to send [bytes array]
"""
if not self.is_open:
return None
try:
data = data + '\n'
for item in data:
self.telnet.write(item.encode('utf-8'))
sleep(0.03)
except Exception as e:
log.error('Telnet write failed {}'.format(e))
return None
def close(self):
"""
Close telnet connection
"""
self.telnet.close()
self.is_open = False
def set_port(self, port):
"""
set port number of telnet connection
"""
self.port = port
def get_port(self):
"""
Get port number of telnet connection
:return: Port Nnumber
"""
return self.port