blob: 8eab53b8ec0d0373eef274f94d89d0f1868b5bd2 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import os
from typing import TYPE_CHECKING
from utils.artifact import create_standard_log_name, log, safe_mkdir
from utils.host_platform import is_mac
from utils.shell import Bash
from ... import config
from ..base import AndroidStream
if TYPE_CHECKING:
from capture.platform.android import Android
logger = log.get_logger(__file__)
class AndroidPcap(AndroidStream):
def __init__(self, platform: "Android"):
self.logger = logger
self.platform = platform
self.target_dir = "/sdcard/Download"
self.pcap_artifact = create_standard_log_name("android_tcpdump", "pcap", parent=self.platform.artifact_dir)
self.pcap_phone_out_path = f"{self.target_dir}/{os.path.basename(self.pcap_artifact)}"
self.pcap_phone_bin_location = "tcpdump" if platform.capabilities.c_has_tcpdump \
else f"{self.target_dir}/tcpdump"
self.pcap_command = f"shell {self.pcap_phone_bin_location} -w {self.pcap_phone_out_path}"
self.pcap_proc = platform.get_adb_background_command(self.pcap_command)
self.pcap_pull = False
self.pcap_pull_command = f"pull {self.pcap_phone_out_path} {self.pcap_artifact}"
self.build_dir = os.path.join(os.path.dirname(__file__), "BUILD")
async def pull_packet_capture(self) -> None:
if self.pcap_pull:
self.logger.info("Attempting to pull android pcap")
await asyncio.sleep(3)
self.platform.run_adb_command(self.pcap_pull_command)
self.pcap_pull = False
async def start(self):
if not self.platform.capabilities.c_has_root:
self.logger.warning("Phone is not rooted, cannot take pcap!")
return
if self.platform.capabilities.c_has_tcpdump:
self.logger.info("tcpdump already available; using!")
self.pcap_proc.start_command()
self.pcap_pull = True
return
if not config.enable_build_push_tcpdump:
self.logger.critical("Android TCP Dump build and push disabled in configs!")
return
if not os.path.exists(os.path.join(self.build_dir, "tcpdump")):
self.logger.warning("tcpdump bin not found, attempting to build, please wait a few moments!")
safe_mkdir(self.build_dir)
if is_mac():
build_script = os.path.join(os.path.dirname(__file__), "mac_build_tcpdump_64.sh")
Bash(f"{build_script} 2>&1 >> BUILD_LOG.txt", sync=True, cwd=self.build_dir).start_command()
else:
build_script = os.path.join(os.path.dirname(__file__), "linux_build_tcpdump_64.sh")
Bash(f"{build_script} 2>&1 >> BUILD_LOG.txt", sync=True, cwd=self.build_dir).start_command()
else:
self.logger.warning("Reusing existing tcpdump build")
if not self.platform.run_adb_command(f"shell ls {self.target_dir}/tcpdump").finished_success():
self.logger.warning("Pushing tcpdump to device")
self.platform.run_adb_command(f"push {os.path.join(self.build_dir, 'tcpdump')} f{self.target_dir}")
self.platform.run_adb_command(f"chmod +x {self.target_dir}/tcpdump")
else:
self.logger.info("tcpdump already in the expected location, not pushing!")
self.logger.info("Starting Android pcap command")
self.pcap_proc.start_command()
self.pcap_pull = True
async def run_observer(self) -> None:
while True:
# TODO: Implement, need to restart w/ new out file (no append) and keep pull manifest, much like `screen`
await asyncio.sleep(120)
async def stop(self):
self.logger.info("Stopping android pcap proc")
self.pcap_proc.stop_command()
await self.pull_packet_capture()