Split TC_MCORE_FS_1_3 to align with the Test Spec (#35274)
* [TC_MCORE_FS_1_3] Fix test script according to test plan update
* Separate storage for all used components
* Open commissioning window on TH_FSA_BRIDGE
* Python wrapper for running fabric-admin and fabric-bridge together
* Customize fabric-admin and fabric-bridge RPC ports
* Create storage directory
* Use fabric-sync-app in the TC-MCORE-FS-1.3 script
* Use CommissionerControlCluster to commission TH_SERVER onto DUT
* Auto-link bridge with admin
* Test automation setup
* Terminate apps on SIGTERM and SIGINT
* Open commissioning window on fabric-bridge after adding to FSA
* Commissioning TH_FSA_BRIDGE to DUT_FSA fabric
* Synchronize server from TH to DUT
* Start another instance of app server
* Test if unique ID was synced
* Allow customization for fabric-sync app components
* Final cleanup
* Split test case into two test cases
* Simplify TC_MCORE_FS_1_3 script
* Simplify TC_MCORE_FS_1_4 steps
* Use volatile storage for fabric-sync-app by default
* Add TC_MCORE_FS_1_4 to exceptions
* Get rid of defaults
* Document used options in open commissioning window
* Speed up the pipe read busy loop
* Refactor local output processing
* Improve wait for output
* Add FS-sync tests to CI
* Improve Python code style
* Fix wait for fabric-sync-app start
* Fix asyncio forwarder
* Fixes for review comments
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index b37ed3c..5265f82 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -486,6 +486,9 @@
--target linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test \
+ --target linux-x64-fabric-admin-rpc-ipv6only-clang \
+ --target linux-x64-fabric-bridge-rpc-ipv6only-no-ble-no-wifi-clang \
+ --target linux-x64-light-data-model-no-unique-id-ipv6only-no-ble-no-wifi-clang \
--target linux-x64-python-bindings \
build \
--copy-artifacts-to objdir-clone \
@@ -500,6 +503,9 @@
echo "CHIP_MICROWAVE_OVEN_APP: out/linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-microwave-oven-app" >> /tmp/test_env.yaml
echo "CHIP_RVC_APP: out/linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-rvc-app" >> /tmp/test_env.yaml
echo "NETWORK_MANAGEMENT_APP: out/linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test/matter-network-manager-app" >> /tmp/test_env.yaml
+ echo "FABRIC_ADMIN_APP: out/linux-x64-fabric-admin-rpc-ipv6only-clang/fabric-admin" >> /tmp/test_env.yaml
+ echo "FABRIC_BRIDGE_APP: out/linux-x64-fabric-bridge-rpc-ipv6only-no-ble-no-wifi-clang/fabric-bridge-app" >> /tmp/test_env.yaml
+ echo "LIGHTING_APP_NO_UNIQUE_ID: out/linux-x64-light-data-model-no-unique-id-ipv6only-no-ble-no-wifi-clang/chip-lighting-app" >> /tmp/test_env.yaml
echo "TRACE_APP: out/trace_data/app-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
echo "TRACE_TEST_JSON: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
echo "TRACE_TEST_PERFETTO: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
diff --git a/examples/fabric-admin/scripts/fabric-sync-app.py b/examples/fabric-admin/scripts/fabric-sync-app.py
new file mode 100755
index 0000000..c6faed8
--- /dev/null
+++ b/examples/fabric-admin/scripts/fabric-sync-app.py
@@ -0,0 +1,318 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2024 Project CHIP Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import contextlib
+import os
+import signal
+import sys
+from argparse import ArgumentParser
+from tempfile import TemporaryDirectory
+
+
+async def asyncio_stdin() -> asyncio.StreamReader:
+ """Wrap sys.stdin in an asyncio StreamReader."""
+ loop = asyncio.get_event_loop()
+ reader = asyncio.StreamReader()
+ protocol = asyncio.StreamReaderProtocol(reader)
+ await loop.connect_read_pipe(lambda: protocol, sys.stdin)
+ return reader
+
+
+async def asyncio_stdout(file=sys.stdout) -> asyncio.StreamWriter:
+ """Wrap an IO stream in an asyncio StreamWriter."""
+ loop = asyncio.get_event_loop()
+ transport, protocol = await loop.connect_write_pipe(
+ lambda: asyncio.streams.FlowControlMixin(loop=loop),
+ os.fdopen(file.fileno(), 'wb'))
+ return asyncio.streams.StreamWriter(transport, protocol, None, loop)
+
+
+async def forward_f(prefix: bytes, f_in: asyncio.StreamReader,
+ f_out: asyncio.StreamWriter, cb=None):
+ """Forward f_in to f_out with a prefix attached.
+
+ This function can optionally feed received lines to a callback function.
+ """
+ while True:
+ line = await f_in.readline()
+ if not line:
+ break
+ if cb is not None:
+ cb(line)
+ f_out.write(prefix)
+ f_out.write(line)
+ await f_out.drain()
+
+
+async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter):
+ """Forward named pipe to f_out.
+
+ Unfortunately, Python does not support async file I/O on named pipes. This
+ function performs busy waiting with a short asyncio-friendly sleep to read
+ from the pipe.
+ """
+ fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK)
+ while True:
+ try:
+ data = os.read(fd, 1024)
+ if data:
+ f_out.write(data)
+ if not data:
+ await asyncio.sleep(0.1)
+ except BlockingIOError:
+ await asyncio.sleep(0.1)
+
+
+async def forward_stdin(f_out: asyncio.StreamWriter):
+ """Forward stdin to f_out."""
+ reader = await asyncio_stdin()
+ while True:
+ line = await reader.readline()
+ if not line:
+ # Exit on Ctrl-D (EOF).
+ sys.exit(0)
+ f_out.write(line)
+
+
+class Subprocess:
+
+ def __init__(self, tag: str, program: str, *args, stdout_cb=None):
+ self.event = asyncio.Event()
+ self.tag = tag.encode()
+ self.program = program
+ self.args = args
+ self.stdout_cb = stdout_cb
+ self.expected_output = None
+
+ def _check_output(self, line: bytes):
+ if self.expected_output is not None and self.expected_output in line:
+ self.event.set()
+
+ async def run(self):
+ self.p = await asyncio.create_subprocess_exec(self.program, *self.args,
+ stdin=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE)
+ # Add the stdout and stderr processing to the event loop.
+ asyncio.create_task(forward_f(
+ self.tag,
+ self.p.stderr,
+ await asyncio_stdout(sys.stderr)))
+ asyncio.create_task(forward_f(
+ self.tag,
+ self.p.stdout,
+ await asyncio_stdout(sys.stdout),
+ cb=self._check_output))
+
+ async def send(self, message: str, expected_output: str = None, timeout: float = None):
+ """Send a message to a process and optionally wait for a response."""
+
+ if expected_output is not None:
+ self.expected_output = expected_output.encode()
+ self.event.clear()
+
+ self.p.stdin.write((message + "\n").encode())
+ await self.p.stdin.drain()
+
+ if expected_output is not None:
+ await asyncio.wait_for(self.event.wait(), timeout=timeout)
+ self.expected_output = None
+
+ async def wait(self):
+ await self.p.wait()
+
+ def terminate(self):
+ self.p.terminate()
+
+
+async def run_admin(program, stdout_cb=None, storage_dir=None,
+ rpc_admin_port=None, rpc_bridge_port=None,
+ paa_trust_store_path=None, commissioner_name=None,
+ commissioner_node_id=None, commissioner_vendor_id=None):
+ args = []
+ if storage_dir is not None:
+ args.extend(["--storage-directory", storage_dir])
+ if rpc_admin_port is not None:
+ args.extend(["--local-server-port", str(rpc_admin_port)])
+ if rpc_bridge_port is not None:
+ args.extend(["--fabric-bridge-server-port", str(rpc_bridge_port)])
+ if paa_trust_store_path is not None:
+ args.extend(["--paa-trust-store-path", paa_trust_store_path])
+ if commissioner_name is not None:
+ args.extend(["--commissioner-name", commissioner_name])
+ if commissioner_node_id is not None:
+ args.extend(["--commissioner-nodeid", str(commissioner_node_id)])
+ if commissioner_vendor_id is not None:
+ args.extend(["--commissioner-vendor-id", str(commissioner_vendor_id)])
+ p = Subprocess("[FS-ADMIN]", program, "interactive", "start", *args,
+ stdout_cb=stdout_cb)
+ await p.run()
+ return p
+
+
+async def run_bridge(program, storage_dir=None, rpc_admin_port=None,
+ rpc_bridge_port=None, discriminator=None, passcode=None,
+ secured_device_port=None):
+ args = []
+ if storage_dir is not None:
+ args.extend(["--KVS",
+ os.path.join(storage_dir, "chip_fabric_bridge_kvs")])
+ if rpc_admin_port is not None:
+ args.extend(["--fabric-admin-server-port", str(rpc_admin_port)])
+ if rpc_bridge_port is not None:
+ args.extend(["--local-server-port", str(rpc_bridge_port)])
+ if discriminator is not None:
+ args.extend(["--discriminator", str(discriminator)])
+ if passcode is not None:
+ args.extend(["--passcode", str(passcode)])
+ if secured_device_port is not None:
+ args.extend(["--secured-device-port", str(secured_device_port)])
+ p = Subprocess("[FS-BRIDGE]", program, *args)
+ await p.run()
+ return p
+
+
+async def main(args):
+
+ # Node ID of the bridge on the fabric.
+ bridge_node_id = 1
+
+ if args.commissioner_node_id == bridge_node_id:
+ raise ValueError(f"NodeID={bridge_node_id} is reserved for the local fabric-bridge")
+
+ storage_dir = args.storage_dir
+ if storage_dir is not None:
+ os.makedirs(storage_dir, exist_ok=True)
+ else:
+ storage = TemporaryDirectory(prefix="fabric-sync-app")
+ storage_dir = storage.name
+
+ pipe = args.stdin_pipe
+ if pipe and not os.path.exists(pipe):
+ os.mkfifo(pipe)
+
+ def terminate(signum, frame):
+ admin.terminate()
+ bridge.terminate()
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, terminate)
+ signal.signal(signal.SIGTERM, terminate)
+
+ admin, bridge = await asyncio.gather(
+ run_admin(
+ args.app_admin,
+ storage_dir=storage_dir,
+ rpc_admin_port=args.app_admin_rpc_port,
+ rpc_bridge_port=args.app_bridge_rpc_port,
+ paa_trust_store_path=args.paa_trust_store_path,
+ commissioner_name=args.commissioner_name,
+ commissioner_node_id=args.commissioner_node_id,
+ commissioner_vendor_id=args.commissioner_vendor_id,
+ ),
+ run_bridge(
+ args.app_bridge,
+ storage_dir=storage_dir,
+ rpc_admin_port=args.app_admin_rpc_port,
+ rpc_bridge_port=args.app_bridge_rpc_port,
+ secured_device_port=args.secured_device_port,
+ discriminator=args.discriminator,
+ passcode=args.passcode,
+ ))
+
+ # Wait a bit for apps to start.
+ await asyncio.sleep(1)
+
+ try:
+ # Check whether the bridge is already commissioned. If it is,
+ # we will get the response, otherwise we will hit timeout.
+ await admin.send(
+ f"descriptor read device-type-list {bridge_node_id} 1 --timeout 1",
+ # Log message which should appear in the fabric-admin output if
+ # the bridge is already commissioned.
+ expected_output="Reading attribute: Cluster=0x0000_001D Endpoint=0x1 AttributeId=0x0000_0000",
+ timeout=1.5)
+ except asyncio.TimeoutError:
+ # Commission the bridge to the admin.
+ cmd = f"fabricsync add-local-bridge {bridge_node_id}"
+ if args.passcode is not None:
+ cmd += f" --setup-pin-code {args.passcode}"
+ if args.secured_device_port is not None:
+ cmd += f" --local-port {args.secured_device_port}"
+ await admin.send(
+ cmd,
+ # Wait for the log message indicating that the bridge has been
+ # added to the fabric.
+ f"Commissioning complete for node ID {bridge_node_id:#018x}: success")
+
+ # Open commissioning window with original setup code for the bridge.
+ cw_endpoint_id = 0
+ cw_option = 0 # 0: Original setup code, 1: New setup code
+ cw_timeout = 600
+ cw_iteration = 1000
+ cw_discriminator = 0
+ await admin.send(f"pairing open-commissioning-window {bridge_node_id} {cw_endpoint_id}"
+ f" {cw_option} {cw_timeout} {cw_iteration} {cw_discriminator}")
+
+ try:
+ await asyncio.gather(
+ forward_pipe(pipe, admin.p.stdin) if pipe else forward_stdin(admin.p.stdin),
+ admin.wait(),
+ bridge.wait(),
+ )
+ except SystemExit:
+ admin.terminate()
+ bridge.terminate()
+ except Exception:
+ admin.terminate()
+ bridge.terminate()
+ raise
+
+
+if __name__ == "__main__":
+ parser = ArgumentParser(description="Fabric-Sync Example Application")
+ parser.add_argument("--app-admin", metavar="PATH",
+ default="out/linux-x64-fabric-admin-rpc/fabric-admin",
+ help="path to the fabric-admin executable; default=%(default)s")
+ parser.add_argument("--app-bridge", metavar="PATH",
+ default="out/linux-x64-fabric-bridge-rpc/fabric-bridge-app",
+ help="path to the fabric-bridge executable; default=%(default)s")
+ parser.add_argument("--app-admin-rpc-port", metavar="PORT", type=int,
+ help="fabric-admin RPC server port")
+ parser.add_argument("--app-bridge-rpc-port", metavar="PORT", type=int,
+ help="fabric-bridge RPC server port")
+ parser.add_argument("--stdin-pipe", metavar="PATH",
+ help="read input from a named pipe instead of stdin")
+ parser.add_argument("--storage-dir", metavar="PATH",
+ help=("directory to place storage files in; by default "
+ "volatile storage is used"))
+ parser.add_argument("--paa-trust-store-path", metavar="PATH",
+ help="path to directory holding PAA certificates")
+ parser.add_argument("--commissioner-name", metavar="NAME",
+ help="commissioner name to use for the admin")
+ parser.add_argument("--commissioner-node-id", metavar="NUM", type=int,
+ help="commissioner node ID to use for the admin")
+ parser.add_argument("--commissioner-vendor-id", metavar="NUM", type=int,
+ help="commissioner vendor ID to use for the admin")
+ parser.add_argument("--secured-device-port", metavar="NUM", type=int,
+ help="secure messages listen port to use for the bridge")
+ parser.add_argument("--discriminator", metavar="NUM", type=int,
+ help="discriminator to use for the bridge")
+ parser.add_argument("--passcode", metavar="NUM", type=int,
+ help="passcode to use for the bridge")
+ with contextlib.suppress(KeyboardInterrupt):
+ asyncio.run(main(parser.parse_args()))
diff --git a/src/python_testing/TC_MCORE_FS_1_3.py b/src/python_testing/TC_MCORE_FS_1_3.py
index 4245bc1..1a18896 100644
--- a/src/python_testing/TC_MCORE_FS_1_3.py
+++ b/src/python_testing/TC_MCORE_FS_1_3.py
@@ -15,15 +15,31 @@
# limitations under the License.
#
-# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_no_uid_app_path:<path_to_app>
+# This test requires a TH_SERVER_NO_UID application that returns UnsupportedAttribute
+# when reading UniqueID from BasicInformation Cluster. Please specify the app
+# location with --string-arg th_server_no_uid_app_path:<path_to_app>
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs: run1
+# test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py
+# test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234
+# test-runner-run/run1/factoryreset: True
+# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID}
+# test-runner-run/run1/script-start-delay: 5
+# test-runner-run/run1/quiet: false
+# === END CI TEST ARGUMENTS ===
+
+import asyncio
import logging
import os
import random
-import signal
import subprocess
-import time
-import uuid
+import sys
+import tempfile
+import threading
import chip.clusters as Clusters
from chip import ChipDeviceCtrl
@@ -32,109 +48,224 @@
from mobly import asserts
+# TODO: Make this class more generic. Issue #35348
+class Subprocess(threading.Thread):
+
+ def __init__(self, args: list = [], tag="", **kw):
+ super().__init__(**kw)
+ self.tag = f"[{tag}] " if tag else ""
+ self.args = args
+
+ def forward_f(self, f_in, f_out):
+ while True:
+ line = f_in.readline()
+ if not line:
+ break
+ f_out.write(f"{self.tag}{line}")
+ f_out.flush()
+
+ def run(self):
+ logging.info("RUN: %s", " ".join(self.args))
+ self.p = subprocess.Popen(self.args, errors="ignore", stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ # Forward stdout and stderr with a tag attached.
+ t1 = threading.Thread(target=self.forward_f, args=[self.p.stdout, sys.stdout])
+ t1.start()
+ t2 = threading.Thread(target=self.forward_f, args=[self.p.stderr, sys.stderr])
+ t2.start()
+ # Wait for the process to finish.
+ self.p.wait()
+ t1.join()
+ t2.join()
+
+ def stop(self):
+ self.p.terminate()
+ self.join()
+
+
+class AppServer:
+
+ def __init__(self, app, storage_dir, port=None, discriminator=None, passcode=None):
+
+ args = [app]
+ args.extend(["--KVS", tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1]])
+ args.extend(['--secured-device-port', str(port)])
+ args.extend(["--discriminator", str(discriminator)])
+ args.extend(["--passcode", str(passcode)])
+ self.app = Subprocess(args, tag="SERVER")
+ self.app.start()
+
+ def stop(self):
+ self.app.stop()
+
+
class TC_MCORE_FS_1_3(MatterBaseTest):
- @async_test_body
- async def setup_class(self):
+
+ @property
+ def default_timeout(self) -> int:
+ # This test has some manual steps, so we need a longer timeout.
+ return 200
+
+ def setup_class(self):
super().setup_class()
- self.th_server_nodeid = 1111
- self.th_server_kvs = None
- self.th_server_port = 5543
- self.app_process_for_dut_eco = None
+ self.th_server = None
+ self.storage = None
- # Create a second controller on a new fabric to communicate to the server
- new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
- new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
- paa_path = str(self.matter_test_config.paa_trust_store_path)
- self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path)
+ # Get the path to the TH_SERVER_NO_UID app from the user params.
+ th_server_app = self.user_params.get("th_server_no_uid_app_path", None)
+ if not th_server_app:
+ asserts.fail("This test requires a TH_SERVER_NO_UID app. Specify app path with --string-arg th_server_no_uid_app_path:<path_to_app>")
+ if not os.path.exists(th_server_app):
+ asserts.fail(f"The path {th_server_app} does not exist")
+
+ # Create a temporary storage directory for keeping KVS files.
+ self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__)
+ logging.info("Temporary storage directory: %s", self.storage.name)
+
+ self.th_server_port = 5544
+ self.th_server_discriminator = random.randint(0, 4095)
+ self.th_server_passcode = 20202021
+
+ # Start the TH_SERVER_NO_UID app.
+ self.th_server = AppServer(
+ th_server_app,
+ storage_dir=self.storage.name,
+ port=self.th_server_port,
+ discriminator=self.th_server_discriminator,
+ passcode=self.th_server_passcode)
def teardown_class(self):
- if self.app_process_for_dut_eco is not None:
- logging.warning("Stopping app with SIGTERM")
- self.app_process_for_dut_eco.send_signal(signal.SIGTERM.value)
- self.app_process_for_dut_eco.wait()
-
- os.remove(self.th_server_kvs)
+ if self.th_server is not None:
+ self.th_server.stop()
+ if self.storage is not None:
+ self.storage.cleanup()
super().teardown_class()
- async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for_th, device_info):
- app = self.user_params.get("th_server_no_uid_app_path", None)
- if not app:
- asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_no_uid_app_path:<path_to_app>')
-
- if not os.path.exists(app):
- asserts.fail(f'The path {app} does not exist')
-
- discriminator = random.randint(0, 4095)
- passcode = 20202021
-
- cmd = [app]
- cmd.extend(['--secured-device-port', str(port)])
- cmd.extend(['--discriminator', str(discriminator)])
- cmd.extend(['--passcode', str(passcode)])
- cmd.extend(['--KVS', kvs])
-
- # TODO: Determine if we want these logs cooked or pushed to somewhere else
- logging.info(f"Starting TH device for {device_info}")
- self.app_process_for_dut_eco = subprocess.Popen(cmd)
- logging.info(f"Started TH device for {device_info}")
- time.sleep(3)
-
- logging.info("Commissioning from separate fabric")
- await self.TH_server_controller.CommissionOnNetwork(nodeId=node_id_for_th, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator)
- logging.info("Commissioning device for DUT ecosystem onto TH for managing")
-
def steps_TC_MCORE_FS_1_3(self) -> list[TestStep]:
- steps = [TestStep(1, "TH commissions TH_SERVER to TH’s fabric.", is_commissioning=True),
- TestStep(2, "DUT_FSA commissions TH_SERVER to DUT_FSA’s fabric and generates a UniqueID.")]
- return steps
+ return [
+ TestStep(0, "Commission DUT if not done", is_commissioning=True),
+ TestStep(1, "TH commissions TH_SERVER_NO_UID to TH's fabric"),
+ TestStep(2, "DUT_FSA commissions TH_SERVER_NO_UID to DUT_FSA's fabric and generates a UniqueID.",
+ "TH verifies a value is visible for the UniqueID from the DUT_FSA's Bridged Device Basic Information Cluster."),
+ ]
+
+ async def commission_via_commissioner_control(self, controller_node_id: int, device_node_id: int):
+ """Commission device_node_id to controller_node_id using CommissionerControl cluster."""
+
+ request_id = random.randint(0, 0xFFFFFFFFFFFFFFFF)
+
+ vendor_id = await self.read_single_attribute_check_success(
+ node_id=device_node_id,
+ cluster=Clusters.BasicInformation,
+ attribute=Clusters.BasicInformation.Attributes.VendorID,
+ )
+
+ product_id = await self.read_single_attribute_check_success(
+ node_id=device_node_id,
+ cluster=Clusters.BasicInformation,
+ attribute=Clusters.BasicInformation.Attributes.ProductID,
+ )
+
+ await self.send_single_cmd(
+ node_id=controller_node_id,
+ cmd=Clusters.CommissionerControl.Commands.RequestCommissioningApproval(
+ requestId=request_id,
+ vendorId=vendor_id,
+ productId=product_id,
+ ),
+ )
+
+ if not self.is_ci:
+ self.wait_for_user_input("Approve Commissioning Approval Request on DUT using manufacturer specified mechanism")
+
+ resp = await self.send_single_cmd(
+ node_id=controller_node_id,
+ cmd=Clusters.CommissionerControl.Commands.CommissionNode(
+ requestId=request_id,
+ responseTimeoutSeconds=30,
+ ),
+ )
+
+ asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow,
+ "Incorrect response type")
+
+ await self.send_single_cmd(
+ node_id=device_node_id,
+ cmd=Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(
+ commissioningTimeout=3*60,
+ PAKEPasscodeVerifier=resp.PAKEPasscodeVerifier,
+ discriminator=resp.discriminator,
+ iterations=resp.iterations,
+ salt=resp.salt,
+ ),
+ timedRequestTimeoutMs=5000,
+ )
@async_test_body
async def test_TC_MCORE_FS_1_3(self):
self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
- self.print_step(0, "Commissioning DUT to TH, already done")
+
+ # Commissioning - done
+ self.step(0)
self.step(1)
- root_node_endpoint = 0
- root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
- set_of_endpoints_before_adding_device = set(root_part_list)
- logging.info(f"Set of endpoints before adding the device: {set_of_endpoints_before_adding_device}")
- kvs = f'kvs_{str(uuid.uuid4())}'
- device_info = "for TH ecosystem"
- await self.create_device_and_commission_to_th_fabric(kvs, self.th_server_port, self.th_server_nodeid, device_info)
+ th_server_th_node_id = 1
+ await self.default_controller.CommissionOnNetwork(
+ nodeId=th_server_th_node_id,
+ setupPinCode=self.th_server_passcode,
+ filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
+ filter=self.th_server_discriminator,
+ )
- self.th_server_kvs = kvs
- read_result = await self.TH_server_controller.ReadAttribute(self.th_server_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)])
- result = read_result[root_node_endpoint][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.UniqueID]
- asserts.assert_true(type_matches(result, Clusters.Attribute.ValueDecodeFailure), "We were expecting a value decode failure")
- asserts.assert_equal(result.Reason.status, Status.UnsupportedAttribute, "Incorrect error returned from reading UniqueID")
+ await self.read_single_attribute_expect_error(
+ cluster=Clusters.BasicInformation,
+ attribute=Clusters.BasicInformation.Attributes.UniqueID,
+ node_id=th_server_th_node_id,
+ error=Status.UnsupportedAttribute,
+ )
self.step(2)
- params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.th_server_nodeid)
- self.wait_for_user_input(
- prompt_msg=f"Using the DUT vendor's provided interface, commission the device using the following parameters:\n"
- f"- discriminator: {params.randomDiscriminator}\n"
- f"- setupPinCode: {params.commissioningParameters.setupPinCode}\n"
- f"- setupQRCode: {params.commissioningParameters.setupQRCode}\n"
- f"- setupManualcode: {params.commissioningParameters.setupManualCode}\n"
- f"If using FabricSync Admin, you may type:\n"
- f">>> pairing onnetwork <desired_node_id> {params.commissioningParameters.setupPinCode}")
+ # Get the list of endpoints on the DUT_FSA_BRIDGE before adding the TH_SERVER_NO_UID.
+ dut_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success(
+ cluster=Clusters.Descriptor,
+ attribute=Clusters.Descriptor.Attributes.PartsList,
+ node_id=self.dut_node_id,
+ endpoint=0,
+ ))
- root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
- set_of_endpoints_after_adding_device = set(root_part_list)
- logging.info(f"Set of endpoints after adding the device: {set_of_endpoints_after_adding_device}")
+ await self.commission_via_commissioner_control(
+ controller_node_id=self.dut_node_id,
+ device_node_id=th_server_th_node_id)
- asserts.assert_true(set_of_endpoints_after_adding_device.issuperset(
- set_of_endpoints_before_adding_device), "Expected only new endpoints to be added")
- unique_endpoints_set = set_of_endpoints_after_adding_device - set_of_endpoints_before_adding_device
- asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint")
- newly_added_endpoint = list(unique_endpoints_set)[0]
+ # Wait for the device to appear on the DUT_FSA_BRIDGE.
+ await asyncio.sleep(2)
- th_sed_dut_unique_id = await self.read_single_attribute_check_success(cluster=Clusters.BridgedDeviceBasicInformation, attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, endpoint=newly_added_endpoint)
- asserts.assert_true(type_matches(th_sed_dut_unique_id, str), "UniqueID should be a string")
- asserts.assert_true(th_sed_dut_unique_id, "UniqueID should not be an empty string")
+ # Get the list of endpoints on the DUT_FSA_BRIDGE after adding the TH_SERVER_NO_UID.
+ dut_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success(
+ cluster=Clusters.Descriptor,
+ attribute=Clusters.Descriptor.Attributes.PartsList,
+ node_id=self.dut_node_id,
+ endpoint=0,
+ ))
+
+ # Get the endpoint number for just added TH_SERVER_NO_UID.
+ logging.info("Endpoints on DUT_FSA_BRIDGE: old=%s, new=%s", dut_fsa_bridge_endpoints, dut_fsa_bridge_endpoints_new)
+ asserts.assert_true(dut_fsa_bridge_endpoints_new.issuperset(dut_fsa_bridge_endpoints),
+ "Expected only new endpoints to be added")
+ unique_endpoints_set = dut_fsa_bridge_endpoints_new - dut_fsa_bridge_endpoints
+ asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint on DUT_FSA")
+ dut_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0]
+
+ dut_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success(
+ cluster=Clusters.BridgedDeviceBasicInformation,
+ attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID,
+ endpoint=dut_fsa_bridge_th_server_endpoint)
+ asserts.assert_true(type_matches(dut_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string")
+ asserts.assert_true(dut_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string")
+ logging.info("UniqueID generated for TH_SERVER_NO_UID: %s", dut_fsa_bridge_th_server_unique_id)
if __name__ == "__main__":
diff --git a/src/python_testing/TC_MCORE_FS_1_4.py b/src/python_testing/TC_MCORE_FS_1_4.py
new file mode 100644
index 0000000..8e05c2d
--- /dev/null
+++ b/src/python_testing/TC_MCORE_FS_1_4.py
@@ -0,0 +1,442 @@
+#
+# Copyright (c) 2024 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This test requires a TH_SERVER_NO_UID application that returns UnsupportedAttribute
+# when reading UniqueID from BasicInformation Cluster. Please specify the app
+# location with --string-arg th_server_no_uid_app_path:<path_to_app>
+
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs: run1
+# test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py
+# test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234
+# test-runner-run/run1/factoryreset: True
+# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_fsa_app_path:examples/fabric-admin/scripts/fabric-sync-app.py th_fsa_admin_path:${FABRIC_ADMIN_APP} th_fsa_bridge_path:${FABRIC_BRIDGE_APP} th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID} dut_fsa_stdin_pipe:dut-fsa-stdin
+# test-runner-run/run1/script-start-delay: 5
+# test-runner-run/run1/quiet: false
+# === END CI TEST ARGUMENTS ===
+
+import asyncio
+import logging
+import os
+import random
+import subprocess
+import sys
+import tempfile
+import threading
+
+import chip.clusters as Clusters
+from chip import ChipDeviceCtrl
+from chip.interaction_model import Status
+from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches
+from mobly import asserts
+
+
+# TODO: Make this class more generic. Issue #35348
+class Subprocess(threading.Thread):
+
+ def __init__(self, args: list = [], stdout_cb=None, tag="", **kw):
+ super().__init__(**kw)
+ self.tag = f"[{tag}] " if tag else ""
+ self.stdout_cb = stdout_cb
+ self.args = args
+
+ def forward_f(self, f_in, f_out):
+ while True:
+ line = f_in.readline()
+ if not line:
+ break
+ f_out.write(f"{self.tag}{line}")
+ f_out.flush()
+ if self.stdout_cb is not None:
+ self.stdout_cb(line)
+
+ def run(self):
+ logging.info("RUN: %s", " ".join(self.args))
+ self.p = subprocess.Popen(self.args, errors="ignore", stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ # Forward stdout and stderr with a tag attached.
+ forwarding_stdout_thread = threading.Thread(target=self.forward_f, args=[self.p.stdout, sys.stdout])
+ forwarding_stdout_thread.start()
+ forwarding_stderr_thread = threading.Thread(target=self.forward_f, args=[self.p.stderr, sys.stderr])
+ forwarding_stderr_thread.start()
+ # Wait for the process to finish.
+ self.p.wait()
+ forwarding_stdout_thread.join()
+ forwarding_stderr_thread.join()
+
+ def stop(self):
+ self.p.terminate()
+ self.join()
+
+
+class FabricSyncApp:
+
+ def _process_admin_output(self, line):
+ if self.wait_for_text_text is not None and self.wait_for_text_text in line:
+ self.wait_for_text_event.set()
+
+ def wait_for_text(self, timeout=30):
+ if not self.wait_for_text_event.wait(timeout=timeout):
+ raise Exception(f"Timeout waiting for text: {self.wait_for_text_text}")
+ self.wait_for_text_event.clear()
+ self.wait_for_text_text = None
+
+ def __init__(self, fabric_sync_app_path, fabric_admin_app_path, fabric_bridge_app_path,
+ storage_dir, fabric_name=None, node_id=None, vendor_id=None,
+ paa_trust_store_path=None, bridge_port=None, bridge_discriminator=None,
+ bridge_passcode=None):
+
+ self.wait_for_text_event = threading.Event()
+ self.wait_for_text_text = None
+
+ args = [fabric_sync_app_path]
+ args.append(f"--app-admin={fabric_admin_app_path}")
+ args.append(f"--app-bridge={fabric_bridge_app_path}")
+ # Override default ports, so it will be possible to run
+ # our TH_FSA alongside the DUT_FSA during CI testing.
+ args.append("--app-admin-rpc-port=44000")
+ args.append("--app-bridge-rpc-port=44001")
+ # Keep the storage directory in a temporary location.
+ args.append(f"--storage-dir={storage_dir}")
+ if paa_trust_store_path is not None:
+ args.append(f"--paa-trust-store-path={paa_trust_store_path}")
+ if fabric_name is not None:
+ args.append(f"--commissioner-name={fabric_name}")
+ if node_id is not None:
+ args.append(f"--commissioner-node-id={node_id}")
+ args.append(f"--commissioner-vendor-id={vendor_id}")
+ args.append(f"--secured-device-port={bridge_port}")
+ args.append(f"--discriminator={bridge_discriminator}")
+ args.append(f"--passcode={bridge_passcode}")
+
+ self.fabric_sync_app = Subprocess(args, stdout_cb=self._process_admin_output)
+ self.wait_for_text_text = "Successfully opened pairing window on the device"
+ self.fabric_sync_app.start()
+
+ # Wait for the fabric-sync-app to be ready.
+ self.wait_for_text()
+
+ def commission_on_network(self, node_id, setup_pin_code=None, filter_type=None, filter=None):
+ self.wait_for_text_text = f"Commissioning complete for node ID {node_id:#018x}: success"
+ # Send the commissioning command to the admin.
+ self.fabric_sync_app.p.stdin.write(f"pairing onnetwork {node_id} {setup_pin_code}\n")
+ self.fabric_sync_app.p.stdin.flush()
+ # Wait for success message.
+ self.wait_for_text()
+
+ def stop(self):
+ self.fabric_sync_app.stop()
+
+
+class AppServer:
+
+ def __init__(self, app, storage_dir, port=None, discriminator=None, passcode=None):
+
+ args = [app]
+ args.extend(["--KVS", tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1]])
+ args.extend(['--secured-device-port', str(port)])
+ args.extend(["--discriminator", str(discriminator)])
+ args.extend(["--passcode", str(passcode)])
+ self.app = Subprocess(args, tag="SERVER")
+ self.app.start()
+
+ def stop(self):
+ self.app.stop()
+
+
+class TC_MCORE_FS_1_4(MatterBaseTest):
+
+ @property
+ def default_timeout(self) -> int:
+ # This test has some manual steps, so we need a longer timeout.
+ return 200
+
+ def setup_class(self):
+ super().setup_class()
+
+ self.th_fsa_controller = None
+ self.th_server = None
+ self.storage = None
+
+ # Get the path to the TH_FSA (fabric-admin and fabric-bridge) app from the user params.
+ th_fsa_app_path = self.user_params.get("th_fsa_app_path")
+ if not th_fsa_app_path:
+ asserts.fail("This test requires a TH_FSA app. Specify app path with --string-arg th_fsa_app_path:<path_to_app>")
+ if not os.path.exists(th_fsa_app_path):
+ asserts.fail(f"The path {th_fsa_app_path} does not exist")
+ th_fsa_admin_path = self.user_params.get("th_fsa_admin_path")
+ if not th_fsa_admin_path:
+ asserts.fail("This test requires a TH_FSA_ADMIN app. Specify app path with --string-arg th_fsa_admin_path:<path_to_app>")
+ if not os.path.exists(th_fsa_admin_path):
+ asserts.fail(f"The path {th_fsa_admin_path} does not exist")
+ th_fsa_bridge_path = self.user_params.get("th_fsa_bridge_path")
+ if not th_fsa_bridge_path:
+ asserts.fail("This test requires a TH_FSA_BRIDGE app. Specify app path with --string-arg th_fsa_bridge_path:<path_to_app>")
+ if not os.path.exists(th_fsa_bridge_path):
+ asserts.fail(f"The path {th_fsa_bridge_path} does not exist")
+
+ # Get the path to the TH_SERVER_NO_UID app from the user params.
+ th_server_app = self.user_params.get("th_server_no_uid_app_path", None)
+ if not th_server_app:
+ asserts.fail("This test requires a TH_SERVER_NO_UID app. Specify app path with --string-arg th_server_no_uid_app_path:<path_to_app>")
+ if not os.path.exists(th_server_app):
+ asserts.fail(f"The path {th_server_app} does not exist")
+
+ # Create a temporary storage directory for keeping KVS files.
+ self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__)
+ logging.info("Temporary storage directory: %s", self.storage.name)
+
+ self.th_fsa_bridge_address = "::1"
+ self.th_fsa_bridge_port = 5543
+ # Random discriminator between 0 and MAX - 1. The one-less is to save
+ # a room for the TH_SERVER_NO_UID discriminator.
+ self.th_fsa_bridge_discriminator = random.randint(0, 4094)
+ self.th_fsa_bridge_passcode = 20202021
+
+ self.th_fsa_controller = FabricSyncApp(
+ th_fsa_app_path,
+ th_fsa_admin_path,
+ th_fsa_bridge_path,
+ storage_dir=self.storage.name,
+ paa_trust_store_path=self.matter_test_config.paa_trust_store_path,
+ bridge_port=self.th_fsa_bridge_port,
+ bridge_discriminator=self.th_fsa_bridge_discriminator,
+ bridge_passcode=self.th_fsa_bridge_passcode,
+ vendor_id=0xFFF1)
+
+ # Get the named pipe path for the DUT_FSA app input from the user params.
+ dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe", None)
+ if dut_fsa_stdin_pipe is not None:
+ self.dut_fsa_stdin = open(dut_fsa_stdin_pipe, "w")
+
+ self.th_server_port = 5544
+ self.th_server_discriminator = self.th_fsa_bridge_discriminator + 1
+ self.th_server_passcode = 20202021
+
+ # Start the TH_SERVER_NO_UID app.
+ self.th_server = AppServer(
+ th_server_app,
+ storage_dir=self.storage.name,
+ port=self.th_server_port,
+ discriminator=self.th_server_discriminator,
+ passcode=self.th_server_passcode)
+
+ def teardown_class(self):
+ if self.th_fsa_controller is not None:
+ self.th_fsa_controller.stop()
+ if self.th_server is not None:
+ self.th_server.stop()
+ if self.storage is not None:
+ self.storage.cleanup()
+ super().teardown_class()
+
+ def steps_TC_MCORE_FS_1_4(self) -> list[TestStep]:
+ return [
+ TestStep(0, "Commission DUT if not done", is_commissioning=True),
+ TestStep(1, "TH commissions TH_SERVER_NO_UID to TH's fabric.",
+ "TH verifies that the TH_SERVER_NO_UID does not provide a UniqueID."),
+ TestStep(2, "TH instructs TH_FSA to commission TH_SERVER_NO_UID to TH_FSA's fabric."),
+ TestStep(3, "TH instructs TH_FSA to open up commissioning window on it's aggregator."),
+ TestStep(4, "Follow manufacturer provided instructions to have DUT_FSA commission TH_FSA's aggregator."),
+ TestStep(5, "Follow manufacturer provided instructions to enable DUT_FSA to synchronize TH_SERVER_NO_UID"
+ " from TH_FSA onto DUT_FSA's fabric. TH to provide endpoint saved from step 2 in user prompt."),
+ TestStep(6, "DUT_FSA synchronizes TH_SERVER_NO_UID onto DUT_FSA's fabric and copies the UniqueID presented"
+ " by TH_FSA's Bridged Device Basic Information Cluster."),
+ ]
+
+ @async_test_body
+ async def test_TC_MCORE_FS_1_4(self):
+ self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
+
+ # Commissioning - done
+ self.step(0)
+
+ self.step(1)
+
+ th_server_th_node_id = 1
+ await self.default_controller.CommissionOnNetwork(
+ nodeId=th_server_th_node_id,
+ setupPinCode=self.th_server_passcode,
+ filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
+ filter=self.th_server_discriminator,
+ )
+
+ await self.read_single_attribute_expect_error(
+ cluster=Clusters.BasicInformation,
+ attribute=Clusters.BasicInformation.Attributes.UniqueID,
+ node_id=th_server_th_node_id,
+ error=Status.UnsupportedAttribute,
+ )
+
+ self.step(2)
+
+ th_fsa_bridge_th_node_id = 2
+ # Commissioning TH_FSA_BRIDGE to TH fabric.
+ await self.default_controller.CommissionOnNetwork(
+ nodeId=th_fsa_bridge_th_node_id,
+ setupPinCode=self.th_fsa_bridge_passcode,
+ filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
+ filter=self.th_fsa_bridge_discriminator,
+ )
+
+ # Get the list of endpoints on the TH_FSA_BRIDGE before adding the TH_SERVER_NO_UID.
+ th_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success(
+ cluster=Clusters.Descriptor,
+ attribute=Clusters.Descriptor.Attributes.PartsList,
+ node_id=th_fsa_bridge_th_node_id,
+ endpoint=0,
+ ))
+
+ discriminator = random.randint(0, 4095)
+ # Open commissioning window on TH_SERVER_NO_UID.
+ params = await self.default_controller.OpenCommissioningWindow(
+ nodeid=th_server_th_node_id,
+ option=self.default_controller.CommissioningWindowPasscode.kTokenWithRandomPin,
+ discriminator=discriminator,
+ iteration=10000,
+ timeout=600)
+
+ th_server_th_fsa_node_id = 3
+ # Commissioning TH_SERVER_NO_UID to TH_FSA.
+ self.th_fsa_controller.commission_on_network(
+ node_id=th_server_th_fsa_node_id,
+ setup_pin_code=params.setupPinCode,
+ filter_type=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
+ filter=discriminator,
+ )
+
+ # Wait some time, so the dynamic endpoint will appear on the TH_FSA_BRIDGE.
+ await asyncio.sleep(5)
+
+ # Get the list of endpoints on the TH_FSA_BRIDGE after adding the TH_SERVER_NO_UID.
+ th_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success(
+ cluster=Clusters.Descriptor,
+ attribute=Clusters.Descriptor.Attributes.PartsList,
+ node_id=th_fsa_bridge_th_node_id,
+ endpoint=0,
+ ))
+
+ # Get the endpoint number for just added TH_SERVER_NO_UID.
+ logging.info("Endpoints on TH_FSA_BRIDGE: old=%s, new=%s", th_fsa_bridge_endpoints, th_fsa_bridge_endpoints_new)
+ asserts.assert_true(th_fsa_bridge_endpoints_new.issuperset(th_fsa_bridge_endpoints),
+ "Expected only new endpoints to be added")
+ unique_endpoints_set = th_fsa_bridge_endpoints_new - th_fsa_bridge_endpoints
+ asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint")
+ th_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0]
+
+ # Verify that TH_FSA created a UniqueID for TH_SERVER_NO_UID.
+ th_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success(
+ cluster=Clusters.BridgedDeviceBasicInformation,
+ attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID,
+ node_id=th_fsa_bridge_th_node_id,
+ endpoint=th_fsa_bridge_th_server_endpoint)
+ asserts.assert_true(type_matches(th_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string")
+ asserts.assert_true(th_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string")
+ logging.info("UniqueID generated for TH_SERVER_NO_UID: %s", th_fsa_bridge_th_server_unique_id)
+
+ self.step(3)
+
+ discriminator = random.randint(0, 4095)
+ # Open commissioning window on TH_FSA_BRIDGE.
+ params = await self.default_controller.OpenCommissioningWindow(
+ nodeid=th_fsa_bridge_th_node_id,
+ option=self.default_controller.CommissioningWindowPasscode.kTokenWithRandomPin,
+ discriminator=discriminator,
+ iteration=10000,
+ timeout=600)
+
+ self.step(4)
+
+ # Commissioning TH_FSA_BRIDGE to DUT_FSA fabric.
+ if not self.is_ci:
+ self.wait_for_user_input(
+ f"Commission TH_FSA's aggregator on DUT using manufacturer specified mechanism.\n"
+ f"Use the following parameters:\n"
+ f"- discriminator: {discriminator}\n"
+ f"- setupPinCode: {params.setupPinCode}\n"
+ f"- setupQRCode: {params.setupQRCode}\n"
+ f"- setupManualCode: {params.setupManualCode}\n"
+ f"If using FabricSync Admin, you may type:\n"
+ f">>> fabricsync add-bridge <desired_node_id> {params.setupPinCode} <th_host_ip> {self.th_fsa_bridge_port}")
+ else:
+ self.dut_fsa_stdin.write(
+ f"fabricsync add-bridge 10 {params.setupPinCode} {self.th_fsa_bridge_address} {self.th_fsa_bridge_port}\n")
+ self.dut_fsa_stdin.flush()
+ # Wait for the commissioning to complete.
+ await asyncio.sleep(5)
+
+ self.step(5)
+
+ # Get the list of endpoints on the DUT_FSA_BRIDGE before synchronization.
+ dut_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success(
+ cluster=Clusters.Descriptor,
+ attribute=Clusters.Descriptor.Attributes.PartsList,
+ node_id=self.dut_node_id,
+ endpoint=0,
+ ))
+
+ # Synchronize TH_SERVER_NO_UID from TH_FSA to DUT_FSA fabric.
+ if not self.is_ci:
+ self.wait_for_user_input(
+ f"Synchronize endpoint from TH_FSA's aggregator to DUT using manufacturer specified mechanism.\n"
+ f"Use the following parameters:\n"
+ f"- endpointID: {th_fsa_bridge_th_server_endpoint}\n"
+ f"If using FabricSync Admin, you may type:\n"
+ f">>> fabricsync sync-device {th_fsa_bridge_th_server_endpoint}")
+ else:
+ self.dut_fsa_stdin.write(f"fabricsync sync-device {th_fsa_bridge_th_server_endpoint}\n")
+ self.dut_fsa_stdin.flush()
+ # Wait for the synchronization to complete.
+ await asyncio.sleep(5)
+
+ self.step(6)
+
+ # Get the list of endpoints on the DUT_FSA_BRIDGE after synchronization
+ dut_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success(
+ cluster=Clusters.Descriptor,
+ attribute=Clusters.Descriptor.Attributes.PartsList,
+ node_id=self.dut_node_id,
+ endpoint=0,
+ ))
+
+ # Get the endpoint number for just synced TH_SERVER_NO_UID.
+ logging.info("Endpoints on DUT_FSA_BRIDGE: old=%s, new=%s", dut_fsa_bridge_endpoints, dut_fsa_bridge_endpoints_new)
+ asserts.assert_true(dut_fsa_bridge_endpoints_new.issuperset(dut_fsa_bridge_endpoints),
+ "Expected only new endpoints to be added")
+ unique_endpoints_set = dut_fsa_bridge_endpoints_new - dut_fsa_bridge_endpoints
+ asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint on DUT_FSA")
+ dut_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0]
+
+ # Verify that DUT_FSA copied the TH_SERVER_NO_UID UniqueID from TH_FSA.
+ dut_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success(
+ cluster=Clusters.BridgedDeviceBasicInformation,
+ attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID,
+ endpoint=dut_fsa_bridge_th_server_endpoint)
+ asserts.assert_true(type_matches(dut_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string")
+ asserts.assert_true(dut_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string")
+ logging.info("UniqueID for TH_SERVER_NO_UID on DUT_FSA: %s", th_fsa_bridge_th_server_unique_id)
+
+ # Make sure that the UniqueID on the DUT_FSA_BRIDGE is the same as the one on the DUT_FSA_BRIDGE.
+ asserts.assert_equal(dut_fsa_bridge_th_server_unique_id, th_fsa_bridge_th_server_unique_id,
+ "UniqueID on DUT_FSA and TH_FSA should be the same")
+
+
+if __name__ == "__main__":
+ default_matter_test_main()
diff --git a/src/python_testing/execute_python_tests.py b/src/python_testing/execute_python_tests.py
index 8249be9..1f6afa9 100644
--- a/src/python_testing/execute_python_tests.py
+++ b/src/python_testing/execute_python_tests.py
@@ -75,6 +75,7 @@
"TC_MCORE_FS_1_1.py",
"TC_MCORE_FS_1_2.py",
"TC_MCORE_FS_1_3.py",
+ "TC_MCORE_FS_1_4.py",
"TC_MCORE_FS_1_5.py",
"TC_OCC_3_1.py",
"TC_OCC_3_2.py",