blob: fa5873d30780fb6f3e782e427bf978102da2a049 [file] [log] [blame]
# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Tests for pw_cli.process."""
import unittest
import sys
import textwrap
from pw_cli import process
import psutil # type: ignore
# This timeout must be long enough to wait for the subprocess output, but
# fast enough that the test doesn't take terribly long in the success case.
FAST_TIMEOUT_SECONDS = 0.5
KILL_SIGNALS = set({-9, 137})
PYTHON = sys.executable
class RunTest(unittest.TestCase):
"""Tests for process.run."""
def test_returns_output(self) -> None:
echo_str = 'foobar'
print_str = f'print("{echo_str}")'
result = process.run(PYTHON, '-c', print_str)
self.assertEqual(result.output, b'foobar\n')
def test_timeout_kills_process(self) -> None:
sleep_100_seconds = 'import time; time.sleep(100)'
result = process.run(
PYTHON, '-c', sleep_100_seconds, timeout=FAST_TIMEOUT_SECONDS
)
self.assertIn(result.returncode, KILL_SIGNALS)
def test_timeout_kills_subprocess(self) -> None:
# Spawn a subprocess which prints its pid and then waits for 100
# seconds.
sleep_in_subprocess = textwrap.dedent(
f"""
import subprocess
import time
child = subprocess.Popen(
['{PYTHON}', '-c', 'import time; print("booh"); time.sleep(100)']
)
print(child.pid, flush=True)
time.sleep(100)
"""
)
result = process.run(
PYTHON, '-c', sleep_in_subprocess, timeout=FAST_TIMEOUT_SECONDS
)
self.assertIn(result.returncode, KILL_SIGNALS)
# The first line of the output is the PID of the child sleep process.
child_pid_str, sep, remainder = result.output.partition(b'\n')
del sep, remainder
child_pid = int(child_pid_str)
self.assertFalse(psutil.pid_exists(child_pid))
if __name__ == '__main__':
unittest.main()