Split TC_MCORE_FS_1_3 to align with the Test Spec (#35274)

* [TC_MCORE_FS_1_3] Fix test script according to test plan update

* Separate storage for all used components

* Open commissioning window on TH_FSA_BRIDGE

* Python wrapper for running fabric-admin and fabric-bridge together

* Customize fabric-admin and fabric-bridge RPC ports

* Create storage directory

* Use fabric-sync-app in the TC-MCORE-FS-1.3 script

* Use CommissionerControlCluster to commission TH_SERVER onto DUT

* Auto-link bridge with admin

* Test automation setup

* Terminate apps on SIGTERM and SIGINT

* Open commissioning window on fabric-bridge after adding to FSA

* Commissioning TH_FSA_BRIDGE to DUT_FSA fabric

* Synchronize server from TH to DUT

* Start another instance of app server

* Test if unique ID was synced

* Allow customization for fabric-sync app components

* Final cleanup

* Split test case into two test cases

* Simplify TC_MCORE_FS_1_3 script

* Simplify TC_MCORE_FS_1_4 steps

* Use volatile storage for fabric-sync-app by default

* Add TC_MCORE_FS_1_4 to exceptions

* Get rid of defaults

* Document used options in open commissioning window

* Speed up the pipe read busy loop

* Refactor local output processing

* Improve wait for output

* Add FS-sync tests to CI

* Improve Python code style

* Fix wait for fabric-sync-app start

* Fix asyncio forwarder

* Fixes for review comments
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index b37ed3c..5265f82 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -486,6 +486,9 @@
                       --target linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test \
                       --target linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test \
                       --target linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test \
+                      --target linux-x64-fabric-admin-rpc-ipv6only-clang \
+                      --target linux-x64-fabric-bridge-rpc-ipv6only-no-ble-no-wifi-clang \
+                      --target linux-x64-light-data-model-no-unique-id-ipv6only-no-ble-no-wifi-clang \
                       --target linux-x64-python-bindings \
                       build \
                       --copy-artifacts-to objdir-clone \
@@ -500,6 +503,9 @@
                   echo "CHIP_MICROWAVE_OVEN_APP: out/linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-microwave-oven-app" >> /tmp/test_env.yaml
                   echo "CHIP_RVC_APP: out/linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-rvc-app" >> /tmp/test_env.yaml
                   echo "NETWORK_MANAGEMENT_APP: out/linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test/matter-network-manager-app" >> /tmp/test_env.yaml
+                  echo "FABRIC_ADMIN_APP: out/linux-x64-fabric-admin-rpc-ipv6only-clang/fabric-admin" >> /tmp/test_env.yaml
+                  echo "FABRIC_BRIDGE_APP: out/linux-x64-fabric-bridge-rpc-ipv6only-no-ble-no-wifi-clang/fabric-bridge-app" >> /tmp/test_env.yaml
+                  echo "LIGHTING_APP_NO_UNIQUE_ID: out/linux-x64-light-data-model-no-unique-id-ipv6only-no-ble-no-wifi-clang/chip-lighting-app" >> /tmp/test_env.yaml
                   echo "TRACE_APP: out/trace_data/app-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
                   echo "TRACE_TEST_JSON: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
                   echo "TRACE_TEST_PERFETTO: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
diff --git a/examples/fabric-admin/scripts/fabric-sync-app.py b/examples/fabric-admin/scripts/fabric-sync-app.py
new file mode 100755
index 0000000..c6faed8
--- /dev/null
+++ b/examples/fabric-admin/scripts/fabric-sync-app.py
@@ -0,0 +1,318 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2024 Project CHIP Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import contextlib
+import os
+import signal
+import sys
+from argparse import ArgumentParser
+from tempfile import TemporaryDirectory
+
+
+async def asyncio_stdin() -> asyncio.StreamReader:
+    """Wrap sys.stdin in an asyncio StreamReader."""
+    loop = asyncio.get_event_loop()
+    reader = asyncio.StreamReader()
+    protocol = asyncio.StreamReaderProtocol(reader)
+    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
+    return reader
+
+
+async def asyncio_stdout(file=sys.stdout) -> asyncio.StreamWriter:
+    """Wrap an IO stream in an asyncio StreamWriter."""
+    loop = asyncio.get_event_loop()
+    transport, protocol = await loop.connect_write_pipe(
+        lambda: asyncio.streams.FlowControlMixin(loop=loop),
+        os.fdopen(file.fileno(), 'wb'))
+    return asyncio.streams.StreamWriter(transport, protocol, None, loop)
+
+
+async def forward_f(prefix: bytes, f_in: asyncio.StreamReader,
+                    f_out: asyncio.StreamWriter, cb=None):
+    """Forward f_in to f_out with a prefix attached.
+
+    This function can optionally feed received lines to a callback function.
+    """
+    while True:
+        line = await f_in.readline()
+        if not line:
+            break
+        if cb is not None:
+            cb(line)
+        f_out.write(prefix)
+        f_out.write(line)
+        await f_out.drain()
+
+
+async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter):
+    """Forward named pipe to f_out.
+
+    Unfortunately, Python does not support async file I/O on named pipes. This
+    function performs busy waiting with a short asyncio-friendly sleep to read
+    from the pipe.
+    """
+    fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK)
+    while True:
+        try:
+            data = os.read(fd, 1024)
+            if data:
+                f_out.write(data)
+            if not data:
+                await asyncio.sleep(0.1)
+        except BlockingIOError:
+            await asyncio.sleep(0.1)
+
+
+async def forward_stdin(f_out: asyncio.StreamWriter):
+    """Forward stdin to f_out."""
+    reader = await asyncio_stdin()
+    while True:
+        line = await reader.readline()
+        if not line:
+            # Exit on Ctrl-D (EOF).
+            sys.exit(0)
+        f_out.write(line)
+
+
+class Subprocess:
+
+    def __init__(self, tag: str, program: str, *args, stdout_cb=None):
+        self.event = asyncio.Event()
+        self.tag = tag.encode()
+        self.program = program
+        self.args = args
+        self.stdout_cb = stdout_cb
+        self.expected_output = None
+
+    def _check_output(self, line: bytes):
+        if self.expected_output is not None and self.expected_output in line:
+            self.event.set()
+
+    async def run(self):
+        self.p = await asyncio.create_subprocess_exec(self.program, *self.args,
+                                                      stdin=asyncio.subprocess.PIPE,
+                                                      stdout=asyncio.subprocess.PIPE,
+                                                      stderr=asyncio.subprocess.PIPE)
+        # Add the stdout and stderr processing to the event loop.
+        asyncio.create_task(forward_f(
+            self.tag,
+            self.p.stderr,
+            await asyncio_stdout(sys.stderr)))
+        asyncio.create_task(forward_f(
+            self.tag,
+            self.p.stdout,
+            await asyncio_stdout(sys.stdout),
+            cb=self._check_output))
+
+    async def send(self, message: str, expected_output: str = None, timeout: float = None):
+        """Send a message to a process and optionally wait for a response."""
+
+        if expected_output is not None:
+            self.expected_output = expected_output.encode()
+            self.event.clear()
+
+        self.p.stdin.write((message + "\n").encode())
+        await self.p.stdin.drain()
+
+        if expected_output is not None:
+            await asyncio.wait_for(self.event.wait(), timeout=timeout)
+            self.expected_output = None
+
+    async def wait(self):
+        await self.p.wait()
+
+    def terminate(self):
+        self.p.terminate()
+
+
+async def run_admin(program, stdout_cb=None, storage_dir=None,
+                    rpc_admin_port=None, rpc_bridge_port=None,
+                    paa_trust_store_path=None, commissioner_name=None,
+                    commissioner_node_id=None, commissioner_vendor_id=None):
+    args = []
+    if storage_dir is not None:
+        args.extend(["--storage-directory", storage_dir])
+    if rpc_admin_port is not None:
+        args.extend(["--local-server-port", str(rpc_admin_port)])
+    if rpc_bridge_port is not None:
+        args.extend(["--fabric-bridge-server-port", str(rpc_bridge_port)])
+    if paa_trust_store_path is not None:
+        args.extend(["--paa-trust-store-path", paa_trust_store_path])
+    if commissioner_name is not None:
+        args.extend(["--commissioner-name", commissioner_name])
+    if commissioner_node_id is not None:
+        args.extend(["--commissioner-nodeid", str(commissioner_node_id)])
+    if commissioner_vendor_id is not None:
+        args.extend(["--commissioner-vendor-id", str(commissioner_vendor_id)])
+    p = Subprocess("[FS-ADMIN]", program, "interactive", "start", *args,
+                   stdout_cb=stdout_cb)
+    await p.run()
+    return p
+
+
+async def run_bridge(program, storage_dir=None, rpc_admin_port=None,
+                     rpc_bridge_port=None, discriminator=None, passcode=None,
+                     secured_device_port=None):
+    args = []
+    if storage_dir is not None:
+        args.extend(["--KVS",
+                     os.path.join(storage_dir, "chip_fabric_bridge_kvs")])
+    if rpc_admin_port is not None:
+        args.extend(["--fabric-admin-server-port", str(rpc_admin_port)])
+    if rpc_bridge_port is not None:
+        args.extend(["--local-server-port", str(rpc_bridge_port)])
+    if discriminator is not None:
+        args.extend(["--discriminator", str(discriminator)])
+    if passcode is not None:
+        args.extend(["--passcode", str(passcode)])
+    if secured_device_port is not None:
+        args.extend(["--secured-device-port", str(secured_device_port)])
+    p = Subprocess("[FS-BRIDGE]", program, *args)
+    await p.run()
+    return p
+
+
+async def main(args):
+
+    # Node ID of the bridge on the fabric.
+    bridge_node_id = 1
+
+    if args.commissioner_node_id == bridge_node_id:
+        raise ValueError(f"NodeID={bridge_node_id} is reserved for the local fabric-bridge")
+
+    storage_dir = args.storage_dir
+    if storage_dir is not None:
+        os.makedirs(storage_dir, exist_ok=True)
+    else:
+        storage = TemporaryDirectory(prefix="fabric-sync-app")
+        storage_dir = storage.name
+
+    pipe = args.stdin_pipe
+    if pipe and not os.path.exists(pipe):
+        os.mkfifo(pipe)
+
+    def terminate(signum, frame):
+        admin.terminate()
+        bridge.terminate()
+        sys.exit(0)
+
+    signal.signal(signal.SIGINT, terminate)
+    signal.signal(signal.SIGTERM, terminate)
+
+    admin, bridge = await asyncio.gather(
+        run_admin(
+            args.app_admin,
+            storage_dir=storage_dir,
+            rpc_admin_port=args.app_admin_rpc_port,
+            rpc_bridge_port=args.app_bridge_rpc_port,
+            paa_trust_store_path=args.paa_trust_store_path,
+            commissioner_name=args.commissioner_name,
+            commissioner_node_id=args.commissioner_node_id,
+            commissioner_vendor_id=args.commissioner_vendor_id,
+        ),
+        run_bridge(
+            args.app_bridge,
+            storage_dir=storage_dir,
+            rpc_admin_port=args.app_admin_rpc_port,
+            rpc_bridge_port=args.app_bridge_rpc_port,
+            secured_device_port=args.secured_device_port,
+            discriminator=args.discriminator,
+            passcode=args.passcode,
+        ))
+
+    # Wait a bit for apps to start.
+    await asyncio.sleep(1)
+
+    try:
+        # Check whether the bridge is already commissioned. If it is,
+        # we will get the response, otherwise we will hit timeout.
+        await admin.send(
+            f"descriptor read device-type-list {bridge_node_id} 1 --timeout 1",
+            # Log message which should appear in the fabric-admin output if
+            # the bridge is already commissioned.
+            expected_output="Reading attribute: Cluster=0x0000_001D Endpoint=0x1 AttributeId=0x0000_0000",
+            timeout=1.5)
+    except asyncio.TimeoutError:
+        # Commission the bridge to the admin.
+        cmd = f"fabricsync add-local-bridge {bridge_node_id}"
+        if args.passcode is not None:
+            cmd += f" --setup-pin-code {args.passcode}"
+        if args.secured_device_port is not None:
+            cmd += f" --local-port {args.secured_device_port}"
+        await admin.send(
+            cmd,
+            # Wait for the log message indicating that the bridge has been
+            # added to the fabric.
+            f"Commissioning complete for node ID {bridge_node_id:#018x}: success")
+
+    # Open commissioning window with original setup code for the bridge.
+    cw_endpoint_id = 0
+    cw_option = 0  # 0: Original setup code, 1: New setup code
+    cw_timeout = 600
+    cw_iteration = 1000
+    cw_discriminator = 0
+    await admin.send(f"pairing open-commissioning-window {bridge_node_id} {cw_endpoint_id}"
+                     f" {cw_option} {cw_timeout} {cw_iteration} {cw_discriminator}")
+
+    try:
+        await asyncio.gather(
+            forward_pipe(pipe, admin.p.stdin) if pipe else forward_stdin(admin.p.stdin),
+            admin.wait(),
+            bridge.wait(),
+        )
+    except SystemExit:
+        admin.terminate()
+        bridge.terminate()
+    except Exception:
+        admin.terminate()
+        bridge.terminate()
+        raise
+
+
+if __name__ == "__main__":
+    parser = ArgumentParser(description="Fabric-Sync Example Application")
+    parser.add_argument("--app-admin", metavar="PATH",
+                        default="out/linux-x64-fabric-admin-rpc/fabric-admin",
+                        help="path to the fabric-admin executable; default=%(default)s")
+    parser.add_argument("--app-bridge", metavar="PATH",
+                        default="out/linux-x64-fabric-bridge-rpc/fabric-bridge-app",
+                        help="path to the fabric-bridge executable; default=%(default)s")
+    parser.add_argument("--app-admin-rpc-port", metavar="PORT", type=int,
+                        help="fabric-admin RPC server port")
+    parser.add_argument("--app-bridge-rpc-port", metavar="PORT", type=int,
+                        help="fabric-bridge RPC server port")
+    parser.add_argument("--stdin-pipe", metavar="PATH",
+                        help="read input from a named pipe instead of stdin")
+    parser.add_argument("--storage-dir", metavar="PATH",
+                        help=("directory to place storage files in; by default "
+                              "volatile storage is used"))
+    parser.add_argument("--paa-trust-store-path", metavar="PATH",
+                        help="path to directory holding PAA certificates")
+    parser.add_argument("--commissioner-name", metavar="NAME",
+                        help="commissioner name to use for the admin")
+    parser.add_argument("--commissioner-node-id", metavar="NUM", type=int,
+                        help="commissioner node ID to use for the admin")
+    parser.add_argument("--commissioner-vendor-id", metavar="NUM", type=int,
+                        help="commissioner vendor ID to use for the admin")
+    parser.add_argument("--secured-device-port", metavar="NUM", type=int,
+                        help="secure messages listen port to use for the bridge")
+    parser.add_argument("--discriminator", metavar="NUM", type=int,
+                        help="discriminator to use for the bridge")
+    parser.add_argument("--passcode", metavar="NUM", type=int,
+                        help="passcode to use for the bridge")
+    with contextlib.suppress(KeyboardInterrupt):
+        asyncio.run(main(parser.parse_args()))
diff --git a/src/python_testing/TC_MCORE_FS_1_3.py b/src/python_testing/TC_MCORE_FS_1_3.py
index 4245bc1..1a18896 100644
--- a/src/python_testing/TC_MCORE_FS_1_3.py
+++ b/src/python_testing/TC_MCORE_FS_1_3.py
@@ -15,15 +15,31 @@
 #    limitations under the License.
 #
 
-# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_no_uid_app_path:<path_to_app>
+# This test requires a TH_SERVER_NO_UID application that returns UnsupportedAttribute
+# when reading UniqueID from BasicInformation Cluster. Please specify the app
+# location with --string-arg th_server_no_uid_app_path:<path_to_app>
 
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs: run1
+# test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py
+# test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234
+# test-runner-run/run1/factoryreset: True
+# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID}
+# test-runner-run/run1/script-start-delay: 5
+# test-runner-run/run1/quiet: false
+# === END CI TEST ARGUMENTS ===
+
+import asyncio
 import logging
 import os
 import random
-import signal
 import subprocess
-import time
-import uuid
+import sys
+import tempfile
+import threading
 
 import chip.clusters as Clusters
 from chip import ChipDeviceCtrl
@@ -32,109 +48,224 @@
 from mobly import asserts
 
 
+# TODO: Make this class more generic. Issue #35348
+class Subprocess(threading.Thread):
+
+    def __init__(self, args: list = [], tag="", **kw):
+        super().__init__(**kw)
+        self.tag = f"[{tag}] " if tag else ""
+        self.args = args
+
+    def forward_f(self, f_in, f_out):
+        while True:
+            line = f_in.readline()
+            if not line:
+                break
+            f_out.write(f"{self.tag}{line}")
+            f_out.flush()
+
+    def run(self):
+        logging.info("RUN: %s", " ".join(self.args))
+        self.p = subprocess.Popen(self.args, errors="ignore", stdin=subprocess.PIPE,
+                                  stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        # Forward stdout and stderr with a tag attached.
+        t1 = threading.Thread(target=self.forward_f, args=[self.p.stdout, sys.stdout])
+        t1.start()
+        t2 = threading.Thread(target=self.forward_f, args=[self.p.stderr, sys.stderr])
+        t2.start()
+        # Wait for the process to finish.
+        self.p.wait()
+        t1.join()
+        t2.join()
+
+    def stop(self):
+        self.p.terminate()
+        self.join()
+
+
+class AppServer:
+
+    def __init__(self, app, storage_dir, port=None, discriminator=None, passcode=None):
+
+        args = [app]
+        args.extend(["--KVS", tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1]])
+        args.extend(['--secured-device-port', str(port)])
+        args.extend(["--discriminator", str(discriminator)])
+        args.extend(["--passcode", str(passcode)])
+        self.app = Subprocess(args, tag="SERVER")
+        self.app.start()
+
+    def stop(self):
+        self.app.stop()
+
+
 class TC_MCORE_FS_1_3(MatterBaseTest):
