Implementations for Push AV Cluster Python Test Scripts with multiple TH (#40159)
* Implement Push AV TCs with multiple Test Harness.
- Add TCs for TC-PAVST-2.4, TC-PAVST-2.5, TC-PAVST-2.6, TC-PAVST-2.7, TC-PAVST-2.8
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Restyled by autopep8
* Create new controller and clean up.
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Restyled by isort
* Resolve style and LINT issues.
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Fix commands as per latest code.
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Restyled by isort
* Exclude test scripts from CI till push av code is merged.
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Fix LINT errors.
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Fix LINT errors.
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Restyled by autopep8
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Added TestBase file containing common api.
Co-Author: Sambhavi <sambhavi.1@samsung.com>
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Restyled by autopep8
* Fix LINT error
Co-author: Sambhavi <sambhavi.1@samsung.com>
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Remove hardcoded value and use @run_if_endpoint_matches at start
Co-author: Sambhavi <sambhavi.1@samsung.com>
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Added test step with TH2 controller
Co-author: Sambhavi <sambhavi.1@samsung.com>
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Restyled by autopep8
* Restyled by isort
* Fix Lint Error
Co-author: Sambhavi <sambhavi.1@samsung.com>
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Restyled by autopep8
* Exclude TC_PAVSTTestBase.py from CI.
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Restyled by prettier-yaml
* Fix Build fail
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Minor step change in TC7
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Path change for clip record to /tmp
Co-author: Sambhavi <sambhavi.1@samsung.com>
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
* Added uninitialized variable bug fix and Disable CI for 2_4 TC
* disable TCs for CI till push av server server logic issue is fixed.
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
---------
Signed-off-by: Raveendra Karu <r.karu@samsung.com>
Co-authored-by: Restyled.io <commits@restyled.io>
diff --git a/examples/camera-app/linux/src/clusters/push-av-stream-transport/push-av-stream-manager.cpp b/examples/camera-app/linux/src/clusters/push-av-stream-transport/push-av-stream-manager.cpp
index 2eeada9..6ed73ae 100644
--- a/examples/camera-app/linux/src/clusters/push-av-stream-transport/push-av-stream-manager.cpp
+++ b/examples/camera-app/linux/src/clusters/push-av-stream-transport/push-av-stream-manager.cpp
@@ -152,7 +152,6 @@
ChipLogError(Camera, "PushAvStreamTransportManager, failed to find Connection :[%u]", connectionID);
return Status::NotFound;
}
- mTransportMap[connectionID].reset();
mMediaController->UnregisterTransport(mTransportMap[connectionID].get());
mTransportMap.erase(connectionID);
mTransportOptionsMap.erase(connectionID);
diff --git a/examples/camera-app/linux/src/pushav-clip-recorder.cpp b/examples/camera-app/linux/src/pushav-clip-recorder.cpp
index 342e143..afd70d8 100644
--- a/examples/camera-app/linux/src/pushav-clip-recorder.cpp
+++ b/examples/camera-app/linux/src/pushav-clip-recorder.cpp
@@ -44,7 +44,11 @@
mClipInfo(aClipInfo),
mAudioInfo(aAudioInfo), mVideoInfo(aVideoInfo), mUploader(aUploader)
{
-
+ mFormatContext = nullptr;
+ mInputFormatContext = nullptr;
+ mVideoStream = nullptr;
+ mAudioStream = nullptr;
+ mAudioEncoderContext = nullptr;
mVideoInfo.mVideoPts = 0;
mVideoInfo.mVideoDts = 0;
mAudioInfo.mAudioPts = 0;
@@ -52,6 +56,13 @@
int streamIndex = 0;
mMetadataSet = false;
mDeinitializeRecorder = false;
+ mUploadedInitSegment = false;
+ mUploadMPD = false;
+ mAudioFragment = 1;
+ mVideoFragment = 1;
+ mCurrentClipStartPts = AV_NOPTS_VALUE;
+ mFoundFirstIFramePts = -1;
+ currentPts = AV_NOPTS_VALUE;
if (mClipInfo.mHasVideo)
{
mVideoInfo.mVideoStreamIndex = streamIndex++;
@@ -73,9 +84,9 @@
PushAVClipRecorder::~PushAVClipRecorder()
{
+ Stop();
if (mWorkerThread.joinable())
{
- Stop();
mWorkerThread.join();
}
}
@@ -229,6 +240,7 @@
if (GetRecorderStatus())
{
SetRecorderStatus(false);
+ mCondition.notify_one();
while (!mVideoQueue.empty())
{
av_packet_free(&mVideoQueue.front());
@@ -348,7 +360,13 @@
while (GetRecorderStatus())
{
std::unique_lock<std::mutex> lock(mQueueMutex);
- mCondition.wait(lock, [this] { return !mVideoQueue.empty() || !mAudioQueue.empty(); });
+ mCondition.wait(
+ lock, [this] { return !mVideoQueue.empty() || !mAudioQueue.empty() || !GetRecorderStatus() || mDeinitializeRecorder; });
+ if (!GetRecorderStatus() || mDeinitializeRecorder)
+ {
+ ChipLogProgress(Camera, "Recorder thread received stop signal for ID: %s", mClipInfo.mRecorderId.c_str());
+ break; // Exit loop
+ }
ProcessBuffersAndWrite();
}
diff --git a/examples/camera-app/linux/src/pushav-transport/pushav-transport.cpp b/examples/camera-app/linux/src/pushav-transport/pushav-transport.cpp
index 3af4c55..fefed37 100644
--- a/examples/camera-app/linux/src/pushav-transport/pushav-transport.cpp
+++ b/examples/camera-app/linux/src/pushav-transport/pushav-transport.cpp
@@ -132,7 +132,7 @@
mTransportTriggerType = transportOptions.triggerOptions.triggerType;
clipInfo.mClipId = 0;
- clipInfo.mOutputPath = "./clips/";
+ clipInfo.mOutputPath = "/tmp";
clipInfo.mInputTimeBase = { 1, 1000000 };
uint8_t audioCodec = static_cast<uint8_t>(mAudioStreamParams.audioCodec);
diff --git a/src/python_testing/TC_PAVSTTestBase.py b/src/python_testing/TC_PAVSTTestBase.py
new file mode 100644
index 0000000..688f8fc
--- /dev/null
+++ b/src/python_testing/TC_PAVSTTestBase.py
@@ -0,0 +1,387 @@
+#
+# Copyright (c) 2025 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import random
+
+from mobly import asserts
+
+import matter.clusters as Clusters
+from matter import ChipDeviceCtrl
+from matter.interaction_model import InteractionModelError, Status
+
+logger = logging.getLogger(__name__)
+
+
+class PAVSTTestBase:
+ async def read_pavst_attribute_expect_success(self, endpoint, attribute):
+ cluster = Clusters.Objects.PushAvStreamTransport
+ return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute)
+
+ async def allocate_one_audio_stream(self):
+ endpoint = self.get_endpoint(default=1)
+ cluster = Clusters.CameraAvStreamManagement
+ attr = Clusters.CameraAvStreamManagement.Attributes
+ commands = Clusters.CameraAvStreamManagement.Commands
+
+ # First verify that ADO is supported
+ aFeatureMap = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attr.FeatureMap)
+ logger.info(f"Rx'd FeatureMap: {aFeatureMap}")
+ adoSupport = aFeatureMap & cluster.Bitmaps.Feature.kAudio
+ asserts.assert_equal(adoSupport, cluster.Bitmaps.Feature.kAudio, "Audio Feature is not supported.")
+
+ # Check if audio stream has already been allocated
+ aAllocatedAudioStreams = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedAudioStreams
+ )
+ logger.info(f"Rx'd AllocatedAudioStreams: {aAllocatedAudioStreams}")
+ if len(aAllocatedAudioStreams) > 0:
+ return aAllocatedAudioStreams[0].audioStreamID
+
+ # Allocate one for the test steps based on SnapshotCapabilities
+ aMicrophoneCapabilities = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.MicrophoneCapabilities
+ )
+ logger.info(f"Rx'd MicrophoneCapabilities: {aMicrophoneCapabilities}")
+ aStreamUsagePriorities = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.StreamUsagePriorities
+ )
+ logger.info(f"Rx'd StreamUsagePriorities : {aStreamUsagePriorities}")
+ asserts.assert_greater(len(aStreamUsagePriorities), 0, "StreamUsagePriorities is empty")
+
+ try:
+ adoStreamAllocateCmd = commands.AudioStreamAllocate(
+ streamUsage=aStreamUsagePriorities[0],
+ audioCodec=aMicrophoneCapabilities.supportedCodecs[0],
+ channelCount=aMicrophoneCapabilities.maxNumberOfChannels,
+ sampleRate=aMicrophoneCapabilities.supportedSampleRates[0],
+ bitRate=1024,
+ bitDepth=aMicrophoneCapabilities.supportedBitDepths[0],
+ )
+ audioStreamAllocateResponse = await self.send_single_cmd(endpoint=endpoint, cmd=adoStreamAllocateCmd)
+ logger.info(f"Rx'd AudioStreamAllocateResponse: {audioStreamAllocateResponse}")
+ asserts.assert_is_not_none(
+ audioStreamAllocateResponse.audioStreamID, "AudioStreamAllocateResponse does not contain StreamID"
+ )
+
+ return [audioStreamAllocateResponse.audioStreamID]
+ except InteractionModelError as e:
+ asserts.assert_equal(e.status, Status.Success, "Unexpected error returned")
+ pass
+
+ async def allocate_one_video_stream(self):
+ endpoint = self.get_endpoint(default=1)
+ cluster = Clusters.CameraAvStreamManagement
+ attr = Clusters.CameraAvStreamManagement.Attributes
+ commands = Clusters.CameraAvStreamManagement.Commands
+
+ # First verify that VDO is supported
+ aFeatureMap = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attr.FeatureMap)
+ logger.info(f"Rx'd FeatureMap: {aFeatureMap}")
+ vdoSupport = aFeatureMap & cluster.Bitmaps.Feature.kVideo
+ asserts.assert_equal(vdoSupport, cluster.Bitmaps.Feature.kVideo, "Video Feature is not supported.")
+
+ # Check if video stream has already been allocated
+ aAllocatedVideoStreams = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedVideoStreams
+ )
+ logger.info(f"Rx'd AllocatedVideoStreams: {aAllocatedVideoStreams}")
+ if len(aAllocatedVideoStreams) > 0:
+ return aAllocatedVideoStreams[0].videoStreamID
+
+ # Allocate one for the test steps
+ aStreamUsagePriorities = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.StreamUsagePriorities
+ )
+ logger.info(f"Rx'd StreamUsagePriorities: {aStreamUsagePriorities}")
+ aRateDistortionTradeOffPoints = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.RateDistortionTradeOffPoints
+ )
+ logger.info(f"Rx'd RateDistortionTradeOffPoints: {aRateDistortionTradeOffPoints}")
+ aMinViewport = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.MinViewportResolution
+ )
+ logger.info(f"Rx'd MinViewport: {aMinViewport}")
+ aVideoSensorParams = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.VideoSensorParams
+ )
+ logger.info(f"Rx'd VideoSensorParams: {aVideoSensorParams}")
+ aMaxEncodedPixelRate = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.MaxEncodedPixelRate
+ )
+ logger.info(f"Rx'd MaxEncodedPixelRate: {aMaxEncodedPixelRate}")
+
+ # Check for Watermark and OSD features
+ watermark = True if (aFeatureMap & cluster.Bitmaps.Feature.kWatermark) != 0 else None
+ osd = True if (aFeatureMap & cluster.Bitmaps.Feature.kOnScreenDisplay) != 0 else None
+
+ try:
+ asserts.assert_greater(len(aStreamUsagePriorities), 0, "StreamUsagePriorities is empty")
+ asserts.assert_greater(len(aRateDistortionTradeOffPoints), 0, "RateDistortionTradeOffPoints is empty")
+ videoStreamAllocateCmd = commands.VideoStreamAllocate(
+ streamUsage=aStreamUsagePriorities[0],
+ videoCodec=aRateDistortionTradeOffPoints[0].codec,
+ minFrameRate=30, # An acceptable value for min frame rate
+ maxFrameRate=aVideoSensorParams.maxFPS,
+ minResolution=aMinViewport,
+ maxResolution=cluster.Structs.VideoResolutionStruct(
+ width=aVideoSensorParams.sensorWidth, height=aVideoSensorParams.sensorHeight
+ ),
+ minBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
+ maxBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
+ keyFrameInterval=4000,
+ watermarkEnabled=watermark,
+ OSDEnabled=osd
+ )
+ videoStreamAllocateResponse = await self.send_single_cmd(endpoint=endpoint, cmd=videoStreamAllocateCmd)
+ logger.info(f"Rx'd VideoStreamAllocateResponse: {videoStreamAllocateResponse}")
+ asserts.assert_is_not_none(
+ videoStreamAllocateResponse.videoStreamID, "VideoStreamAllocateResponse does not contain StreamID"
+ )
+
+ return [videoStreamAllocateResponse.videoStreamID]
+ except InteractionModelError as e:
+ asserts.assert_equal(e.status, Status.Success, "Unexpected error returned")
+ pass
+
+ async def validate_allocated_video_stream(self, videoStreamID):
+ endpoint = self.get_endpoint(default=1)
+ cluster = Clusters.CameraAvStreamManagement
+ attr = Clusters.CameraAvStreamManagement.Attributes
+
+ # Make sure the DUT allocated sterams as requested
+ aAllocatedVideoStreams = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedVideoStreams
+ )
+
+ if not any(stream.videoStreamID == videoStreamID for stream in aAllocatedVideoStreams):
+ asserts.fail(f"Video Stream with ID {videoStreamID} not found as expected")
+
+ async def validate_allocated_audio_stream(self, audioStreamID):
+ endpoint = self.get_endpoint(default=1)
+ cluster = Clusters.CameraAvStreamManagement
+ attr = Clusters.CameraAvStreamManagement.Attributes
+
+ # Make sure the DUT allocated sterams as requested
+ aAllocatedAudioStreams = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedAudioStreams
+ )
+
+ if not any(stream.audioStreamID == audioStreamID for stream in aAllocatedAudioStreams):
+ asserts.fail(f"Audio Stream with ID {audioStreamID} not found as expected")
+
+ async def allocate_one_pushav_transport(self, endpoint, triggerType=Clusters.PushAvStreamTransport.Enums.TransportTriggerTypeEnum.kContinuous,
+ trigger_Options=None):
+ endpoint = self.get_endpoint(default=1)
+ cluster = Clusters.PushAvStreamTransport
+
+ # First verify that ADO is supported
+ aFeatureMap = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=Clusters.CameraAvStreamManagement, attribute=Clusters.CameraAvStreamManagement.Attributes.FeatureMap)
+ logger.info(f"Rx'd FeatureMap: {aFeatureMap}")
+ adoSupport = aFeatureMap & Clusters.CameraAvStreamManagement.Bitmaps.Feature.kAudio
+ asserts.assert_equal(adoSupport, Clusters.CameraAvStreamManagement.Bitmaps.Feature.kAudio,
+ "Audio Feature is not supported.")
+
+ # Check if audio stream has already been allocated
+ aAllocatedAudioStream = await self.allocate_one_audio_stream()
+ logger.info(f"Rx'd AllocatedAudioStream: {aAllocatedAudioStream}")
+
+ # Check if video stream has already been allocated
+ aAllocatedVideoStream = await self.allocate_one_video_stream()
+ logger.info(f"Rx'd AllocatedVideoStream: {aAllocatedVideoStream}")
+
+ aStreamUsagePriorities = await self.read_single_attribute_check_success(
+ endpoint=endpoint, cluster=Clusters.CameraAvStreamManagement, attribute=Clusters.CameraAvStreamManagement.Attributes.StreamUsagePriorities
+ )
+ asserts.assert_greater(len(aStreamUsagePriorities), 0, "StreamUsagePriorities is empty")
+
+ triggerOptions = {"triggerType": triggerType}
+ if (trigger_Options is not None):
+ triggerOptions = trigger_Options
+
+ try:
+ await self.send_single_cmd(
+ cmd=cluster.Commands.AllocatePushTransport(
+ {
+ "streamUsage": aStreamUsagePriorities[0],
+ "videoStreamID": aAllocatedVideoStream,
+ "audioStreamID": aAllocatedAudioStream,
+ "endpointID": endpoint,
+ "url": "https://localhost:1234/streams/1",
+ "triggerOptions": triggerOptions,
+ "ingestMethod": cluster.Enums.IngestMethodsEnum.kCMAFIngest,
+ "containerOptions": {
+ "containerType": cluster.Enums.ContainerFormatEnum.kCmaf,
+ "CMAFContainerOptions": {"CMAFInterface": cluster.Enums.CMAFInterfaceEnum.kInterface1, "chunkDuration": 4, "segmentDuration": 3,
+ "sessionGroup": 3, "trackName": ""},
+ },
+ "expiryTime": 5,
+ }
+ ),
+ endpoint=endpoint,
+ )
+ return Status.Success
+ except InteractionModelError as e:
+ asserts.assert_not_equal(e.status, Status.Success, "Unexpected error returned")
+ pass
+
+ async def check_and_delete_all_push_av_transports(self, endpoint, attribute):
+ pvcluster = Clusters.PushAvStreamTransport
+
+ transportConfigs = await self.read_pavst_attribute_expect_success(
+ endpoint,
+ attribute.CurrentConnections,
+ )
+
+ for config in transportConfigs:
+ if config.connectionID != 0:
+ try:
+ await self.send_single_cmd(
+ cmd=pvcluster.Commands.DeallocatePushTransport(
+ connectionID=config.connectionID
+ ),
+ endpoint=endpoint,
+ )
+ except InteractionModelError as e:
+ asserts.assert_true(
+ e.status == Status.Success, "Unexpected error returned"
+ )
+ pass
+
+ return Status.Success
+
+ async def psvt_modify_push_transport(self, cmd, devCtrl=None):
+ endpoint = self.get_endpoint(default=1)
+ dev_ctrl = self.default_controller
+ if (devCtrl is not None):
+ dev_ctrl = devCtrl
+ try:
+ await self.send_single_cmd(cmd=cmd, endpoint=endpoint, dev_ctrl=dev_ctrl)
+ return Status.Success
+ except InteractionModelError as e:
+ asserts.assert_true(
+ e.status == Status.NotFound, "Unexpected error returned"
+ )
+ return e.status
+ pass
+
+ async def psvt_deallocate_push_transport(self, cmd, devCtrl=None):
+ endpoint = self.get_endpoint(default=1)
+ dev_ctrl = self.default_controller
+ if (devCtrl is not None):
+ dev_ctrl = devCtrl
+ try:
+ await self.send_single_cmd(cmd=cmd, endpoint=endpoint, dev_ctrl=dev_ctrl)
+ return Status.Success
+ except InteractionModelError as e:
+ asserts.assert_true(
+ e.status == Status.NotFound, "Unexpected error returned"
+ )
+ return e.status
+ pass
+
+ async def psvt_set_transport_status(self, cmd, devCtrl=None):
+ endpoint = self.get_endpoint(default=1)
+ dev_ctrl = self.default_controller
+ if (devCtrl is not None):
+ dev_ctrl = devCtrl
+ try:
+ await self.send_single_cmd(cmd=cmd, endpoint=endpoint, dev_ctrl=dev_ctrl)
+ return Status.Success
+ except InteractionModelError as e:
+ asserts.assert_true(
+ e.status == Status.NotFound, "Unexpected error returned"
+ )
+ return e.status
+ pass
+
+ async def psvt_find_transport(self, cmd, expected_connectionID=None, devCtrl=None):
+ endpoint = self.get_endpoint(default=1)
+ dev_ctrl = self.default_controller
+ if (devCtrl is not None):
+ dev_ctrl = devCtrl
+ try:
+ status = await self.send_single_cmd(cmd=cmd, endpoint=endpoint, dev_ctrl=dev_ctrl)
+ asserts.assert_equal(
+ status.transportConfigurations[0].connectionID, expected_connectionID, "Unexpected connection ID returned"
+ )
+ return Status.Success
+ except InteractionModelError as e:
+ asserts.assert_true(
+ e.status == Status.NotFound, "Unexpected error returned"
+ )
+ return e.status
+ pass
+
+ async def psvt_manually_trigger_transport(self, cmd, expected_cluster_status=None, devCtrl=None):
+ endpoint = self.get_endpoint(default=1)
+ dev_ctrl = self.default_controller
+ if (devCtrl is not None):
+ dev_ctrl = devCtrl
+ try:
+ await self.send_single_cmd(cmd=cmd, endpoint=endpoint, dev_ctrl=dev_ctrl)
+ return Status.Success
+ except InteractionModelError as e:
+ if (expected_cluster_status is not None):
+ asserts.assert_true(
+ e.clusterStatus == expected_cluster_status, "Unexpected error returned"
+ )
+ return e.clusterStatus
+ else:
+ asserts.assert_true(
+ e.status == Status.NotFound, "Unexpected error returned"
+ )
+ return e.status
+ pass
+
+ async def psvt_create_test_harness_controller(self):
+ self.th1 = self.default_controller
+ self.discriminator = random.randint(0, 4095)
+ params = await self.th1.OpenCommissioningWindow(
+ nodeid=self.dut_node_id, timeout=900, iteration=10000, discriminator=self.discriminator, option=1)
+
+ th2_certificate_authority = (
+ self.certificate_authority_manager.NewCertificateAuthority()
+ )
+ th2_fabric_admin = th2_certificate_authority.NewFabricAdmin(
+ vendorId=0xFFF1, fabricId=self.th1.fabricId + 1
+ )
+
+ self.th2 = th2_fabric_admin.NewController(
+ nodeId=2, useTestCommissioner=True)
+
+ setupPinCode = params.setupPinCode
+
+ await self.th2.CommissionOnNetwork(
+ nodeId=self.dut_node_id, setupPinCode=setupPinCode,
+ filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)
+
+ return self.th2
+
+ async def read_currentfabricindex(self, th: ChipDeviceCtrl) -> int:
+ cluster = Clusters.Objects.OperationalCredentials
+ attribute = Clusters.OperationalCredentials.Attributes.CurrentFabricIndex
+ current_fabric_index = await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute)
+ return current_fabric_index
+
+ async def psvt_remove_current_fabric(self, devCtrl):
+ fabric_idx_cr2_2 = await self.read_currentfabricindex(th=devCtrl)
+ removeFabricCmd2 = Clusters.OperationalCredentials.Commands.RemoveFabric(fabric_idx_cr2_2)
+ resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd2)
+ return resp
+ asserts.assert_equal(
+ resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk, "Expected removal of TH2's fabric to succeed")
diff --git a/src/python_testing/TC_PAVST_2_4.py b/src/python_testing/TC_PAVST_2_4.py
new file mode 100644
index 0000000..e3eec6a
--- /dev/null
+++ b/src/python_testing/TC_PAVST_2_4.py
@@ -0,0 +1,202 @@
+#
+# Copyright (c) 2025 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs:
+# run1:
+# app: ${CAMERA_APP}
+# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
+# script-args: >
+# --storage-path admin_storage.json
+# --commissioning-method on-network
+# --discriminator 1234
+# --passcode 20202021
+# --PICS src/app/tests/suites/certification/ci-pics-values
+# --trace-to json:${TRACE_TEST_JSON}.json
+# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
+# --endpoint 1
+# factory-reset: true
+# quiet: true
+# === END CI TEST ARGUMENTS ===
+
+import logging
+
+from mobly import asserts
+from TC_PAVSTTestBase import PAVSTTestBase
+
+import matter.clusters as Clusters
+from matter.interaction_model import Status
+from matter.testing.matter_testing import MatterBaseTest, TestStep, default_matter_test_main, has_cluster, run_if_endpoint_matches
+
+logger = logging.getLogger(__name__)
+
+
+class TC_PAVST_2_4(MatterBaseTest, PAVSTTestBase):
+ def desc_TC_PAVST_2_4(self) -> str:
+ return "[TC-PAVST-2.4] Attributes with Server as DUT"
+
+ def pics_TC_PAVST_2_4(self):
+ return ["PAVST.S"]
+
+ def steps_TC_PAVST_2_4(self) -> list[TestStep]:
+ return [
+ TestStep(
+ 1,
+ "TH1 executes step 1-5 of TC-PAVST-2.3 to allocate a PushAV transport.",
+ "Verify successful completion of all steps.",
+ ),
+ TestStep(
+ 2,
+ "TH1 Reads CurrentConnections attribute from PushAV Stream Transport Cluster on DUT over a large-payload session.",
+ "Verify the number of PushAV Connections in the list is 1. Store the TransportOptions and ConnectionID in the corresponding TransportConfiguration as aTransportOptions and aConnectionID.",
+ ),
+ TestStep(
+ 3,
+ "TH1 sends the ModifyPushTransport command with ConnectionID != aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 4,
+ "TH2 sends the ModifyPushTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 5,
+ "TH1 sends the ModifyPushTransport command with ConnectionID != aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 6,
+ "TH1 Reads CurrentConnections attribute from PushAV Stream Transport Cluster on DUT over a large-payload session.",
+ "Verify the number of PushAV Connections in the list is 1. Store the TransportOptions and ConnectionID in the corresponding TransportConfiguration as aTransportOptions and aConnectionID.",
+ )
+ ]
+
+ @run_if_endpoint_matches(has_cluster(Clusters.PushAvStreamTransport))
+ async def test_TC_PAVST_2_4(self):
+ endpoint = self.get_endpoint(default=1)
+ pvcluster = Clusters.PushAvStreamTransport
+ pvattr = Clusters.PushAvStreamTransport.Attributes
+ aAllocatedVideoStreams = []
+ aAllocatedAudioStreams = []
+
+ aTransportOptions = ""
+ aConnectionID = ""
+
+ self.step(1)
+ # Commission DUT - already done
+ status = await self.check_and_delete_all_push_av_transports(endpoint, pvattr)
+ asserts.assert_equal(
+ status, Status.Success, "Status must be SUCCESS!"
+ )
+
+ aAllocatedVideoStreams = await self.allocate_one_video_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedVideoStreams),
+ 1,
+ "AllocatedVideoStreams must not be empty",
+ )
+
+ aAllocatedAudioStreams = await self.allocate_one_audio_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedAudioStreams),
+ 1,
+ "AllocatedAudioStreams must not be empty",
+ )
+
+ status = await self.allocate_one_pushav_transport(endpoint)
+ asserts.assert_equal(
+ status, Status.Success, "Push AV Transport should be allocated successfully"
+ )
+
+ self.step(2)
+ transportConfigs = await self.read_pavst_attribute_expect_success(endpoint,
+ pvattr.CurrentConnections,
+ )
+ asserts.assert_greater_equal(
+ len(transportConfigs), 1, "TransportConfigurations must not be empty!"
+ )
+ aTransportOptions = transportConfigs[0].transportOptions
+ aConnectionID = transportConfigs[0].connectionID
+
+ # TH1 sends command
+ self.step(3)
+ all_connectionID = [tc.connectionID for tc in transportConfigs]
+ max_connectionID = max(all_connectionID)
+ cmd = pvcluster.Commands.ModifyPushTransport(
+ connectionID=max_connectionID + 1,
+ transportOptions=aTransportOptions
+ )
+ status = await self.psvt_modify_push_transport(cmd)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ # TH2 sends command
+ self.step(4)
+ th2 = await self.psvt_create_test_harness_controller()
+
+ cmd = pvcluster.Commands.ModifyPushTransport(
+ connectionID=aConnectionID,
+ transportOptions=aTransportOptions
+ )
+ status = await self.psvt_modify_push_transport(cmd, devCtrl=th2)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ resp = await self.psvt_remove_current_fabric(th2)
+ asserts.assert_equal(
+ resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk, "Expected removal of TH2's fabric to succeed")
+
+ self.step(5)
+ aModifiedTransportOptions = aTransportOptions.expiryTime
+ aModifiedTransportOptions = aModifiedTransportOptions + 120
+ aTransportOptions.expiryTime = aModifiedTransportOptions
+ cmd = pvcluster.Commands.ModifyPushTransport(
+ connectionID=aConnectionID,
+ transportOptions=aTransportOptions,
+ )
+ status = await self.psvt_modify_push_transport(cmd)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+ self.step(6)
+ transportConfigs = await self.read_pavst_attribute_expect_success(
+ endpoint, pvattr.CurrentConnections
+ )
+ asserts.assert_greater_equal(
+ len(transportConfigs), 1, "TransportConfigurations must not be empty!"
+ )
+ result = (
+ transportConfigs[0].transportOptions.expiryTime
+ == aModifiedTransportOptions
+ and transportConfigs[0].connectionID == aConnectionID
+ )
+ asserts.assert_true(
+ result,
+ "ConnectionID or ExpiryTime should match as per the modified transport options"
+ )
+
+
+if __name__ == "__main__":
+ default_matter_test_main()
diff --git a/src/python_testing/TC_PAVST_2_5.py b/src/python_testing/TC_PAVST_2_5.py
new file mode 100644
index 0000000..60e6c93
--- /dev/null
+++ b/src/python_testing/TC_PAVST_2_5.py
@@ -0,0 +1,185 @@
+#
+# Copyright (c) 2025 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs:
+# run1:
+# app: ${CAMERA_APP}
+# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
+# script-args: >
+# --storage-path admin_storage.json
+# --commissioning-method on-network
+# --discriminator 1234
+# --passcode 20202021
+# --PICS src/app/tests/suites/certification/ci-pics-values
+# --trace-to json:${TRACE_TEST_JSON}.json
+# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
+# --endpoint 1
+# factory-reset: true
+# quiet: true
+# === END CI TEST ARGUMENTS ===
+
+import logging
+
+from mobly import asserts
+from TC_PAVSTTestBase import PAVSTTestBase
+
+import matter.clusters as Clusters
+from matter.interaction_model import Status
+from matter.testing.matter_testing import MatterBaseTest, TestStep, default_matter_test_main, has_cluster, run_if_endpoint_matches
+
+logger = logging.getLogger(__name__)
+
+
+class TC_PAVST_2_5(MatterBaseTest, PAVSTTestBase):
+ def desc_TC_PAVST_2_5(self) -> str:
+ return "[TC-PAVST-2.5] Attributes with Server as DUT"
+
+ def pics_TC_PAVST_2_5(self):
+ return ["PAVST.S"]
+
+ def steps_TC_PAVST_2_5(self) -> list[TestStep]:
+ return [
+ TestStep(
+ 1,
+ "TH1 executes step 1-5 of TC-PAVST-2.3 to allocate a PushAV transport.",
+ "Verify successful completion of all steps.",
+ ),
+ TestStep(
+ 2,
+ "TH1 Reads CurrentConnections attribute from PushAV Stream Transport Cluster on DUT over a large-payload session",
+ "Verify the number of PushAV Connections in the list is 1. Store the TransportStatus and ConnectionID in the corresponding TransportConfiguration as aTransportStatus and aConnectionID.",
+ ),
+ TestStep(
+ 3,
+ "TH1 sends the DeallocatePushTransport command with ConnectionID != aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 4,
+ "TH2 sends the DeallocatePushTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 5,
+ "TH1 sends the DeallocatePushTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with SUCCESS status code.",
+ ),
+ TestStep(
+ 6,
+ "TH1 Reads CurrentConnections attribute from PushAV Stream Transport Cluster on DUT.",
+ "Verify the number of PushAV Connections is 0.",
+ ),
+ ]
+
+ @run_if_endpoint_matches(has_cluster(Clusters.PushAvStreamTransport))
+ async def test_TC_PAVST_2_5(self):
+ endpoint = self.get_endpoint(default=1)
+ pvcluster = Clusters.PushAvStreamTransport
+ pvattr = Clusters.PushAvStreamTransport.Attributes
+ aAllocatedVideoStreams = []
+ aAllocatedAudioStreams = []
+
+ aConnectionID = ""
+
+ self.step(1)
+ # Commission DUT - already done
+ status = await self.check_and_delete_all_push_av_transports(endpoint, pvattr)
+ asserts.assert_equal(
+ status, Status.Success, "Status must be SUCCESS!"
+ )
+
+ aAllocatedVideoStreams = await self.allocate_one_video_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedVideoStreams),
+ 1,
+ "AllocatedVideoStreams must not be empty",
+ )
+
+ aAllocatedAudioStreams = await self.allocate_one_audio_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedAudioStreams),
+ 1,
+ "AllocatedAudioStreams must not be empty",
+ )
+
+ status = await self.allocate_one_pushav_transport(endpoint)
+ asserts.assert_equal(
+ status, Status.Success, "Push AV Transport should be allocated successfully"
+ )
+
+ self.step(2)
+ transportConfigs = await self.read_pavst_attribute_expect_success(endpoint,
+ pvattr.CurrentConnections,
+ )
+ asserts.assert_greater_equal(
+ len(transportConfigs), 1, "TransportConfigurations must not be empty!"
+ )
+ aConnectionID = transportConfigs[0].connectionID
+
+ # TH1 sends command
+ self.step(3)
+ all_connectionID = [tc.connectionID for tc in transportConfigs]
+ max_connectionID = max(all_connectionID)
+ cmd = pvcluster.Commands.DeallocatePushTransport(
+ connectionID=max_connectionID + 1
+ )
+ status = await self.psvt_deallocate_push_transport(cmd)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ # TH2 sends command
+ self.step(4)
+ # Establishing TH2 controller
+ th2 = await self.psvt_create_test_harness_controller()
+ cmd = pvcluster.Commands.DeallocatePushTransport(
+ connectionID=aConnectionID,
+ )
+ status = await self.psvt_deallocate_push_transport(cmd, devCtrl=th2)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ resp = await self.psvt_remove_current_fabric(th2)
+ asserts.assert_equal(
+ resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk, "Expected removal of TH2's fabric to succeed")
+
+ self.step(5)
+ cmd = pvcluster.Commands.DeallocatePushTransport(
+ connectionID=aConnectionID
+ )
+ status = await self.psvt_deallocate_push_transport(cmd)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+ self.step(6)
+ transportConfigs = await self.read_pavst_attribute_expect_success(
+ endpoint, pvattr.CurrentConnections
+ )
+ asserts.assert_equal(
+ len(transportConfigs), 0, "TransportConfigurations must not be empty!"
+ )
+
+
+if __name__ == "__main__":
+ default_matter_test_main()
diff --git a/src/python_testing/TC_PAVST_2_6.py b/src/python_testing/TC_PAVST_2_6.py
new file mode 100644
index 0000000..05a187f
--- /dev/null
+++ b/src/python_testing/TC_PAVST_2_6.py
@@ -0,0 +1,195 @@
+#
+# Copyright (c) 2025 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs:
+# run1:
+# app: ${CAMERA_APP}
+# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
+# script-args: >
+# --storage-path admin_storage.json
+# --commissioning-method on-network
+# --discriminator 1234
+# --passcode 20202021
+# --PICS src/app/tests/suites/certification/ci-pics-values
+# --trace-to json:${TRACE_TEST_JSON}.json
+# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
+# --endpoint 1
+# factory-reset: true
+# quiet: true
+# === END CI TEST ARGUMENTS ===
+
+import logging
+
+from mobly import asserts
+from TC_PAVSTTestBase import PAVSTTestBase
+
+import matter.clusters as Clusters
+from matter.interaction_model import Status
+from matter.testing.matter_testing import MatterBaseTest, TestStep, default_matter_test_main, has_cluster, run_if_endpoint_matches
+
+logger = logging.getLogger(__name__)
+
+
+class TC_PAVST_2_6(MatterBaseTest, PAVSTTestBase):
+ def desc_TC_PAVST_2_6(self) -> str:
+ return "[TC-PAVST-2.5] Attributes with Server as DUT"
+
+ def pics_TC_PAVST_2_6(self):
+ return ["PAVST.S"]
+
+ def steps_TC_PAVST_2_6(self) -> list[TestStep]:
+ return [
+ TestStep(
+ 1,
+ "TH1 executes step 1-5 of TC-PAVST-2.3 to allocate a PushAV transport.",
+ "Verify successful completion of all steps.",
+ ),
+ TestStep(
+ 2,
+ "TH1 Reads CurrentConnections attribute from PushAV Stream Transport Cluster on DUT over a large-payload session",
+ "Verify the number of PushAV Connections in the list is 1. Store the TransportStatus and ConnectionID in the corresponding TransportConfiguration as aTransportStatus and aConnectionID.",
+ ),
+ TestStep(
+ 3,
+ "TH1 sends the DeallocatePushTransport command with ConnectionID != aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 4,
+ "TH2 sends the DeallocatePushTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 5,
+ "TH1 sends the DeallocatePushTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with SUCCESS status code.",
+ ),
+ TestStep(
+ 6,
+ "TH1 Reads CurrentConnections attribute from PushAV Stream Transport Cluster on DUT.",
+ "Verify the number of PushAV Connections is 0.",
+ )
+ ]
+
+ @run_if_endpoint_matches(has_cluster(Clusters.PushAvStreamTransport))
+ async def test_TC_PAVST_2_6(self):
+ endpoint = self.get_endpoint(default=1)
+ pvcluster = Clusters.PushAvStreamTransport
+ pvattr = Clusters.PushAvStreamTransport.Attributes
+ aAllocatedVideoStreams = []
+ aAllocatedAudioStreams = []
+
+ aConnectionID = ""
+ aTransportStatus = ""
+
+ self.step(1)
+ # Commission DUT - already done
+ status = await self.check_and_delete_all_push_av_transports(endpoint, pvattr)
+ asserts.assert_equal(
+ status, Status.Success, "Status must be SUCCESS!"
+ )
+
+ aAllocatedVideoStreams = await self.allocate_one_video_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedVideoStreams),
+ 1,
+ "AllocatedVideoStreams must not be empty",
+ )
+
+ aAllocatedAudioStreams = await self.allocate_one_audio_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedAudioStreams),
+ 1,
+ "AllocatedAudioStreams must not be empty",
+ )
+
+ status = await self.allocate_one_pushav_transport(endpoint)
+ asserts.assert_equal(
+ status, Status.Success, "Push AV Transport should be allocated successfully"
+ )
+
+ self.step(2)
+ transportConfigs = await self.read_pavst_attribute_expect_success(endpoint,
+ pvattr.CurrentConnections,
+ )
+ asserts.assert_greater_equal(
+ len(transportConfigs), 1, "TransportConfigurations must not be empty!"
+ )
+ aConnectionID = transportConfigs[0].connectionID
+ aTransportStatus = transportConfigs[0].transportStatus
+
+ # TH1 sends command
+ self.step(3)
+ all_connectionID = [tc.connectionID for tc in transportConfigs]
+ max_connectionID = max(all_connectionID)
+ cmd = pvcluster.Commands.SetTransportStatus(
+ connectionID=max_connectionID + 1,
+ transportStatus=aTransportStatus
+ )
+ status = await self.psvt_set_transport_status(cmd)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ # TH2 sends command
+ self.step(4)
+ # Establishing TH2 controller
+ th2 = await self.psvt_create_test_harness_controller()
+ cmd = pvcluster.Commands.SetTransportStatus(
+ connectionID=aConnectionID,
+ transportStatus=aTransportStatus
+ )
+ status = await self.psvt_set_transport_status(cmd, devCtrl=th2)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ resp = await self.psvt_remove_current_fabric(th2)
+ asserts.assert_equal(
+ resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk, "Expected removal of TH2's fabric to succeed")
+
+ self.step(5)
+ cmd = pvcluster.Commands.SetTransportStatus(
+ connectionID=aConnectionID,
+ transportStatus=not aTransportStatus
+ )
+ status = await self.psvt_set_transport_status(cmd)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+ self.step(6)
+ transportConfigs = await self.read_pavst_attribute_expect_success(
+ endpoint, pvattr.CurrentConnections
+ )
+ asserts.assert_greater_equal(
+ len(transportConfigs), 1, "TransportConfigurations must be 1!"
+ )
+ asserts.assert_true(
+ transportConfigs[0].transportStatus
+ == (not aTransportStatus),
+ "Transport Status must be same the modified one",
+ )
+
+
+if __name__ == "__main__":
+ default_matter_test_main()
diff --git a/src/python_testing/TC_PAVST_2_7.py b/src/python_testing/TC_PAVST_2_7.py
new file mode 100644
index 0000000..315e45f
--- /dev/null
+++ b/src/python_testing/TC_PAVST_2_7.py
@@ -0,0 +1,300 @@
+#
+# Copyright (c) 2025 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs:
+# run1:
+# app: ${CAMERA_APP}
+# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
+# script-args: >
+# --storage-path admin_storage.json
+# --commissioning-method on-network
+# --discriminator 1234
+# --passcode 20202021
+# --PICS src/app/tests/suites/certification/ci-pics-values
+# --trace-to json:${TRACE_TEST_JSON}.json
+# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
+# --endpoint 1
+# factory-reset: true
+# quiet: true
+# === END CI TEST ARGUMENTS ===
+
+import logging
+
+from mobly import asserts
+from TC_PAVSTTestBase import PAVSTTestBase
+
+import matter.clusters as Clusters
+from matter.interaction_model import Status
+from matter.testing.matter_testing import MatterBaseTest, TestStep, default_matter_test_main, has_cluster, run_if_endpoint_matches
+
+logger = logging.getLogger(__name__)
+
+
+class TC_PAVST_2_7(MatterBaseTest, PAVSTTestBase):
+ def TC_PAVST_2_7(self) -> str:
+ return "[TC-PAVST-2.5] Attributes with Server as DUT"
+
+ def pics_TC_PAVST_2_7(self):
+ return ["PAVST.S"]
+
+ def steps_TC_PAVST_2_7(self) -> list[TestStep]:
+ return [
+ TestStep(
+ 1,
+ "TH1 executes step 1-5 of TC-PAVST-2.3 to allocate a PushAV transport with TriggerType = Continuous.",
+ "Verify successful completion of all steps.",
+ ),
+ TestStep(
+ 2,
+ "TH1 Reads CurrentConnections attribute from PushAV Stream Transport Cluster on DUT over a large-payload session",
+ "Verify the number of PushAV Connections in the list is 1. Store the TransportStatus and ConnectionID in the corresponding TransportConfiguration as aTransportStatus and aConnectionID.",
+ ),
+ TestStep(
+ 3,
+ "TH1 sends the ManuallyTriggerTransport command with ConnectionID != aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 4,
+ "TH2 sends the ManuallyTriggerTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 5,
+ "TH1 sends the SetTransportStatus command with ConnectionID = aConnectionID and TransportStatus = Inactive.",
+ "DUT responds with SUCCESS status code.",
+ ),
+ TestStep(
+ 6,
+ "TH1 sends the ManuallyTriggerTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with InvalidTransportStatus.",
+ ),
+ TestStep(
+ 7,
+ "TH1 sends the SetTransportStatus command with ConnectionID = aConnectionID and TransportStatus = Active.",
+ "DUT responds with SUCCESS status code.",
+ ),
+ TestStep(
+ 8,
+ "TH1 sends the ManuallyTriggerTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with InvalidTriggerType.",
+ ),
+ TestStep(
+ 9,
+ "TH1 sends the DeallocatePushTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with SUCCESS status code.",
+ ),
+ TestStep(
+ 10,
+ "TH1 executes step 1-5 of TC-PAVST-2.3 to allocate a PushAV transport with TriggerType = Command.",
+ "Verify successful completion of all steps.",
+ ),
+ TestStep(
+ 11,
+ "TH1 sends the SetTransportStatus command with ConnectionID = aConnectionID and TransportStatus = Active.",
+ "DUT responds with SUCCESS status code.",
+ ),
+ TestStep(
+ 12,
+ "TH1 sends the ManuallyTriggerTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with SUCCESS status code.",
+ ),
+ ]
+
+ @run_if_endpoint_matches(has_cluster(Clusters.PushAvStreamTransport))
+ async def test_TC_PAVST_2_7(self):
+ endpoint = self.get_endpoint(default=1)
+ pvcluster = Clusters.PushAvStreamTransport
+ pvattr = Clusters.PushAvStreamTransport.Attributes
+ aAllocatedVideoStreams = []
+ aAllocatedAudioStreams = []
+
+ aConnectionID = ""
+
+ self.step(1)
+ # Commission DUT - already done
+ status = await self.check_and_delete_all_push_av_transports(endpoint, pvattr)
+ asserts.assert_equal(
+ status, Status.Success, "Status must be SUCCESS!"
+ )
+
+ aAllocatedVideoStreams = await self.allocate_one_video_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedVideoStreams),
+ 1,
+ "AllocatedVideoStreams must not be empty",
+ )
+
+ aAllocatedAudioStreams = await self.allocate_one_audio_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedAudioStreams),
+ 1,
+ "AllocatedAudioStreams must not be empty",
+ )
+
+ status = await self.allocate_one_pushav_transport(endpoint, triggerType=pvcluster.Enums.TransportTriggerTypeEnum.kContinuous)
+ asserts.assert_equal(
+ status, Status.Success, "Push AV Transport should be allocated successfully"
+ )
+
+ self.step(2)
+ transportConfigs = await self.read_pavst_attribute_expect_success(endpoint,
+ pvattr.CurrentConnections,
+ )
+ asserts.assert_greater_equal(
+ len(transportConfigs), 1, "TransportConfigurations must not be empty!"
+ )
+ aConnectionID = transportConfigs[0].connectionID
+
+ # TH1 sends command
+ self.step(3)
+ all_connectionID = [tc.connectionID for tc in transportConfigs]
+ max_connectionID = max(all_connectionID)
+ cmd = pvcluster.Commands.ManuallyTriggerTransport(
+ connectionID=max_connectionID + 1,
+ activationReason=pvcluster.Enums.TriggerActivationReasonEnum.kEmergency
+ )
+ status = await self.psvt_manually_trigger_transport(cmd)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ # TH2 sends command
+ self.step(4)
+ # Establishing TH2 controller
+ th2 = await self.psvt_create_test_harness_controller()
+ cmd = pvcluster.Commands.ManuallyTriggerTransport(
+ connectionID=aConnectionID,
+ activationReason=pvcluster.Enums.TriggerActivationReasonEnum.kEmergency
+ )
+ status = await self.psvt_manually_trigger_transport(cmd, devCtrl=th2)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ resp = await self.psvt_remove_current_fabric(th2)
+ asserts.assert_equal(
+ resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk, "Expected removal of TH2's fabric to succeed")
+
+ self.step(5)
+ cmd = pvcluster.Commands.SetTransportStatus(
+ connectionID=aConnectionID,
+ transportStatus=pvcluster.Enums.TransportStatusEnum.kInactive
+ )
+ status = await self.psvt_set_transport_status(cmd)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+ self.step(6)
+ cmd = pvcluster.Commands.ManuallyTriggerTransport(
+ connectionID=aConnectionID,
+ activationReason=pvcluster.Enums.TriggerActivationReasonEnum.kEmergency
+ )
+ status = await self.psvt_manually_trigger_transport(cmd, expected_cluster_status=pvcluster.Enums.StatusCodeEnum.kInvalidTransportStatus)
+ asserts.assert_true(
+ status == pvcluster.Enums.StatusCodeEnum.kInvalidTransportStatus,
+ "DUT must respond with TransportStatus Inactive.",
+ )
+
+ self.step(7)
+ cmd = pvcluster.Commands.SetTransportStatus(
+ connectionID=aConnectionID,
+ transportStatus=pvcluster.Enums.TransportStatusEnum.kActive
+ )
+ status = await self.psvt_set_transport_status(cmd)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+ self.step(8)
+ cmd = pvcluster.Commands.ManuallyTriggerTransport(
+ connectionID=aConnectionID,
+ activationReason=pvcluster.Enums.TriggerActivationReasonEnum.kEmergency
+ )
+ status = await self.psvt_manually_trigger_transport(cmd, expected_cluster_status=pvcluster.Enums.StatusCodeEnum.kInvalidTriggerType)
+ asserts.assert_true(
+ status == pvcluster.Enums.StatusCodeEnum.kInvalidTriggerType,
+ "DUT must respond with InvalidTriggerType status code.",
+ )
+
+ self.step(9)
+ cmd = pvcluster.Commands.DeallocatePushTransport(
+ connectionID=aConnectionID
+ )
+ status = await self.psvt_deallocate_push_transport(cmd)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+ self.step(10)
+ status = await self.check_and_delete_all_push_av_transports(endpoint, pvattr)
+ asserts.assert_equal(
+ status, Status.Success, "Status must be SUCCESS!"
+ )
+
+ aAllocatedVideoStreams = await self.allocate_one_video_stream()
+
+ aAllocatedAudioStreams = await self.allocate_one_audio_stream()
+
+ triggerOptions = {"triggerType": pvcluster.Enums.TransportTriggerTypeEnum.kCommand,
+ "maxPreRollLen": 4000, }
+
+ status = await self.allocate_one_pushav_transport(endpoint, trigger_Options=triggerOptions)
+ asserts.assert_equal(
+ status, Status.Success, "Push AV Transport should be allocated successfully"
+ )
+
+ transportConfigs = await self.read_pavst_attribute_expect_success(endpoint,
+ pvattr.CurrentConnections,
+ )
+ asserts.assert_greater_equal(
+ len(transportConfigs), 1, "TransportConfigurations must not be empty!"
+ )
+ aConnectionID = transportConfigs[0].connectionID
+
+ self.step(11)
+ cmd = pvcluster.Commands.SetTransportStatus(
+ connectionID=aConnectionID,
+ transportStatus=pvcluster.Enums.TransportStatusEnum.kActive
+ )
+ status = await self.psvt_set_transport_status(cmd)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+ self.step(12)
+ timeControl = {"initialDuration": 1, "augmentationDuration": 1, "maxDuration": 1, "blindDuration": 1}
+ cmd = pvcluster.Commands.ManuallyTriggerTransport(
+ connectionID=aConnectionID,
+ activationReason=pvcluster.Enums.TriggerActivationReasonEnum.kUserInitiated,
+ timeControl=timeControl
+ )
+ status = await self.psvt_manually_trigger_transport(cmd)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with Success status code.",
+ )
+
+
+if __name__ == "__main__":
+ default_matter_test_main()
diff --git a/src/python_testing/TC_PAVST_2_8.py b/src/python_testing/TC_PAVST_2_8.py
new file mode 100644
index 0000000..022891f
--- /dev/null
+++ b/src/python_testing/TC_PAVST_2_8.py
@@ -0,0 +1,185 @@
+#
+# Copyright (c) 2025 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs:
+# run1:
+# app: ${CAMERA_APP}
+# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
+# script-args: >
+# --storage-path admin_storage.json
+# --commissioning-method on-network
+# --discriminator 1234
+# --passcode 20202021
+# --PICS src/app/tests/suites/certification/ci-pics-values
+# --trace-to json:${TRACE_TEST_JSON}.json
+# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
+# --endpoint 1
+# factory-reset: true
+# quiet: true
+# === END CI TEST ARGUMENTS ===
+
+import logging
+
+from mobly import asserts
+from TC_PAVSTTestBase import PAVSTTestBase
+
+import matter.clusters as Clusters
+from matter.clusters.Types import Nullable
+from matter.interaction_model import Status
+from matter.testing.matter_testing import MatterBaseTest, TestStep, default_matter_test_main, has_cluster, run_if_endpoint_matches
+
+logger = logging.getLogger(__name__)
+
+
+class TC_PAVST_2_8(MatterBaseTest, PAVSTTestBase):
+ def desc_TC_PAVST_2_8(self) -> str:
+ return "[TC-PAVST-2.8] Attributes with Server as DUT"
+
+ def pics_TC_PAVST_2_8(self):
+ return ["PAVST.S"]
+
+ def steps_TC_PAVST_2_8(self) -> list[TestStep]:
+ return [
+ TestStep(
+ 1,
+ "TH1 executes step 1-5 of TC-PAVST-2.3 to allocate a PushAV transport.",
+ "Verify successful completion of all steps.",
+ ),
+ TestStep(
+ 2,
+ "TH1 Reads CurrentConnections attribute from PushAV Stream Transport Cluster on DUT over a large-payload session.",
+ "Verify the number of PushAV Connections in the list is 1. Store the TransportStatus and ConnectionID in the corresponding TransportConfiguration as aTransportStatus and aConnectionID. Store TriggerType as aTriggerType.",
+ ),
+ TestStep(
+ 3,
+ "TH1 sends the FindTransport command with ConnectionID != aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 4,
+ "TH2 sends the FindTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with NOT_FOUND status code.",
+ ),
+ TestStep(
+ 5,
+ "TH1 sends the FindTransport command with ConnectionID = aConnectionID.",
+ "DUT responds with FindTransportResponse with the TransportConfiguration corresponding to aConnectionID.",
+ ),
+ TestStep(
+ 6,
+ "TH1 sends the FindTransport command with ConnectionID = Null.",
+ "DUT responds with FindTransportResponse with the TransportConfiguration corresponding to aConnectionID.",
+ ),
+ ]
+
+ @run_if_endpoint_matches(has_cluster(Clusters.PushAvStreamTransport))
+ async def test_TC_PAVST_2_8(self):
+ endpoint = self.get_endpoint(default=1)
+ pvcluster = Clusters.PushAvStreamTransport
+ pvattr = Clusters.PushAvStreamTransport.Attributes
+ aAllocatedVideoStreams = []
+ aAllocatedAudioStreams = []
+
+ aConnectionID = ""
+
+ self.step(1)
+ # Commission DUT - already done
+ status = await self.check_and_delete_all_push_av_transports(endpoint, pvattr)
+ asserts.assert_equal(
+ status, Status.Success, "Status must be SUCCESS!"
+ )
+
+ aAllocatedVideoStreams = await self.allocate_one_video_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedVideoStreams),
+ 1,
+ "AllocatedVideoStreams must not be empty",
+ )
+
+ aAllocatedAudioStreams = await self.allocate_one_audio_stream()
+ asserts.assert_greater_equal(
+ len(aAllocatedAudioStreams),
+ 1,
+ "AllocatedAudioStreams must not be empty",
+ )
+
+ status = await self.allocate_one_pushav_transport(endpoint)
+ asserts.assert_equal(
+ status, Status.Success, "Push AV Transport should be allocated successfully"
+ )
+
+ self.step(2)
+ transportConfigs = await self.read_pavst_attribute_expect_success(endpoint,
+ pvattr.CurrentConnections,
+ )
+ asserts.assert_greater_equal(
+ len(transportConfigs), 1, "TransportConfigurations must not be empty!"
+ )
+ aConnectionID = transportConfigs[0].connectionID
+
+ # TH1 sends command
+ self.step(3)
+ all_connectionID = [tc.connectionID for tc in transportConfigs]
+ max_connectionID = max(all_connectionID)
+ cmd = pvcluster.Commands.FindTransport(
+ connectionID=max_connectionID + 1
+ )
+ status = await self.psvt_find_transport(cmd)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+
+ # TH2 sends command
+ self.step(4)
+ th2 = await self.psvt_create_test_harness_controller()
+ cmd = pvcluster.Commands.FindTransport(
+ connectionID=aConnectionID,
+ )
+ status = await self.psvt_find_transport(cmd, expected_connectionID=aConnectionID, devCtrl=th2)
+ asserts.assert_true(
+ status == Status.NotFound,
+ "DUT responds with NOT_FOUND status code.",
+ )
+ resp = await self.psvt_remove_current_fabric(th2)
+ asserts.assert_equal(
+ resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk, "Expected removal of TH2's fabric to succeed")
+
+ self.step(5)
+ cmd = pvcluster.Commands.FindTransport(
+ connectionID=aConnectionID,
+ )
+ status = await self.psvt_find_transport(cmd, expected_connectionID=aConnectionID)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+ self.step(6)
+ cmd = pvcluster.Commands.FindTransport(
+ connectionID=Nullable(),
+ )
+ status = await self.psvt_find_transport(cmd, expected_connectionID=aConnectionID)
+ asserts.assert_true(
+ status == Status.Success,
+ "DUT responds with SUCCESS status code.")
+
+
+if __name__ == "__main__":
+ default_matter_test_main()
diff --git a/src/python_testing/test_metadata.yaml b/src/python_testing/test_metadata.yaml
index c7742fc..eee1871 100644
--- a/src/python_testing/test_metadata.yaml
+++ b/src/python_testing/test_metadata.yaml
@@ -123,6 +123,10 @@
- name: TC_WEBRTCPTestBase.py
reason:
Shared code for the WebRTC Provider Cluster, not a standalone test.
+ - name: TC_PAVSTTestBase.py
+ reason:
+ Shared code for Push AV Cluster tests (TC_PAVST_*), not a standalone
+ test.
- name: TC_PAVST_2_1.py
reason:
Depends on PushAV cluster. Will be enabled once local tests are fully
@@ -135,6 +139,26 @@
reason:
Depends on PushAV cluster. Will be enabled once local tests are fully
verified.
+ - name: TC_PAVST_2_4.py
+ reason:
+ Depends on PushAV cluster. Will be enabled once local tests are fully
+ verified.
+ - name: TC_PAVST_2_5.py
+ reason:
+ Depends on PushAV cluster. Will be enabled once local tests are fully
+ verified.
+ - name: TC_PAVST_2_6.py
+ reason:
+ Depends on PushAV cluster. Will be enabled once local tests are fully
+ verified.
+ - name: TC_PAVST_2_7.py
+ reason:
+ Depends on PushAV cluster. Will be enabled once local tests are fully
+ verified.
+ - name: TC_PAVST_2_8.py
+ reason:
+ Depends on PushAV cluster. Will be enabled once local tests are fully
+ verified.
- name: TC_PAVST_2_9.py
reason:
Depends on PushAV cluster. Will be enabled once local tests are fully