blob: 12bb359a303bd30cca6e011c358ccd5f616b669a [file] [log] [blame]
#
# Copyright (c) 2025 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from matter.clusters import CameraAvStreamManagement
class WebRTCTestHelper:
async def read_avstr_attribute_expect_success(self, endpoint, attribute):
return await self.read_single_attribute_check_success(
endpoint=endpoint, cluster=CameraAvStreamManagement, attribute=attribute
)
async def allocate_audio_stream(self, endpoint):
"""Try to allocate an audio stream from the camera device. Returns the stream ID if successful, otherwise None."""
attrs = CameraAvStreamManagement.Attributes
try:
# Get the parms from the device (those which are available)
aStreamUsagePriorities = await self.read_avstr_attribute_expect_success(endpoint, attrs.StreamUsagePriorities)
aMicrophoneCapabilities = await self.read_avstr_attribute_expect_success(
endpoint=endpoint, attribute=attrs.MicrophoneCapabilities
)
aBitRate = 0
codec = aMicrophoneCapabilities.supportedCodecs[0]
match codec:
case CameraAvStreamManagement.Enums.AudioCodecEnum.kOpus:
aBitRate = 30000
case CameraAvStreamManagement.Enums.AudioCodecEnum.kAacLc:
aBitRate = 40000
case _:
aBitRate = 30000
logging.warning(f"Using default bitrate {aBitRate} for unhandled codec {codec}")
adoStreamAllocateCmd = CameraAvStreamManagement.Commands.AudioStreamAllocate(
streamUsage=aStreamUsagePriorities[0],
audioCodec=aMicrophoneCapabilities.supportedCodecs[0],
channelCount=aMicrophoneCapabilities.maxNumberOfChannels,
sampleRate=aMicrophoneCapabilities.supportedSampleRates[0],
bitRate=aBitRate,
bitDepth=aMicrophoneCapabilities.supportedBitDepths[0],
)
audioStreamAllocateResponse = await self.send_single_cmd(endpoint=endpoint, cmd=adoStreamAllocateCmd)
return audioStreamAllocateResponse.audioStreamID
except Exception as e:
logging.error(f"Failed to allocate audio stream. {e}")
return None
async def allocate_video_stream(self, endpoint):
"""Try to allocate a video stream from the camera device. Returns the stream ID if successful, otherwise None."""
attrs = CameraAvStreamManagement.Attributes
try:
# Check for watermark and OSD features
feature_map = await self.read_avstr_attribute_expect_success(endpoint, attrs.FeatureMap)
watermark = True if (feature_map & CameraAvStreamManagement.Bitmaps.Feature.kWatermark) != 0 else None
osd = True if (feature_map & CameraAvStreamManagement.Bitmaps.Feature.kOnScreenDisplay) != 0 else None
# Get the parms from the device (those which are available)
aStreamUsagePriorities = await self.read_avstr_attribute_expect_success(endpoint, attrs.StreamUsagePriorities)
aRateDistortionTradeOffPoints = await self.read_avstr_attribute_expect_success(
endpoint, attrs.RateDistortionTradeOffPoints
)
aMinViewportRes = await self.read_avstr_attribute_expect_success(endpoint, attrs.MinViewportResolution)
aVideoSensorParams = await self.read_avstr_attribute_expect_success(endpoint, attrs.VideoSensorParams)
response = await self.send_single_cmd(
cmd=CameraAvStreamManagement.Commands.VideoStreamAllocate(
streamUsage=aStreamUsagePriorities[0],
videoCodec=aRateDistortionTradeOffPoints[0].codec,
minFrameRate=30,
maxFrameRate=aVideoSensorParams.maxFPS,
minResolution=aMinViewportRes,
maxResolution=CameraAvStreamManagement.Structs.VideoResolutionStruct(
width=aVideoSensorParams.sensorWidth, height=aVideoSensorParams.sensorHeight
),
minBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
maxBitRate=aRateDistortionTradeOffPoints[0].minBitRate,
keyFrameInterval=4000,
watermarkEnabled=watermark,
OSDEnabled=osd,
),
endpoint=endpoint,
)
return response.videoStreamID
except Exception as e:
logging.error(f"Failed to allocate video stream. {e}")
return None