blob: f7898a0d661267f67e8dc1590216f7c7dabef402 [file] [log] [blame]
#
# Copyright (c) 2025 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import chip.clusters as Clusters
from chip.interaction_model import InteractionModelError, Status
from mobly import asserts
logger = logging.getLogger(__name__)
class AVSUMTestBase:
has_feature_dptz = False
has_feature_mpan = False
has_feature_mtilt = False
has_feature_mzoom = False
has_feature_mpresets = False
def ptz_range_validation(self, mptz, tiltmin, tiltmax, panmin, panmax, zoommax):
if self.has_feature_mpan:
if mptz.pan is None:
asserts.fail("Pan value missing when MPAN feature set")
else:
apan = mptz.pan
asserts.assert_greater_equal(apan, panmin, "Pan out of range of DUT defined values")
asserts.assert_less_equal(apan, panmax, "Pan out of range of DUT defined values")
if self.has_feature_mtilt:
if mptz.tilt is None:
asserts.fail("Tilt value missing when MTILT feature set")
else:
atilt = mptz.tilt
asserts.assert_greater_equal(atilt, tiltmin, "Tilt out of range of DUT defined values")
asserts.assert_less_equal(atilt, tiltmax, "Tilt out of range of DUT defined values")
if self.has_feature_mzoom:
if mptz.zoom is None:
asserts.fail("Zoom value missing when MZOOM feature set")
else:
azoom = mptz.zoom
asserts.assert_greater_equal(azoom, 1, "Zoom out of range of DUT defined values")
asserts.assert_less_equal(azoom, zoommax, "Zoom out of range of DUT defined values")
def verify_preset_matches(self, preset, position):
if self.has_feature_mpan:
asserts.assert_equal(preset.settings.pan, position.pan)
if self.has_feature_mtilt:
asserts.assert_equal(preset.settings.tilt, position.tilt)
if self.has_feature_mzoom:
asserts.assert_equal(preset.settings.zoom, position.zoom)
async def read_avsum_attribute_expect_success(self, endpoint, attribute):
cluster = Clusters.Objects.CameraAvSettingsUserLevelManagement
return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute)
async def read_avstr_attribute_expect_success(self, endpoint, attribute):
cluster = Clusters.CameraAvStreamManagement
return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute)
async def check_avsum_attribute(self, attribute, expected_value, endpoint):
value = await self.read_avsum_attribute_expect_success(endpoint=endpoint, attribute=attribute)
asserts.assert_equal(value, expected_value,
f"Unexpected '{attribute}' value - expected {expected_value}, was {value}")
async def send_save_preset_command(self, endpoint, name: str, presetID: int = None, expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.MPTZSavePreset(
name=name,
presetID=presetID),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_move_to_preset_command(self, endpoint, presetID, expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.MPTZMoveToPreset(
presetID=presetID),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_remove_preset_command(self, endpoint, presetID, expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.MPTZRemovePreset(
presetID=presetID),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_dptz_set_viewport_command(self, endpoint, streamID, viewport, expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.DPTZSetViewport(
videoStreamID=streamID,
viewport=viewport),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_dptz_relative_move_command(self, endpoint, streamID, deltaX: int = None, deltaY: int = None,
zoomDelta: int = None, expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.DPTZRelativeMove(
videoStreamID=streamID,
deltaX=deltaX,
deltaY=deltaY,
zoomDelta=zoomDelta),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_mptz_set_pan_position_command(self, endpoint, pan, expected_status: Status = Status.Success):
tilt = zoom = None
await self.send_mptz_set_position_command(endpoint, pan, tilt, zoom, expected_status)
async def send_mptz_set_tilt_position_command(self, endpoint, tilt, expected_status: Status = Status.Success):
pan = zoom = None
await self.send_mptz_set_position_command(endpoint, pan, tilt, zoom, expected_status)
async def send_mptz_set_zoom_position_command(self, endpoint, zoom, expected_status: Status = Status.Success):
pan = tilt = None
await self.send_mptz_set_position_command(endpoint, pan, tilt, zoom, expected_status)
async def send_null_mptz_set_position_command(self, endpoint, expected_status: Status = Status.Success):
pan = tilt = zoom = None
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.MPTZSetPosition(
pan=pan, tilt=tilt, zoom=zoom),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_mptz_set_position_command(self, endpoint, pan, tilt, zoom, expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.MPTZSetPosition(
pan=pan, tilt=tilt, zoom=zoom),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_mptz_relative_move_pan_command(self, endpoint, panDelta, expected_status: Status = Status.Success):
tiltDelta = zoomDelta = None
await self.send_mptz_relative_move_command(endpoint, panDelta, tiltDelta, zoomDelta, expected_status)
async def send_mptz_relative_move_tilt_command(self, endpoint, tiltDelta, expected_status: Status = Status.Success):
panDelta = zoomDelta = None
await self.send_mptz_relative_move_command(endpoint, panDelta, tiltDelta, zoomDelta, expected_status)
async def send_mptz_relative_move_zoom_command(self, endpoint, zoomDelta, expected_status: Status = Status.Success):
panDelta = tiltDelta = None
await self.send_mptz_relative_move_command(endpoint, panDelta, tiltDelta, zoomDelta, expected_status)
async def send_null_mptz_relative_move_command(self, endpoint, expected_status: Status = Status.Success):
panDelta = tiltDelta = zoomDelta = None
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.MPTZRelativeMove(
panDelta=panDelta, tiltDelta=tiltDelta, zoomDelta=zoomDelta),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_mptz_relative_move_command(self, endpoint, panDelta, tiltDelta, zoomDelta, expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.CameraAvSettingsUserLevelManagement.Commands.MPTZRelativeMove(
panDelta=panDelta, tiltDelta=tiltDelta, zoomDelta=zoomDelta),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def video_stream_allocate_command(self, endpoint, expected_status: Status = Status.Success):
cluster = Clusters.Objects.CameraAvStreamManagement
attrs = cluster.Attributes
# Check for watermark and OSD features
feature_map = await self.read_avstr_attribute_expect_success(endpoint, attrs.FeatureMap)
watermark = True if (feature_map & cluster.Bitmaps.Feature.kWatermark) != 0 else None
osd = True if (feature_map & cluster.Bitmaps.Feature.kOnScreenDisplay) != 0 else None
# Get the parms from the device (those which are available)
aStreamUsagePriorities = await self.read_avstr_attribute_expect_success(endpoint, attrs.StreamUsagePriorities)
aRateDistortionTradeOffPoints = await self.read_avstr_attribute_expect_success(endpoint, attrs.RateDistortionTradeOffPoints)
aMinViewport = await self.read_avstr_attribute_expect_success(endpoint, attrs.MinViewport)
aVideoSensorParams = await self.read_avstr_attribute_expect_success(endpoint, attrs.VideoSensorParams)
try:
response = await self.send_single_cmd(cmd=Clusters.CameraAvStreamManagement.Commands.VideoStreamAllocate(
streamUsage=aStreamUsagePriorities[0],
videoCodec=aRateDistortionTradeOffPoints[0].codec,
minFrameRate=30,
maxFrameRate=aVideoSensorParams.maxFPS,
minResolution=aMinViewport,
maxResolution=Clusters.CameraAvStreamManagement.Structs.VideoResolutionStruct(width=aVideoSensorParams.sensorWidth,
height=aVideoSensorParams.sensorHeight),
minBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
maxBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
minKeyFrameInterval=2000,
maxKeyFrameInterval=8000,
watermarkEnabled=watermark,
OSDEnabled=osd
),
endpoint=endpoint)
asserts.assert_equal(expected_status, Status.Success)
return response.videoStreamID
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")