scripts: runner: add network port helper class
Some of the flashing scripts try to be clever about picking unused
ports. That's convenient for the user, so add a helper class to
runner.core to accomplish similar ends portably.
Signed-off-by: Marti Bolivar <marti.bolivar@linaro.org>
diff --git a/scripts/support/runner/core.py b/scripts/support/runner/core.py
index e451d76..ddf4d60 100644
--- a/scripts/support/runner/core.py
+++ b/scripts/support/runner/core.py
@@ -70,6 +70,73 @@
return fmt.format(*args)
+MAX_PORT = 49151
+
+
+class NetworkPortHelper:
+ '''Helper class for dealing with local IP network ports.'''
+
+ def get_unused_ports(self, starting_from):
+ '''Find unused network ports, starting at given values.
+
+ starting_from is an iterable of ports the caller would like to use.
+
+ The return value is an iterable of ports, in the same order, using
+ the given values if they were unused, or the next sequentially
+ available unused port otherwise.
+
+ Ports may be bound between this call's check and actual usage, so
+ callers still need to handle errors involving returned ports.'''
+ start = list(starting_from)
+ used = self._used_now()
+ ret = []
+
+ for desired in start:
+ port = desired
+ while port in used:
+ port += 1
+ if port > MAX_PORT:
+ msg = "ports above {} are in use"
+ raise ValueError(msg.format(desired))
+ used.add(port)
+ ret.append(port)
+
+ return ret
+
+ def _used_now(self):
+ handlers = {
+ 'Windows': self._used_now_windows,
+ 'Linux': self._used_now_linux,
+ 'Darwin': self._used_now_darwin,
+ }
+ handler = handlers[platform.system()]
+ return handler()
+
+ def _used_now_windows(self):
+ cmd = ['netstat', '-a', '-n', '-p', 'tcp']
+ return self._parser_windows(cmd)
+
+ def _used_now_linux(self):
+ cmd = ['netstat', '-a', '-n', '-t']
+ return self._parser_linux_darwin(cmd)
+
+ def _used_now_darwin(self):
+ cmd = ['netstat', '-a', '-n', '-p', 'tcp']
+ return self._parser_linux_darwin(cmd)
+
+ def _parser_windows(self, cmd):
+ out = subprocess.check_output(cmd).split(b'\r\n')
+ used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out
+ if x.startswith(b' TCP')]
+ return {int(b) for b in used_bytes}
+
+ def _parser_linux_darwin(self, cmd):
+ out = subprocess.check_output(cmd).split(b'\n')
+ used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out
+ if x.startswith(b'tcp')]
+ return {int(b) for b in used_bytes}
+
+
class ZephyrBinaryRunner(abc.ABC):
'''Abstract superclass for binary runners (flashers, debuggers).