blob: 3afe183cd39d2dcc45248d387e586aac85fd7bf7 [file] [log] [blame]
#
# Copyright (c) 2026 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from actions_service import actions_service_pb2
from mobly import asserts
from pw_hdlc import rpc
from pw_system.device_connection import create_device_serial_or_socket_connection
import matter.clusters as Clusters
from matter.testing.decorators import async_test_body
from matter.testing.event_attribute_reporting import EventSubscriptionHandler
from matter.testing.matter_testing import MatterBaseTest, TestStep
from matter.testing.runner import default_matter_test_main
logger = logging.getLogger(__name__)
class TC_GENERICSWITCH(MatterBaseTest):
"""Tests for chef rootnode_genericswitch_9866e35d0b device."""
# Switch endpoint that supports triple press
_SWITCH_TRIPLE_PRESS_ENDPOINT = 1
_SWITCH_TRIPLE_PRESS_FEATURE_MAP = 30
_SWITCH_TRIPLE_PRESS_TAG_LIST = [
Clusters.Objects.Globals.Structs.SemanticTagStruct(
mfgCode=Clusters.Types.NullValue, namespaceID=8, tag=2, label=None),
Clusters.Objects.Globals.Structs.SemanticTagStruct(
mfgCode=Clusters.Types.NullValue, namespaceID=7, tag=1, label=None)
]
_SWITCH_TRIPLE_PRESS_NUMBER_OF_POSITIONS = 2
# Switch endpoint that supports single press only
_SWITCH_SINGLE_PRESS_ENDPOINT = 2
_SWITCH_SINGLE_PRESS_FEATURE_MAP = 2
_SWITCH_SINGLE_PRESS_TAG_LIST = [
Clusters.Objects.Globals.Structs.SemanticTagStruct(
mfgCode=Clusters.Types.NullValue, namespaceID=8, tag=3, label=None),
Clusters.Objects.Globals.Structs.SemanticTagStruct(
mfgCode=Clusters.Types.NullValue, namespaceID=7, tag=2, label=None)
]
_SWITCH_SINGLE_PRESS_NUMBER_OF_POSITIONS = 2
_PW_RPC_SOCKET_ADDR = "0.0.0.0:33000"
_PW_RPC_BAUD_RATE = 115200
async def _read_switch_feature_map(self, endpoint):
return await self.read_single_attribute_check_success(
endpoint=endpoint,
cluster=Clusters.Objects.Switch,
attribute=Clusters.Objects.Switch.Attributes.FeatureMap
)
async def _read_switch_number_of_positions(self, endpoint):
return await self.read_single_attribute_check_success(
endpoint=endpoint,
cluster=Clusters.Objects.Switch,
attribute=Clusters.Objects.Switch.Attributes.NumberOfPositions
)
async def _read_descriptor_semantic_tags(self, endpoint):
return await self.read_single_attribute_check_success(
endpoint=endpoint,
cluster=Clusters.Objects.Descriptor,
attribute=Clusters.Objects.Descriptor.Attributes.TagList
)
def _inject_switch_events(self, device, endpoint, actions: list):
result = device.rpcs.chip.rpc.Actions.Set(
endpoint_id=endpoint,
cluster_id=Clusters.Objects.Switch.id,
actions=actions,
)
asserts.assert_true(result.status.ok(), msg="PwRPC status not ok.")
def desc_TC_GENERICSWITCH(self) -> str:
return "[TC_GENERICSWITCH] chef genericswitch functionality test."
def steps_TC_GENERICSWITCH(self):
return [
TestStep(
1, "[TC_GENERICSWITCH] Commissioning already done.", is_commissioning=True),
TestStep(
2, "[TC_GENERICSWITCH] Triple press endpoint feature map."),
TestStep(3, "[TC_GENERICSWITCH] Triple press endpoint tag list."),
TestStep(
4, "[TC_GENERICSWITCH] Triple press endpoint number of positions."),
TestStep(
5, "[TC_GENERICSWITCH] Single press endpoint feature map."),
TestStep(6, "[TC_GENERICSWITCH] Single press endpoint tag list."),
TestStep(
7, "[TC_GENERICSWITCH] Single press endpoint number of positions."),
TestStep(
8, "[TC_GENERICSWITCH] Test switch press events via RPC injection.")
]
@async_test_body
async def test_TC_GENERICSWITCH(self):
"""Run all steps."""
self.step(1)
# Commissioning already done.
self.step(2)
feature_map = await self._read_switch_feature_map(self._SWITCH_TRIPLE_PRESS_ENDPOINT)
asserts.assert_equal(
feature_map, self._SWITCH_TRIPLE_PRESS_FEATURE_MAP)
self.step(3)
tag_list = await self._read_descriptor_semantic_tags(self._SWITCH_TRIPLE_PRESS_ENDPOINT)
asserts.assert_equal(tag_list, self._SWITCH_TRIPLE_PRESS_TAG_LIST)
self.step(4)
number_of_positions = await self._read_switch_number_of_positions(self._SWITCH_TRIPLE_PRESS_ENDPOINT)
asserts.assert_equal(
number_of_positions,
self._SWITCH_TRIPLE_PRESS_NUMBER_OF_POSITIONS
)
self.step(5)
feature_map = await self._read_switch_feature_map(self._SWITCH_SINGLE_PRESS_ENDPOINT)
asserts.assert_equal(
feature_map, self._SWITCH_SINGLE_PRESS_FEATURE_MAP)
self.step(6)
tag_list = await self._read_descriptor_semantic_tags(self._SWITCH_SINGLE_PRESS_ENDPOINT)
asserts.assert_equal(tag_list, self._SWITCH_SINGLE_PRESS_TAG_LIST)
self.step(7)
number_of_positions = await self._read_switch_number_of_positions(self._SWITCH_SINGLE_PRESS_ENDPOINT)
asserts.assert_equal(
number_of_positions,
self._SWITCH_SINGLE_PRESS_NUMBER_OF_POSITIONS
)
self.step(8)
device_connection = create_device_serial_or_socket_connection(
device="",
baudrate=self._PW_RPC_BAUD_RATE,
token_databases=[],
socket_addr=self._PW_RPC_SOCKET_ADDR,
compiled_protos=[actions_service_pb2],
rpc_logging=True,
channel_id=rpc.DEFAULT_CHANNEL_ID,
hdlc_encoding=True,
device_tracing=False,
)
with device_connection as device:
logger.info("Started Pw socket connection.")
logger.info("Testing events on Endpoint %d",
self._SWITCH_TRIPLE_PRESS_ENDPOINT)
events_callback_1 = EventSubscriptionHandler(
expected_cluster=Clusters.Objects.Switch)
await events_callback_1.start(
dev_ctrl=self.default_controller,
node_id=self.dut_node_id,
endpoint=self._SWITCH_TRIPLE_PRESS_ENDPOINT,
)
self._inject_switch_events( # A sequence of events simulating double press.
device,
self._SWITCH_TRIPLE_PRESS_ENDPOINT,
actions=[
actions_service_pb2.Action(
type=actions_service_pb2.ActionType.EMIT_EVENT,
delayMs=0,
actionId=Clusters.Objects.Switch.Events.InitialPress.event_id,
arg1=1, # New Position = 1
),
actions_service_pb2.Action(
type=actions_service_pb2.ActionType.EMIT_EVENT,
delayMs=10,
actionId=Clusters.Objects.Switch.Events.ShortRelease.event_id,
arg1=1, # Previous Position = 1
),
actions_service_pb2.Action(
type=actions_service_pb2.ActionType.EMIT_EVENT,
delayMs=10,
actionId=Clusters.Objects.Switch.Events.MultiPressOngoing.event_id,
arg1=1, # New Position = 1
arg2=2, # number of presses
),
actions_service_pb2.Action(
type=actions_service_pb2.ActionType.EMIT_EVENT,
delayMs=10,
actionId=Clusters.Objects.Switch.Events.ShortRelease.event_id,
arg1=1, # Previous Position = 1
),
actions_service_pb2.Action(
type=actions_service_pb2.ActionType.EMIT_EVENT,
delayMs=100,
actionId=Clusters.Objects.Switch.Events.MultiPressComplete.event_id,
arg1=1, # Previous Position = 1
arg2=2, # number of presses
),
]
)
logger.info("Injected multi press events.")
events_callback_1.wait_for_event_report(
Clusters.Objects.Switch.Events.InitialPress)
events_callback_1.wait_for_event_report(
Clusters.Objects.Switch.Events.ShortRelease)
events_callback_1.wait_for_event_report(
Clusters.Objects.Switch.Events.MultiPressOngoing)
events_callback_1.wait_for_event_report(
Clusters.Objects.Switch.Events.ShortRelease)
events_callback_1.wait_for_event_report(
Clusters.Objects.Switch.Events.MultiPressComplete)
logger.info("Multi press events verified.")
events_callback_1.reset()
events_callback_1.cancel()
logger.info("Testing events on Endpoint %d",
self._SWITCH_SINGLE_PRESS_ENDPOINT)
events_callback_2 = EventSubscriptionHandler(
expected_cluster=Clusters.Objects.Switch)
await events_callback_2.start(
dev_ctrl=self.default_controller,
node_id=self.dut_node_id,
endpoint=self._SWITCH_SINGLE_PRESS_ENDPOINT,
)
self._inject_switch_events( # An initial press.
device,
self._SWITCH_SINGLE_PRESS_ENDPOINT,
actions=[
actions_service_pb2.Action(
type=actions_service_pb2.ActionType.EMIT_EVENT,
delayMs=0,
actionId=Clusters.Objects.Switch.Events.InitialPress.event_id,
arg1=1, # New Position = 1
),
]
)
logger.info("Injected initial press.")
events_callback_2.wait_for_event_report(
Clusters.Objects.Switch.Events.InitialPress)
logger.info("Initial press events verified.")
events_callback_2.reset()
events_callback_2.cancel()
if __name__ == "__main__":
default_matter_test_main()