| #!/usr/bin/env python3 |
| # Copyright 2021 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Tests using the callback client for pw_rpc.""" |
| |
| import contextlib |
| import unittest |
| |
| import pw_hdlc.rpc |
| from pw_rpc import benchmark_pb2, testing |
| from pw_status import Status |
| |
| ITERATIONS = 50 |
| |
| |
| class RpcIntegrationTest(unittest.TestCase): |
| """Calls RPCs on an RPC server through a socket.""" |
| |
| test_server_command: tuple[str, ...] = () |
| port: int |
| |
| def setUp(self) -> None: |
| self._context = pw_hdlc.rpc.HdlcRpcLocalServerAndClient( |
| self.test_server_command, self.port, [benchmark_pb2] |
| ) |
| self.rpcs = self._context.client.channel(1).rpcs |
| |
| def tearDown(self) -> None: |
| self._context.close() |
| |
| def test_unary(self) -> None: |
| for i in range(ITERATIONS): |
| payload = f'O_o #{i}'.encode() |
| status, reply = self.rpcs.pw.rpc.Benchmark.UnaryEcho( |
| payload=payload |
| ) |
| self.assertIs(status, Status.OK) |
| self.assertEqual(reply.payload, payload) |
| |
| def test_bidirectional(self) -> None: |
| with self.rpcs.pw.rpc.Benchmark.BidirectionalEcho.invoke() as call: |
| responses = call.get_responses() |
| |
| for i in range(ITERATIONS): |
| payload = f'O_o #{i}'.encode() |
| call.send(benchmark_pb2.Payload(payload=payload)) |
| |
| self.assertEqual(next(responses).payload, payload) |
| |
| def test_bidirectional_call_twice(self) -> None: |
| rpc = self.rpcs.pw.rpc.Benchmark.BidirectionalEcho |
| |
| for _ in range(ITERATIONS): |
| with contextlib.ExitStack() as stack: |
| first_call = stack.enter_context(rpc.invoke()) |
| first_call_responses = first_call.get_responses() |
| first_call.send(payload=b'abc') |
| self.assertEqual( |
| next(first_call_responses), rpc.response(payload=b'abc') |
| ) |
| self.assertFalse(first_call.completed()) |
| |
| second_call = stack.enter_context(rpc.invoke()) |
| second_call_responses = second_call.get_responses() |
| second_call.send(payload=b'123') |
| self.assertEqual( |
| next(second_call_responses), rpc.response(payload=b'123') |
| ) |
| self.assertFalse(second_call.completed()) |
| |
| # Check that issuing `second_call` did not cancel `first call`. |
| self.assertFalse(first_call.completed()) |
| self.assertIs(first_call.error, None) |
| |
| # Send to `first_call` again and check for a response. |
| first_call.send(payload=b'def') |
| self.assertEqual( |
| next(first_call_responses), rpc.response(payload=b'def') |
| ) |
| |
| |
| def _main( |
| test_server_command: list[str], port: int, unittest_args: list[str] |
| ) -> None: |
| RpcIntegrationTest.test_server_command = tuple(test_server_command) |
| RpcIntegrationTest.port = port |
| unittest.main(argv=unittest_args) |
| |
| |
| if __name__ == '__main__': |
| _main(**vars(testing.parse_test_server_args())) |