| #!/usr/bin/env python3 |
| # Copyright 2022 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Unit test for proxy.py""" |
| |
| import abc |
| import asyncio |
| from struct import pack |
| import time |
| from typing import List |
| import unittest |
| |
| from pigweed.pw_rpc.internal import packet_pb2 |
| from pigweed.pw_transfer import transfer_pb2 |
| from pw_hdlc import encode |
| from pw_transfer import ProtocolVersion |
| from pw_transfer.chunk import Chunk |
| |
| import proxy |
| |
| |
| class MockRng(abc.ABC): |
| def __init__(self, results: List[float]): |
| self._results = results |
| |
| def uniform(self, from_val: float, to_val: float) -> float: |
| val_range = to_val - from_val |
| val = self._results.pop() |
| val *= val_range |
| val += from_val |
| return val |
| |
| |
| class ProxyTest(unittest.IsolatedAsyncioTestCase): |
| async def test_transposer_simple(self): |
| sent_packets: List[bytes] = [] |
| |
| # Async helper so DataTransposer can await on it. |
| async def append(list: List[bytes], data: bytes): |
| list.append(data) |
| |
| transposer = proxy.DataTransposer( |
| lambda data: append(sent_packets, data), |
| name="test", |
| rate=0.5, |
| timeout=100, |
| seed=1234567890, |
| ) |
| transposer._rng = MockRng([0.6, 0.4]) |
| await transposer.process(b'aaaaaaaaaa') |
| await transposer.process(b'bbbbbbbbbb') |
| |
| # Give the transposer task time to process the data. |
| await asyncio.sleep(0.05) |
| |
| self.assertEqual(sent_packets, [b'bbbbbbbbbb', b'aaaaaaaaaa']) |
| |
| async def test_transposer_timeout(self): |
| sent_packets: List[bytes] = [] |
| |
| # Async helper so DataTransposer can await on it. |
| async def append(list: List[bytes], data: bytes): |
| list.append(data) |
| |
| transposer = proxy.DataTransposer( |
| lambda data: append(sent_packets, data), |
| name="test", |
| rate=0.5, |
| timeout=0.100, |
| seed=1234567890, |
| ) |
| transposer._rng = MockRng([0.4, 0.6]) |
| await transposer.process(b'aaaaaaaaaa') |
| |
| # Even though this should be transposed, there is no following data so |
| # the transposer should timout and send this in-order. |
| await transposer.process(b'bbbbbbbbbb') |
| |
| # Give the transposer time to timeout. |
| await asyncio.sleep(0.5) |
| |
| self.assertEqual(sent_packets, [b'aaaaaaaaaa', b'bbbbbbbbbb']) |
| |
| async def test_server_failure(self): |
| sent_packets: List[bytes] = [] |
| |
| # Async helper so DataTransposer can await on it. |
| async def append(list: List[bytes], data: bytes): |
| list.append(data) |
| |
| packets_before_failure = [1, 2, 3] |
| server_failure = proxy.ServerFailure( |
| lambda data: append(sent_packets, data), |
| name="test", |
| packets_before_failure_list=packets_before_failure.copy(), |
| ) |
| |
| # After passing the list to ServerFailure, add a test for no |
| # packets dropped |
| packets_before_failure.append(5) |
| |
| packets = [ |
| b'1', |
| b'2', |
| b'3', |
| b'4', |
| b'5', |
| ] |
| |
| for num_packets in packets_before_failure: |
| sent_packets.clear() |
| for packet in packets: |
| await server_failure.process(packet) |
| self.assertEqual(len(sent_packets), num_packets) |
| server_failure.handle_event(proxy.Event.TRANSFER_START) |
| |
| async def test_keep_drop_queue_loop(self): |
| sent_packets: List[bytes] = [] |
| |
| # Async helper so DataTransposer can await on it. |
| async def append(list: List[bytes], data: bytes): |
| list.append(data) |
| |
| keep_drop_queue = proxy.KeepDropQueue( |
| lambda data: append(sent_packets, data), |
| name="test", |
| keep_drop_queue=[2, 1, 3], |
| ) |
| |
| expected_sequence = [ |
| b'1', |
| b'2', |
| b'4', |
| b'5', |
| b'6', |
| b'9', |
| ] |
| input_packets = [ |
| b'1', |
| b'2', |
| b'3', |
| b'4', |
| b'5', |
| b'6', |
| b'7', |
| b'8', |
| b'9', |
| ] |
| |
| for packet in input_packets: |
| await keep_drop_queue.process(packet) |
| self.assertEqual(sent_packets, expected_sequence) |
| |
| async def test_keep_drop_queue(self): |
| sent_packets: List[bytes] = [] |
| |
| # Async helper so DataTransposer can await on it. |
| async def append(list: List[bytes], data: bytes): |
| list.append(data) |
| |
| keep_drop_queue = proxy.KeepDropQueue( |
| lambda data: append(sent_packets, data), |
| name="test", |
| keep_drop_queue=[2, 1, 1, -1], |
| ) |
| |
| expected_sequence = [ |
| b'1', |
| b'2', |
| b'4', |
| ] |
| input_packets = [ |
| b'1', |
| b'2', |
| b'3', |
| b'4', |
| b'5', |
| b'6', |
| b'7', |
| b'8', |
| b'9', |
| ] |
| |
| for packet in input_packets: |
| await keep_drop_queue.process(packet) |
| self.assertEqual(sent_packets, expected_sequence) |
| |
| async def test_window_packet_dropper(self): |
| sent_packets: List[bytes] = [] |
| |
| # Async helper so DataTransposer can await on it. |
| async def append(list: List[bytes], data: bytes): |
| list.append(data) |
| |
| window_packet_dropper = proxy.WindowPacketDropper( |
| lambda data: append(sent_packets, data), |
| name="test", |
| window_packet_to_drop=0, |
| ) |
| |
| packets = [ |
| _encode_rpc_frame( |
| Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'1') |
| ), |
| _encode_rpc_frame( |
| Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'2') |
| ), |
| _encode_rpc_frame( |
| Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'3') |
| ), |
| _encode_rpc_frame( |
| Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'4') |
| ), |
| _encode_rpc_frame( |
| Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'5') |
| ), |
| ] |
| |
| expected_packets = packets[1:] |
| |
| # Test each even twice to assure the filter does not have issues |
| # on new window bondaries. |
| events = [ |
| proxy.Event.PARAMETERS_RETRANSMIT, |
| proxy.Event.PARAMETERS_CONTINUE, |
| proxy.Event.PARAMETERS_RETRANSMIT, |
| proxy.Event.PARAMETERS_CONTINUE, |
| ] |
| |
| for event in events: |
| sent_packets.clear() |
| for packet in packets: |
| await window_packet_dropper.process(packet) |
| self.assertEqual(sent_packets, expected_packets) |
| window_packet_dropper.handle_event(event) |
| |
| |
| def _encode_rpc_frame(chunk: Chunk) -> bytes: |
| packet = packet_pb2.RpcPacket( |
| type=packet_pb2.PacketType.SERVER_STREAM, |
| channel_id=101, |
| service_id=1001, |
| method_id=100001, |
| payload=chunk.to_message().SerializeToString(), |
| ).SerializeToString() |
| return encode.ui_frame(73, packet) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |