blob: b1f16cf94e07aeca9aee8be8d635d2bb1954abbd [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Cross-language test of pw_transfer.
Usage:
bazel run pw_transfer/integration_test:cross_language_integration_test
Command-line arguments must be provided after a double-dash:
bazel run pw_transfer/integration_test:cross_language_integration_test -- \
--server-port 3304
Which tests to run can be specified as command-line arguments:
bazel run pw_transfer/integration_test:cross_language_integration_test -- \
PwTransferIntegrationTest.test_small_client_write_1_java
"""
import argparse
import asyncio
import logging
from parameterized import parameterized
import pathlib
import random
import sys
import tempfile
import time
from typing import List, NamedTuple
import unittest
from google.protobuf import text_format
from pigweed.pw_transfer.integration_test import config_pb2
from rules_python.python.runfiles import runfiles
_LOG = logging.getLogger('pw_transfer_intergration_test_proxy')
_LOG.level = logging.DEBUG
_LOG.addHandler(logging.StreamHandler(sys.stdout))
class LogMonitor():
"""Monitors lines read from the reader, and logs them."""
class Error(Exception):
"""Raised if wait_for_line reaches EOF before expected line."""
pass
def __init__(self, prefix: str, reader: asyncio.StreamReader):
"""Initializer.
Args:
prefix: Prepended to read lines before they are logged.
reader: StreamReader to read lines from.
"""
self._prefix = prefix
self._reader = reader
# Queue of messages waiting to be monitored.
self._queue = asyncio.Queue()
# Relog any messages read from the reader, and enqueue them for
# monitoring.
self._relog_and_enqueue_task = asyncio.create_task(
self._relog_and_enqueue())
async def wait_for_line(self, msg: str):
"""Wait for a line containing msg to be read from the reader."""
while True:
line = await self._queue.get()
if not line:
raise LogMonitor.Error(
f"Reached EOF before getting line matching {msg}")
if msg in line.decode():
return
async def wait_for_eof(self):
"""Wait for the reader to reach EOF, relogging any lines read."""
# Drain the queue, since we're not monitoring it any more.
drain_queue = asyncio.create_task(self._drain_queue())
await asyncio.gather(drain_queue, self._relog_and_enqueue_task)
async def _relog_and_enqueue(self):
"""Reads lines from the reader, logs them, and puts them in queue."""
while True:
line = await self._reader.readline()
await self._queue.put(line)
if line:
_LOG.info(f"{self._prefix} {line.decode().rstrip()}")
else:
# EOF. Note, we still put the EOF in the queue, so that the
# queue reader can process it appropriately.
return
async def _drain_queue(self):
while True:
line = await self._queue.get()
if not line:
# EOF.
return
class MonitoredSubprocess:
"""A subprocess with monitored asynchronous communication."""
@staticmethod
async def create(cmd: List[str], prefix: str, stdinput: bytes):
"""Starts the subprocess and writes stdinput to stdin.
This method returns once stdinput has been written to stdin. The
MonitoredSubprocess continues to log the process's stderr and stdout
(with the prefix) until it terminates.
Args:
cmd: Command line to execute.
prefix: Prepended to process logs.
stdinput: Written to stdin on process startup.
"""
self = MonitoredSubprocess()
self._process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
self._stderr_monitor = LogMonitor(f"{prefix} ERR:",
self._process.stderr)
self._stdout_monitor = LogMonitor(f"{prefix} OUT:",
self._process.stdout)
self._process.stdin.write(stdinput)
await self._process.stdin.drain()
self._process.stdin.close()
await self._process.stdin.wait_closed()
return self
async def wait_for_line(self, stream: str, msg: str, timeout: float):
"""Wait for a line containing msg to be read on the stream."""
if stream == "stdout":
monitor = self._stdout_monitor
elif stream == "stderr":
monitor = self._stderr_monitor
else:
raise ValueError(
"Stream must be 'stdout' or 'stderr', got {stream}")
await asyncio.wait_for(monitor.wait_for_line(msg), timeout)
def returncode(self):
return self._process.returncode
def terminate(self):
"""Terminate the process."""
self._process.terminate()
async def wait_for_termination(self, timeout: float):
"""Wait for the process to terminate."""
await asyncio.wait_for(
asyncio.gather(self._process.wait(),
self._stdout_monitor.wait_for_eof(),
self._stderr_monitor.wait_for_eof()), timeout)
async def terminate_and_wait(self, timeout: float):
"""Terminate the process and wait for it to exit."""
if self.returncode() is not None:
# Process already terminated
return
self.terminate()
await self.wait_for_termination(timeout)
class _TransferConfig(NamedTuple):
"""A simple tuple to collect configs for test binaries."""
server: config_pb2.ServerConfig
client: config_pb2.ClientConfig
proxy: config_pb2.ProxyConfig
# TODO(b/232805936): Extend tests to use different resource IDs and do multiple
# reads/writes.
class PwTransferIntegrationTest(unittest.TestCase):
# Prefix for log messages coming from the harness (as opposed to the server,
# client, or proxy processes). Padded so that the length is the same as
# "SERVER OUT:".
_PREFIX = "HARNESS: "
SERVER_PORT = 3300
CLIENT_PORT = 3301
JAVA_CLIENT_BINARY = None
CPP_CLIENT_BINARY = None
PYTHON_CLIENT_BINARY = None
PROXY_BINARY = None
SERVER_BINARY = None
@classmethod
def setUpClass(cls):
# TODO(tpudlik): This is Bazel-only. Support gn, too.
r = runfiles.Create()
# For each binary used by this test, get the binaries produced by the
# build system if an override has not been specified.
if cls.JAVA_CLIENT_BINARY is None:
cls.JAVA_CLIENT_BINARY = r.Rlocation(
"pigweed/pw_transfer/integration_test/java_client")
if cls.CPP_CLIENT_BINARY is None:
cls.CPP_CLIENT_BINARY = r.Rlocation(
"pigweed/pw_transfer/integration_test/cpp_client")
if cls.PYTHON_CLIENT_BINARY is None:
cls.PYTHON_CLIENT_BINARY = r.Rlocation(
"pigweed/pw_transfer/integration_test/python_client")
if cls.PROXY_BINARY is None:
cls.PROXY_BINARY = r.Rlocation(
"pigweed/pw_transfer/integration_test/proxy")
if cls.SERVER_BINARY is None:
cls.SERVER_BINARY = r.Rlocation(
"pigweed/pw_transfer/integration_test/server")
cls._CLIENT_BINARY = {
"cpp": cls.CPP_CLIENT_BINARY,
"java": cls.JAVA_CLIENT_BINARY,
"python": cls.PYTHON_CLIENT_BINARY,
}
async def _start_client(self, client_type: str,
config: config_pb2.ClientConfig):
_LOG.info(f"{self._PREFIX} Starting client with config\n{config}")
self._client = await MonitoredSubprocess.create(
[self._CLIENT_BINARY[client_type],
str(self.CLIENT_PORT)], "CLIENT",
str(config).encode('ascii'))
async def _start_server(self, config: config_pb2.ServerConfig):
_LOG.info(f"{self._PREFIX} Starting server with config\n{config}")
self._server = await MonitoredSubprocess.create(
[self.SERVER_BINARY, str(self.SERVER_PORT)], "SERVER",
str(config).encode('ascii'))
async def _start_proxy(self, config: config_pb2.ProxyConfig):
_LOG.info(f"{self._PREFIX} Starting proxy with config\n{config}")
self._proxy = await MonitoredSubprocess.create(
[
self.PROXY_BINARY, "--server-port",
str(self.SERVER_PORT), "--client-port",
str(self.CLIENT_PORT)
],
# Extra space in "PROXY " so that it lines up with "SERVER".
"PROXY ",
str(config).encode('ascii'))
async def _perform_write(self, server_config: config_pb2.ServerConfig,
client_type: str,
client_config: config_pb2.ClientConfig,
proxy_config: config_pb2.ProxyConfig) -> None:
"""Performs a pw_transfer write.
Args:
server_config: Server configuration.
client_type: Either "cpp", "java", or "python".
client_config: Client configuration.
proxy_config: Proxy configuration.
"""
# Timeout for components (server, proxy) to come up or shut down after
# write is finished or a signal is sent. Approximately arbitrary. Should
# not be too long so that we catch bugs in the server that prevent it
# from shutting down.
TIMEOUT = 5 # seconds
try:
await self._start_proxy(proxy_config)
await self._proxy.wait_for_line("stderr",
"Listening for client connection",
TIMEOUT)
await self._start_server(server_config)
await self._server.wait_for_line("stderr",
"Starting pw_rpc server on port",
TIMEOUT)
await self._start_client(client_type, client_config)
# No timeout: the client will only exit once the transfer
# completes, and this can take a long time for large payloads.
await self._client.wait_for_termination(None)
self.assertEqual(self._client.returncode(), 0)
# Wait for the server to exit.
await self._server.wait_for_termination(TIMEOUT)
self.assertEqual(self._server.returncode(), 0)
finally:
# Stop the server, if still running. (Only expected if the
# wait_for above timed out.)
if self._server:
await self._server.terminate_and_wait(TIMEOUT)
# Stop the proxy. Unlike the server, we expect it to still be
# running at this stage.
if self._proxy:
await self._proxy.terminate_and_wait(TIMEOUT)
@staticmethod
def _default_config() -> _TransferConfig:
"""Returns a new transfer config with default options."""
return _TransferConfig(
config_pb2.ServerConfig(
chunk_size_bytes=216,
pending_bytes=32 * 1024,
chunk_timeout_seconds=5,
transfer_service_retries=4,
extend_window_divisor=32,
),
config_pb2.ClientConfig(
max_retries=5,
initial_chunk_timeout_ms=4000,
chunk_timeout_ms=4000,
),
text_format.Parse(
"""
client_filter_stack: [
{ hdlc_packetizer: {} },
{ data_dropper: {rate: 0.01, seed: 1649963713563718435} }
]
server_filter_stack: [
{ hdlc_packetizer: {} },
{ data_dropper: {rate: 0.01, seed: 1649963713563718436} }
]""", config_pb2.ProxyConfig()))
def _do_single_write(self, client_type: str, config: _TransferConfig,
resource_id: int, data: bytes) -> None:
"""Performs a single client-to-server write of the provided data."""
with tempfile.NamedTemporaryFile(
) as f_payload, tempfile.NamedTemporaryFile() as f_server_output:
config.server.resources[resource_id].destination_paths.append(
f_server_output.name)
config.client.transfer_actions.append(
config_pb2.TransferAction(
resource_id=resource_id,
file_path=f_payload.name,
transfer_type=config_pb2.TransferAction.TransferType.
WRITE_TO_SERVER))
f_payload.write(data)
f_payload.flush() # Ensure contents are there to read!
asyncio.run(
self._perform_write(config.server, client_type, config.client,
config.proxy))
self.assertEqual(f_server_output.read(), data)
@parameterized.expand([
("cpp"),
("java"),
("python"),
])
def test_null_byte_client_write(self, client_type):
payload = b"\0"
config = self._default_config()
resource_id = 5
self._do_single_write(client_type, config, resource_id, payload)
@parameterized.expand([
("cpp"),
("java"),
("python"),
])
def test_single_byte_client_write(self, client_type):
payload = b"?"
config = self._default_config()
resource_id = 5
self._do_single_write(client_type, config, resource_id, payload)
@parameterized.expand([
("cpp"),
("java"),
("python"),
])
def test_small_client_write(self, client_type):
payload = b"some data"
config = self._default_config()
resource_id = 5
self._do_single_write(client_type, config, resource_id, payload)
@parameterized.expand([
("cpp"),
("java"),
("python"),
])
def test_medium_client_write(self, client_type):
payload = random.Random(67336391945).randbytes(512)
config = self._default_config()
resource_id = 5
self._do_single_write(client_type, config, resource_id, payload)
@parameterized.expand([
("cpp"),
("java"),
("python"),
])
def test_large_hdlc_escape_client_write(self, client_type):
payload = b"~" * 98731
config = self._default_config()
resource_id = 5
self._do_single_write(client_type, config, resource_id, payload)
@parameterized.expand([
("cpp"),
("java"),
("python"),
])
def test_3mb_write_dropped_data(self, client_type):
server_config = config_pb2.ServerConfig(
chunk_size_bytes=216,
pending_bytes=32 * 1024,
chunk_timeout_seconds=5,
transfer_service_retries=4,
extend_window_divisor=32,
)
client_config = config_pb2.ClientConfig(
max_retries=5,
initial_chunk_timeout_ms=10000,
chunk_timeout_ms=4000,
)
proxy_config = text_format.Parse(
"""
client_filter_stack: [
{ rate_limiter: {rate: 50000} },
{ hdlc_packetizer: {} },
{ data_dropper: {rate: 0.01, seed: 1649963713563718435} }
]
server_filter_stack: [
{ rate_limiter: {rate: 50000} },
{ hdlc_packetizer: {} },
{ data_dropper: {rate: 0.01, seed: 1649963713563718436} }
]""", config_pb2.ProxyConfig())
payload = random.Random(1649963713563718437).randbytes(3 * 1024 * 1024)
resource_id = 12
config = _TransferConfig(server_config, client_config, proxy_config)
self._do_single_write(client_type, config, resource_id, payload)
@parameterized.expand([
("cpp"),
("java"),
("python"),
])
def test_3mb_write_reordered_data(self, client_type):
server_config = config_pb2.ServerConfig(
chunk_size_bytes=216,
pending_bytes=32 * 1024,
chunk_timeout_seconds=5,
transfer_service_retries=4,
extend_window_divisor=32,
)
client_config = config_pb2.ClientConfig(
max_retries=5,
initial_chunk_timeout_ms=10000,
chunk_timeout_ms=4000,
)
proxy_config = text_format.Parse(
"""
client_filter_stack: [
{ rate_limiter: {rate: 50000} },
{ hdlc_packetizer: {} },
{ data_transposer: {rate: 0.005, timeout: 0.5, seed: 1649963713563718435} }
]
server_filter_stack: [
{ rate_limiter: {rate: 50000} },
{ hdlc_packetizer: {} },
{ data_transposer: {rate: 0.005, timeout: 0.5, seed: 1649963713563718435} }
]""", config_pb2.ProxyConfig())
payload = random.Random(1649963713563718437).randbytes(3 * 1024 * 1024)
resource_id = 12
config = _TransferConfig(server_config, client_config, proxy_config)
self._do_single_write(client_type, config, resource_id, payload)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--server-port',
type=int,
help=
'Port of the integration test server. The proxy will forward connections to this port',
)
parser.add_argument(
'--client-port',
type=int,
help=
'Port on which to listen for connections from integration test client.',
)
parser.add_argument(
'--java-client',
type=pathlib.Path,
default=None,
help='Path to the Java transfer client to use in tests',
)
parser.add_argument(
'--cpp-client',
type=pathlib.Path,
default=None,
help='Path to the C++ transfer client to use in tests',
)
parser.add_argument(
'--python-client',
type=pathlib.Path,
default=None,
help='Path to the Python transfer client to use in tests',
)
parser.add_argument(
'--server',
type=pathlib.Path,
default=None,
help='Path to the transfer server to use in tests',
)
parser.add_argument(
'--proxy',
type=pathlib.Path,
default=None,
help=('Path to the proxy binary to use in tests to allow interception '
'of client/server data'),
)
(args, passthrough_args) = parser.parse_known_args()
if args.server_port:
PwTransferIntegrationTest.SERVER_PORT = args.server_port
if args.client_port:
PwTransferIntegrationTest.CLIENT_PORT = args.client_port
PwTransferIntegrationTest.JAVA_CLIENT_BINARY = args.java_client
PwTransferIntegrationTest.CPP_CLIENT_BINARY = args.cpp_client
PwTransferIntegrationTest.PYTHON_CLIENT_BINARY = args.python_client
PwTransferIntegrationTest.SERVER_BINARY = args.server
PwTransferIntegrationTest.PROXY_BINARY = args.proxy
unittest_args = [sys.argv[0]] + passthrough_args
unittest.main(argv=unittest_args)