blob: e9e540d5208a0b05cbfde33cc76da889a357a221 [file] [log] [blame]
# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Log decoder tests."""
from dataclasses import dataclass
import logging
from random import randint
from typing import Any
from unittest import TestCase, main
from pw_log.log_decoder import (
Log,
LogStreamDecoder,
log_decoded_log,
pw_status_code_to_name,
timestamp_parser_ns_since_boot,
)
from pw_log.proto import log_pb2
import pw_tokenizer
_MESSAGE_NO_FILENAME = 'World'
_MESSAGE_AND_ARGS_NO_FILENAME = f'■msg♦{_MESSAGE_NO_FILENAME}'
_MESSAGE_TOKEN_NO_FILENAME = pw_tokenizer.tokens.pw_tokenizer_65599_hash(
_MESSAGE_AND_ARGS_NO_FILENAME
)
# Creating a database with tokenized information for the core to detokenize
# tokenized log entries.
_TOKEN_DATABASE = pw_tokenizer.tokens.Database(
[
pw_tokenizer.tokens.TokenizedStringEntry(0x01148A48, 'total_dropped'),
pw_tokenizer.tokens.TokenizedStringEntry(
0x03796798, 'min_queue_remaining'
),
pw_tokenizer.tokens.TokenizedStringEntry(0x2E668CD6, 'Jello, world!'),
pw_tokenizer.tokens.TokenizedStringEntry(0x329481A2, 'parser_errors'),
pw_tokenizer.tokens.TokenizedStringEntry(0x7F35A9A5, 'TestName'),
pw_tokenizer.tokens.TokenizedStringEntry(0xCC6D3131, 'Jello?'),
pw_tokenizer.tokens.TokenizedStringEntry(
0x144C501D, '■msg♦SampleMessage■module♦MODULE■file♦file/path.cc'
),
pw_tokenizer.tokens.TokenizedStringEntry(0x0000106A, 'ModuleOrMessage'),
pw_tokenizer.tokens.TokenizedStringEntry(
pw_tokenizer.tokens.pw_tokenizer_65599_hash(
'■msg♦World■module♦wifi■file♦/path/to/file.cc'
),
'■msg♦World■module♦wifi■file♦/path/to/file.cc',
),
pw_tokenizer.tokens.TokenizedStringEntry(
_MESSAGE_TOKEN_NO_FILENAME, _MESSAGE_AND_ARGS_NO_FILENAME
),
]
)
_DETOKENIZER = pw_tokenizer.Detokenizer(_TOKEN_DATABASE)
def _create_log_entry_with_tokenized_fields(
message: str, module: str, file: str, thread: str, line: int, level: int
) -> log_pb2.LogEntry:
"""Tokenizing tokenizable LogEntry fields to become a detoknized log."""
tokenized_message = pw_tokenizer.encode.encode_token_and_args(
pw_tokenizer.tokens.pw_tokenizer_65599_hash(message)
)
tokenized_module = pw_tokenizer.encode.encode_token_and_args(
pw_tokenizer.tokens.pw_tokenizer_65599_hash(module)
)
tokenized_file = pw_tokenizer.encode.encode_token_and_args(
pw_tokenizer.tokens.pw_tokenizer_65599_hash(file)
)
tokenized_thread = pw_tokenizer.encode.encode_token_and_args(
pw_tokenizer.tokens.pw_tokenizer_65599_hash(thread)
)
return log_pb2.LogEntry(
message=tokenized_message,
module=tokenized_module,
file=tokenized_file,
line_level=Log.pack_line_level(line, level),
thread=tokenized_thread,
)
def _create_random_log_entry() -> log_pb2.LogEntry:
return log_pb2.LogEntry(
message=bytes(f'message {randint(1,100)}'.encode('utf-8')),
line_level=Log.pack_line_level(
randint(0, 2000), randint(logging.DEBUG, logging.CRITICAL)
),
file=b'main.cc',
thread=bytes(f'thread {randint(1,5)}'.encode('utf-8')),
)
def _create_drop_count_message_log_entry(
drop_count: int, reason: str = ''
) -> log_pb2.LogEntry:
log_entry = log_pb2.LogEntry(dropped=drop_count)
if reason:
log_entry.message = bytes(reason.encode('utf-8'))
return log_entry
class TestLogStreamDecoderBase(TestCase):
"""Base Test class for LogStreamDecoder."""
def setUp(self) -> None:
"""Set up logs decoder."""
def parse_pw_status(msg: str) -> str:
return pw_status_code_to_name(msg)
self.captured_logs: list[Log] = []
def decoded_log_handler(log: Log) -> None:
self.captured_logs.append(log)
self.decoder = LogStreamDecoder(
decoded_log_handler=decoded_log_handler,
detokenizer=_DETOKENIZER,
source_name='source',
timestamp_parser=timestamp_parser_ns_since_boot,
message_parser=parse_pw_status,
)
def _captured_logs_as_str(self) -> str:
return '\n'.join(map(str, self.captured_logs))
class TestLogStreamDecoderDecodingFunctionality(TestLogStreamDecoderBase):
"""Tests LogStreamDecoder decoding functionality."""
def test_parse_log_entry_valid_non_tokenized(self) -> None:
"""Test that valid LogEntry protos are parsed correctly."""
expected_log = Log(
message='Hello',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Hello',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(result, expected_log)
def test_parse_log_entry_valid_packed_message(self) -> None:
"""Test that valid LogEntry protos are parsed correctly."""
log_with_metadata_in_message = Log(
message='World',
file_and_line='/path/to/file.cc',
level=logging.DEBUG,
source_name=self.decoder.source_name,
module_name='wifi',
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=bytes(
'■msg♦World■module♦wifi■file♦/path/to/file.cc'.encode(
'utf-8'
)
),
line_level=Log.pack_line_level(0, logging.DEBUG),
timestamp=100,
)
)
self.assertEqual(result, log_with_metadata_in_message)
def test_parse_log_entry_valid_logs_drop_message(self) -> None:
"""Test that valid LogEntry protos are parsed correctly."""
dropped_message = Log(
message='Dropped 30 logs due to buffer too small',
level=logging.WARNING,
source_name=self.decoder.source_name,
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(message=b'buffer too small', dropped=30)
)
self.assertEqual(result, dropped_message)
def test_parse_log_entry_valid_tokenized(self) -> None:
"""Test that tokenized LogEntry protos are parsed correctly."""
message = 'Jello, world!'
module_name = 'TestName'
file = 'parser_errors'
thread_name = 'Jello?'
line = 123
level = logging.INFO
expected_log = Log(
message=message,
module_name=module_name,
file_and_line=file + ':123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
thread_name=thread_name,
)
log_entry = _create_log_entry_with_tokenized_fields(
message, module_name, file, thread_name, line, level
)
result = self.decoder.parse_log_entry_proto(log_entry)
self.assertEqual(result, expected_log, msg='Log was not detokenized')
def test_tokenized_contents_not_detokenized(self):
"""Test fields with tokens not in the database are not decrypted."""
# The following strings do not have tokens in the device token db.
message_not_in_db = 'device is shutting down.'
module_name_not_in_db = 'Battery'
file_not_in_db = 'charger.cc'
thread_name_not_in_db = 'BatteryStatus'
line = 123
level = logging.INFO
log_entry = _create_log_entry_with_tokenized_fields(
message_not_in_db,
module_name_not_in_db,
file_not_in_db,
thread_name_not_in_db,
line,
level,
)
message = pw_tokenizer.proto.decode_optionally_tokenized(
_DETOKENIZER, log_entry.message
)
module = pw_tokenizer.proto.decode_optionally_tokenized(
_DETOKENIZER, log_entry.module
)
file = pw_tokenizer.proto.decode_optionally_tokenized(
_DETOKENIZER, log_entry.file
)
thread = pw_tokenizer.proto.decode_optionally_tokenized(
_DETOKENIZER, log_entry.thread
)
expected_log = Log(
message=message,
module_name=module,
file_and_line=file + ':123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
thread_name=thread,
)
result = self.decoder.parse_log_entry_proto(log_entry)
self.assertEqual(
result, expected_log, msg='Log was unexpectedly detokenized'
)
def test_extracting_log_entry_fields_from_tokenized_metadata(self):
"""Test that tokenized metadata can be used to extract other fields."""
metadata = '■msg♦World■module♦wifi■file♦/path/to/file.cc'
thread_name = 'M0Log'
log_entry = log_pb2.LogEntry(
message=pw_tokenizer.encode.encode_token_and_args(
pw_tokenizer.tokens.pw_tokenizer_65599_hash(metadata)
),
line_level=Log.pack_line_level(0, logging.DEBUG),
thread=bytes(thread_name.encode('utf-8')),
)
log_with_metadata_in_message = Log(
message='World',
file_and_line='/path/to/file.cc',
level=logging.DEBUG,
source_name=self.decoder.source_name,
module_name='wifi',
timestamp='0:00',
thread_name=thread_name,
)
result = self.decoder.parse_log_entry_proto(log_entry)
self.assertEqual(
result, log_with_metadata_in_message, msg='Log was detokenized.'
)
def test_extracting_status_argument_from_log_message(self):
"""Test extract status from log message."""
expected_log = Log(
message='Could not start flux capacitor: PERMISSION_DENIED',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Could not start flux capacitor: pw::Status=7',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
expected_log = Log(
message='Error connecting to server: UNAVAILABLE',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Error connecting to server: pw::Status=14',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_extracting_status_with_improper_spacing(self):
"""Test spaces before pw::Status are ignored."""
expected_log = Log(
message='Error connecting to server:UNAVAILABLE',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Error connecting to server:pw::Status=14',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
expected_log = Log(
message='Error connecting to server: UNAVAILABLE',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Error connecting to server: pw::Status=14',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_not_extracting_status_extra_space_before_code(self):
"""Test spaces after pw::Status are not allowed."""
expected_log = Log(
message='Could not start flux capacitor: pw::Status= 7',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Could not start flux capacitor: pw::Status= 7',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_not_extracting_status_new_line_before_code(self):
"""Test new line characters after pw::Status are not allowed."""
expected_log = Log(
message='Could not start flux capacitor: pw::Status=\n7',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Could not start flux capacitor: pw::Status=\n7',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_not_extracting_status_from_log_message_with_improper_format(self):
"""Test status not extracted from log message with incorrect format."""
expected_log = Log(
message='Could not start flux capacitor: pw::Status12',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Could not start flux capacitor: pw::Status12',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_status_code_in_message_does_not_exist(self):
"""Test status does not exist in pw_status."""
expected_log = Log(
message='Could not start flux capacitor: pw::Status=17',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Could not start flux capacitor: pw::Status=17',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_status_code_in_message_is_negative(self):
"""Test status code is negative."""
expected_log = Log(
message='Could not start flux capacitor: pw::Status=-1',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Could not start flux capacitor: pw::Status=-1',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_status_code_is_name(self):
"""Test if the status code format includes the name instead."""
expected_log = Log(
message='Cannot use flux capacitor: pw::Status=PERMISSION_DENIED',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=(
b'Cannot use flux capacitor: pw::Status=PERMISSION_DENIED'
),
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_spelling_mistakes_with_status_keyword(self):
"""Test spelling mistakes with status keyword."""
expected_log = Log(
message='Could not start flux capacitor: pw::Rtatus=12',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Could not start flux capacitor: pw::Rtatus=12',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_spelling_mistakes_with_status_keyword_lowercase_s(self):
"""Test spelling mistakes with status keyword."""
expected_log = Log(
message='Could not start flux capacitor: pw::status=13',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Could not start flux capacitor: pw::status=13',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_status_code_at_beginning_of_message(self):
"""Test embedded status argument is found."""
expected_log = Log(
message='UNAVAILABLE to connect to server.',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'pw::Status=14 to connect to server.',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_status_code_in_the_middle_of_message(self):
"""Test embedded status argument is found."""
expected_log = Log(
message='Connection error: UNAVAILABLE connecting to server.',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=(
b'Connection error: pw::Status=14 connecting to server.'
),
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_status_code_with_no_surrounding_spaces(self):
"""Test embedded status argument is found."""
expected_log = Log(
message='Connection error:UNAVAILABLEconnecting to server.',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Connection error:pw::Status=14connecting to server.',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_multiple_status_arguments_in_log_message(self):
"""Test replacement of multiple status arguments into status string."""
expected_log = Log(
message='Connection error: UNAVAILABLE and PERMISSION_DENIED.',
file_and_line='my/path/file.cc:123',
level=logging.INFO,
source_name=self.decoder.source_name,
timestamp='0:00',
)
result = self.decoder.parse_log_entry_proto(
log_pb2.LogEntry(
message=b'Connection error: pw::Status=14 and pw::Status=7.',
file=b'my/path/file.cc',
line_level=Log.pack_line_level(123, logging.INFO),
)
)
self.assertEqual(
result,
expected_log,
msg='Status was not extracted from log message.',
)
def test_no_filename_in_message_parses_successfully(self):
"""Test that if the file name is not present the log entry is parsed."""
thread_name = 'thread'
log_entry = log_pb2.LogEntry(
message=pw_tokenizer.encode.encode_token_and_args(
_MESSAGE_TOKEN_NO_FILENAME
),
line_level=Log.pack_line_level(123, logging.DEBUG),
thread=bytes(thread_name.encode('utf-8')),
)
expected_log = Log(
message=_MESSAGE_NO_FILENAME,
file_and_line=':123',
level=logging.DEBUG,
source_name=self.decoder.source_name,
timestamp='0:00',
thread_name=thread_name,
)
result = self.decoder.parse_log_entry_proto(log_entry)
self.assertEqual(result, expected_log)
def test_log_decoded_log(self):
"""Test that the logger correctly formats a decoded log."""
test_log = Log(
message="SampleMessage",
level=logging.DEBUG,
timestamp='1:30',
module_name="MyModule",
source_name="MySource",
thread_name="MyThread",
file_and_line='my_file.cc:123',
metadata_fields={'field1': 432, 'field2': 'value'},
)
class CapturingLogger(logging.Logger):
"""Captures values passed to log().
Tests that calls to log() have the correct level, extra arguments
and the message format string and arguments match.
"""
@dataclass(frozen=True)
class LoggerLog:
"""Represents a process log() call."""
level: int
message: str
kwargs: Any
def __init__(self):
super().__init__(name="CapturingLogger")
self.log_calls: list[CapturingLogger.LoggerLog] = []
def log(self, level, msg, *args, **kwargs) -> None:
log = CapturingLogger.LoggerLog(
level=level, message=msg % args, kwargs=kwargs
)
self.log_calls.append(log)
test_logger = CapturingLogger()
log_decoded_log(test_log, test_logger)
self.assertEqual(len(test_logger.log_calls), 1)
self.assertEqual(test_logger.log_calls[0].level, test_log.level)
self.assertEqual(
test_logger.log_calls[0].message,
'[%s] %s %s %s %s'
% (
test_log.source_name,
test_log.module_name,
test_log.timestamp,
test_log.message,
test_log.file_and_line,
),
)
self.assertEqual(
test_logger.log_calls[0].kwargs['extra']['extra_metadata_fields'],
test_log.metadata_fields,
)
class TestLogStreamDecoderLogDropDetectionFunctionality(
TestLogStreamDecoderBase
):
"""Tests LogStreamDecoder log drop detection functionality."""
def test_log_drops_transport_error(self):
"""Tests log drops at transport."""
log_entry_in_log_entries_1 = 3
log_entries_1 = log_pb2.LogEntries(
first_entry_sequence_id=0,
entries=[
_create_random_log_entry()
for _ in range(log_entry_in_log_entries_1)
],
)
self.decoder.parse_log_entries_proto(log_entries_1)
# Assume a second LogEntries was dropped with 5 log entries. I.e. a
# log_entries_2 with sequence_ie = 4 and 5 log entries.
log_entry_in_log_entries_2 = 5
log_entry_in_log_entries_3 = 3
log_entries_3 = log_pb2.LogEntries(
first_entry_sequence_id=log_entry_in_log_entries_2
+ log_entry_in_log_entries_3,
entries=[
_create_random_log_entry()
for _ in range(log_entry_in_log_entries_3)
],
)
self.decoder.parse_log_entries_proto(log_entries_3)
# The drop message is placed where the dropped logs were detected.
self.assertEqual(
len(self.captured_logs),
log_entry_in_log_entries_1 + 1 + log_entry_in_log_entries_3,
msg=(
'Unexpected number of messages received: '
f'{self._captured_logs_as_str()}'
),
)
self.assertEqual(
(
f'Dropped {log_entry_in_log_entries_2} logs due to '
f'{LogStreamDecoder.DROP_REASON_LOSS_AT_TRANSPORT}'
),
self.captured_logs[log_entry_in_log_entries_1].message,
)
def test_log_drops_source_not_connected(self):
"""Tests log drops when source of the logs was not connected."""
log_entry_in_log_entries = 4
drop_count = 7
log_entries = log_pb2.LogEntries(
first_entry_sequence_id=drop_count,
entries=[
_create_random_log_entry()
for _ in range(log_entry_in_log_entries)
],
)
self.decoder.parse_log_entries_proto(log_entries)
# The drop message is placed where the log drops was detected.
self.assertEqual(
len(self.captured_logs),
1 + log_entry_in_log_entries,
msg=(
'Unexpected number of messages received: '
f'{self._captured_logs_as_str()}'
),
)
self.assertEqual(
(
f'Dropped {drop_count} logs due to '
f'{LogStreamDecoder.DROP_REASON_SOURCE_NOT_CONNECTED}'
),
self.captured_logs[0].message,
)
def test_log_drops_source_enqueue_failure_no_message(self):
"""Tests log drops when source reports log drops."""
drop_count = 5
log_entries = log_pb2.LogEntries(
first_entry_sequence_id=0,
entries=[
_create_random_log_entry(),
_create_random_log_entry(),
_create_drop_count_message_log_entry(drop_count),
_create_random_log_entry(),
],
)
self.decoder.parse_log_entries_proto(log_entries)
# The drop message is placed where the log drops was detected.
self.assertEqual(
len(self.captured_logs),
4,
msg=(
'Unexpected number of messages received: '
f'{self._captured_logs_as_str()}'
),
)
self.assertEqual(
(
f'Dropped {drop_count} logs due to '
f'{LogStreamDecoder.DROP_REASON_SOURCE_ENQUEUE_FAILURE}'
),
self.captured_logs[2].message,
)
def test_log_drops_source_enqueue_failure_with_message(self):
"""Tests log drops when source reports log drops."""
drop_count = 8
reason = 'Flux Capacitor exploded'
log_entries = log_pb2.LogEntries(
first_entry_sequence_id=0,
entries=[
_create_random_log_entry(),
_create_random_log_entry(),
_create_drop_count_message_log_entry(drop_count, reason),
_create_random_log_entry(),
],
)
self.decoder.parse_log_entries_proto(log_entries)
# The drop message is placed where the log drops was detected.
self.assertEqual(
len(self.captured_logs),
4,
msg=(
'Unexpected number of messages received: '
f'{self._captured_logs_as_str()}'
),
)
self.assertEqual(
f'Dropped {drop_count} logs due to {reason.lower()}',
self.captured_logs[2].message,
)
if __name__ == '__main__':
main()