blob: 9ddb1ddd90c37fd11d73fb9101d324e183568390 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import asyncio
import os
import shutil
import sys
from pathlib import Path
import probe.runner as probe_runner
from capture import PacketCaptureRunner, controller
from discovery import MatterBleScanner, MatterDnssdListener
from utils.artifact import create_file_timestamp, safe_mkdir
from utils.host_platform import get_available_interfaces, verify_host_dependencies
from utils.log import border_print
import config
splash = '''\x1b[0m
\x1b[32;1m┌────────┐\x1b[33;20m▪\x1b[32;1m \x1b[34;1m┌──────┐ \x1b[33;20m• \x1b[35;1m┌──────────┐ \x1b[33;20m●
\x1b[32;1m│░░░░░░░░│ \x1b[34;1m│░░░░░░└┐ \x1b[33;20m゚\x1b[35;1m│░░░░░░░░░░│
\x1b[32;1m└──┐░░┌──┘\x1b[33;20m۰\x1b[32;1m \x1b[34;1m│░░┌┐░░░│ \x1b[35;1m└───┐░░┌───┘
\x1b[32;1m │░░│ \x1b[34;1m│░░│└┐░░│\x1b[33;20m▫ \x1b[35;1m \x1b[33;20m۰\x1b[35;1m │░░│ \x1b[33;20m。
\x1b[32;1m \x1b[33;20m•\x1b[32;1m │░░│ \x1b[33;20m● \x1b[34;1m│░░│┌┘░░│ \x1b[35;1m │░░│
\x1b[32;1m┌──┘░░└──┐ \x1b[34;1m│░░└┘░░░│ \x1b[35;1m │░░│ \x1b[33;20m•
\x1b[32;1m│░░░░░░░░│ \x1b[34;1m│░░░░░░┌┘\x1b[33;20m۰ \x1b[35;1m \x1b[33;20m▪\x1b[35;1m │░░│
\x1b[32;1m└────────┘\x1b[33;20m•\x1b[32;1m \x1b[34;1m└──────┘\x1b[33;20m。 \x1b[35;1m └──┘ \x1b[33;20m▫
\x1b[32;1m✰ Interop\x1b[34;1m ✰ Debugging\x1b[35;1m ✰ Tool
\x1b[0m'''
class InteropDebuggingTool:
def __init__(self) -> None:
if config.enable_color:
print(splash)
self.artifact_dir = None
create_artifact_dir = True
if len(sys.argv) == 1:
create_artifact_dir = False
elif sys.argv[1] != "capture" and sys.argv[1] != "discover":
create_artifact_dir = False
elif len(sys.argv) >= 3 and (sys.argv[2] == "-h" or sys.argv[2] == "--help"):
create_artifact_dir = False
verify_host_dependencies(["adb", "tcpdump"])
if not os.environ['IDT_OUTPUT_DIR']:
print('Missing required env vars! Use /scripts!!!')
sys.exit(1)
self.artifact_dir_parent = os.path.join(
Path(__file__).resolve().parent,
os.environ['IDT_OUTPUT_DIR'])
artifact_timestamp = create_file_timestamp()
self.artifact_dir = os.path.join(
self.artifact_dir_parent,
f'idt_{artifact_timestamp}')
if create_artifact_dir:
safe_mkdir(self.artifact_dir)
border_print(f"Using artifact dir {self.artifact_dir}")
self.available_platforms = controller.list_available_platforms()
self.available_platforms_default = 'Android' if 'Android' in self.available_platforms else None
self.platform_required = self.available_platforms_default is None
self.available_ecosystems = controller.list_available_ecosystems()
self.available_ecosystems_default = 'ALL'
self.available_ecosystems.append(self.available_ecosystems_default)
self.available_net_interfaces = get_available_interfaces()
self.available_net_interfaces_default = "any" if "any" in self.available_net_interfaces else None
self.pcap_artifact_dir = os.path.join(self.artifact_dir, "pcap")
self.net_interface_required = self.available_net_interfaces_default is None
self.ble_artifact_dir = os.path.join(self.artifact_dir, "ble")
self.dnssd_artifact_dir = os.path.join(self.artifact_dir, "dnssd")
self.prober_dir = os.path.join(self.artifact_dir, "probes")
self.process_args()
def process_args(self) -> None:
parser = argparse.ArgumentParser(
prog="idt",
description="Interop Debugging Tool for Matter")
subparsers = parser.add_subparsers(title="subcommands")
discover_parser = subparsers.add_parser(
"discover", help="Discover all Matter devices")
discover_parser.set_defaults(func=self.command_discover)
discover_parser.add_argument(
"--type",
"-t",
help="Specify the type of discovery to execute",
required=True,
choices=[
"ble",
"b",
"dnssd",
"d"])
capture_parser = subparsers.add_parser(
"capture",
help="Capture all information of interest while running a manual test")
platform_help = "Run capture for a particular platform"
if self.available_platforms_default:
platform_help += f" (default {self.available_platforms_default})"
capture_parser.add_argument("--platform",
"-p",
help=platform_help,
required=self.platform_required,
choices=self.available_platforms,
default=self.available_platforms_default)
capture_parser.add_argument(
"--ecosystem",
"-e",
help="Run capture for a particular ecosystem or ALL ecosystems (default ALL)",
required=False,
choices=self.available_ecosystems,
default=self.available_ecosystems_default)
capture_parser.add_argument("--pcap",
"-c",
help="Run packet capture (default t)",
required=False,
choices=['t', 'f'],
default='t')
interface_help = "Specify packet capture interface"
if self.available_net_interfaces_default:
interface_help += f" (default {self.available_net_interfaces_default})"
capture_parser.add_argument(
"--interface",
"-i",
help=interface_help,
required=self.net_interface_required,
choices=self.available_net_interfaces,
default=self.available_net_interfaces_default)
capture_parser.set_defaults(func=self.command_capture)
prober_parser = subparsers.add_parser("probe",
help="Probe the environment for Matter and general networking info")
prober_parser.set_defaults(func=self.command_probe)
args, unknown = parser.parse_known_args()
if not hasattr(args, 'func'):
parser.print_help()
else:
args.func(args)
def command_discover(self, args: argparse.Namespace) -> None:
if args.type[0] == "b":
safe_mkdir(self.ble_artifact_dir)
scanner = MatterBleScanner(self.ble_artifact_dir)
asyncio.run(scanner.browse_interactive())
self.zip_artifacts()
else:
safe_mkdir(self.dnssd_artifact_dir)
MatterDnssdListener(self.dnssd_artifact_dir).browse_interactive()
self.zip_artifacts()
def zip_artifacts(self) -> None:
zip_basename = os.path.basename(self.artifact_dir)
archive_file = shutil.make_archive(zip_basename,
'zip',
root_dir=self.artifact_dir)
output_zip = shutil.move(archive_file, self.artifact_dir_parent)
border_print(f'Output zip: {output_zip}')
def command_capture(self, args: argparse.Namespace) -> None:
pcap = args.pcap == 't'
pcap_runner = None if not pcap else PacketCaptureRunner(
self.pcap_artifact_dir, args.interface)
if pcap:
border_print("Starting pcap")
safe_mkdir(self.pcap_artifact_dir)
pcap_runner.start_pcap()
asyncio.run(controller.init_ecosystems(args.platform,
args.ecosystem,
self.artifact_dir))
asyncio.run(controller.start())
asyncio.run(controller.run_analyzers())
if pcap:
border_print("Stopping pcap")
pcap_runner.stop_pcap()
asyncio.run(controller.stop())
asyncio.run(controller.probe())
border_print("Checking error report")
controller.write_error_report(self.artifact_dir)
border_print("Compressing artifacts...")
self.zip_artifacts()
def command_probe(self, args: argparse.Namespace) -> None:
border_print("Starting generic Matter prober for local environment!")
safe_mkdir(self.dnssd_artifact_dir)
safe_mkdir(self.prober_dir)
probe_runner.run_probes(self.prober_dir, self.dnssd_artifact_dir)
self.zip_artifacts()