Tweaks to evse test plans based on review(issue #31460) (#31901)
* Added support for test event triggers and handling of reading events into matter_testing_support.
* Made TC_EEVSE_Utils.py use the matter_testing_support instead of its own local copy.
* Restyled by isort
* Added TC_EEVSE_2_2, 2_4, 2_5 to tests.yaml. Fixed compile warning treated as error due to sign conversion.
* Updated default min_charge and max_charge in TC_EEVSE_Utils send_enable_charge_command to have sensible default values if not specified.
* Fixed test app name
* Moved test runs later in test yaml
* Fixed discriminator used in script to match that used in the app.
* Added --endpoint 1 so it tests the correct endpoint
* Code review comment fixes.
* Fixed trailing whitepace
* Merged TC_EEVSE tests back in
---------
Co-authored-by: Restyled.io <commits@restyled.io>
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index 6f09b85..fbaf631 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -452,6 +452,7 @@
--target linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-lit-icd-ipv6only-no-ble-no-wifi-tsan-clang-test \
+ --target linux-x64-energy-management-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-python-bindings \
build \
--copy-artifacts-to objdir-clone \
@@ -475,6 +476,9 @@
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DeviceBasicComposition.py" --script-args "--storage-path admin_storage.json --manual-code 10054912339 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DeviceConformance.py" --script-args "--storage-path admin_storage.json --manual-code 10054912339 --bool-arg ignore_in_progress:True allow_provisional:True --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto --tests test_TC_IDM_10_2"'
+ scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-energy-management-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-energy-management-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json --enable-key 000102030405060708090a0b0c0d0e0f" --script "src/python_testing/TC_EEVSE_2_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --hex-arg enableKey:000102030405060708090a0b0c0d0e0f --endpoint 1 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
+ scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-energy-management-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-energy-management-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json --enable-key 000102030405060708090a0b0c0d0e0f" --script "src/python_testing/TC_EEVSE_2_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --hex-arg enableKey:000102030405060708090a0b0c0d0e0f --endpoint 1 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
+ scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-energy-management-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-energy-management-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json --enable-key 000102030405060708090a0b0c0d0e0f" --script "src/python_testing/TC_EEVSE_2_5.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --hex-arg enableKey:000102030405060708090a0b0c0d0e0f --endpoint 1 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_FAN_3_1.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_FAN_3_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_FAN_3_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
diff --git a/examples/energy-management-app/energy-management-common/src/EVSEManufacturerImpl.cpp b/examples/energy-management-app/energy-management-common/src/EVSEManufacturerImpl.cpp
index 0f20467..16d3a35 100644
--- a/examples/energy-management-app/energy-management-common/src/EVSEManufacturerImpl.cpp
+++ b/examples/energy-management-app/energy-management-common/src/EVSEManufacturerImpl.cpp
@@ -232,8 +232,9 @@
// Update meter values
// Avoid using floats - so we will do a basic rand() call which will generate a integer value between 0 and RAND_MAX
- // first compute power as a mean + some random value in range 0 to mPowerRandomness_mW
- int64_t power = (rand() % gFakeReadingsData.mPowerRandomness_mW);
+ // first compute power as a mean + some random value in range +/- mPowerRandomness_mW
+ int64_t power =
+ (static_cast<int64_t>(rand()) % (2 * gFakeReadingsData.mPowerRandomness_mW)) - gFakeReadingsData.mPowerRandomness_mW;
power += gFakeReadingsData.mPower_mW; // add in the base power
// TODO call the EPM cluster to send a power reading
diff --git a/src/python_testing/TC_EEVSE_2_2.py b/src/python_testing/TC_EEVSE_2_2.py
index 1651f4d..0fcf656 100644
--- a/src/python_testing/TC_EEVSE_2_2.py
+++ b/src/python_testing/TC_EEVSE_2_2.py
@@ -21,9 +21,9 @@
import chip.clusters as Clusters
from chip.clusters.Types import NullValue
-from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
+from matter_testing_support import EventChangeCallback, MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from mobly import asserts
-from TC_EEVSE_Utils import EEVSEBaseTestHelper, EventChangeCallback
+from TC_EEVSE_Utils import EEVSEBaseTestHelper
logger = logging.getLogger(__name__)
@@ -123,7 +123,7 @@
self.step("4")
await self.send_test_event_trigger_pluggedin()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EVConnected)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EVConnected)
self.step("4a")
await self.check_evse_attribute("State", Clusters.EnergyEvse.Enums.StateEnum.kPluggedInNoDemand)
@@ -147,7 +147,7 @@
self.step("6")
await self.send_test_event_trigger_charge_demand()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStarted)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStarted)
self.step("6a")
await self.check_evse_attribute("State", expected_state)
@@ -172,7 +172,7 @@
# Sleep for the charging duration plus a couple of seconds to check it has stopped
time.sleep(charging_duration + 2)
# check EnergyTransferredStoped (EvseStopped)
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStopped)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStopped)
expected_reason = Clusters.EnergyEvse.Enums.EnergyTransferStoppedReasonEnum.kEVSEStopped
self.validate_energy_transfer_stopped_event(event_data, session_id, expected_state, expected_reason)
@@ -188,7 +188,7 @@
max_charge_current = 12000
await self.send_enable_charge_command(charge_until=charge_until, min_charge=min_charge_current, max_charge=max_charge_current)
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStarted)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStarted)
self.step("8a")
await self.check_evse_attribute("State", Clusters.EnergyEvse.Enums.StateEnum.kPluggedInCharging)
@@ -228,7 +228,7 @@
self.step("10")
await self.send_test_event_trigger_charge_demand_clear()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStopped)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStopped)
expected_reason = Clusters.EnergyEvse.Enums.EnergyTransferStoppedReasonEnum.kEVStopped
self.validate_energy_transfer_stopped_event(event_data, session_id, expected_state, expected_reason)
@@ -239,7 +239,7 @@
await self.send_test_event_trigger_charge_demand()
# Check we get EnergyTransferStarted again
await self.send_enable_charge_command(charge_until=charge_until, min_charge=min_charge_current, max_charge=max_charge_current)
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStarted)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStarted)
self.validate_energy_transfer_started_event(event_data, session_id, expected_state, expected_max_charge)
self.step("11a")
@@ -247,7 +247,7 @@
self.step("12")
await self.send_test_event_trigger_charge_demand_clear()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStopped)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStopped)
expected_reason = Clusters.EnergyEvse.Enums.EnergyTransferStoppedReasonEnum.kEVStopped
self.validate_energy_transfer_stopped_event(event_data, session_id, expected_state, expected_reason)
@@ -256,7 +256,7 @@
self.step("13")
await self.send_test_event_trigger_pluggedin_clear()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EVNotDetected)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EVNotDetected)
expected_state = Clusters.EnergyEvse.Enums.StateEnum.kPluggedInNoDemand
self.validate_ev_not_detected_event(event_data, session_id, expected_state, expected_duration=0, expected_charged=0)
@@ -280,13 +280,13 @@
session_id = session_id + 1
# Check we get a new EVConnected event with updated session ID
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EVConnected)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EVConnected)
self.validate_ev_connected_event(event_data, session_id)
self.step("14a")
await self.send_test_event_trigger_charge_demand()
expected_state = Clusters.EnergyEvse.Enums.StateEnum.kPluggedInCharging # This is the value at the event time
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStarted)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStarted)
self.validate_energy_transfer_started_event(event_data, session_id, expected_state, expected_max_charge)
self.step("14b")
@@ -295,7 +295,7 @@
self.step("15")
await self.send_disable_command()
expected_state = Clusters.EnergyEvse.Enums.StateEnum.kPluggedInCharging # This is the value prior to stopping
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStopped)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStopped)
expected_reason = Clusters.EnergyEvse.Enums.EnergyTransferStoppedReasonEnum.kEVSEStopped
self.validate_energy_transfer_stopped_event(event_data, session_id, expected_state, expected_reason)
@@ -307,7 +307,7 @@
self.step("17")
await self.send_test_event_trigger_pluggedin_clear()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EVNotDetected)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EVNotDetected)
expected_state = Clusters.EnergyEvse.Enums.StateEnum.kPluggedInNoDemand
self.validate_ev_not_detected_event(event_data, session_id, expected_state, expected_duration=0, expected_charged=0)
diff --git a/src/python_testing/TC_EEVSE_2_4.py b/src/python_testing/TC_EEVSE_2_4.py
index 63adbe5..9ccad2f 100644
--- a/src/python_testing/TC_EEVSE_2_4.py
+++ b/src/python_testing/TC_EEVSE_2_4.py
@@ -20,8 +20,8 @@
import chip.clusters as Clusters
from chip.clusters.Types import NullValue
-from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
-from TC_EEVSE_Utils import EEVSEBaseTestHelper, EventChangeCallback
+from matter_testing_support import EventChangeCallback, MatterBaseTest, TestStep, async_test_body, default_matter_test_main
+from TC_EEVSE_Utils import EEVSEBaseTestHelper
logger = logging.getLogger(__name__)
@@ -99,7 +99,7 @@
self.step("4")
await self.send_test_event_trigger_pluggedin()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EVConnected)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EVConnected)
self.step("4a")
await self.check_evse_attribute("State", Clusters.EnergyEvse.Enums.StateEnum.kPluggedInNoDemand)
@@ -117,7 +117,7 @@
self.step("6")
await self.send_test_event_trigger_charge_demand()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStarted)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStarted)
self.step("6a")
await self.check_evse_attribute("State", Clusters.EnergyEvse.Enums.StateEnum.kPluggedInCharging)
@@ -127,7 +127,7 @@
self.step("7")
await self.send_test_event_trigger_evse_ground_fault()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.Fault)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.Fault)
expected_state = Clusters.EnergyEvse.Enums.StateEnum.kPluggedInCharging
previous_fault = Clusters.EnergyEvse.Enums.FaultStateEnum.kNoError
current_fault = Clusters.EnergyEvse.Enums.FaultStateEnum.kGroundFault
@@ -141,7 +141,7 @@
self.step("8")
await self.send_test_event_trigger_evse_over_temperature_fault()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.Fault)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.Fault)
expected_state = Clusters.EnergyEvse.Enums.StateEnum.kFault
previous_fault = Clusters.EnergyEvse.Enums.FaultStateEnum.kGroundFault
current_fault = Clusters.EnergyEvse.Enums.FaultStateEnum.kOverTemperature
@@ -155,7 +155,7 @@
self.step("9")
await self.send_test_event_trigger_evse_fault_clear()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.Fault)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.Fault)
expected_state = Clusters.EnergyEvse.Enums.StateEnum.kFault
previous_fault = Clusters.EnergyEvse.Enums.FaultStateEnum.kOverTemperature
current_fault = Clusters.EnergyEvse.Enums.FaultStateEnum.kNoError
@@ -169,11 +169,11 @@
self.step("10")
await self.send_test_event_trigger_charge_demand_clear()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EnergyTransferStopped)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EnergyTransferStopped)
self.step("11")
await self.send_test_event_trigger_pluggedin_clear()
- event_data = events_callback.WaitForEventReport(Clusters.EnergyEvse.Events.EVNotDetected)
+ event_data = events_callback.wait_for_event_report(Clusters.EnergyEvse.Events.EVNotDetected)
expected_state = Clusters.EnergyEvse.Enums.StateEnum.kPluggedInNoDemand
self.validate_ev_not_detected_event(event_data, session_id, expected_state, expected_duration=0, expected_charged=0)
diff --git a/src/python_testing/TC_EEVSE_2_5.py b/src/python_testing/TC_EEVSE_2_5.py
index 4a32b5a..00150f2 100644
--- a/src/python_testing/TC_EEVSE_2_5.py
+++ b/src/python_testing/TC_EEVSE_2_5.py
@@ -20,8 +20,8 @@
import chip.clusters as Clusters
from chip.clusters.Types import NullValue
from chip.interaction_model import Status
-from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
-from TC_EEVSE_Utils import EEVSEBaseTestHelper, EventChangeCallback
+from matter_testing_support import EventChangeCallback, MatterBaseTest, TestStep, async_test_body, default_matter_test_main
+from TC_EEVSE_Utils import EEVSEBaseTestHelper
logger = logging.getLogger(__name__)
diff --git a/src/python_testing/TC_EEVSE_Utils.py b/src/python_testing/TC_EEVSE_Utils.py
index fb6bfa1..6b6b139 100644
--- a/src/python_testing/TC_EEVSE_Utils.py
+++ b/src/python_testing/TC_EEVSE_Utils.py
@@ -16,45 +16,14 @@
import logging
-import queue
import chip.clusters as Clusters
-from chip.clusters import ClusterObjects as ClusterObjects
-from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction
from chip.interaction_model import InteractionModelError, Status
from mobly import asserts
logger = logging.getLogger(__name__)
-class EventChangeCallback:
- def __init__(self, expected_cluster: ClusterObjects):
- self._q = queue.Queue()
- self._expected_cluster = expected_cluster
-
- async def start(self, dev_ctrl, node_id: int, endpoint: int):
- self._subscription = await dev_ctrl.ReadEvent(node_id,
- events=[(endpoint, self._expected_cluster, True)], reportInterval=(1, 5),
- fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
- self._subscription.SetEventUpdateCallback(self.__call__)
-
- def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction):
- if res.Status == Status.Success and res.Header.ClusterId == self._expected_cluster.id:
- logging.info(
- f'Got subscription report for event on cluster {self._expected_cluster}: {res.Data}')
- self._q.put(res)
-
- def WaitForEventReport(self, expected_event: ClusterObjects.ClusterEvent):
- try:
- res = self._q.get(block=True, timeout=10)
- except queue.Empty:
- asserts.fail("Failed to receive a report for the event {}".format(expected_event))
-
- asserts.assert_equal(res.Header.ClusterId, expected_event.cluster_id, "Expected cluster ID not found in event report")
- asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report")
- return res.Data
-
-
class EEVSEBaseTestHelper:
async def read_evse_attribute_expect_success(self, endpoint: int = None, attribute: str = ""):
@@ -79,7 +48,7 @@
asserts.assert_equal(result[0].Status, Status.Success, "UserMaximumChargeCurrent write failed")
async def send_enable_charge_command(self, endpoint: int = None, charge_until: int = None, timedRequestTimeoutMs: int = 3000,
- min_charge: int = None, max_charge: int = None, expected_status: Status = Status.Success):
+ min_charge: int = 6000, max_charge: int = 32000, expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.EnergyEvse.Commands.EnableCharging(
chargingEnabledUntil=charge_until,
@@ -110,34 +79,6 @@
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
- async def send_test_event_triggers(self, enableKey: bytes = None, eventTrigger=0x0099000000000000):
- # get the test event enable key or assume the default
- # This can be passed in on command line using
- # --hex-arg enableKey:000102030405060708090a0b0c0d0e0f
- if enableKey is None:
- if 'enableKey' not in self.matter_test_config.global_test_params:
- enableKey = bytes([b for b in range(16)])
- else:
- enableKey = self.matter_test_config.global_test_params['enableKey']
-
- try:
- # GeneralDiagnosics cluster is meant to be on Endpoint 0 (Root)
- await self.send_single_cmd(endpoint=0,
- cmd=Clusters.GeneralDiagnostics.Commands.TestEventTrigger(
- enableKey,
- eventTrigger)
- )
-
- except InteractionModelError as e:
- asserts.fail(f"Unexpected error returned - {e.status}")
-
- async def check_test_event_triggers_enabled(self):
- full_attr = Clusters.GeneralDiagnostics.Attributes.TestEventTriggersEnabled
- cluster = Clusters.Objects.GeneralDiagnostics
- # GeneralDiagnosics cluster is meant to be on Endpoint 0 (Root)
- test_event_enabled = await self.read_single_attribute_check_success(endpoint=0, cluster=cluster, attribute=full_attr)
- asserts.assert_equal(test_event_enabled, True, "TestEventTriggersEnabled is False")
-
async def send_test_event_trigger_basic(self):
await self.send_test_event_triggers(eventTrigger=0x0099000000000000)
diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py
index 4ddb4ce..2f3ca96 100644
--- a/src/python_testing/matter_testing_support.py
+++ b/src/python_testing/matter_testing_support.py
@@ -53,6 +53,7 @@
import chip.native
from chip import discovery
from chip.ChipStack import ChipStack
+from chip.clusters import ClusterObjects as ClusterObjects
from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction
from chip.exceptions import ChipStackError
from chip.interaction_model import InteractionModelError, Status
@@ -273,6 +274,42 @@
return self._name
+class EventChangeCallback:
+ def __init__(self, expected_cluster: ClusterObjects):
+ """This class creates a queue to store received event callbacks, that can be checked by the test script
+ expected_cluster: is the cluster from which the events are expected
+ """
+ self._q = queue.Queue()
+ self._expected_cluster = expected_cluster
+
+ async def start(self, dev_ctrl, node_id: int, endpoint: int):
+ """This starts a subscription for events on the specified node_id and endpoint. The cluster is specified when the class instance is created."""
+ self._subscription = await dev_ctrl.ReadEvent(node_id,
+ events=[(endpoint, self._expected_cluster, True)], reportInterval=(1, 5),
+ fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
+ self._subscription.SetEventUpdateCallback(self.__call__)
+
+ def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction):
+ """This is the subscription callback when an event is received.
+ It checks the event is from the expected_cluster and then posts it into the queue for later processing."""
+ if res.Status == Status.Success and res.Header.ClusterId == self._expected_cluster.id:
+ logging.info(
+ f'Got subscription report for event on cluster {self._expected_cluster}: {res.Data}')
+ self._q.put(res)
+
+ def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, timeout: int = 10):
+ """This function allows a test script to block waiting for the specific event to arrive with a timeout.
+ It returns the event data so that the values can be checked."""
+ try:
+ res = self._q.get(block=True, timeout=timeout)
+ except queue.Empty:
+ asserts.fail("Failed to receive a report for the event {}".format(expected_event))
+
+ asserts.assert_equal(res.Header.ClusterId, expected_event.cluster_id, "Expected cluster ID not found in event report")
+ asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report")
+ return res.Data
+
+
class InternalTestRunnerHooks(TestRunnerHooks):
def start(self, count: int):
@@ -847,6 +884,43 @@
result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs)
return result
+ async def send_test_event_triggers(self, eventTrigger: int, enableKey: bytes = None):
+ """This helper function sends a test event trigger to the General Diagnostics cluster on endpoint 0
+
+ The enableKey can be passed into the function, or omitted which will then
+ use the one provided to the script via --hex-arg enableKey:<HEX VALUE>
+ if not it defaults to 0x000102030405060708090a0b0c0d0e0f
+ """
+ # get the test event enable key or assume the default
+ # This can be passed in on command line using
+ # --hex-arg enableKey:000102030405060708090a0b0c0d0e0f
+ if enableKey is None:
+ if 'enableKey' not in self.matter_test_config.global_test_params:
+ enableKey = bytes([b for b in range(16)])
+ else:
+ enableKey = self.matter_test_config.global_test_params['enableKey']
+
+ try:
+ # GeneralDiagnostics cluster is meant to be on Endpoint 0 (Root)
+ await self.send_single_cmd(endpoint=0,
+ cmd=Clusters.GeneralDiagnostics.Commands.TestEventTrigger(
+ enableKey,
+ eventTrigger)
+ )
+
+ except InteractionModelError as e:
+ asserts.fail(
+ f"Sending TestEventTrigger resulted in Unexpected error. Are they enabled in DUT? Command returned - {e.status}")
+
+ async def check_test_event_triggers_enabled(self):
+ """This cluster checks that the General Diagnostics cluster TestEventTriggersEnabled attribute is True.
+ It will assert and fail the test if not True."""
+ full_attr = Clusters.GeneralDiagnostics.Attributes.TestEventTriggersEnabled
+ cluster = Clusters.Objects.GeneralDiagnostics
+ # GeneralDiagnostics cluster is meant to be on Endpoint 0 (Root)
+ test_event_enabled = await self.read_single_attribute_check_success(endpoint=0, cluster=cluster, attribute=full_attr)
+ asserts.assert_equal(test_event_enabled, True, "TestEventTriggersEnabled is False")
+
def print_step(self, stepnum: typing.Union[int, str], title: str) -> None:
logging.info(f'***** Test Step {stepnum} : {title}')