[Camera] Add push av server and tls endpoint provisioning support for PAVSTI TCs (#40749)

* Add push av server and tls endpoint provisioning support for push av test cases

Signed-off-by: s-gatti <s.gatti@samsung.com>
Signed-off-by: Charles Kim <chulspro.kim@samsung.com>

* Address review comments and update review comments

Signed-off-by: s-gatti <s.gatti@samsung.com>
Signed-off-by: Charles Kim <chulspro.kim@samsung.com>

* Update PAVSTI Utils

Signed-off-by: Charles Kim <chulspro.kim@samsung.com>

---------

Signed-off-by: s-gatti <s.gatti@samsung.com>
Signed-off-by: Charles Kim <chulspro.kim@samsung.com>
Co-authored-by: s-gatti <s.gatti@samsung.com>
Co-authored-by: marktrayer <m.trayer@samsung.com>
diff --git a/src/python_testing/TC_PAVSTI_1_1.py b/src/python_testing/TC_PAVSTI_1_1.py
index 7ec1f7d..9f9a4d5 100644
--- a/src/python_testing/TC_PAVSTI_1_1.py
+++ b/src/python_testing/TC_PAVSTI_1_1.py
@@ -24,6 +24,7 @@
 #     app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
 #     script-args: >
 #       --storage-path admin_storage.json
+#       --string-arg th_server_app_path:${PUSH_AV_SERVER}
 #       --commissioning-method on-network
 #       --discriminator 1234
 #       --passcode 20202021
@@ -38,6 +39,7 @@
 
 from mobly import asserts
 from TC_AVSMTestBase import AVSMTestBase
+from TC_PAVSTI_Utils import PAVSTIUtils, PushAvServerProcess
 
 import matter.clusters as Clusters
 from matter.clusters import Globals
@@ -46,13 +48,31 @@
 logger = logging.getLogger(__name__)
 
 
-class TC_PAVSTI_1_1(MatterBaseTest, AVSMTestBase):
+class TC_PAVSTI_1_1(MatterBaseTest, AVSMTestBase, PAVSTIUtils):
     def desc_TC_PAVSTI_1_1(self) -> str:
         return "[TC-PAVSTI-1.1] Verify transmission when trigger type is Manual."
 
     def pics_TC_PAVSTI_1_1(self):
         return ["PAVST.S"]
 
+    @async_test_body
+    async def setup_class(self):
+        th_server_app = self.user_params.get("th_server_app_path", None)
+        if th_server_app:
+            self.server = PushAvServerProcess(server_path=th_server_app)
+        else:
+            self.server = PushAvServerProcess()
+        self.server.start(
+            expected_output="Running on https://0.0.0.0:1234",
+            timeout=30,
+        )
+        super().setup_class()
+
+    def teardown_class(self):
+        if self.server is not None:
+            self.server.terminate()
+        super().teardown_class()
+
     def steps_TC_PAVSTI_1_1(self) -> list[TestStep]:
         return [
             TestStep(
@@ -122,6 +142,8 @@
         # Commission DUT - already done
         await self.precondition_one_allocated_video_stream(streamUsage=Globals.Enums.StreamUsageEnum.kRecording)
         await self.precondition_one_allocated_audio_stream(streamUsage=Globals.Enums.StreamUsageEnum.kRecording)
+        tlsEndpointId = await self.precondition_provision_tls_endpoint(endpoint=endpoint, server=self.server)
+        uploadStreamId = self.server.create_stream()
 
         self.step(1)
         currentConnections = await self.read_single_attribute_check_success(
@@ -194,14 +216,14 @@
                     "streamUsage": Globals.Enums.StreamUsageEnum.kRecording,
                     "videoStreamID": videoStreamId,
                     "audioStreamID": audioStreamId,
-                    "endpointID": 1,  # TODO: Revisit TLS arguments once TLSCM cluster is available.
-                    "url": "https://localhost:1234/streams/1",
+                    "endpointID": tlsEndpointId,
+                    "url": f"https://localhost:1234/streams/{uploadStreamId}",
                     "triggerOptions": {"triggerType": pushavCluster.Enums.TransportTriggerTypeEnum.kCommand, "maxPreRollLen": 10},
                     "ingestMethod": pushavCluster.Enums.IngestMethodsEnum.kCMAFIngest,
                     "containerFormat": pushavCluster.Enums.ContainerFormatEnum.kCmaf,
                     "containerOptions": {
                         "containerType": pushavCluster.Enums.ContainerFormatEnum.kCmaf,
-                        "CMAFContainerOptions": {"chunkDuration": 4},
+                        "CMAFContainerOptions": {"CMAFInterface": 0, "segmentDuration": 4000, "chunkDuration": 2000, "sessionGroup": 1, "trackName": "media"},
                     },
                 }
             ),
diff --git a/src/python_testing/TC_PAVSTI_1_2.py b/src/python_testing/TC_PAVSTI_1_2.py
index c3b7c10..d4bc7ca 100644
--- a/src/python_testing/TC_PAVSTI_1_2.py
+++ b/src/python_testing/TC_PAVSTI_1_2.py
@@ -24,6 +24,7 @@
 #     app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
 #     script-args: >
 #       --storage-path admin_storage.json
+#       --string-arg th_server_app_path:${PUSH_AV_SERVER}
 #       --commissioning-method on-network
 #       --discriminator 1234
 #       --passcode 20202021
@@ -38,6 +39,7 @@
 
 from mobly import asserts
 from TC_AVSMTestBase import AVSMTestBase
+from TC_PAVSTI_Utils import PAVSTIUtils, PushAvServerProcess
 
 import matter.clusters as Clusters
 from matter.clusters import Globals
@@ -47,13 +49,31 @@
 logger = logging.getLogger(__name__)
 
 
-class TC_PAVSTI_1_2(MatterBaseTest, AVSMTestBase):
+class TC_PAVSTI_1_2(MatterBaseTest, AVSMTestBase, PAVSTIUtils):
     def desc_TC_PAVSTI_1_2(self) -> str:
         return "[TC-PAVSTI-1.2] Verify transmission with trigger type as Continuous and ensure privacy settings are checked if supported."
 
     def pics_TC_PAVSTI_1_2(self):
         return ["PAVST.S"]
 
+    @async_test_body
+    async def setup_class(self):
+        th_server_app = self.user_params.get("th_server_app_path", None)
+        if th_server_app:
+            self.server = PushAvServerProcess(server_path=th_server_app)
+        else:
+            self.server = PushAvServerProcess()
+        self.server.start(
+            expected_output="Running on https://0.0.0.0:1234",
+            timeout=30,
+        )
+        super().setup_class()
+
+    def teardown_class(self):
+        if self.server is not None:
+            self.server.terminate()
+        super().teardown_class()
+
     def steps_TC_PAVSTI_1_2(self) -> list[TestStep]:
         return [
             TestStep(
@@ -143,6 +163,8 @@
         # Commission DUT - already done
         await self.precondition_one_allocated_video_stream(streamUsage=Globals.Enums.StreamUsageEnum.kRecording)
         await self.precondition_one_allocated_audio_stream(streamUsage=Globals.Enums.StreamUsageEnum.kRecording)
+        tlsEndpointId = await self.precondition_provision_tls_endpoint(endpoint=endpoint, server=self.server)
+        uploadStreamId = self.server.create_stream()
 
         self.step(1)
         currentConnections = await self.read_single_attribute_check_success(
@@ -244,14 +266,14 @@
                     "streamUsage": Globals.Enums.StreamUsageEnum.kRecording,
                     "videoStreamID": videoStreamId,
                     "audioStreamID": audioStreamId,
-                    "endpointID": 1,  # TODO: Revisit TLS arguments once TLSCM cluster is available.
-                    "url": "https://localhost:1234/streams/1",
+                    "endpointID": tlsEndpointId,
+                    "url": f"https://localhost:1234/streams/{uploadStreamId}",
                     "triggerOptions": {"triggerType": pushavCluster.Enums.TransportTriggerTypeEnum.kContinuous},
                     "ingestMethod": pushavCluster.Enums.IngestMethodsEnum.kCMAFIngest,
                     "containerFormat": pushavCluster.Enums.ContainerFormatEnum.kCmaf,
                     "containerOptions": {
                         "containerType": pushavCluster.Enums.ContainerFormatEnum.kCmaf,
-                        "CMAFContainerOptions": {"chunkDuration": 4},
+                        "CMAFContainerOptions": {"CMAFInterface": 0, "segmentDuration": 4000, "chunkDuration": 2000, "sessionGroup": 1, "trackName": "media"},
                     },
                 }
             ),
diff --git a/src/python_testing/TC_PAVSTI_Utils.py b/src/python_testing/TC_PAVSTI_Utils.py
new file mode 100644
index 0000000..7a661ac
--- /dev/null
+++ b/src/python_testing/TC_PAVSTI_Utils.py
@@ -0,0 +1,348 @@
+#
+#    Copyright (c) 2025 Project CHIP Authors
+#    All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+import random
+import tempfile
+from typing import Optional, Union
+
+import requests
+from cryptography import x509
+from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.primitives.asymmetric import ec, utils
+from ecdsa.curves import curve_by_name
+from mobly import asserts
+from pyasn1.codec.der.decoder import decode as der_decoder
+from pyasn1.error import PyAsn1Error
+from pyasn1_modules import rfc2986, rfc5480
+
+import matter.clusters as Clusters
+from matter import ChipDeviceCtrl
+from matter.clusters.Types import Nullable, NullValue
+from matter.interaction_model import InteractionModelError, Status
+from matter.testing.conversions import hex_from_bytes
+from matter.testing.matter_testing import type_matches
+from matter.testing.tasks import Subprocess
+from matter.tlv import TLVWriter, uint
+
+
+class PushAvServerProcess(Subprocess):
+    """Class for starting Push AV Server in a subprocess"""
+
+    # Prefix for log messages from push av server
+    PREFIX = b"[PUSH_AV_SERVER]"
+
+    # By default this points to the push_av_server in Test Harness
+    # TCs utilizing this should expect th_server_app_path otherwise
+    DEFAULT_SERVER_PATH = "/root/apps/push_av_server/server.py"
+
+    def __init__(
+        self,
+        server_path: str = DEFAULT_SERVER_PATH,
+        port: int = 1234,
+        host: str = "0.0.0.0",
+    ):
+        self._working_directory = tempfile.TemporaryDirectory(prefix="pavs-")
+        self.host = host
+        self.port = port
+        self.base_url = f"https://{self.host}:{self.port}"
+        # Build the command list
+        command = ["python3", server_path]
+        command.extend(
+            [
+                "--host",
+                str(self.host),
+                "--working-directory",
+                self._working_directory.name,
+            ]
+        )
+
+        # Start the server application
+        super().__init__(
+            *command,
+            output_cb=lambda line, is_stderr: self.PREFIX + line,
+        )
+
+    def __del__(self):
+        try:
+            self._working_directory.cleanup()
+        except Exception:
+            pass
+
+    def _get_json(self, endpoint: str) -> dict:
+        url = f"{self.base_url}{endpoint}"
+        response = requests.get(url, verify=False, timeout=5)
+        response.raise_for_status()
+        return response.json()
+
+    def _post_json(self, endpoint: str, data: Optional[dict] = None) -> dict:
+        url = f"{self.base_url}{endpoint}"
+        response = requests.post(url, json=data or {}, verify=False, timeout=5)
+        response.raise_for_status()
+        return response.json()
+
+    def get_root_cert(self) -> bytes:
+        """Retrieve the root certificate in DER format."""
+        response = self._get_json("/certs/server/root.pem")
+        root_cert_pem = response["cert"]["public_cert"].encode("utf-8")
+        root_cert = x509.load_pem_x509_certificate(root_cert_pem)
+        return root_cert.public_bytes(encoding=serialization.Encoding.DER)
+
+    def get_device_certificate(self, device_name: str = "DUT") -> bytes:
+        """Retrieve a device certificate in DER format."""
+        response = self._get_json(f"/certs/device/{device_name}.pem")
+        device_cert_pem = response["cert"]["public_cert"].encode("utf-8")
+        device_cert = x509.load_pem_x509_certificate(device_cert_pem)
+        return device_cert.public_bytes(encoding=serialization.Encoding.DER)
+
+    def sign_csr(self, csr_der: bytes, device_name: str = "DUT") -> dict:
+        """Submit a CSR for signing and return the server's response."""
+        csr = x509.load_der_x509_csr(csr_der)
+        csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8")
+        return self._post_json(f"/certs/{device_name}/sign", {"csr": csr_pem})
+
+    def create_stream(self) -> str:
+        """Request the server to create a new stream."""
+        response = self._post_json("/streams")
+        return response["stream_id"]
+
+
+class PAVSTIUtils:
+    """Utils for Push AV TC's TLS requirements."""
+
+    def assert_valid_caid(self, caid: int) -> None:
+        asserts.assert_greater_equal(caid, 0, "Invalid CAID returned")
+        asserts.assert_less_equal(caid, 65534, "Invalid CAID returned")
+
+    def assert_valid_ccdid(self, ccdid: int) -> None:
+        asserts.assert_greater_equal(ccdid, 0, "Invalid CCDID returned")
+        asserts.assert_less_equal(ccdid, 65534, "Invalid CCDID returned")
+
+    def assert_valid_csr(
+        self,
+        response: Clusters.TlsCertificateManagement.Commands.TLSClientCSRResponse,
+        nonce: bytes,
+    ) -> None:
+        try:
+            temp, _ = der_decoder(response.csr, asn1Spec=rfc2986.CertificationRequest())
+        except PyAsn1Error:
+            asserts.fail("Unable to decode CSR - improperly formatted DER")
+        layer1 = dict(temp)
+
+        # Verify public key is 256 bytes
+        csr = x509.load_der_x509_csr(response.csr)
+        csr_pubkey = csr.public_key()
+
+        # Ensure key size is 256 bits
+        asserts.assert_equal(csr_pubkey.key_size, 256, "Incorrect key size")
+
+        # Ensure signature algorithm is ecdsa-with-SHA256
+        signature_algorithm = dict(layer1["signatureAlgorithm"])["algorithm"]
+        asserts.assert_equal(
+            signature_algorithm,
+            rfc5480.ecdsa_with_SHA256,
+            "CSR specifies incorrect signature key algorithm",
+        )
+
+        # Validate signature
+        asserts.assert_true(csr.is_signature_valid, "CSR signature is invalid")
+
+        # Verify response.nonce is octet string of length 32
+        try:
+            # response.nonce is an octet string if it can be converted to an int
+            int(hex_from_bytes(response.nonce), 16)
+        except ValueError:
+            asserts.fail("Returned CSR nonce is not an octet string")
+
+        # Verify response.nonce is valid signature
+        nocsr_elements = dict(
+            [
+                (1, response.csr),
+                (2, nonce),
+            ]
+        )
+        writer = TLVWriter()
+        writer.put(None, nocsr_elements)
+
+        baselen = curve_by_name("NIST256p").baselen
+        signature_raw_r = int(hex_from_bytes(response.nonce[:baselen]), 16)
+        signature_raw_s = int(hex_from_bytes(response.nonce[baselen:]), 16)
+
+        nocsr_signature = utils.encode_dss_signature(signature_raw_r, signature_raw_s)
+        csr_pubkey.verify(
+            signature=nocsr_signature,
+            data=writer.encoding,
+            signature_algorithm=ec.ECDSA(hashes.SHA256()),
+        )
+
+    # ----------------------------------------------------------------------
+    # Command helpers
+    # ----------------------------------------------------------------------
+
+    async def send_provision_root_command(
+        self, endpoint: int, certificate: bytes, expected_status: Status = Status.Success
+    ) -> Union[
+        Clusters.TlsCertificateManagement.Commands.ProvisionRootCertificateResponse,
+        InteractionModelError,
+    ]:
+        try:
+            result = await self.send_single_cmd(
+                cmd=Clusters.TlsCertificateManagement.Commands.ProvisionRootCertificate(
+                    certificate=certificate
+                ),
+                endpoint=endpoint,
+                payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD,
+            )
+
+            asserts.assert_true(
+                type_matches(
+                    result,
+                    Clusters.TlsCertificateManagement.Commands.ProvisionRootCertificateResponse,
+                ),
+                "Unexpected return type for ProvisionRootCertificate",
+            )
+            return result
+        except InteractionModelError as e:
+            asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
+            return e
+
+    async def send_remove_root_command(
+        self, endpoint: int, caid: int, expected_status: Status = Status.Success
+    ) -> InteractionModelError:
+        try:
+            result = await self.send_single_cmd(
+                cmd=Clusters.TlsCertificateManagement.Commands.RemoveRootCertificate(
+                    caid=caid
+                ),
+                endpoint=endpoint,
+                payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD,
+            )
+            return result
+        except InteractionModelError as e:
+            asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
+            return e
+
+    async def send_csr_command(
+        self, endpoint: int, nonce: bytes, expected_status: Status = Status.Success
+    ) -> Union[
+        Clusters.TlsCertificateManagement.Commands.TLSClientCSRResponse,
+        InteractionModelError,
+    ]:
+        try:
+            result = await self.send_single_cmd(
+                cmd=Clusters.TlsCertificateManagement.Commands.TLSClientCSR(
+                    nonce=nonce
+                ),
+                endpoint=endpoint,
+                payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD,
+            )
+
+            asserts.assert_true(
+                type_matches(
+                    result,
+                    Clusters.TlsCertificateManagement.Commands.TLSClientCSRResponse,
+                ),
+                "Unexpected return type for TLSClientCSR",
+            )
+            return result
+        except InteractionModelError as e:
+            asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
+            return e
+
+    async def send_provision_client_command(
+        self, endpoint: int, certificate: bytes, ccdid: int, expected_status: Status = Status.Success
+    ) -> InteractionModelError:
+        try:
+            result = await self.send_single_cmd(
+                cmd=Clusters.TlsCertificateManagement.Commands.ProvisionClientCertificate(
+                    ccdid=ccdid,
+                    clientCertificateDetails=Clusters.TlsCertificateManagement.Structs.TLSClientCertificateDetailStruct(
+                        clientCertificate=certificate
+                    ),
+                ),
+                endpoint=endpoint,
+                payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD,
+            )
+            return result
+        except InteractionModelError as e:
+            asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
+            return e
+
+    async def send_provision_tls_endpoint_command(
+        self,
+        endpoint: int,
+        hostname: bytes,
+        port: uint,
+        caid: uint,
+        ccdid: Union[Nullable, uint] = NullValue,
+        expected_status: Status = Status.Success,
+    ) -> Union[
+        Clusters.TlsClientManagement.Commands.ProvisionEndpointResponse,
+        InteractionModelError,
+    ]:
+        try:
+            result = await self.send_single_cmd(
+                cmd=Clusters.TlsClientManagement.Commands.ProvisionEndpoint(
+                    hostname=hostname, port=port, caid=caid, ccdid=ccdid
+                ),
+                endpoint=endpoint,
+                payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD,
+            )
+
+            asserts.assert_true(
+                type_matches(
+                    result,
+                    Clusters.TlsClientManagement.Commands.ProvisionEndpointResponse,
+                ),
+                "Unexpected return type for ProvisionEndpoint",
+            )
+            return result
+        except InteractionModelError as e:
+            asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
+            return e
+
+    # ----------------------------------------------------------------------
+    # Precondition setup
+    # ----------------------------------------------------------------------
+
+    async def precondition_provision_tls_endpoint(
+        self, endpoint: int, server: PushAvServerProcess
+    ) -> int:
+        """Perform provisioning steps to set up TLS endpoint."""
+        root_cert_der = server.get_root_cert()
+        prc_result = await self.send_provision_root_command(endpoint=endpoint, certificate=root_cert_der)
+        self.assert_valid_caid(prc_result.caid)
+
+        csr_nonce = random.randbytes(32)
+        csr_result = await self.send_csr_command(endpoint=endpoint, nonce=csr_nonce)
+        self.assert_valid_ccdid(csr_result.ccdid)
+        self.assert_valid_csr(csr_result, csr_nonce)
+
+        server.sign_csr(csr_result.csr)
+        device_cert_der = server.get_device_certificate()
+
+        await self.send_provision_client_command(
+            endpoint=endpoint, certificate=device_cert_der, ccdid=csr_result.ccdid
+        )
+        result = await self.send_provision_tls_endpoint_command(
+            endpoint=endpoint,
+            hostname=b"localhost",
+            port=1234,
+            expected_status=Status.Success,
+            caid=prc_result.caid,
+            ccdid=csr_result.ccdid,
+        )
+        return result.endpointID
diff --git a/src/python_testing/requirements.txt b/src/python_testing/requirements.txt
index 6c3f9d4..715089b 100644
--- a/src/python_testing/requirements.txt
+++ b/src/python_testing/requirements.txt
@@ -2,4 +2,5 @@
 pyasn1
 pyasn1_modules
 zeroconf
-langcodes
\ No newline at end of file
+langcodes
+requests
\ No newline at end of file
diff --git a/src/python_testing/test_metadata.yaml b/src/python_testing/test_metadata.yaml
index e61ca66..0c0e24d 100644
--- a/src/python_testing/test_metadata.yaml
+++ b/src/python_testing/test_metadata.yaml
@@ -171,6 +171,8 @@
       reason:
           Depends on TLS Certificate Management Cluster. Will be enabled once
           local tests are fully verified.
+    - name: TC_PAVSTI_Utils.py
+      reason: Helper methods for Push AV Integration TCs
 
 # This is a list of slow tests (just arbitrarily picked around 20 seconds)
 # used in some script reporting for "be patient" messages as well as potentially