-    @async_test_body
-    async def setup_class(self):
+
+    @property
+    def default_timeout(self) -> int:
+        # This test has some manual steps, so we need a longer timeout.
+        return 200
+
+    def setup_class(self):
         super().setup_class()
 
-        self.th_server_nodeid = 1111
-        self.th_server_kvs = None
-        self.th_server_port = 5543
-        self.app_process_for_dut_eco = None
+        self.th_server = None
+        self.storage = None
 
-        # Create a second controller on a new fabric to communicate to the server
-        new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
-        new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
-        paa_path = str(self.matter_test_config.paa_trust_store_path)
-        self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path)
+        # Get the path to the TH_SERVER_NO_UID app from the user params.
+        th_server_app = self.user_params.get("th_server_no_uid_app_path", None)
+        if not th_server_app:
+            asserts.fail("This test requires a TH_SERVER_NO_UID app. Specify app path with --string-arg th_server_no_uid_app_path:<path_to_app>")
+        if not os.path.exists(th_server_app):
+            asserts.fail(f"The path {th_server_app} does not exist")
+
+        # Create a temporary storage directory for keeping KVS files.
+        self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__)
+        logging.info("Temporary storage directory: %s", self.storage.name)
+
+        self.th_server_port = 5544
+        self.th_server_discriminator = random.randint(0, 4095)
+        self.th_server_passcode = 20202021
+
+        # Start the TH_SERVER_NO_UID app.
+        self.th_server = AppServer(
+            th_server_app,
+            storage_dir=self.storage.name,
+            port=self.th_server_port,
+            discriminator=self.th_server_discriminator,
+            passcode=self.th_server_passcode)
 
     def teardown_class(self):
