blob: 204075e5fd98a7c7856bd7ef2299863355117d10 [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Tests for pw_console.log_view"""
import logging
import time
import sys
import unittest
from datetime import datetime
from unittest.mock import MagicMock, patch
from parameterized import parameterized # type: ignore
from prompt_toolkit.data_structures import Point
from prompt_toolkit.formatted_text import FormattedText
from pw_console.log_view import LogView
_PYTHON_3_8 = sys.version_info >= (
3,
8,
)
def _create_log_view():
log_pane = MagicMock()
log_view = LogView(log_pane)
return log_view, log_pane
class TestLogView(unittest.TestCase):
"""Tests for LogView."""
def setUp(self):
self.maxDiff = None # pylint: disable=invalid-name
def _create_log_view_with_logs(self, log_count=100):
log_view, log_pane = _create_log_view()
if log_count > 0:
test_log = logging.getLogger('log_view.test')
with self.assertLogs(test_log, level='DEBUG') as _log_context:
test_log.addHandler(log_view.log_store)
for i in range(log_count):
test_log.debug('Test log %s', i)
return log_view, log_pane
def test_follow_toggle(self) -> None:
log_view, _pane = _create_log_view()
self.assertTrue(log_view.follow)
log_view.toggle_follow()
self.assertFalse(log_view.follow)
def test_follow_scrolls_to_bottom(self) -> None:
log_view, _pane = _create_log_view()
log_view.toggle_follow()
self.assertFalse(log_view.follow)
self.assertEqual(log_view.get_current_line(), 0)
test_log = logging.getLogger('log_view.test')
# Log 5 messagse, current_line should stay at 0
with self.assertLogs(test_log, level='DEBUG') as _log_context:
test_log.addHandler(log_view.log_store)
for i in range(5):
test_log.debug('Test log %s', i)
self.assertEqual(log_view.get_total_count(), 5)
self.assertEqual(log_view.get_current_line(), 0)
# Turn follow on
log_view.toggle_follow()
self.assertTrue(log_view.follow)
# Log another messagse, current_line should move to the last.
with self.assertLogs(test_log, level='DEBUG') as _log_context:
test_log.addHandler(log_view.log_store)
test_log.debug('Test log')
self.assertEqual(log_view.get_total_count(), 6)
self.assertEqual(log_view.get_current_line(), 5)
@parameterized.expand([
('no logs',
80, 10, # window_width, window_height
0, True, # log_count, follow_enabled
0, [0, -1]), # expected_total logs, expected_indices
('logs shorter than window height',
80, 10, # window_width, window_height
7, True, # log_count, follow_enabled
7, [0, 6]), # expected_total logs, expected_indices
('logs larger than window height with follow on',
80, 10, # window_width, window_height
20, True, # log_count, follow_enabled
10, [10, 19]), # expected_total logs, expected_indices
('logs larger than window height with follow off',
80, 10, # window_width, window_height
20, False, # log_count, follow_enabled
10, [0, 9]), # expected_total logs, expected_indices
]) # yapf: disable
def test_get_log_window_indices(
self,
_name,
window_width,
window_height,
log_count,
follow_enabled,
expected_total,
expected_indices,
) -> None:
"""Test get_log_window_indices() with various window sizes and log
counts."""
log_view, _pane = _create_log_view()
if not follow_enabled:
log_view.toggle_follow()
# Make required number of log messages
if log_count > 0:
test_log = logging.getLogger('log_view.test')
with self.assertLogs(test_log, level='DEBUG') as _log_context:
test_log.addHandler(log_view.log_store)
for i in range(log_count):
test_log.debug('Test log %s', i)
# Get indices
start_index, end_index = log_view.get_log_window_indices(
available_width=window_width, available_height=window_height)
self.assertEqual([start_index, end_index], expected_indices)
# Number of logs should equal the height of the window
self.assertEqual((end_index - start_index) + 1, expected_total)
def test_scrolling(self) -> None:
"""Test all scrolling methods."""
log_view, _pane = self._create_log_view_with_logs(log_count=100)
# Page scrolling needs to know the current window height.
log_view._window_height = 10 # pylint: disable=protected-access
# Follow is on by default, current line should be at the end.
self.assertEqual(log_view.get_current_line(), 99)
# Move to the beginning.
log_view.scroll_to_top()
self.assertEqual(log_view.get_current_line(), 0)
# Should not be able to scroll before the beginning.
log_view.scroll_up()
self.assertEqual(log_view.get_current_line(), 0)
log_view.scroll_up_one_page()
self.assertEqual(log_view.get_current_line(), 0)
# Single and multi line movement.
log_view.scroll_down()
self.assertEqual(log_view.get_current_line(), 1)
log_view.scroll(5)
self.assertEqual(log_view.get_current_line(), 6)
log_view.scroll_up()
self.assertEqual(log_view.get_current_line(), 5)
# Page down and up.
log_view.scroll_down_one_page()
self.assertEqual(log_view.get_current_line(), 15)
log_view.scroll_up_one_page()
self.assertEqual(log_view.get_current_line(), 5)
# Move to the end.
log_view.scroll_to_bottom()
self.assertEqual(log_view.get_current_line(), 99)
# Should not be able to scroll beyond the end.
log_view.scroll_down()
self.assertEqual(log_view.get_current_line(), 99)
log_view.scroll_down_one_page()
self.assertEqual(log_view.get_current_line(), 99)
# Move up a bit.
log_view.scroll(-5)
self.assertEqual(log_view.get_current_line(), 94)
# Simulate a mouse click to scroll.
# Click 5 lines above current position.
log_view.scroll_to_position(Point(0, -5))
self.assertEqual(log_view.get_current_line(), 89)
# Click 3 lines below current position.
log_view.scroll_to_position(Point(0, 3))
self.assertEqual(log_view.get_current_line(), 92)
# Clicking if follow is enabled should not scroll.
log_view.toggle_follow()
self.assertTrue(log_view.follow)
self.assertEqual(log_view.get_current_line(), 99)
log_view.scroll_to_position(Point(0, -5))
self.assertEqual(log_view.get_current_line(), 99)
def test_render_content_and_cursor_position(self) -> None:
"""Test render_content results and get_cursor_position
get_cursor_position() should return the correct position depending on
what line is selected."""
# Mock time to always return the same value.
mock_time = MagicMock( # type: ignore
return_value=time.mktime(
datetime(2021, 7, 13, 0, 0, 0).timetuple()))
with patch('time.time', new=mock_time):
log_view, log_pane = self._create_log_view_with_logs(log_count=4)
# Mock needed LogPane functions that pull info from prompt_toolkit.
log_pane.get_horizontal_scroll_amount = MagicMock(return_value=0)
log_pane.current_log_pane_width = 30
log_pane.current_log_pane_height = 10
log_view.scroll_to_top()
log_view.render_content()
self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=0))
log_view.scroll_to_bottom()
log_view.render_content()
self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=3))
expected_line_cache = [
[
('class:log-time', '20210713 00:00:00'),
('', ' '),
('class:log-level-10', 'DEBUG'),
('', ' '),
('', 'Test log 0'),
('', '\n')
],
[
('class:log-time', '20210713 00:00:00'),
('', ' '),
('class:log-level-10', 'DEBUG'),
('', ' '),
('', 'Test log 1'),
('', '\n')
],
[
('class:log-time', '20210713 00:00:00'),
('', ' '),
('class:log-level-10', 'DEBUG'),
('', ' '),
('', 'Test log 2'),
('', '\n')
],
FormattedText([
('class:selected-log-line [SetCursorPosition]', ''),
('class:selected-log-line class:log-time', '20210713 00:00:00'),
('class:selected-log-line ', ' '),
('class:selected-log-line class:log-level-10', 'DEBUG'),
('class:selected-log-line ', ' '),
('class:selected-log-line ', 'Test log 3'),
('class:selected-log-line ', ' \n')
]),
] # yapf: disable
self.assertEqual(
list(log_view._line_fragment_cache # pylint: disable=protected-access
),
expected_line_cache)
def test_clear_scrollback(self) -> None:
"""Test various functions with clearing log scrollback history."""
# pylint: disable=protected-access
# Create log_view with 4 logs
starting_log_count = 4
log_view, _pane = self._create_log_view_with_logs(
log_count=starting_log_count)
# Check setup is correct
self.assertTrue(log_view.follow)
self.assertEqual(log_view.get_current_line(), 3)
self.assertEqual(log_view.get_total_count(), 4)
self.assertEqual(
list(log.record.message
for log in log_view._get_visible_log_lines()),
['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'])
self.assertEqual(
log_view.get_log_window_indices(available_width=80,
available_height=10), (0, 3))
# Clear scrollback
log_view.clear_scrollback()
# Follow is still on
self.assertTrue(log_view.follow)
self.assertEqual(log_view.hidden_line_count(), 4)
# Current line index should stay the same
self.assertEqual(log_view.get_current_line(), 3)
# Total count should stay the same
self.assertEqual(log_view.get_total_count(), 4)
# No lines returned
self.assertEqual(
list(log.record.message
for log in log_view._get_visible_log_lines()), [])
self.assertEqual(
log_view.get_log_window_indices(available_width=80,
available_height=10), (4, 3))
# Add Log 4 more lines
test_log = logging.getLogger('log_view.test')
with self.assertLogs(test_log, level='DEBUG') as _log_context:
test_log.addHandler(log_view.log_store)
for i in range(4):
test_log.debug('Test log %s', i + starting_log_count)
# Current line
self.assertEqual(log_view.hidden_line_count(), 4)
self.assertEqual(log_view.get_last_log_line_index(), 7)
self.assertEqual(log_view.get_current_line(), 7)
self.assertEqual(log_view.get_total_count(), 8)
# Only the last 4 logs should appear
self.assertEqual(
list(log.record.message
for log in log_view._get_visible_log_lines()),
['Test log 4', 'Test log 5', 'Test log 6', 'Test log 7'])
# Window height == 2
self.assertEqual(
log_view.get_log_window_indices(available_width=80,
available_height=2), (6, 7))
# Window height == 10
self.assertEqual(
log_view.get_log_window_indices(available_width=80,
available_height=10), (4, 7))
log_view.scroll_to_top()
self.assertEqual(log_view.get_current_line(), 4)
log_view.scroll_to_bottom()
self.assertEqual(log_view.get_current_line(), 7)
# Turn follow back on
log_view.toggle_follow()
log_view.undo_clear_scrollback()
# Current line and total are the same
self.assertEqual(log_view.get_current_line(), 7)
self.assertEqual(log_view.get_total_count(), 8)
# All logs should appear
self.assertEqual(
list(log.record.message
for log in log_view._get_visible_log_lines()), [
'Test log 0', 'Test log 1', 'Test log 2', 'Test log 3',
'Test log 4', 'Test log 5', 'Test log 6', 'Test log 7'
])
self.assertEqual(
log_view.get_log_window_indices(available_width=80,
available_height=10), (0, 7))
log_view.scroll_to_top()
self.assertEqual(log_view.get_current_line(), 0)
log_view.scroll_to_bottom()
self.assertEqual(log_view.get_current_line(), 7)
if _PYTHON_3_8:
# pylint: disable=no-name-in-module
from unittest import IsolatedAsyncioTestCase # type: ignore
class TestLogViewFiltering(IsolatedAsyncioTestCase): # pylint: disable=undefined-variable
"""Test LogView log filtering capabilities."""
def _create_log_view_from_list(self, log_messages):
log_view, log_pane = _create_log_view()
test_log = logging.getLogger('log_view.test')
with self.assertLogs(test_log, level='DEBUG') as _log_context:
test_log.addHandler(log_view.log_store)
for log, extra_arg in log_messages:
test_log.debug('%s', log, extra=extra_arg)
return log_view, log_pane
@parameterized.expand([
(
'regex filter',
'log.*item',
[
('Log some item', dict()),
('Log another item', dict()),
('Some exception', dict()),
],
[
'Log some item',
'Log another item',
],
None, # field
False, # invert
),
(
'regex filter with field',
'earth',
[
('Log some item',
dict(extra_metadata_fields={'planet': 'Jupiter'})),
('Log another item',
dict(extra_metadata_fields={'planet': 'Earth'})),
('Some exception',
dict(extra_metadata_fields={'planet': 'Earth'})),
],
[
'Log another item',
'Some exception',
],
'planet', # field
False, # invert
),
(
'regex filter with field inverted',
'earth',
[
('Log some item',
dict(extra_metadata_fields={'planet': 'Jupiter'})),
('Log another item',
dict(extra_metadata_fields={'planet': 'Earth'})),
('Some exception',
dict(extra_metadata_fields={'planet': 'Earth'})),
],
[
'Log some item',
],
'planet', # field
True, # invert
),
]) # yapf: disable
async def test_log_filtering(
self,
_name,
input_text,
input_lines,
expected_matched_lines,
field=None,
invert=False,
) -> None:
"""Test run log view filtering."""
log_view, _log_pane = self._create_log_view_from_list(input_lines)
self.assertEqual(log_view.get_total_count(), len(input_lines))
log_view.new_search(input_text, invert=invert, field=field)
log_view.apply_filter()
await log_view.filter_existing_logs_task
self.assertEqual(log_view.get_total_count(),
len(expected_matched_lines))
self.assertEqual(
[log.record.message for log in log_view.filtered_logs],
expected_matched_lines)
log_view.clear_filters()
self.assertEqual(log_view.get_total_count(), len(input_lines))
if __name__ == '__main__':
unittest.main()