blob: fa84d0ebef1e72b4f57c60f8f27c13988ebc4c53 [file] [log] [blame]
#
# Copyright (c) 2025 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
# app: ${CAMERA_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# --endpoint 1
# factory-reset: true
# quiet: true
# === END CI TEST ARGUMENTS ===
import logging
from mobly import asserts
import matter.clusters as Clusters
from matter.interaction_model import InteractionModelError, Status
from matter.testing.matter_testing import MatterBaseTest, TestStep, default_matter_test_main, has_feature, run_if_endpoint_matches
log = logging.getLogger(__name__)
class TC_AVSM_VideoStreamsPersistence(MatterBaseTest):
def desc_TC_AVSM_VideoStreamsPersistence(self) -> str:
return "[TC-AVSM-VideoStreamsPersistence] Validate Video Streams Persistence functionality with Server as DUT"
def pics_TC_AVSM_VideoStreamsPersistence(self):
return ["AVSM.S"]
def steps_TC_AVSM_VideoStreamsPersistence(self) -> list[TestStep]:
return [
TestStep("precondition", "Commissioning, already done", is_commissioning=True),
TestStep(1, "TH reads FeatureMap attribute from CameraAVStreamManagement Cluster on DUT", "Verify VDO is supported."),
TestStep(
2,
"TH reads AllocatedVideoStreams attribute from CameraAVStreamManagement Cluster on DUT",
"Verify the number of allocated video streams in the list is 0.",
),
TestStep(
3,
"TH reads StreamUsagePriorities attribute from CameraAVStreamManagement Cluster on DUT.",
"Store this value in aStreamUsagePriorities.",
),
TestStep(
4,
"TH reads RateDistortionTradeOffPoints attribute from CameraAVStreamManagement Cluster on DUT.",
"Store this value in aRateDistortionTradeOffPoints.",
),
TestStep(
5,
"TH reads MinViewportResolution attribute from CameraAVStreamManagement Cluster on DUT.",
"Store this value in aMinViewportRes.",
),
TestStep(
6,
"TH reads VideoSensorParams attribute from CameraAVStreamManagement Cluster on DUT.",
"Store this value in aVideoSensorParams.",
),
TestStep(
7,
"TH reads MaxEncodedPixelRate attribute from CameraAVStreamManagement Cluster on DUT.",
"Store this value in aMaxEncodedPixelRate.",
),
TestStep(
8,
"If the watermark feature is supported, set aWatermark to True, otherwise set this to Null.",
),
TestStep(
9,
"If the OSD feature is supported, set aOSD to True, otherwise set this to Null.",
),
TestStep(
10,
"TH sets StreamUsage from aStreamUsagePriorities. TH sets VideoCodec, MinResolution, MaxResolution, MinBitRate, MaxBitRate conforming with aRateDistortionTradeOffPoints. TH sets MinFrameRate, MaxFrameRate conforming with aVideoSensorParams. TH sets the KeyFrameInterval = 4000. TH sets WatermarkEnabled to aWatermark, TH also sets OSDEnabled to aOSD. TH sends the VideoStreamAllocate command with these arguments.",
"DUT responds with VideoStreamAllocateResponse command with a valid VideoStreamID.",
),
TestStep(
11,
"TH reads AllocatedVideoStreams attribute from CameraAVStreamManagement Cluster on DUT",
"Verify the number of allocated video streams in the list is 1.",
),
TestStep(
12,
"TH injects kFault_ClearInMemoryAllocatedVideoStreams on DUT.",
),
TestStep(
13,
"TH reads AllocatedVideoStreams attribute from CameraAVStreamManagement Cluster on DUT",
"Verify the number of allocated video streams in the list is 0.",
),
TestStep(
14,
"TH injects kFault_LoadPersistentCameraAVSMAttributes on DUT.",
),
TestStep(
15,
"TH reads AllocatedVideoStreams attribute from CameraAVStreamManagement Cluster on DUT",
"Verify the number of allocated video streams in the list is 1.",
"Verify the individual fields of allocated video stream is as was allocated in step 10.",
),
]
@run_if_endpoint_matches(
has_feature(Clusters.CameraAvStreamManagement, Clusters.CameraAvStreamManagement.Bitmaps.Feature.kVideo)
)
async def test_TC_AVSM_VideoStreamsPersistence(self):
endpoint = self.get_endpoint()
cluster = Clusters.CameraAvStreamManagement
attr = Clusters.CameraAvStreamManagement.Attributes
commands = Clusters.CameraAvStreamManagement.Commands
self.step("precondition")
# Commission DUT - already done
self.step(1)
aFeatureMap = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attr.FeatureMap)
log.info(f"Rx'd FeatureMap: {aFeatureMap}")
vdoSupport = aFeatureMap & cluster.Bitmaps.Feature.kVideo
asserts.assert_equal(vdoSupport, cluster.Bitmaps.Feature.kVideo, "Video Feature is not supported.")
self.step(2)
aAllocatedVideoStreams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedVideoStreams
)
log.info(f"Rx'd AllocatedVideoStreams: {aAllocatedVideoStreams}")
asserts.assert_equal(len(aAllocatedVideoStreams), 0, "The number of allocated video streams in the list is not 0")
self.step(3)
aStreamUsagePriorities = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.StreamUsagePriorities
)
log.info(f"Rx'd StreamUsagePriorities: {aStreamUsagePriorities}")
self.step(4)
aRateDistortionTradeOffPoints = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.RateDistortionTradeOffPoints
)
log.info(f"Rx'd RateDistortionTradeOffPoints: {aRateDistortionTradeOffPoints}")
self.step(5)
aMinViewportRes = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.MinViewportResolution
)
log.info(f"Rx'd MinViewportResolution: {aMinViewportRes}")
self.step(6)
aVideoSensorParams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.VideoSensorParams
)
log.info(f"Rx'd VideoSensorParams: {aVideoSensorParams}")
self.step(7)
aMaxEncodedPixelRate = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.MaxEncodedPixelRate
)
log.info(f"Rx'd MaxEncodedPixelRate: {aMaxEncodedPixelRate}")
# Check for watermark and OSD features
self.step(8)
watermark = True if (aFeatureMap & cluster.Bitmaps.Feature.kWatermark) != 0 else None
self.step(9)
osd = True if (aFeatureMap & cluster.Bitmaps.Feature.kOnScreenDisplay) != 0 else None
self.step(10)
try:
asserts.assert_greater(len(aStreamUsagePriorities), 0, "StreamUsagePriorities is empty")
asserts.assert_greater(len(aRateDistortionTradeOffPoints), 0, "RateDistortionTradeOffPoints is empty")
videoStreamAllocateCmd = commands.VideoStreamAllocate(
streamUsage=aStreamUsagePriorities[0],
videoCodec=aRateDistortionTradeOffPoints[0].codec,
minFrameRate=30, # An acceptable value for min frame rate
maxFrameRate=aVideoSensorParams.maxFPS,
minResolution=aMinViewportRes,
maxResolution=cluster.Structs.VideoResolutionStruct(
width=aVideoSensorParams.sensorWidth, height=aVideoSensorParams.sensorHeight
),
minBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
maxBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
keyFrameInterval=4000,
watermarkEnabled=watermark,
OSDEnabled=osd,
)
videoStreamAllocateResponse = await self.send_single_cmd(endpoint=endpoint, cmd=videoStreamAllocateCmd)
log.info(f"Rx'd VideoStreamAllocateResponse: {videoStreamAllocateResponse}")
asserts.assert_is_not_none(
videoStreamAllocateResponse.videoStreamID, "VideoStreamAllocateResponse does not contain StreamID"
)
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.Success, "Unexpected error returned")
self.step(11)
aAllocatedVideoStreams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedVideoStreams
)
log.info(f"Rx'd AllocatedVideoStreams: {aAllocatedVideoStreams}")
asserts.assert_equal(len(aAllocatedVideoStreams), 1, "The number of allocated video streams in the list is not 1")
self.step(12)
log.info("Injecting kFault_ClearInMemoryAllocatedVideoStreams on DUT")
# --- Fault‑Injection cluster (mfg‑specific 0xFFF1_FC06) ---
# Use FailAtFault to activate the chip‑layer fault exactly once
#
# • faultType = kChipFault (0x03) – always used for CHIP faults
# • id = FaultInjection.Id.kFault_ClearInMemoryAllocatedVideoStreams
# • numCallsToSkip = 0 – trigger on the very next call
# • numCallsToFail = 1 – inject once, then auto‑clear
# • takeMutex = False – single‑threaded app, no lock needed
#
command = Clusters.FaultInjection.Commands.FailAtFault(
type=Clusters.FaultInjection.Enums.FaultType.kChipFault,
id=34, # kFault_ClearInMemoryAllocatedVideoStreams
numCallsToFail=1,
takeMutex=False,
)
await self.default_controller.SendCommand(
nodeId=self.dut_node_id,
endpoint=0, # Fault‑Injection cluster lives on EP0
payload=command,
)
self.step(13)
aAllocatedVideoStreams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedVideoStreams
)
log.info(f"Rx'd AllocatedVideoStreams: {aAllocatedVideoStreams}")
asserts.assert_equal(len(aAllocatedVideoStreams), 0, "Allocated video streams is not empty")
self.step(14)
log.info("Injecting kFault_LoadPersistentCameraAVSMAttributes on DUT")
# --- Fault‑Injection cluster (mfg‑specific 0xFFF1_FC06) ---
# Use FailAtFault to activate the chip‑layer fault exactly once
#
# • faultType = kChipFault (0x03) – always used for CHIP faults
# • id = FaultInjection.Id.kFault_LoadPersistentCameraAVSMAttributes
# • numCallsToSkip = 0 – trigger on the very next call
# • numCallsToFail = 1 – inject once, then auto‑clear
# • takeMutex = False – single‑threaded app, no lock needed
#
command = Clusters.FaultInjection.Commands.FailAtFault(
type=Clusters.FaultInjection.Enums.FaultType.kChipFault,
id=37, # kFault_LoadPersistentCameraAVSMAttributes
numCallsToFail=1,
takeMutex=False,
)
await self.default_controller.SendCommand(
nodeId=self.dut_node_id,
endpoint=0, # Fault‑Injection cluster lives on EP0
payload=command,
)
self.step(15)
aAllocatedVideoStreams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedVideoStreams
)
log.info(f"Rx'd AllocatedVideoStreams: {aAllocatedVideoStreams}")
asserts.assert_equal(len(aAllocatedVideoStreams), 1, "Allocated video streams is not empty")
##### Validate fields in Video Stream that was stored #####
asserts.assert_equal(aAllocatedVideoStreams[0].streamUsage, aStreamUsagePriorities[0], "Stream Usage does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].videoCodec,
aRateDistortionTradeOffPoints[0].codec, "Video codec does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].minFrameRate, 30, "MinFrameRate does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].maxFrameRate, aVideoSensorParams.maxFPS, "MaxFrameRate does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].minResolution, aMinViewportRes, "MinResolution does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].maxResolution.width,
aVideoSensorParams.sensorWidth, "MaxResolution does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].maxResolution.height,
aVideoSensorParams.sensorHeight, "MaxResolution does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].minBitRate,
aRateDistortionTradeOffPoints[0].minBitRate, "MinBitRate does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].maxBitRate,
aRateDistortionTradeOffPoints[0].minBitRate, "MaxBitRate does not match")
asserts.assert_equal(aAllocatedVideoStreams[0].keyFrameInterval, 4000, "KeyFrameInterval does not match")
# Clear all allocated streams
aAllocatedVideoStreams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedVideoStreams
)
for stream in aAllocatedVideoStreams:
try:
await self.send_single_cmd(endpoint=endpoint, cmd=commands.VideoStreamDeallocate(videoStreamID=(stream.videoStreamID)))
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.Success, "Unexpected error returned")
if __name__ == "__main__":
default_matter_test_main()