-        if self.app_process_for_dut_eco is not None:
-            logging.warning("Stopping app with SIGTERM")
-            self.app_process_for_dut_eco.send_signal(signal.SIGTERM.value)
-            self.app_process_for_dut_eco.wait()
-
-        os.remove(self.th_server_kvs)
+        if self.th_server is not None:
+            self.th_server.stop()
+        if self.storage is not None:
+            self.storage.cleanup()
         super().teardown_class()
 
-    async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for_th, device_info):
-        app = self.user_params.get("th_server_no_uid_app_path", None)
-        if not app:
-            asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_no_uid_app_path:<path_to_app>')
-
-        if not os.path.exists(app):
-            asserts.fail(f'The path {app} does not exist')
-
-        discriminator = random.randint(0, 4095)
-        passcode = 20202021
-
-        cmd = [app]
-        cmd.extend(['--secured-device-port', str(port)])
-        cmd.extend(['--discriminator', str(discriminator)])
-        cmd.extend(['--passcode', str(passcode)])
-        cmd.extend(['--KVS', kvs])
-
-        # TODO: Determine if we want these logs cooked or pushed to somewhere else
-        logging.info(f"Starting TH device for {device_info}")
-        self.app_process_for_dut_eco = subprocess.Popen(cmd)
-        logging.info(f"Started TH device for {device_info}")
-        time.sleep(3)
-
-        logging.info("Commissioning from separate fabric")
-        await self.TH_server_controller.CommissionOnNetwork(nodeId=node_id_for_th, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator)
-        logging.info("Commissioning device for DUT ecosystem onto TH for managing")
-
     def steps_TC_MCORE_FS_1_3(self) -> list[TestStep]:
-        steps = [TestStep(1, "TH commissions TH_SERVER to TH’s fabric.", is_commissioning=True),
-                 TestStep(2, "DUT_FSA commissions TH_SERVER to DUT_FSA’s fabric and generates a UniqueID.")]
-        return steps
+        return [
+            TestStep(0, "Commission DUT if not done", is_commissioning=True),
+            TestStep(1, "TH commissions TH_SERVER_NO_UID to TH's fabric"),
+            TestStep(2, "DUT_FSA commissions TH_SERVER_NO_UID to DUT_FSA's fabric and generates a UniqueID.",
+                     "TH verifies a value is visible for the UniqueID from the DUT_FSA's Bridged Device Basic Information Cluster."),
+        ]
+
+    async def commission_via_commissioner_control(self, controller_node_id: int, device_node_id: int):
+        """Commission device_node_id to controller_node_id using CommissionerControl cluster."""
+
+        request_id = random.randint(0, 0xFFFFFFFFFFFFFFFF)
+
+        vendor_id = await self.read_single_attribute_check_success(
+            node_id=device_node_id,
+            cluster=Clusters.BasicInformation,
+            attribute=Clusters.BasicInformation.Attributes.VendorID,
+        )
+
+        product_id = await self.read_single_attribute_check_success(
+            node_id=device_node_id,
+            cluster=Clusters.BasicInformation,
+            attribute=Clusters.BasicInformation.Attributes.ProductID,
+        )
+
+        await self.send_single_cmd(
+            node_id=controller_node_id,
+            cmd=Clusters.CommissionerControl.Commands.RequestCommissioningApproval(
+                requestId=request_id,
+                vendorId=vendor_id,
+                productId=product_id,
+            ),
+        )
+
+        if not self.is_ci:
+            self.wait_for_user_input("Approve Commissioning Approval Request on DUT using manufacturer specified mechanism")
+
+        resp = await self.send_single_cmd(
+            node_id=controller_node_id,
+            cmd=Clusters.CommissionerControl.Commands.CommissionNode(
+                requestId=request_id,
+                responseTimeoutSeconds=30,
+            ),
+        )
+
+        asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow,
+                             "Incorrect response type")
+
+        await self.send_single_cmd(
+            node_id=device_node_id,
+            cmd=Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(
+                commissioningTimeout=3*60,
+                PAKEPasscodeVerifier=resp.PAKEPasscodeVerifier,
+                discriminator=resp.discriminator,
+                iterations=resp.iterations,
+                salt=resp.salt,
+            ),
+            timedRequestTimeoutMs=5000,
+        )
 
     @async_test_body
     async def test_TC_MCORE_FS_1_3(self):
         self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
-        self.print_step(0, "Commissioning DUT to TH, already done")
+
+        # Commissioning - done
+        self.step(0)
 
         self.step(1)
-        root_node_endpoint = 0
-        root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
-        set_of_endpoints_before_adding_device = set(root_part_list)
-        logging.info(f"Set of endpoints before adding the device: {set_of_endpoints_before_adding_device}")
 
-        kvs = f'kvs_{str(uuid.uuid4())}'
-        device_info = "for TH ecosystem"
-        await self.create_device_and_commission_to_th_fabric(kvs, self.th_server_port, self.th_server_nodeid, device_info)
+        th_server_th_node_id = 1
+        await self.default_controller.CommissionOnNetwork(
+            nodeId=th_server_th_node_id,
+            setupPinCode=self.th_server_passcode,
+            filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
+            filter=self.th_server_discriminator,
+        )
 
