blob: b6ff460bedac95c3a81495aeca45c7b637353e5e [file] [log] [blame]
#
# Copyright (c) 2025 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import chip.clusters as Clusters
from chip.interaction_model import InteractionModelError, Status
from mobly import asserts
logger = logging.getLogger(__name__)
class AVSMTestBase:
async def precondition_one_allocated_snapshot_stream(self):
endpoint = self.get_endpoint(default=1)
cluster = Clusters.CameraAvStreamManagement
attr = Clusters.CameraAvStreamManagement.Attributes
commands = Clusters.CameraAvStreamManagement.Commands
# First verify that SNP is supported
aFeatureMap = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attr.FeatureMap)
logger.info(f"Rx'd FeatureMap: {aFeatureMap}")
snpSupport = (aFeatureMap & cluster.Bitmaps.Feature.kSnapshot) > 0
asserts.assert_true(snpSupport, "Snapshot Feature is not supported.")
# Check if snapshot stream has already been allocated
aAllocatedSnapshotStreams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedSnapshotStreams
)
logger.info(f"Rx'd AllocatedSnapshotStreams: {aAllocatedSnapshotStreams}")
if len(aAllocatedSnapshotStreams) > 0:
return
# Allocate one for the test steps based on SnapshotCapabilities
aSnapshotCapabilities = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.SnapshotCapabilities
)
logger.info(f"Rx'd SnapshotCapabilities: {aSnapshotCapabilities}")
asserts.assert_greater(len(aSnapshotCapabilities), 0, "SnapshotCapabilities list is empty")
# Check for Watermark and OSD features
watermark = True if (aFeatureMap & cluster.Bitmaps.Feature.kWatermark) != 0 else None
osd = True if (aFeatureMap & cluster.Bitmaps.Feature.kOnScreenDisplay) != 0 else None
try:
snpStreamAllocateCmd = commands.SnapshotStreamAllocate(
imageCodec=aSnapshotCapabilities[0].imageCodec,
maxFrameRate=aSnapshotCapabilities[0].maxFrameRate,
minResolution=aSnapshotCapabilities[0].resolution,
maxResolution=aSnapshotCapabilities[0].resolution,
quality=90,
watermarkEnabled=watermark,
OSDEnabled=osd
)
snpStreamAllocateResponse = await self.send_single_cmd(endpoint=endpoint, cmd=snpStreamAllocateCmd)
logger.info(f"Rx'd SnapshotStreamAllocateResponse: {snpStreamAllocateResponse}")
asserts.assert_is_not_none(
snpStreamAllocateResponse.snapshotStreamID, "SnapshotStreamAllocateResponse does not contain StreamID"
)
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.Success, "Unexpected error returned")
pass
async def precondition_one_allocated_audio_stream(self):
endpoint = self.get_endpoint(default=1)
cluster = Clusters.CameraAvStreamManagement
attr = Clusters.CameraAvStreamManagement.Attributes
commands = Clusters.CameraAvStreamManagement.Commands
# First verify that ADO is supported
aFeatureMap = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attr.FeatureMap)
logger.info(f"Rx'd FeatureMap: {aFeatureMap}")
adoSupport = aFeatureMap & cluster.Bitmaps.Feature.kAudio
asserts.assert_equal(adoSupport, cluster.Bitmaps.Feature.kAudio, "Audio Feature is not supported.")
# Check if audio stream has already been allocated
aAllocatedAudioStreams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedAudioStreams
)
logger.info(f"Rx'd AllocatedAudioStreams: {aAllocatedAudioStreams}")
if len(aAllocatedAudioStreams) > 0:
return
# Allocate one for the test steps based on SnapshotCapabilities
aMicrophoneCapabilities = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.MicrophoneCapabilities
)
logger.info(f"Rx'd MicrophoneCapabilities: {aMicrophoneCapabilities}")
aStreamUsagePriorities = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.StreamUsagePrioritiesList
)
logger.info(f"Rx'd StreamUsagePriorities : {aStreamUsagePriorities}")
asserts.assert_greater(len(aStreamUsagePriorities), 0, "StreamUsagePriorities is empty")
try:
adoStreamAllocateCmd = commands.AudioStreamAllocate(
streamUsage=aStreamUsagePriorities[0],
audioCodec=aMicrophoneCapabilities.supportedCodecs[0],
channelCount=aMicrophoneCapabilities.maxNumberOfChannels,
sampleRate=aMicrophoneCapabilities.supportedSampleRates[0],
bitRate=1024,
bitDepth=aMicrophoneCapabilities.supportedBitDepths[0],
)
audioStreamAllocateResponse = await self.send_single_cmd(endpoint=endpoint, cmd=adoStreamAllocateCmd)
logger.info(f"Rx'd AudioStreamAllocateResponse: {audioStreamAllocateResponse}")
asserts.assert_is_not_none(
audioStreamAllocateResponse.audioStreamID, "AudioStreamAllocateResponse does not contain StreamID"
)
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.Success, "Unexpected error returned")
pass
async def precondition_one_allocated_video_stream(self):
endpoint = self.get_endpoint(default=1)
cluster = Clusters.CameraAvStreamManagement
attr = Clusters.CameraAvStreamManagement.Attributes
commands = Clusters.CameraAvStreamManagement.Commands
# First verify that VDO is supported
aFeatureMap = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attr.FeatureMap)
logger.info(f"Rx'd FeatureMap: {aFeatureMap}")
vdoSupport = aFeatureMap & cluster.Bitmaps.Feature.kVideo
asserts.assert_equal(vdoSupport, cluster.Bitmaps.Feature.kVideo, "Video Feature is not supported.")
# Check if video stream has already been allocated
aAllocatedVideoStreams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.AllocatedVideoStreams
)
logger.info(f"Rx'd AllocatedVideoStreams: {aAllocatedVideoStreams}")
if len(aAllocatedVideoStreams) > 0:
return
# Allocate one for the test steps
aStreamUsagePriorities = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.StreamUsagePriorities
)
logger.info(f"Rx'd StreamUsagePriorities: {aStreamUsagePriorities}")
aRateDistortionTradeOffPoints = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.RateDistortionTradeOffPoints
)
logger.info(f"Rx'd RateDistortionTradeOffPoints: {aRateDistortionTradeOffPoints}")
aMinViewport = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.MinViewport
)
logger.info(f"Rx'd MinViewport: {aMinViewport}")
aVideoSensorParams = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.VideoSensorParams
)
logger.info(f"Rx'd VideoSensorParams: {aVideoSensorParams}")
aMaxEncodedPixelRate = await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=cluster, attribute=attr.MaxEncodedPixelRate
)
logger.info(f"Rx'd MaxEncodedPixelRate: {aMaxEncodedPixelRate}")
# Check for Watermark and OSD features
watermark = True if (aFeatureMap & cluster.Bitmaps.Feature.kWatermark) != 0 else None
osd = True if (aFeatureMap & cluster.Bitmaps.Feature.kOnScreenDisplay) != 0 else None
try:
asserts.assert_greater(len(aStreamUsagePriorities), 0, "StreamUsagePriorities is empty")
asserts.assert_greater(len(aRateDistortionTradeOffPoints), 0, "RateDistortionTradeOffPoints is empty")
videoStreamAllocateCmd = commands.VideoStreamAllocate(
streamUsage=aStreamUsagePriorities[0],
videoCodec=aRateDistortionTradeOffPoints[0].codec,
minFrameRate=30, # An acceptable value for min frame rate
maxFrameRate=aVideoSensorParams.maxFPS,
minResolution=aMinViewport,
maxResolution=cluster.Structs.VideoResolutionStruct(
width=aVideoSensorParams.sensorWidth, height=aVideoSensorParams.sensorHeight
),
minBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
maxBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
minKeyFrameInterval=4000,
maxKeyFrameInterval=4000,
watermarkEnabled=watermark,
OSDEnabled=osd
)
videoStreamAllocateResponse = await self.send_single_cmd(endpoint=endpoint, cmd=videoStreamAllocateCmd)
logger.info(f"Rx'd VideoStreamAllocateResponse: {videoStreamAllocateResponse}")
asserts.assert_is_not_none(
videoStreamAllocateResponse.videoStreamID, "VideoStreamAllocateResponse does not contain StreamID"
)
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.Success, "Unexpected error returned")
pass