| # Copyright 2016 The Brotli Authors. All rights reserved. |
| # |
| # Distributed under MIT license. |
| # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT |
| |
| import functools |
| import os |
| import unittest |
| |
| from . import _test_utils |
| import brotli |
| |
| |
| def _get_original_name(test_data): |
| return test_data.split('.compressed')[0] |
| |
| |
| class TestDecompressor(_test_utils.TestCase): |
| |
| CHUNK_SIZE = 1 |
| |
| def setUp(self): |
| self.decompressor = brotli.Decompressor() |
| |
| def tearDown(self): |
| self.decompressor = None |
| |
| def _check_decompression(self, test_data): |
| # Verify decompression matches the original. |
| temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) |
| original = _get_original_name(test_data) |
| self.assertFilesMatch(temp_uncompressed, original) |
| |
| def _decompress(self, test_data): |
| temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) |
| with open(temp_uncompressed, 'wb') as out_file: |
| with open(test_data, 'rb') as in_file: |
| read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) |
| for data in iter(read_chunk, b''): |
| out_file.write(self.decompressor.process(data)) |
| self.assertTrue(self.decompressor.is_finished()) |
| |
| def _decompress_with_limit(self, test_data, max_output_length): |
| temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) |
| with open(temp_uncompressed, 'wb') as out_file: |
| with open(test_data, 'rb') as in_file: |
| chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'') |
| while not self.decompressor.is_finished(): |
| data = b'' |
| if self.decompressor.can_accept_more_data(): |
| data = next(chunk_iter, b'') |
| decompressed_data = self.decompressor.process(data, max_output_length=max_output_length) |
| self.assertTrue(len(decompressed_data) <= max_output_length) |
| out_file.write(decompressed_data) |
| self.assertTrue(next(chunk_iter, None) == None) |
| |
| def _test_decompress(self, test_data): |
| self._decompress(test_data) |
| self._check_decompression(test_data) |
| |
| def _test_decompress_with_limit(self, test_data): |
| self._decompress_with_limit(test_data, max_output_length=20) |
| self._check_decompression(test_data) |
| |
| def test_too_much_input(self): |
| with open(os.path.join(_test_utils.TESTDATA_DIR, "zerosukkanooa.compressed"), 'rb') as in_file: |
| compressed = in_file.read() |
| self.decompressor.process(compressed[:-1], max_output_length=1) |
| # the following assertion checks whether the test setup is correct |
| self.assertTrue(not self.decompressor.can_accept_more_data()) |
| with self.assertRaises(brotli.error): |
| self.decompressor.process(compressed[-1:]) |
| |
| def test_changing_limit(self): |
| test_data = os.path.join(_test_utils.TESTDATA_DIR, "zerosukkanooa.compressed") |
| temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) |
| with open(temp_uncompressed, 'wb') as out_file: |
| with open(test_data, 'rb') as in_file: |
| compressed = in_file.read() |
| uncompressed = self.decompressor.process(compressed[:-1], max_output_length=1) |
| self.assertTrue(len(uncompressed) <= 1) |
| out_file.write(uncompressed) |
| while not self.decompressor.can_accept_more_data(): |
| out_file.write(self.decompressor.process(b'')) |
| out_file.write(self.decompressor.process(compressed[-1:])) |
| self._check_decompression(test_data) |
| |
| def test_garbage_appended(self): |
| with self.assertRaises(brotli.error): |
| self.decompressor.process(brotli.compress(b'a') + b'a') |
| |
| def test_already_finished(self): |
| self.decompressor.process(brotli.compress(b'a')) |
| with self.assertRaises(brotli.error): |
| self.decompressor.process(b'a') |
| |
| |
| _test_utils.generate_test_methods(TestDecompressor, for_decompression=True) |
| |
| if __name__ == '__main__': |
| unittest.main() |