-        self.th_server_kvs = kvs
-        read_result = await self.TH_server_controller.ReadAttribute(self.th_server_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)])
-        result = read_result[root_node_endpoint][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.UniqueID]
-        asserts.assert_true(type_matches(result, Clusters.Attribute.ValueDecodeFailure), "We were expecting a value decode failure")
-        asserts.assert_equal(result.Reason.status, Status.UnsupportedAttribute, "Incorrect error returned from reading UniqueID")
+        await self.read_single_attribute_expect_error(
+            cluster=Clusters.BasicInformation,
+            attribute=Clusters.BasicInformation.Attributes.UniqueID,
+            node_id=th_server_th_node_id,
+            error=Status.UnsupportedAttribute,
+        )
 
         self.step(2)
-        params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.th_server_nodeid)
 
-        self.wait_for_user_input(
-            prompt_msg=f"Using the DUT vendor's provided interface, commission the device using the following parameters:\n"
-            f"- discriminator: {params.randomDiscriminator}\n"
-            f"- setupPinCode: {params.commissioningParameters.setupPinCode}\n"
-            f"- setupQRCode: {params.commissioningParameters.setupQRCode}\n"
-            f"- setupManualcode: {params.commissioningParameters.setupManualCode}\n"
-            f"If using FabricSync Admin, you may type:\n"
-            f">>> pairing onnetwork <desired_node_id> {params.commissioningParameters.setupPinCode}")
+        # Get the list of endpoints on the DUT_FSA_BRIDGE before adding the TH_SERVER_NO_UID.
+        dut_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success(
+            cluster=Clusters.Descriptor,
+            attribute=Clusters.Descriptor.Attributes.PartsList,
+            node_id=self.dut_node_id,
+            endpoint=0,
+        ))
 
-        root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
-        set_of_endpoints_after_adding_device = set(root_part_list)
-        logging.info(f"Set of endpoints after adding the device: {set_of_endpoints_after_adding_device}")
+        await self.commission_via_commissioner_control(
+            controller_node_id=self.dut_node_id,
+            device_node_id=th_server_th_node_id)
 
-        asserts.assert_true(set_of_endpoints_after_adding_device.issuperset(
-            set_of_endpoints_before_adding_device), "Expected only new endpoints to be added")
-        unique_endpoints_set = set_of_endpoints_after_adding_device - set_of_endpoints_before_adding_device
-        asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint")
-        newly_added_endpoint = list(unique_endpoints_set)[0]
+        # Wait for the device to appear on the DUT_FSA_BRIDGE.
+        await asyncio.sleep(2)
 
