| # |
| # Copyright (c) 2025 Project CHIP Authors |
| # All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import ipaddress |
| import logging |
| import os |
| import random |
| import shutil |
| import tempfile |
| from typing import Optional |
| |
| import psutil |
| import requests |
| from cryptography import x509 |
| from cryptography.hazmat.primitives import serialization |
| from TC_TLS_Utils import TLSUtils |
| |
| from matter.interaction_model import Status |
| from matter.testing.tasks import Subprocess |
| |
| |
| class PushAvServerProcess(Subprocess): |
| """Class for starting Push AV Server in a subprocess""" |
| |
| # Prefix for log messages from push av server |
| PREFIX = b"[PUSH_AV_SERVER]" |
| |
| # By default this points to the push_av_server in Test Harness |
| # TCs utilizing this should expect th_server_app_path otherwise |
| DEFAULT_SERVER_PATH = "/root/apps/push_av_server/server.py" |
| |
| def __init__( |
| self, |
| server_path: str | None, |
| port: int = 1234, |
| host: str = "0.0.0.0", |
| ): |
| if server_path is None: |
| logging.error(f"No path provided for Push AV Server, using the default path for TH: {self.DEFAULT_SERVER_PATH}") |
| server_path = self.DEFAULT_SERVER_PATH |
| self._working_directory = os.path.join(tempfile.gettempdir(), "pavstest") |
| if os.path.exists(self._working_directory): |
| shutil.rmtree(self._working_directory) |
| os.makedirs(self._working_directory) |
| self.host = host |
| self.port = port |
| self.base_url = f"https://{self.host}:{self.port}" |
| # Build the command list |
| command = ["python3", server_path] |
| command.extend( |
| [ |
| "--host", |
| str(self.host), |
| "--working-directory", |
| self._working_directory, |
| ] |
| ) |
| |
| # Start the server application |
| super().__init__( |
| *command, |
| output_cb=lambda line, is_stderr: self.PREFIX + line, |
| ) |
| |
| def __del__(self): |
| try: |
| if os.path.exists(self._working_directory): |
| shutil.rmtree(self._working_directory) |
| except Exception: |
| pass |
| |
| def _get_json(self, endpoint: str) -> dict: |
| url = f"{self.base_url}{endpoint}" |
| response = requests.get(url, verify=False, timeout=5) |
| response.raise_for_status() |
| return response.json() |
| |
| def _post_json(self, endpoint: str, data: Optional[dict] = None) -> dict: |
| url = f"{self.base_url}{endpoint}" |
| response = requests.post(url, json=data or {}, verify=False, timeout=5) |
| response.raise_for_status() |
| return response.json() |
| |
| def get_root_cert(self) -> bytes: |
| """Retrieve the root certificate in DER format.""" |
| response = self._get_json("/certs/server/root.pem") |
| root_cert_pem = response["cert"]["public_cert"].encode("utf-8") |
| root_cert = x509.load_pem_x509_certificate(root_cert_pem) |
| return root_cert.public_bytes(encoding=serialization.Encoding.DER) |
| |
| def get_device_certificate(self, device_name: str = "DUT") -> bytes: |
| """Retrieve a device certificate in DER format.""" |
| response = self._get_json(f"/certs/device/{device_name}.pem") |
| device_cert_pem = response["cert"]["public_cert"].encode("utf-8") |
| device_cert = x509.load_pem_x509_certificate(device_cert_pem) |
| return device_cert.public_bytes(encoding=serialization.Encoding.DER) |
| |
| def sign_csr(self, csr_der: bytes, device_name: str = "DUT") -> dict: |
| """Submit a CSR for signing and return the server's response.""" |
| csr = x509.load_der_x509_csr(csr_der) |
| csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") |
| return self._post_json(f"/certs/{device_name}/sign", {"csr": csr_pem}) |
| |
| def create_stream(self) -> str: |
| """Request the server to create a new stream.""" |
| response = self._post_json("/streams") |
| return response["stream_id"] |
| |
| def create_key_pair(self) -> None: |
| """ |
| This method is a work around to create keys for camera-app |
| as currently it tries to access from /tmp/pavstest/ |
| """ |
| self._post_json("/certs/dev/keypair") |
| |
| |
| class PAVSTIUtils: |
| """Utils for Push AV TC's TLS requirements.""" |
| |
| # ---------------------------------------------------------------------- |
| # Precondition setup |
| # ---------------------------------------------------------------------- |
| |
| def _get_private_ip(self): |
| candidates = {"192": [], "10": []} |
| |
| for iface, addrs in psutil.net_if_addrs().items(): |
| for addr in addrs: |
| if addr.family.name == 'AF_INET': |
| ip = addr.address |
| ip_obj = ipaddress.ip_address(ip) |
| if ip_obj.is_private: |
| if ip.startswith("192.168."): |
| candidates["192"].append(ip) |
| elif ip.startswith("10."): |
| candidates["10"].append(ip) |
| |
| if candidates["192"]: |
| return candidates["192"][0] |
| if candidates["10"]: |
| return candidates["10"][0] |
| |
| raise RuntimeError("No private IP found, specify using --string-arg host_ip <IPv4>") |
| |
| async def precondition_provision_tls_endpoint( |
| self, endpoint: int, server: PushAvServerProcess, host_ip: str | None = None |
| ) -> int: |
| """Perform provisioning steps to set up TLS endpoint.""" |
| if host_ip is None: |
| # If no host ip specified, try to get private ip |
| # this is mainly required when running TCs in Test Harness |
| logging.error("No host_ip provided in test arguments") |
| host_ip = self._get_private_ip() |
| logging.info(f"Using IP: {host_ip} as hostname to provision TLS Endpoint") |
| |
| tls_utils = TLSUtils(self, endpoint=endpoint) |
| # Create Kep Pair for camera as it currently tries to access it from /tmp/pavstest when uploading. |
| # TODO: Remove once camera-app supports TLS |
| server.create_key_pair() |
| root_cert_der = server.get_root_cert() |
| prc_result = await tls_utils.send_provision_root_command(certificate=root_cert_der) |
| tls_utils.assert_valid_caid(prc_result.caid) |
| |
| csr_nonce = random.randbytes(32) |
| csr_result = await tls_utils.send_csr_command(nonce=csr_nonce) |
| tls_utils.assert_valid_ccdid(csr_result.ccdid) |
| tls_utils.assert_valid_csr(csr_result, csr_nonce) |
| |
| server.sign_csr(csr_result.csr) |
| device_cert_der = server.get_device_certificate() |
| |
| await tls_utils.send_provision_client_command( |
| certificate=device_cert_der, ccdid=csr_result.ccdid |
| ) |
| result = await tls_utils.send_provision_tls_endpoint_command( |
| hostname=host_ip.encode('utf-8'), |
| port=1234, |
| expected_status=Status.Success, |
| caid=prc_result.caid, |
| ccdid=csr_result.ccdid, |
| ) |
| return result.endpointID, host_ip |