blob: b401f88dd8b231871ca676ae747e3981791ee1e7 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import asyncio
import logging
import os
import shutil
import sys
from pathlib import Path
from capture import EcosystemController, EcosystemFactory, PacketCaptureRunner, PlatformFactory
from capture.file_utils import border_print, create_file_timestamp, safe_mkdir
from discovery import MatterBleScanner, MatterDnssdListener
logging.basicConfig(
format='%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s]\n%(message)s \n',
level=logging.INFO)
class InteropDebuggingTool:
def __init__(self) -> None:
self.artifact_dir = None
create_artifact_dir = True
if len(sys.argv) == 1:
create_artifact_dir = False
elif sys.argv[1] != "capture" and sys.argv[1] != "discover":
create_artifact_dir = False
elif len(sys.argv) >= 3 and (sys.argv[2] == "-h" or sys.argv[2] == "--help"):
create_artifact_dir = False
if not os.environ['IDT_OUTPUT_DIR']:
print('Missing required env vars! Use /scripts!!!')
sys.exit(1)
self.artifact_dir_parent = os.path.join(
Path(__file__).resolve().parent,
os.environ['IDT_OUTPUT_DIR'])
artifact_timestamp = create_file_timestamp()
self.artifact_dir = os.path.join(
self.artifact_dir_parent,
f'idt_{artifact_timestamp}')
if create_artifact_dir:
safe_mkdir(self.artifact_dir)
border_print(f"Using artifact dir {self.artifact_dir}")
self.available_platforms = PlatformFactory.list_available_platforms()
self.available_platforms_default = 'Android' if 'Android' in self.available_platforms else None
self.platform_required = self.available_platforms_default is None
self.available_ecosystems = EcosystemFactory.list_available_ecosystems()
self.available_ecosystems_default = 'ALL'
self.available_ecosystems.append(self.available_ecosystems_default)
net_interface_path = "/sys/class/net/"
self.available_net_interfaces = os.listdir(net_interface_path) \
if os.path.exists(net_interface_path) \
else []
self.available_net_interfaces.append("any")
self.available_net_interfaces_default = "any"
self.pcap_artifact_dir = os.path.join(self.artifact_dir, "pcap")
self.net_interface_required = self.available_net_interfaces_default is None
self.ble_artifact_dir = os.path.join(self.artifact_dir, "ble")
self.dnssd_artifact_dir = os.path.join(self.artifact_dir, "dnssd")
self.process_args()
def process_args(self) -> None:
parser = argparse.ArgumentParser(
prog="idt",
description="Interop Debugging Tool for Matter")
subparsers = parser.add_subparsers(title="subcommands")
discover_parser = subparsers.add_parser(
"discover", help="Discover all Matter devices")
discover_parser.set_defaults(func=self.command_discover)
discover_parser.add_argument(
"--type",
"-t",
help="Specify the type of discovery to execute",
required=True,
choices=[
"ble",
"b",
"dnssd",
"d"])
capture_parser = subparsers.add_parser(
"capture",
help="Capture all information of interest while running a manual test")
platform_help = "Run capture for a particular platform"
if self.available_platforms_default:
platform_help += f" (default {self.available_platforms_default})"
capture_parser.add_argument("--platform",
"-p",
help=platform_help,
required=self.platform_required,
choices=self.available_platforms,
default=self.available_platforms_default)
capture_parser.add_argument(
"--ecosystem",
"-e",
help="Run capture for a particular ecosystem or ALL ecosystems (default ALL)",
required=False,
choices=self.available_ecosystems,
default=self.available_ecosystems_default)
capture_parser.add_argument("--pcap",
"-c",
help="Run packet capture (default t)",
required=False,
choices=['t', 'f'],
default='t')
interface_help = "Specify packet capture interface"
if self.available_net_interfaces_default:
interface_help += f" (default {self.available_net_interfaces_default})"
capture_parser.add_argument(
"--interface",
"-i",
help=interface_help,
required=self.net_interface_required,
choices=self.available_net_interfaces,
default=self.available_net_interfaces_default)
capture_parser.set_defaults(func=self.command_capture)
args, unknown = parser.parse_known_args()
if not hasattr(args, 'func'):
parser.print_help()
else:
args.func(args)
def command_discover(self, args: argparse.Namespace) -> None:
if args.type[0] == "b":
safe_mkdir(self.ble_artifact_dir)
MatterBleScanner(self.ble_artifact_dir).browse_interactive()
else:
safe_mkdir(self.dnssd_artifact_dir)
MatterDnssdListener(self.dnssd_artifact_dir).browse_interactive()
def zip_artifacts(self) -> None:
zip_basename = os.path.basename(self.artifact_dir)
archive_file = shutil.make_archive(zip_basename,
'zip',
root_dir=self.artifact_dir)
output_zip = shutil.move(archive_file, self.artifact_dir_parent)
print(f'Output zip: {output_zip}')
def command_capture(self, args: argparse.Namespace) -> None:
pcap = args.pcap == 't'
pcap_runner = None if not pcap else PacketCaptureRunner(
self.pcap_artifact_dir, args.interface)
if pcap:
border_print("Starting pcap")
safe_mkdir(self.pcap_artifact_dir)
pcap_runner.start_pcap()
asyncio.run(EcosystemFactory.init_ecosystems(args.platform,
args.ecosystem,
self.artifact_dir))
asyncio.run(EcosystemController.start())
border_print("Press enter twice to stop streaming", important=True)
input("")
if pcap:
border_print("Stopping pcap")
pcap_runner.stop_pcap()
asyncio.run(EcosystemController.stop())
asyncio.run(EcosystemController.analyze())
border_print("Compressing artifacts, this may take some time!")
self.zip_artifacts()