blob: d1c1f2fceb9377f6f3756fa8d27c8f16d75de153 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Tests for detokenize."""
import base64
import concurrent
import datetime as dt
import functools
import io
import os
from pathlib import Path
import struct
import tempfile
from typing import Any, Callable, NamedTuple
import unittest
from unittest import mock
from pw_tokenizer import database
from pw_tokenizer import detokenize
from pw_tokenizer import elf_reader
from pw_tokenizer import tokens
# This function is not part of this test. It was used to generate the binary
# strings for EMPTY_ELF and ELF_WITH_TOKENIZER_SECTIONS. It takes a path and
# returns a Python byte string suitable for copying into Python source code.
def path_to_byte_string(path):
with open(path, 'rb') as fd:
data = fd.read()
output = []
indices = iter(range(len(data)))
while True:
line = ''
while len(line) < 70:
try:
i = next(indices)
except StopIteration:
break
line += repr(data[i : i + 1])[2:-1].replace("'", r'\'')
if not line:
return ''.join(output)
output.append(" b'{}'\n".format(''.join(line)))
# This is an empty ELF file. It was created from the ELF file for
# tokenize_test.cc with the command:
#
# arm-none-eabi-objcopy -S --only-section NO_SECTIONS_PLEASE <ELF> <OUTPUT>
#
# The resulting ELF was converted to a Python binary string using
# path_to_byte_string function above.
EMPTY_ELF = (
b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01'
b'\x00\x00\x00\xd1\x83\x00\x084\x00\x00\x00\xe0\x00\x00\x00\x00\x04\x00\x05'
b'4\x00 \x00\x05\x00(\x00\x02\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00'
b'\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00'
b'\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00'
b'\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x06\x00\x00\x00\x00\x00\x01\x00\x00.shstrtab\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01'
b'\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00'
b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x00\x00\x00'
)
# This is an ELF file with only the pw_tokenizer sections. It was created
# from a tokenize_test binary built for the STM32F429i Discovery board. The
# pw_tokenizer sections were extracted with this command:
#
# arm-none-eabi-objcopy -S --only-section ".pw_tokenizer*" <ELF> <OUTPUT>
#
ELF_WITH_TOKENIZER_SECTIONS_PATH = Path(__file__).parent.joinpath(
'example_binary_with_tokenized_strings.elf'
)
ELF_WITH_TOKENIZER_SECTIONS = ELF_WITH_TOKENIZER_SECTIONS_PATH.read_bytes()
TOKENS_IN_ELF = 22
TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS = 26
# 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py).
JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e'
class DetokenizeTest(unittest.TestCase):
"""Tests the detokenize.Detokenizer."""
def test_simple(self):
detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(
0xCDAB, '%02d %s %c%%', date_removed=dt.datetime.now()
)
]
)
)
self.assertEqual(
str(detok.detokenize(b'\xab\xcd\0\0\x02\x03Two\x66')), '01 Two 3%'
)
def test_detokenize_extra_data_is_unsuccessful(self):
detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(
1, 'no args', date_removed=dt.datetime(1, 1, 1)
)
]
)
)
result = detok.detokenize(b'\x01\0\0\0\x04args')
self.assertEqual(len(result.failures), 1)
string, args, remaining = result.failures[0]
self.assertEqual('no args', string)
self.assertFalse(args)
self.assertEqual(b'\x04args', remaining)
self.assertEqual('no args', string)
self.assertEqual('no args', str(result))
def test_detokenize_zero_extend_short_token_with_no_args(self):
detok = detokenize.Detokenizer(
tokens.Database(
[tokens.TokenizedStringEntry(0xCDAB, 'This token is 16 bits')]
)
)
self.assertEqual(
str(detok.detokenize(b'\xab\xcd')), 'This token is 16 bits'
)
def test_detokenize_missing_data_is_unsuccessful(self):
detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(
2, '%s', date_removed=dt.datetime(1, 1, 1)
)
]
)
)
result = detok.detokenize(b'\x02\0\0\0')
string, args, remaining = result.failures[0]
self.assertEqual('%s', string)
self.assertEqual(len(args), 1)
self.assertEqual(b'', remaining)
self.assertEqual(len(result.failures), 1)
self.assertEqual('%s', str(result))
def test_detokenize_missing_data_with_errors_is_unsuccessful(self):
detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(
2, '%s', date_removed=dt.datetime(1, 1, 1)
)
]
),
show_errors=True,
)
result = detok.detokenize(b'\x02\0\0\0')
string, args, remaining = result.failures[0]
self.assertIn('%s MISSING', string)
self.assertEqual(len(args), 1)
self.assertEqual(b'', remaining)
self.assertEqual(len(result.failures), 1)
self.assertIn('%s MISSING', str(result))
def test_unparsed_data(self):
detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(
1, 'no args', date_removed=dt.datetime(100, 1, 1)
),
]
)
)
result = detok.detokenize(b'\x01\0\0\0o_o')
self.assertFalse(result.ok())
self.assertEqual('no args', str(result))
self.assertIn('o_o', repr(result))
self.assertIn('decoding failed', result.error_message())
def test_empty_db(self):
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
self.assertIn(
'unknown token', detok.detokenize(b'1234').error_message()
)
self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
self.assertEqual(
'$' + base64.b64encode(b'1234').decode(),
str(detok.detokenize(b'1234')),
)
self.assertIsNone(detok.detokenize(b'').token)
def test_empty_db_show_errors(self):
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
self.assertIn(
'unknown token', detok.detokenize(b'1234').error_message()
)
self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
self.assertIn('unknown token', str(detok.detokenize(b'1234')))
self.assertIsNone(detok.detokenize(b'').token)
def test_missing_token_show_errors(self):
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
self.assertIn('missing token', detok.detokenize(b'').error_message())
self.assertIn('missing token', str(detok.detokenize(b'')))
def test_missing_token(self):
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
self.assertIn('missing token', detok.detokenize(b'').error_message())
self.assertEqual('$', str(detok.detokenize(b'')))
def test_unknown_shorter_token_show_error(self):
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
self.assertIn('unknown token', detok.detokenize(b'1').error_message())
self.assertIn('unknown token', str(detok.detokenize(b'1')))
self.assertIn('unknown token', repr(detok.detokenize(b'1')))
self.assertIn('unknown token', detok.detokenize(b'123').error_message())
self.assertIn('unknown token', str(detok.detokenize(b'123')))
self.assertIn('unknown token', repr(detok.detokenize(b'123')))
def test_unknown_shorter_token(self):
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
self.assertEqual(
'unknown token 00000001', detok.detokenize(b'\1').error_message()
)
self.assertEqual(
'$' + base64.b64encode(b'\1\0\0\0').decode(),
str(detok.detokenize(b'\1')),
)
self.assertIn('unknown token 00000001', repr(detok.detokenize(b'\1')))
self.assertEqual(
'unknown token 00030201',
detok.detokenize(b'\1\2\3').error_message(),
)
self.assertEqual(
'$' + base64.b64encode(b'\1\2\3\0').decode(),
str(detok.detokenize(b'\1\2\3')),
)
self.assertIn(
'unknown token 00030201', repr(detok.detokenize(b'\1\2\3'))
)
def test_decode_from_elf_data(self):
detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
self.assertEqual(
str(detok.detokenize(JELLO_WORLD_TOKEN)), 'Jello, world!'
)
undecoded_args = detok.detokenize(JELLO_WORLD_TOKEN + b'some junk')
self.assertFalse(undecoded_args.ok())
self.assertEqual(str(undecoded_args), 'Jello, world!')
self.assertTrue(detok.detokenize(b'\0\0\0\0').ok())
self.assertEqual(str(detok.detokenize(b'\0\0\0\0')), '')
def test_decode_from_elf_file(self):
"""Test decoding from an elf file."""
detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
expected_tokens = frozenset(detok.database.token_to_entries.keys())
with tempfile.NamedTemporaryFile('wb', delete=False) as elf:
try:
elf.write(ELF_WITH_TOKENIZER_SECTIONS)
elf.close()
# Open ELF by file object
with open(elf.name, 'rb') as fd:
detok = detokenize.Detokenizer(fd)
self.assertEqual(
expected_tokens,
frozenset(detok.database.token_to_entries.keys()),
)
# Open ELF by path
detok = detokenize.Detokenizer(elf.name)
self.assertEqual(
expected_tokens,
frozenset(detok.database.token_to_entries.keys()),
)
# Open ELF by elf_reader.Elf
with open(elf.name, 'rb') as fd:
detok = detokenize.Detokenizer(elf_reader.Elf(fd))
self.assertEqual(
expected_tokens,
frozenset(detok.database.token_to_entries.keys()),
)
finally:
os.unlink(elf.name)
def test_decode_from_csv_file(self):
detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
expected_tokens = frozenset(detok.database.token_to_entries.keys())
csv_database = str(detok.database)
self.assertEqual(len(csv_database.splitlines()), TOKENS_IN_ELF)
with tempfile.NamedTemporaryFile('w', delete=False) as csv_file:
try:
csv_file.write(csv_database)
csv_file.close()
# Open CSV by path
detok = detokenize.Detokenizer(csv_file.name)
self.assertEqual(
expected_tokens,
frozenset(detok.database.token_to_entries.keys()),
)
# Open CSV by file object
with open(csv_file.name) as fd:
detok = detokenize.Detokenizer(fd)
self.assertEqual(
expected_tokens,
frozenset(detok.database.token_to_entries.keys()),
)
finally:
os.unlink(csv_file.name)
def test_create_detokenizer_with_token_database(self):
detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
expected_tokens = frozenset(detok.database.token_to_entries.keys())
detok = detokenize.Detokenizer(detok.database)
self.assertEqual(
expected_tokens, frozenset(detok.database.token_to_entries.keys())
)
class DetokenizeWithCollisions(unittest.TestCase):
"""Tests collision resolution."""
def setUp(self):
super().setUp()
token = 0xBAAD
# Database with several conflicting tokens.
self.detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(
token, 'REMOVED', date_removed=dt.datetime(9, 1, 1)
),
tokens.TokenizedStringEntry(token, 'newer'),
tokens.TokenizedStringEntry(
token, 'A: %d', date_removed=dt.datetime(30, 5, 9)
),
tokens.TokenizedStringEntry(
token, 'B: %c', date_removed=dt.datetime(30, 5, 10)
),
tokens.TokenizedStringEntry(token, 'C: %s'),
tokens.TokenizedStringEntry(token, '%d%u'),
tokens.TokenizedStringEntry(token, '%s%u %d'),
tokens.TokenizedStringEntry(1, '%s'),
tokens.TokenizedStringEntry(1, '%d'),
tokens.TokenizedStringEntry(2, 'Three %s %s %s'),
tokens.TokenizedStringEntry(2, 'Five %d %d %d %d %s'),
]
)
)
def test_collision_no_args_favors_most_recently_present(self):
no_args = self.detok.detokenize(b'\xad\xba\0\0')
self.assertFalse(no_args.ok())
self.assertEqual(len(no_args.successes), 2)
self.assertEqual(len(no_args.failures), 5)
self.assertEqual(len(no_args.matches()), 7)
self.assertEqual(str(no_args), 'newer')
self.assertEqual(len(no_args.best_result()[1]), 0)
self.assertEqual(no_args.best_result()[0], 'newer')
def test_collision_one_integer_arg_favors_most_recently_present(self):
multiple_correct = self.detok.detokenize(b'\xad\xba\0\0\x7a')
self.assertFalse(multiple_correct.ok())
self.assertIn('ERROR', repr(multiple_correct))
self.assertEqual(len(multiple_correct.successes), 2)
self.assertEqual(len(multiple_correct.failures), 5)
self.assertEqual(len(multiple_correct.matches()), 7)
self.assertEqual(str(multiple_correct), 'B: =')
def test_collision_one_integer_arg_favor_successful_decode(self):
# One string decodes successfully, since the arg is out of range for %c.
int_arg = self.detok.detokenize(b'\xad\xba\0\0\xfe\xff\xff\xff\x0f')
self.assertTrue(int_arg.ok())
self.assertEqual(str(int_arg), 'A: 2147483647')
def test_collision_one_string_arg_favors_successful_decode(self):
# One string decodes successfully, since decoding the argument as an
# integer does not decode all the data.
string_arg = self.detok.detokenize(b'\xad\xba\0\0\x02Hi')
self.assertTrue(string_arg.ok())
self.assertEqual(str(string_arg), 'C: Hi')
def test_collision_one_string_arg_favors_decoding_all_data(self):
result = self.detok.detokenize(b'\1\0\0\0\x83hi')
self.assertEqual(len(result.failures), 2)
# Should resolve to the string since %d would leave one byte behind.
self.assertEqual(str(result), '%s')
def test_collision_multiple_args_favors_decoding_more_arguments(self):
result = self.detok.detokenize(b'\2\0\0\0\1\2\1\4\5')
self.assertEqual(len(result.matches()), 2)
self.assertEqual(result.matches()[0][0], 'Five -1 1 -1 2 %s')
self.assertEqual(result.matches()[1][0], 'Three \2 \4 %s')
def test_collision_multiple_args_favors_decoding_all_arguments(self):
unambiguous = self.detok.detokenize(b'\xad\xba\0\0\x01#\x00\x01')
self.assertTrue(unambiguous.ok())
self.assertEqual(len(unambiguous.matches()), 7)
self.assertEqual('#0 -1', str(unambiguous))
self.assertIn('#0 -1', repr(unambiguous))
class ManualPoolExecutor(concurrent.futures.Executor):
"""A stubbed pool executor that captures the most recent work request
and holds it until the public process method is manually called."""
def __init__(self):
super().__init__()
self._func = None
# pylint: disable=arguments-differ
def submit(self, func, *args, **kwargs):
"""Submits work to the pool, stashing the partial for later use."""
self._func = functools.partial(func, *args, **kwargs)
def process(self):
"""Processes the latest func submitted to the pool."""
if self._func is not None:
self._func()
self._func = None
class InlinePoolExecutor(concurrent.futures.Executor):
"""A stubbed pool executor that runs work immediately, inline."""
# pylint: disable=arguments-differ
def submit(self, func, *args, **kwargs):
"""Submits work to the pool, stashing the partial for later use."""
func(*args, **kwargs)
@mock.patch('os.path.getmtime')
class AutoUpdatingDetokenizerTest(unittest.TestCase):
"""Tests the AutoUpdatingDetokenizer class."""
def test_update(self, mock_getmtime):
"""Tests the update command."""
db = database.load_token_database(
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
)
self.assertEqual(len(db), TOKENS_IN_ELF)
the_time = [100]
def move_back_time_if_file_exists(path):
if os.path.exists(path):
the_time[0] -= 1
return the_time[0]
raise FileNotFoundError
mock_getmtime.side_effect = move_back_time_if_file_exists
with tempfile.NamedTemporaryFile('wb', delete=False) as file:
try:
file.close()
pool = ManualPoolExecutor()
detok = detokenize.AutoUpdatingDetokenizer(
file.name, min_poll_period_s=0, pool=pool
)
self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
with open(file.name, 'wb') as fd:
tokens.write_binary(db, fd)
# After the change but before the pool runs in another thread,
# the token should not exist.
self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
# After the pool is allowed to process, it should.
pool.process()
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
finally:
os.unlink(file.name)
def test_update_with_directory(self, mock_getmtime):
"""Tests the update command with a directory format database."""
db = database.load_token_database(
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
)
self.assertEqual(len(db), TOKENS_IN_ELF)
the_time = [100]
def move_back_time_if_file_exists(path):
if os.path.exists(path):
the_time[0] -= 1
return the_time[0]
raise FileNotFoundError
mock_getmtime.side_effect = move_back_time_if_file_exists
with tempfile.TemporaryDirectory() as dbdir:
with tempfile.NamedTemporaryFile(
'wb', delete=False, suffix='.pw_tokenizer.csv', dir=dbdir
) as matching_suffix_file, tempfile.NamedTemporaryFile(
'wb', delete=False, suffix='.not.right', dir=dbdir
) as mismatched_suffix_file:
try:
matching_suffix_file.close()
mismatched_suffix_file.close()
pool = ManualPoolExecutor()
detok = detokenize.AutoUpdatingDetokenizer(
dbdir, min_poll_period_s=0, pool=pool
)
self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
with open(mismatched_suffix_file.name, 'wb') as fd:
tokens.write_csv(db, fd)
pool.process()
self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
with open(matching_suffix_file.name, 'wb') as fd:
tokens.write_csv(db, fd)
# After the change but before the pool runs in another
# thread, the token should not exist.
self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
pool.process()
# After the pool is allowed to process, it should.
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
finally:
os.unlink(mismatched_suffix_file.name)
os.unlink(matching_suffix_file.name)
os.rmdir(dbdir)
# The database stays around if the file is deleted.
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
def test_no_update_if_time_is_same(self, mock_getmtime):
mock_getmtime.return_value = 100
with tempfile.NamedTemporaryFile('wb', delete=False) as file:
try:
tokens.write_csv(
database.load_token_database(
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
),
file,
)
file.close()
detok = detokenize.AutoUpdatingDetokenizer(
file.name, min_poll_period_s=0, pool=InlinePoolExecutor()
)
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
# Empty the database, but keep the mock modified time the same.
with open(file.name, 'wb'):
pass
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
# Move back time so the now-empty file is reloaded.
mock_getmtime.return_value = 50
self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
finally:
os.unlink(file.name)
def test_token_domain_in_str(self, _) -> None:
"""Tests a str containing a domain"""
detok = detokenize.AutoUpdatingDetokenizer(
f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*',
min_poll_period_s=0,
pool=InlinePoolExecutor(),
)
self.assertEqual(
len(detok.database), TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS
)
def test_token_domain_in_path(self, _) -> None:
"""Tests a Path() containing a domain"""
detok = detokenize.AutoUpdatingDetokenizer(
Path(f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*'),
min_poll_period_s=0,
pool=InlinePoolExecutor(),
)
self.assertEqual(
len(detok.database), TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS
)
def test_token_no_domain_in_str(self, _) -> None:
"""Tests a str without a domain"""
detok = detokenize.AutoUpdatingDetokenizer(
str(ELF_WITH_TOKENIZER_SECTIONS_PATH),
min_poll_period_s=0,
pool=InlinePoolExecutor(),
)
self.assertEqual(len(detok.database), TOKENS_IN_ELF)
def test_token_no_domain_in_path(self, _) -> None:
"""Tests a Path() without a domain"""
detok = detokenize.AutoUpdatingDetokenizer(
ELF_WITH_TOKENIZER_SECTIONS_PATH,
min_poll_period_s=0,
pool=InlinePoolExecutor(),
)
self.assertEqual(len(detok.database), TOKENS_IN_ELF)
def _next_char(message: bytes) -> bytes:
return bytes(b + 1 for b in message)
class NestedMessageParserTest(unittest.TestCase):
"""Tests parsing prefixed messages."""
class _Case(NamedTuple):
data: bytes
expected: bytes
title: str
transform: Callable[[bytes], bytes] = _next_char
TRANSFORM_TEST_CASES = (
_Case(b'$abcd', b'%bcde', 'single message'),
_Case(
b'$$WHAT?$abc$WHY? is this $ok $',
b'%%WHAT?%bcd%WHY? is this %ok %',
'message and non-message',
),
_Case(b'$1$', b'%1%', 'empty message'),
_Case(b'$abc$defgh', b'%bcd%efghh', 'sequential message'),
_Case(
b'w$abcx$defygh$$abz',
b'w$ABCx$DEFygh$$ABz',
'interspersed start/end non-message',
bytes.upper,
),
_Case(
b'$abcx$defygh$$ab',
b'$ABCx$DEFygh$$AB',
'interspersed start/end message ',
bytes.upper,
),
)
def setUp(self) -> None:
self.decoder = detokenize.NestedMessageParser('$', 'abcdefg')
def test_transform_io(self) -> None:
for data, expected, title, transform in self.TRANSFORM_TEST_CASES:
self.assertEqual(
expected,
b''.join(
self.decoder.transform_io(io.BytesIO(data), transform)
),
f'{title}: {data!r}',
)
def test_transform_bytes_with_flush(self) -> None:
for data, expected, title, transform in self.TRANSFORM_TEST_CASES:
self.assertEqual(
expected,
self.decoder.transform(data, transform, flush=True),
f'{title}: {data!r}',
)
def test_transform_bytes_sequential(self) -> None:
transform = lambda message: message.upper().replace(b'$', b'*')
self.assertEqual(self.decoder.transform(b'abc$abcd', transform), b'abc')
self.assertEqual(self.decoder.transform(b'$', transform), b'*ABCD')
self.assertEqual(self.decoder.transform(b'$b', transform), b'*')
self.assertEqual(self.decoder.transform(b'', transform), b'')
self.assertEqual(self.decoder.transform(b' ', transform), b'*B ')
self.assertEqual(self.decoder.transform(b'hello', transform), b'hello')
self.assertEqual(self.decoder.transform(b'?? $ab', transform), b'?? ')
self.assertEqual(
self.decoder.transform(b'123$ab4$56$a', transform), b'*AB123*AB4*56'
)
self.assertEqual(
self.decoder.transform(b'bc', transform, flush=True), b'*ABC'
)
MESSAGES_TEST: Any = (
(b'123$abc456$a', (False, b'123'), (True, b'$abc'), (False, b'456')),
(b'7$abcd', (True, b'$a'), (False, b'7')),
(b'e',),
(b'',),
(b'$', (True, b'$abcde')),
(b'$', (True, b'$')),
(b'$a$b$c', (True, b'$'), (True, b'$a'), (True, b'$b')),
(b'1', (True, b'$c'), (False, b'1')),
(b'',),
(b'?', (False, b'?')),
(b'!@', (False, b'!@')),
(b'%^&', (False, b'%^&')),
)
def test_read_messages(self) -> None:
for step in self.MESSAGES_TEST:
data: bytes = step[0]
pieces: tuple[tuple[bool, bytes], ...] = step[1:]
self.assertEqual(tuple(self.decoder.read_messages(data)), pieces)
def test_read_messages_flush(self) -> None:
self.assertEqual(
list(self.decoder.read_messages(b'123$a')), [(False, b'123')]
)
self.assertEqual(list(self.decoder.read_messages(b'b')), [])
self.assertEqual(
list(self.decoder.read_messages(b'', flush=True)), [(True, b'$ab')]
)
def test_read_messages_io(self) -> None:
# Rework the read_messages test data for stream input.
data = io.BytesIO(b''.join(step[0] for step in self.MESSAGES_TEST))
expected_pieces = sum((step[1:] for step in self.MESSAGES_TEST), ())
result = self.decoder.read_messages_io(data)
for expected_is_message, expected_data in expected_pieces:
if expected_is_message:
is_message, piece = next(result)
self.assertTrue(is_message)
self.assertEqual(expected_data, piece)
else: # the IO version yields non-messages byte by byte
for byte in expected_data:
is_message, piece = next(result)
self.assertFalse(is_message)
self.assertEqual(bytes([byte]), piece)
class DetokenizeNested(unittest.TestCase):
"""Tests detokenizing nested tokens"""
def test_nested_hashed_arg(self):
detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(0xA, 'tokenized argument'),
tokens.TokenizedStringEntry(
2,
'This is a ' + '$#%08x',
),
]
)
)
self.assertEqual(
str(detok.detokenize(b'\x02\0\0\0\x14')),
'This is a tokenized argument',
)
def test_nested_base64_arg(self):
detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(1, 'base64 argument'),
tokens.TokenizedStringEntry(2, 'This is a %s'),
]
)
)
self.assertEqual(
str(detok.detokenize(b'\x02\0\0\0\x09$AQAAAA==')), # token for 1
'This is a base64 argument',
)
def test_deeply_nested_arg(self):
detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(1, '$10#0000000005'),
tokens.TokenizedStringEntry(2, 'This is a $#%08x'),
tokens.TokenizedStringEntry(3, 'deeply nested argument'),
tokens.TokenizedStringEntry(4, '$AQAAAA=='),
tokens.TokenizedStringEntry(5, '$AwAAAA=='),
]
)
)
self.assertEqual(
str(detok.detokenize(b'\x02\0\0\0\x08')), # token for 4
'This is a deeply nested argument',
)
class DetokenizeBase64(unittest.TestCase):
"""Tests detokenizing Base64 messages."""
JELLO = b'$' + base64.b64encode(JELLO_WORLD_TOKEN)
RECURSION_STRING = f'The secret message is "{JELLO.decode()}"'
RECURSION = b'$' + base64.b64encode(
struct.pack('I', tokens.c_hash(RECURSION_STRING))
)
RECURSION_STRING_2 = f"'{RECURSION.decode()}', said the spy."
RECURSION_2 = b'$' + base64.b64encode(
struct.pack('I', tokens.c_hash(RECURSION_STRING_2))
)
TEST_CASES = (
(b'', b''),
(b'nothing here', b'nothing here'),
(JELLO, b'Jello, world!'),
(JELLO + b'a', b'Jello, world!a'),
(JELLO + b'abc', b'Jello, world!abc'),
(JELLO + b'abc=', b'Jello, world!abc='),
(b'$a' + JELLO + b'a', b'$aJello, world!a'),
(b'Hello ' + JELLO + b'?', b'Hello Jello, world!?'),
(b'$' + JELLO, b'$Jello, world!'),
(JELLO + JELLO, b'Jello, world!Jello, world!'),
(JELLO + b'$' + JELLO, b'Jello, world!$Jello, world!'),
(JELLO + b'$a' + JELLO + b'bcd', b'Jello, world!$aJello, world!bcd'),
(b'$3141', b'$3141'),
(JELLO + b'$3141', b'Jello, world!$3141'),
(
JELLO + b'$a' + JELLO + b'b' + JELLO + b'c',
b'Jello, world!$aJello, world!bJello, world!c',
),
(RECURSION, b'The secret message is "Jello, world!"'),
(
RECURSION_2,
b'\'The secret message is "Jello, world!"\', said the spy.',
),
)
def setUp(self):
super().setUp()
db = database.load_token_database(
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
)
db.add(
tokens.TokenizedStringEntry(tokens.c_hash(s), s)
for s in [self.RECURSION_STRING, self.RECURSION_STRING_2]
)
self.detok = detokenize.Detokenizer(db)
def test_detokenize_base64_live(self):
for data, expected in self.TEST_CASES:
output = io.BytesIO()
self.detok.detokenize_base64_live(io.BytesIO(data), output, '$')
self.assertEqual(expected, output.getvalue(), f'Input: {data!r}')
def test_detokenize_base64_to_file(self):
for data, expected in self.TEST_CASES:
output = io.BytesIO()
self.detok.detokenize_base64_to_file(data, output, '$')
self.assertEqual(expected, output.getvalue())
def test_detokenize_base64(self):
for data, expected in self.TEST_CASES:
self.assertEqual(expected, self.detok.detokenize_base64(data, b'$'))
def test_detokenize_base64_str(self):
for data, expected in self.TEST_CASES:
self.assertEqual(
expected.decode(), self.detok.detokenize_base64(data.decode())
)
class DetokenizeInfiniteRecursion(unittest.TestCase):
"""Tests that infinite Base64 token recursion resolves."""
def setUp(self):
super().setUp()
self.detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(0, '$AAAAAA=='), # token for 0
tokens.TokenizedStringEntry(1, '$AgAAAA=='), # token for 2
tokens.TokenizedStringEntry(2, '$#00000003'), # token for 3
tokens.TokenizedStringEntry(3, '$AgAAAA=='), # token for 2
]
)
)
def test_detokenize_self_recursion(self):
for depth in range(5):
self.assertEqual(
self.detok.detokenize_text(
b'This one is deep: $AAAAAA==', recursion=depth
),
b'This one is deep: $AAAAAA==',
)
def test_detokenize_self_recursion_default(self):
self.assertEqual(
self.detok.detokenize_text(
b'This one is deep: $AAAAAA==',
),
b'This one is deep: $AAAAAA==',
)
def test_detokenize_cyclic_recursion_even(self):
self.assertEqual(
self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=6),
b'I said "$AgAAAA=="',
)
def test_detokenize_cyclic_recursion_odd(self):
self.assertEqual(
self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=7),
b'I said "$#00000003"',
)
class DetokenizeBase64InfiniteRecursion(unittest.TestCase):
"""Tests that infinite Bas64 token recursion resolves."""
def setUp(self):
super().setUp()
self.detok = detokenize.Detokenizer(
tokens.Database(
[
tokens.TokenizedStringEntry(0, '$AAAAAA=='), # token for 0
tokens.TokenizedStringEntry(1, '$AgAAAA=='), # token for 2
tokens.TokenizedStringEntry(2, '$AwAAAA=='), # token for 3
tokens.TokenizedStringEntry(3, '$AgAAAA=='), # token for 2
]
)
)
def test_detokenize_self_recursion(self):
for depth in range(5):
self.assertEqual(
self.detok.detokenize_base64(
b'This one is deep: $AAAAAA==', recursion=depth
),
b'This one is deep: $AAAAAA==',
)
def test_detokenize_self_recursion_default(self):
self.assertEqual(
self.detok.detokenize_base64(b'This one is deep: $64#AAAAAA=='),
b'This one is deep: $AAAAAA==',
)
def test_detokenize_cyclic_recursion_even(self):
self.assertEqual(
self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=2),
b'I said "$AgAAAA=="',
)
def test_detokenize_cyclic_recursion_odd(self):
self.assertEqual(
self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=3),
b'I said "$AwAAAA=="',
)
if __name__ == '__main__':
unittest.main()