-        th_sed_dut_unique_id = await self.read_single_attribute_check_success(cluster=Clusters.BridgedDeviceBasicInformation, attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, endpoint=newly_added_endpoint)
-        asserts.assert_true(type_matches(th_sed_dut_unique_id, str), "UniqueID should be a string")
-        asserts.assert_true(th_sed_dut_unique_id, "UniqueID should not be an empty string")
+        # Get the list of endpoints on the DUT_FSA_BRIDGE after adding the TH_SERVER_NO_UID.
+        dut_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success(
+            cluster=Clusters.Descriptor,
+            attribute=Clusters.Descriptor.Attributes.PartsList,
+            node_id=self.dut_node_id,
+            endpoint=0,
+        ))
+
+        # Get the endpoint number for just added TH_SERVER_NO_UID.
+        logging.info("Endpoints on DUT_FSA_BRIDGE: old=%s, new=%s", dut_fsa_bridge_endpoints, dut_fsa_bridge_endpoints_new)
+        asserts.assert_true(dut_fsa_bridge_endpoints_new.issuperset(dut_fsa_bridge_endpoints),
+                            "Expected only new endpoints to be added")
+        unique_endpoints_set = dut_fsa_bridge_endpoints_new - dut_fsa_bridge_endpoints
+        asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint on DUT_FSA")
+        dut_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0]
+
+        dut_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success(
+            cluster=Clusters.BridgedDeviceBasicInformation,
+            attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID,
+            endpoint=dut_fsa_bridge_th_server_endpoint)
+        asserts.assert_true(type_matches(dut_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string")
+        asserts.assert_true(dut_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string")
+        logging.info("UniqueID generated for TH_SERVER_NO_UID: %s", dut_fsa_bridge_th_server_unique_id)
 
 
 if __name__ == "__main__":
diff --git a/src/python_testing/TC_MCORE_FS_1_4.py b/src/python_testing/TC_MCORE_FS_1_4.py
new file mode 100644
index 0000000..8e05c2d
--- /dev/null
+++ b/src/python_testing/TC_MCORE_FS_1_4.py
@@ -0,0 +1,442 @@
+#
+#    Copyright (c) 2024 Project CHIP Authors
+#    All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+# This test requires a TH_SERVER_NO_UID application that returns UnsupportedAttribute
+# when reading UniqueID from BasicInformation Cluster. Please specify the app
+# location with --string-arg th_server_no_uid_app_path:<path_to_app>
+
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs: run1
+# test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py
+# test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234
+# test-runner-run/run1/factoryreset: True
+# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_fsa_app_path:examples/fabric-admin/scripts/fabric-sync-app.py th_fsa_admin_path:${FABRIC_ADMIN_APP} th_fsa_bridge_path:${FABRIC_BRIDGE_APP} th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID} dut_fsa_stdin_pipe:dut-fsa-stdin
+# test-runner-run/run1/script-start-delay: 5
+# test-runner-run/run1/quiet: false
+# === END CI TEST ARGUMENTS ===
+
+import asyncio
+import logging
+import os
+import random
+import subprocess
+import sys
+import tempfile
+import threading
+
+import chip.clusters as Clusters
+from chip import ChipDeviceCtrl
+from chip.interaction_model import Status
+from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches
+from mobly import asserts
+
+
+# TODO: Make this class more generic. Issue #35348
+class Subprocess(threading.Thread):
+
+    def __init__(self, args: list = [], stdout_cb=None, tag="", **kw):
+        super().__init__(**kw)
+        self.tag = f"[{tag}] " if tag else ""
+        self.stdout_cb = stdout_cb
+        self.args = args
+
+    def forward_f(self, f_in, f_out):
+        while True:
+            line = f_in.readline()
+            if not line:
+                break
+            f_out.write(f"{self.tag}{line}")
+            f_out.flush()
+            if self.stdout_cb is not None:
+                self.stdout_cb(line)
+
+    def run(self):
+        logging.info("RUN: %s", " ".join(self.args))
+        self.p = subprocess.Popen(self.args, errors="ignore", stdin=subprocess.PIPE,
+                                  stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        # Forward stdout and stderr with a tag attached.
+        forwarding_stdout_thread = threading.Thread(target=self.forward_f, args=[self.p.stdout, sys.stdout])
+        forwarding_stdout_thread.start()
+        forwarding_stderr_thread = threading.Thread(target=self.forward_f, args=[self.p.stderr, sys.stderr])
+        forwarding_stderr_thread.start()
+        # Wait for the process to finish.
+        self.p.wait()
+        forwarding_stdout_thread.join()
+        forwarding_stderr_thread.join()
+
+    def stop(self):
+        self.p.terminate()
+        self.join()
+
+
+class FabricSyncApp:
+
+    def _process_admin_output(self, line):
+        if self.wait_for_text_text is not None and self.wait_for_text_text in line:
+            self.wait_for_text_event.set()
+
+    def wait_for_text(self, timeout=30):
+        if not self.wait_for_text_event.wait(timeout=timeout):
+            raise Exception(f"Timeout waiting for text: {self.wait_for_text_text}")
+        self.wait_for_text_event.clear()
+        self.wait_for_text_text = None
+
+    def __init__(self, fabric_sync_app_path, fabric_admin_app_path, fabric_bridge_app_path,
+                 storage_dir, fabric_name=None, node_id=None, vendor_id=None,
+                 paa_trust_store_path=None, bridge_port=None, bridge_discriminator=None,
+                 bridge_passcode=None):
+
+        self.wait_for_text_event = threading.Event()
+        self.wait_for_text_text = None
+
+        args = [fabric_sync_app_path]
+        args.append(f"--app-admin={fabric_admin_app_path}")
+        args.append(f"--app-bridge={fabric_bridge_app_path}")
+        # Override default ports, so it will be possible to run
+        # our TH_FSA alongside the DUT_FSA during CI testing.
+        args.append("--app-admin-rpc-port=44000")
+        args.append("--app-bridge-rpc-port=44001")
+        # Keep the storage directory in a temporary location.
+        args.append(f"--storage-dir={storage_dir}")
+        if paa_trust_store_path is not None:
+            args.append(f"--paa-trust-store-path={paa_trust_store_path}")
+        if fabric_name is not None:
+            args.append(f"--commissioner-name={fabric_name}")
+        if node_id is not None:
+            args.append(f"--commissioner-node-id={node_id}")
+        args.append(f"--commissioner-vendor-id={vendor_id}")
+        args.append(f"--secured-device-port={bridge_port}")
+        args.append(f"--discriminator={bridge_discriminator}")
+        args.append(f"--passcode={bridge_passcode}")
+
+        self.fabric_sync_app = Subprocess(args, stdout_cb=self._process_admin_output)
+        self.wait_for_text_text = "Successfully opened pairing window on the device"
+        self.fabric_sync_app.start()
+
+        # Wait for the fabric-sync-app to be ready.
+        self.wait_for_text()
+
+    def commission_on_network(self, node_id, setup_pin_code=None, filter_type=None, filter=None):
+        self.wait_for_text_text = f"Commissioning complete for node ID {node_id:#018x}: success"
+        # Send the commissioning command to the admin.
+        self.fabric_sync_app.p.stdin.write(f"pairing onnetwork {node_id} {setup_pin_code}\n")
+        self.fabric_sync_app.p.stdin.flush()
+        # Wait for success message.
+        self.wait_for_text()
+
+    def stop(self):
+        self.fabric_sync_app.stop()
+
+
+class AppServer:
+
+    def __init__(self, app, storage_dir, port=None, discriminator=None, passcode=None):
+
+        args = [app]
+        args.extend(["--KVS", tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1]])
+        args.extend(['--secured-device-port', str(port)])
+        args.extend(["--discriminator", str(discriminator)])
+        args.extend(["--passcode", str(passcode)])
+        self.app = Subprocess(args, tag="SERVER")
+        self.app.start()
+
+    def stop(self):
+        self.app.stop()
+
+
+class TC_MCORE_FS_1_4(MatterBaseTest):
+
+    @property
+    def default_timeout(self) -> int:
+        # This test has some manual steps, so we need a longer timeout.
+        return 200
+
+    def setup_class(self):
+        super().setup_class()
+
+        self.th_fsa_controller = None
+        self.th_server = None
+        self.storage = None
+
+        # Get the path to the TH_FSA (fabric-admin and fabric-bridge) app from the user params.
+        th_fsa_app_path = self.user_params.get("th_fsa_app_path")
+        if not th_fsa_app_path:
+            asserts.fail("This test requires a TH_FSA app. Specify app path with --string-arg th_fsa_app_path:<path_to_app>")
+        if not os.path.exists(th_fsa_app_path):
+            asserts.fail(f"The path {th_fsa_app_path} does not exist")
+        th_fsa_admin_path = self.user_params.get("th_fsa_admin_path")
+        if not th_fsa_admin_path:
+            asserts.fail("This test requires a TH_FSA_ADMIN app. Specify app path with --string-arg th_fsa_admin_path:<path_to_app>")
+        if not os.path.exists(th_fsa_admin_path):
+            asserts.fail(f"The path {th_fsa_admin_path} does not exist")
+        th_fsa_bridge_path = self.user_params.get("th_fsa_bridge_path")
+        if not th_fsa_bridge_path:
+            asserts.fail("This test requires a TH_FSA_BRIDGE app. Specify app path with --string-arg th_fsa_bridge_path:<path_to_app>")
+        if not os.path.exists(th_fsa_bridge_path):
+            asserts.fail(f"The path {th_fsa_bridge_path} does not exist")
+
+        # Get the path to the TH_SERVER_NO_UID app from the user params.
+        th_server_app = self.user_params.get("th_server_no_uid_app_path", None)
+        if not th_server_app:
+            asserts.fail("This test requires a TH_SERVER_NO_UID app. Specify app path with --string-arg th_server_no_uid_app_path:<path_to_app>")
+        if not os.path.exists(th_server_app):
+            asserts.fail(f"The path {th_server_app} does not exist")
+
+        # Create a temporary storage directory for keeping KVS files.
+        self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__)
+        logging.info("Temporary storage directory: %s", self.storage.name)
+
+        self.th_fsa_bridge_address = "::1"
+        self.th_fsa_bridge_port = 5543
+        # Random discriminator between 0 and MAX - 1. The one-less is to save
+        # a room for the TH_SERVER_NO_UID discriminator.
+        self.th_fsa_bridge_discriminator = random.randint(0, 4094)
+        self.th_fsa_bridge_passcode = 20202021
+
+        self.th_fsa_controller = FabricSyncApp(
+            th_fsa_app_path,
+            th_fsa_admin_path,
+            th_fsa_bridge_path,
+            storage_dir=self.storage.name,
+            paa_trust_store_path=self.matter_test_config.paa_trust_store_path,
+            bridge_port=self.th_fsa_bridge_port,
+            bridge_discriminator=self.th_fsa_bridge_discriminator,
+            bridge_passcode=self.th_fsa_bridge_passcode,
+            vendor_id=0xFFF1)
+
+        # Get the named pipe path for the DUT_FSA app input from the user params.
+        dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe", None)
+        if dut_fsa_stdin_pipe is not None:
+            self.dut_fsa_stdin = open(dut_fsa_stdin_pipe, "w")
+
+        self.th_server_port = 5544
+        self.th_server_discriminator = self.th_fsa_bridge_discriminator + 1
+        self.th_server_passcode = 20202021
+
+        # Start the TH_SERVER_NO_UID app.
+        self.th_server = AppServer(
+            th_server_app,
+            storage_dir=self.storage.name,
+            port=self.th_server_port,
+            discriminator=self.th_server_discriminator,
+            passcode=self.th_server_passcode)
+
+    def teardown_class(self):
+        if self.th_fsa_controller is not None:
+            self.th_fsa_controller.stop()
+        if self.th_server is not None:
+            self.th_server.stop()
+        if self.storage is not None:
+            self.storage.cleanup()
+        super().teardown_class()
+
+    def steps_TC_MCORE_FS_1_4(self) -> list[TestStep]:
+        return [
+            TestStep(0, "Commission DUT if not done", is_commissioning=True),
+            TestStep(1, "TH commissions TH_SERVER_NO_UID to TH's fabric.",
+                     "TH verifies that the TH_SERVER_NO_UID does not provide a UniqueID."),
+            TestStep(2, "TH instructs TH_FSA to commission TH_SERVER_NO_UID to TH_FSA's fabric."),
+            TestStep(3, "TH instructs TH_FSA to open up commissioning window on it's aggregator."),
+            TestStep(4, "Follow manufacturer provided instructions to have DUT_FSA commission TH_FSA's aggregator."),
+            TestStep(5, "Follow manufacturer provided instructions to enable DUT_FSA to synchronize TH_SERVER_NO_UID"
+                     " from TH_FSA onto DUT_FSA's fabric. TH to provide endpoint saved from step 2 in user prompt."),
+            TestStep(6, "DUT_FSA synchronizes TH_SERVER_NO_UID onto DUT_FSA's fabric and copies the UniqueID presented"
+                     " by TH_FSA's Bridged Device Basic Information Cluster."),
+        ]
+
+    @async_test_body
+    async def test_TC_MCORE_FS_1_4(self):
+        self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
+
+        # Commissioning - done
+        self.step(0)
+
+        self.step(1)
+
+        th_server_th_node_id = 1
+        await self.default_controller.CommissionOnNetwork(
+            nodeId=th_server_th_node_id,
+            setupPinCode=self.th_server_passcode,
+            filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
+            filter=self.th_server_discriminator,
+        )
+
+        await self.read_single_attribute_expect_error(
+            cluster=Clusters.BasicInformation,
+            attribute=Clusters.BasicInformation.Attributes.UniqueID,
+            node_id=th_server_th_node_id,
+            error=Status.UnsupportedAttribute,
+        )
+
+        self.step(2)
+
+        th_fsa_bridge_th_node_id = 2
+        # Commissioning TH_FSA_BRIDGE to TH fabric.
+        await self.default_controller.CommissionOnNetwork(
+            nodeId=th_fsa_bridge_th_node_id,
+            setupPinCode=self.th_fsa_bridge_passcode,
+            filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
+            filter=self.th_fsa_bridge_discriminator,
+        )
+
+        # Get the list of endpoints on the TH_FSA_BRIDGE before adding the TH_SERVER_NO_UID.
+        th_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success(
+            cluster=Clusters.Descriptor,
+            attribute=Clusters.Descriptor.Attributes.PartsList,
+            node_id=th_fsa_bridge_th_node_id,
+            endpoint=0,
+        ))
+
+        discriminator = random.randint(0, 4095)
+        # Open commissioning window on TH_SERVER_NO_UID.
+        params = await self.default_controller.OpenCommissioningWindow(
+            nodeid=th_server_th_node_id,
+            option=self.default_controller.CommissioningWindowPasscode.kTokenWithRandomPin,
+            discriminator=discriminator,
+            iteration=10000,
+            timeout=600)
+
+        th_server_th_fsa_node_id = 3
+        # Commissioning TH_SERVER_NO_UID to TH_FSA.
+        self.th_fsa_controller.commission_on_network(
+            node_id=th_server_th_fsa_node_id,
+            setup_pin_code=params.setupPinCode,
+            filter_type=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
+            filter=discriminator,
+        )
+
+        # Wait some time, so the dynamic endpoint will appear on the TH_FSA_BRIDGE.
+        await asyncio.sleep(5)
+
+        # Get the list of endpoints on the TH_FSA_BRIDGE after adding the TH_SERVER_NO_UID.
+        th_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success(
+            cluster=Clusters.Descriptor,
+            attribute=Clusters.Descriptor.Attributes.PartsList,
+            node_id=th_fsa_bridge_th_node_id,
+            endpoint=0,
+        ))
+
+        # Get the endpoint number for just added TH_SERVER_NO_UID.
+        logging.info("Endpoints on TH_FSA_BRIDGE: old=%s, new=%s", th_fsa_bridge_endpoints, th_fsa_bridge_endpoints_new)
+        asserts.assert_true(th_fsa_bridge_endpoints_new.issuperset(th_fsa_bridge_endpoints),
+                            "Expected only new endpoints to be added")
+        unique_endpoints_set = th_fsa_bridge_endpoints_new - th_fsa_bridge_endpoints
+        asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint")
+        th_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0]
+
+        # Verify that TH_FSA created a UniqueID for TH_SERVER_NO_UID.
+        th_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success(
+            cluster=Clusters.BridgedDeviceBasicInformation,
+            attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID,
+            node_id=th_fsa_bridge_th_node_id,
+            endpoint=th_fsa_bridge_th_server_endpoint)
+        asserts.assert_true(type_matches(th_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string")
+        asserts.assert_true(th_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string")
+        logging.info("UniqueID generated for TH_SERVER_NO_UID: %s", th_fsa_bridge_th_server_unique_id)
+
+        self.step(3)
+
+        discriminator = random.randint(0, 4095)
+        # Open commissioning window on TH_FSA_BRIDGE.
+        params = await self.default_controller.OpenCommissioningWindow(
+            nodeid=th_fsa_bridge_th_node_id,
+            option=self.default_controller.CommissioningWindowPasscode.kTokenWithRandomPin,
+            discriminator=discriminator,
+            iteration=10000,
+            timeout=600)
+
+        self.step(4)
+
+        # Commissioning TH_FSA_BRIDGE to DUT_FSA fabric.
+        if not self.is_ci:
+            self.wait_for_user_input(
+                f"Commission TH_FSA's aggregator on DUT using manufacturer specified mechanism.\n"
+                f"Use the following parameters:\n"
+                f"- discriminator: {discriminator}\n"
+                f"- setupPinCode: {params.setupPinCode}\n"
+                f"- setupQRCode: {params.setupQRCode}\n"
+                f"- setupManualCode: {params.setupManualCode}\n"
+                f"If using FabricSync Admin, you may type:\n"
+                f">>> fabricsync add-bridge <desired_node_id> {params.setupPinCode} <th_host_ip> {self.th_fsa_bridge_port}")
+        else:
+            self.dut_fsa_stdin.write(
+                f"fabricsync add-bridge 10 {params.setupPinCode} {self.th_fsa_bridge_address} {self.th_fsa_bridge_port}\n")
+            self.dut_fsa_stdin.flush()
+            # Wait for the commissioning to complete.
+            await asyncio.sleep(5)
+
+        self.step(5)
+
+        # Get the list of endpoints on the DUT_FSA_BRIDGE before synchronization.
+        dut_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success(
+            cluster=Clusters.Descriptor,
+            attribute=Clusters.Descriptor.Attributes.PartsList,
+            node_id=self.dut_node_id,
+            endpoint=0,
+        ))
+
+        # Synchronize TH_SERVER_NO_UID from TH_FSA to DUT_FSA fabric.
+        if not self.is_ci:
+            self.wait_for_user_input(
+                f"Synchronize endpoint from TH_FSA's aggregator to DUT using manufacturer specified mechanism.\n"
+                f"Use the following parameters:\n"
+                f"- endpointID: {th_fsa_bridge_th_server_endpoint}\n"
+                f"If using FabricSync Admin, you may type:\n"
+                f">>> fabricsync sync-device {th_fsa_bridge_th_server_endpoint}")
+        else:
+            self.dut_fsa_stdin.write(f"fabricsync sync-device {th_fsa_bridge_th_server_endpoint}\n")
+            self.dut_fsa_stdin.flush()
+            # Wait for the synchronization to complete.
+            await asyncio.sleep(5)
+
+        self.step(6)
+
+        # Get the list of endpoints on the DUT_FSA_BRIDGE after synchronization
+        dut_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success(
+            cluster=Clusters.Descriptor,
+            attribute=Clusters.Descriptor.Attributes.PartsList,
+            node_id=self.dut_node_id,
+            endpoint=0,
+        ))
+
+        # Get the endpoint number for just synced TH_SERVER_NO_UID.
+        logging.info("Endpoints on DUT_FSA_BRIDGE: old=%s, new=%s", dut_fsa_bridge_endpoints, dut_fsa_bridge_endpoints_new)
+        asserts.assert_true(dut_fsa_bridge_endpoints_new.issuperset(dut_fsa_bridge_endpoints),
+                            "Expected only new endpoints to be added")
+        unique_endpoints_set = dut_fsa_bridge_endpoints_new - dut_fsa_bridge_endpoints
+        asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint on DUT_FSA")
+        dut_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0]
+
+        # Verify that DUT_FSA copied the TH_SERVER_NO_UID UniqueID from TH_FSA.
+        dut_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success(
+            cluster=Clusters.BridgedDeviceBasicInformation,
+            attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID,
+            endpoint=dut_fsa_bridge_th_server_endpoint)
+        asserts.assert_true(type_matches(dut_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string")
+        asserts.assert_true(dut_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string")
+        logging.info("UniqueID for TH_SERVER_NO_UID on DUT_FSA: %s", th_fsa_bridge_th_server_unique_id)
+
+        # Make sure that the UniqueID on the DUT_FSA_BRIDGE is the same as the one on the DUT_FSA_BRIDGE.
+        asserts.assert_equal(dut_fsa_bridge_th_server_unique_id, th_fsa_bridge_th_server_unique_id,
+                             "UniqueID on DUT_FSA and TH_FSA should be the same")
+
+
+if __name__ == "__main__":
+    default_matter_test_main()
diff --git a/src/python_testing/execute_python_tests.py b/src/python_testing/execute_python_tests.py
index 8249be9..1f6afa9 100644
--- a/src/python_testing/execute_python_tests.py
+++ b/src/python_testing/execute_python_tests.py
@@ -75,6 +75,7 @@
         "TC_MCORE_FS_1_1.py",
         "TC_MCORE_FS_1_2.py",
         "TC_MCORE_FS_1_3.py",
+        "TC_MCORE_FS_1_4.py",
         "TC_MCORE_FS_1_5.py",
         "TC_OCC_3_1.py",
         "TC_OCC_3_2.py",