Add DeviceSubscriptionManager to manage subscription of fabric-admin (#35305)

---------

Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: Andrei Litvin <andy314@gmail.com>
Co-authored-by: saurabhst <s.kumar9@samsung.com>
diff --git a/examples/fabric-admin/BUILD.gn b/examples/fabric-admin/BUILD.gn
index e408cdd..805fda1 100644
--- a/examples/fabric-admin/BUILD.gn
+++ b/examples/fabric-admin/BUILD.gn
@@ -84,6 +84,8 @@
     "device_manager/DeviceManager.h",
     "device_manager/DeviceSubscription.cpp",
     "device_manager/DeviceSubscription.h",
+    "device_manager/DeviceSubscriptionManager.cpp",
+    "device_manager/DeviceSubscriptionManager.h",
     "device_manager/DeviceSynchronization.cpp",
     "device_manager/DeviceSynchronization.h",
   ]
diff --git a/examples/fabric-admin/commands/pairing/PairingCommand.cpp b/examples/fabric-admin/commands/pairing/PairingCommand.cpp
index 89887ec..c9b58cd 100644
--- a/examples/fabric-admin/commands/pairing/PairingCommand.cpp
+++ b/examples/fabric-admin/commands/pairing/PairingCommand.cpp
@@ -423,7 +423,9 @@
     {
         // print to console
         fprintf(stderr, "New device with Node ID: 0x%lx has been successfully added.\n", nodeId);
-        DeviceSynchronizer::Instance().StartDeviceSynchronization(CurrentCommissioner(), mNodeId, mDeviceIsICD);
+        // CurrentCommissioner() has a lifetime that is the entire life of the application itself
+        // so it is safe to provide to StartDeviceSynchronization.
+        DeviceSynchronizer::Instance().StartDeviceSynchronization(&CurrentCommissioner(), mNodeId, mDeviceIsICD);
     }
     else
     {
@@ -564,6 +566,8 @@
         fprintf(stderr, "Device with Node ID: 0x%lx has been successfully removed.\n", nodeId);
 
 #if defined(PW_RPC_ENABLED)
+        chip::app::InteractionModelEngine::GetInstance()->ShutdownSubscriptions(command->CurrentCommissioner().GetFabricIndex(),
+                                                                                nodeId);
         RemoveSynchronizedDevice(nodeId);
 #endif
     }
diff --git a/examples/fabric-admin/device_manager/DeviceSubscription.cpp b/examples/fabric-admin/device_manager/DeviceSubscription.cpp
index 73f9098..eabbdda 100644
--- a/examples/fabric-admin/device_manager/DeviceSubscription.cpp
+++ b/examples/fabric-admin/device_manager/DeviceSubscription.cpp
@@ -100,7 +100,7 @@
 #if defined(PW_RPC_ENABLED)
         AdminCommissioningAttributeChanged(mCurrentAdministratorCommissioningAttributes);
 #else
-        ChipLogError(NotSpecified, "Cannot synchronize device with fabric bridge: RPC not enabled");
+        ChipLogError(NotSpecified, "Cannot forward Administrator Commissioning Attribute to fabric bridge: RPC not enabled");
 #endif
         mChangeDetected = false;
     }
@@ -108,7 +108,10 @@
 
 void DeviceSubscription::OnDone(ReadClient * apReadClient)
 {
-    // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
+    // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+    // DeviceSubscription.
+    MoveToState(State::AwaitingDestruction);
+    mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
 }
 
 void DeviceSubscription::OnError(CHIP_ERROR error)
@@ -118,6 +121,15 @@
 
 void DeviceSubscription::OnDeviceConnected(Messaging::ExchangeManager & exchangeMgr, const SessionHandle & sessionHandle)
 {
+    if (mState == State::Stopping)
+    {
+        // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+        // DeviceSubscription.
+        MoveToState(State::AwaitingDestruction);
+        mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
+        return;
+    }
+    VerifyOrDie(mState == State::Connecting);
     mClient = std::make_unique<ReadClient>(app::InteractionModelEngine::GetInstance(), &exchangeMgr /* echangeMgr */,
                                            *this /* callback */, ReadClient::InteractionType::Subscribe);
     VerifyOrDie(mClient);
@@ -136,25 +148,95 @@
     if (err != CHIP_NO_ERROR)
     {
         ChipLogError(NotSpecified, "Failed to issue subscription to AdministratorCommissioning data");
-        // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
+        // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+        // DeviceSubscription.
+        MoveToState(State::AwaitingDestruction);
+        mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
+        return;
     }
+    MoveToState(State::SubscriptionStarted);
+}
+
+void DeviceSubscription::MoveToState(const State aTargetState)
+{
+    mState = aTargetState;
+    ChipLogDetail(NotSpecified, "DeviceSubscription moving to [%10.10s]", GetStateStr());
+}
+
+const char * DeviceSubscription::GetStateStr() const
+{
+    switch (mState)
+    {
+    case State::Idle:
+        return "Idle";
+
+    case State::Connecting:
+        return "Connecting";
+
+    case State::Stopping:
+        return "Stopping";
+
+    case State::SubscriptionStarted:
+        return "SubscriptionStarted";
+
+    case State::AwaitingDestruction:
+        return "AwaitingDestruction";
+    }
+    return "N/A";
 }
 
 void DeviceSubscription::OnDeviceConnectionFailure(const ScopedNodeId & peerId, CHIP_ERROR error)
 {
-    ChipLogError(NotSpecified, "Device Sync failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId()));
-    // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
+    VerifyOrDie(mState == State::Connecting || mState == State::Stopping);
+    ChipLogError(NotSpecified, "DeviceSubscription failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId()));
+    // TODO(#35333) Figure out how we should recover if we fail to connect and mState == State::Connecting.
+
+    // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+    // DeviceSubscription.
+    MoveToState(State::AwaitingDestruction);
+    mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
 }
 
-void DeviceSubscription::StartSubscription(Controller::DeviceController & controller, NodeId nodeId)
+CHIP_ERROR DeviceSubscription::StartSubscription(OnDoneCallback onDoneCallback, Controller::DeviceController & controller,
+                                                 NodeId nodeId)
 {
-    VerifyOrDie(!mSubscriptionStarted);
+    assertChipStackLockedByCurrentThread();
+    VerifyOrDie(mState == State::Idle);
 
     mCurrentAdministratorCommissioningAttributes         = chip_rpc_AdministratorCommissioningChanged_init_default;
     mCurrentAdministratorCommissioningAttributes.node_id = nodeId;
     mCurrentAdministratorCommissioningAttributes.window_status =
         static_cast<uint32_t>(Clusters::AdministratorCommissioning::CommissioningWindowStatusEnum::kWindowNotOpen);
-    mSubscriptionStarted = true;
+    mState          = State::Connecting;
+    mOnDoneCallback = onDoneCallback;
 
-    controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
+    return controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
+}
+
+void DeviceSubscription::StopSubscription()
+{
+    assertChipStackLockedByCurrentThread();
+    VerifyOrDie(mState != State::Idle);
+    // Something is seriously wrong if we die on the line below
+    VerifyOrDie(mState != State::AwaitingDestruction);
+
+    if (mState == State::Stopping)
+    {
+        // Stop is called again while we are still waiting on connected callbacks
+        return;
+    }
+
+    if (mState == State::Connecting)
+    {
+        MoveToState(State::Stopping);
+        return;
+    }
+
+    // By calling reset on our ReadClient we terminate the subscription.
+    VerifyOrDie(mClient);
+    mClient.reset();
+    // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+    // DeviceSubscription.
+    MoveToState(State::AwaitingDestruction);
+    mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
 }
diff --git a/examples/fabric-admin/device_manager/DeviceSubscription.h b/examples/fabric-admin/device_manager/DeviceSubscription.h
index 7a9e504..353eb30 100644
--- a/examples/fabric-admin/device_manager/DeviceSubscription.h
+++ b/examples/fabric-admin/device_manager/DeviceSubscription.h
@@ -26,6 +26,8 @@
 #include "fabric_bridge_service/fabric_bridge_service.pb.h"
 #include "fabric_bridge_service/fabric_bridge_service.rpc.pb.h"
 
+class DeviceSubscriptionManager;
+
 /// Attribute subscription to attributes that are important to keep track and send to fabric-bridge
 /// via RPC when change has been identified.
 ///
@@ -35,11 +37,18 @@
 class DeviceSubscription : public chip::app::ReadClient::Callback
 {
 public:
+    using OnDoneCallback = std::function<void(chip::NodeId)>;
+
     DeviceSubscription();
 
-    /// Usually called after we have added a synchronized device to fabric-bridge to monitor
-    /// for any changes that need to be propgated to fabric-bridge.
-    void StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId);
+    CHIP_ERROR StartSubscription(OnDoneCallback onDoneCallback, chip::Controller::DeviceController & controller,
+                                 chip::NodeId nodeId);
+
+    /// This will trigger stopping the subscription. Once subscription is stopped the OnDoneCallback
+    /// provided in StartSubscription will be called to indicate that subscription have been terminated.
+    ///
+    /// Must only be called after StartSubscription was successfully called.
+    void StopSubscription();
 
     ///////////////////////////////////////////////////////////////
     // ReadClient::Callback implementation
@@ -57,6 +66,19 @@
     void OnDeviceConnectionFailure(const chip::ScopedNodeId & peerId, CHIP_ERROR error);
 
 private:
+    enum class State : uint8_t
+    {
+        Idle,                ///< Default state that the object starts out in, where no work has commenced
+        Connecting,          ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks to be called
+        Stopping,            ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks so we can terminate
+        SubscriptionStarted, ///< We have started a subscription.
+        AwaitingDestruction, ///< The object has completed its work and is awaiting destruction.
+    };
+
+    void MoveToState(const State aTargetState);
+    const char * GetStateStr() const;
+
+    OnDoneCallback mOnDoneCallback;
     std::unique_ptr<chip::app::ReadClient> mClient;
 
     chip::Callback::Callback<chip::OnDeviceConnected> mOnDeviceConnectedCallback;
@@ -64,7 +86,5 @@
 
     chip_rpc_AdministratorCommissioningChanged mCurrentAdministratorCommissioningAttributes;
     bool mChangeDetected = false;
-    // Ensures that DeviceSubscription starts a subscription only once.  If instance of
-    // DeviceSubscription  can be reused, the class documentation should be updated accordingly.
-    bool mSubscriptionStarted = false;
+    State mState         = State::Idle;
 };
diff --git a/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp
new file mode 100644
index 0000000..c59fd98
--- /dev/null
+++ b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp
@@ -0,0 +1,77 @@
+/*
+ *   Copyright (c) 2024 Project CHIP Authors
+ *   All rights reserved.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+
+#include "DeviceSubscriptionManager.h"
+#include "rpc/RpcClient.h"
+
+#include <app/InteractionModelEngine.h>
+#include <app/server/Server.h>
+
+#include <app-common/zap-generated/ids/Attributes.h>
+#include <app-common/zap-generated/ids/Clusters.h>
+#include <device_manager/DeviceManager.h>
+
+using namespace ::chip;
+using namespace ::chip::app;
+
+DeviceSubscriptionManager & DeviceSubscriptionManager::Instance()
+{
+    static DeviceSubscriptionManager instance;
+    return instance;
+}
+
+CHIP_ERROR DeviceSubscriptionManager::StartSubscription(Controller::DeviceController & controller, NodeId nodeId)
+{
+    assertChipStackLockedByCurrentThread();
+    auto it = mDeviceSubscriptionMap.find(nodeId);
+    VerifyOrReturnError((it == mDeviceSubscriptionMap.end()), CHIP_ERROR_INCORRECT_STATE);
+
+    auto deviceSubscription = std::make_unique<DeviceSubscription>();
+    VerifyOrReturnError(deviceSubscription, CHIP_ERROR_NO_MEMORY);
+    ReturnErrorOnFailure(deviceSubscription->StartSubscription(
+        [this](NodeId aNodeId) { this->DeviceSubscriptionTerminated(aNodeId); }, controller, nodeId));
+
+    mDeviceSubscriptionMap[nodeId] = std::move(deviceSubscription);
+    return CHIP_NO_ERROR;
+}
+
+CHIP_ERROR DeviceSubscriptionManager::RemoveSubscription(chip::NodeId nodeId)
+{
+    assertChipStackLockedByCurrentThread();
+    auto it = mDeviceSubscriptionMap.find(nodeId);
+    VerifyOrReturnError((it != mDeviceSubscriptionMap.end()), CHIP_ERROR_NOT_FOUND);
+    // We cannot safely erase the DeviceSubscription from mDeviceSubscriptionMap.
+    // After calling StopSubscription we expect DeviceSubscription to eventually
+    // call the OnDoneCallback we provided in StartSubscription which will call
+    // DeviceSubscriptionTerminated where it will be erased from the
+    // mDeviceSubscriptionMap.
+    it->second->StopSubscription();
+    return CHIP_NO_ERROR;
+}
+
+void DeviceSubscriptionManager::DeviceSubscriptionTerminated(NodeId nodeId)
+{
+    assertChipStackLockedByCurrentThread();
+    auto it = mDeviceSubscriptionMap.find(nodeId);
+    // DeviceSubscriptionTerminated is a private method that is expected to only
+    // be called by DeviceSubscription when it is terminal and is ready to be
+    // cleaned up and removed. If it is not mapped that means something has gone
+    // really wrong and there is likely a memory leak somewhere.
+    VerifyOrDie(it != mDeviceSubscriptionMap.end());
+    mDeviceSubscriptionMap.erase(nodeId);
+}
diff --git a/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h
new file mode 100644
index 0000000..5f4e115
--- /dev/null
+++ b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h
@@ -0,0 +1,43 @@
+/*
+ *   Copyright (c) 2024 Project CHIP Authors
+ *   All rights reserved.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+#pragma once
+
+#include "DeviceSubscription.h"
+
+#include <app/ReadClient.h>
+#include <controller/CHIPDeviceController.h>
+#include <lib/core/DataModelTypes.h>
+
+#include <memory>
+
+class DeviceSubscriptionManager
+{
+public:
+    static DeviceSubscriptionManager & Instance();
+
+    /// Usually called after we have added a synchronized device to fabric-bridge to monitor
+    /// for any changes that need to be propagated to fabric-bridge.
+    CHIP_ERROR StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId);
+
+    CHIP_ERROR RemoveSubscription(chip::NodeId nodeId);
+
+private:
+    void DeviceSubscriptionTerminated(chip::NodeId nodeId);
+
+    std::unordered_map<chip::NodeId, std::unique_ptr<DeviceSubscription>> mDeviceSubscriptionMap;
+};
diff --git a/examples/fabric-admin/device_manager/DeviceSynchronization.cpp b/examples/fabric-admin/device_manager/DeviceSynchronization.cpp
index 3239f89..adbe0c0 100644
--- a/examples/fabric-admin/device_manager/DeviceSynchronization.cpp
+++ b/examples/fabric-admin/device_manager/DeviceSynchronization.cpp
@@ -17,6 +17,8 @@
  */
 
 #include "DeviceSynchronization.h"
+
+#include "DeviceSubscriptionManager.h"
 #include "rpc/RpcClient.h"
 
 #include <app/InteractionModelEngine.h>
@@ -72,13 +74,6 @@
     VerifyOrDie(path.mEndpointId == kRootEndpointId);
     VerifyOrDie(path.mClusterId == Clusters::BasicInformation::Id);
 
-    CHIP_ERROR error = status.ToChipError();
-    if (CHIP_NO_ERROR != error)
-    {
-        ChipLogError(NotSpecified, "Response Failure: %" CHIP_ERROR_FORMAT, error.Format());
-        return;
-    }
-
     switch (path.mAttributeId)
     {
     case Clusters::BasicInformation::Attributes::UniqueID::Id:
@@ -131,6 +126,17 @@
     if (!DeviceMgr().IsCurrentBridgeDevice(mCurrentDeviceData.node_id))
     {
         AddSynchronizedDevice(mCurrentDeviceData);
+        // TODO(#35077) Figure out how we should reflect CADMIN values of ICD.
+        if (!mCurrentDeviceData.is_icd)
+        {
+            VerifyOrDie(mController);
+            // TODO(#35333) Figure out how we should recover in this circumstance.
+            CHIP_ERROR err = DeviceSubscriptionManager::Instance().StartSubscription(*mController, mCurrentDeviceData.node_id);
+            if (err != CHIP_NO_ERROR)
+            {
+                ChipLogError(NotSpecified, "Failed start subscription to ");
+            }
+        }
     }
 #else
     ChipLogError(NotSpecified, "Cannot synchronize device with fabric bridge: RPC not enabled");
@@ -178,9 +184,10 @@
     mDeviceSyncInProcess = false;
 }
 
-void DeviceSynchronizer::StartDeviceSynchronization(chip::Controller::DeviceController & controller, chip::NodeId nodeId,
+void DeviceSynchronizer::StartDeviceSynchronization(chip::Controller::DeviceController * controller, chip::NodeId nodeId,
                                                     bool deviceIsIcd)
 {
+    VerifyOrDie(controller);
     if (mDeviceSyncInProcess)
     {
         ChipLogError(NotSpecified, "Device Sync NOT POSSIBLE: another sync is in progress");
@@ -194,5 +201,6 @@
 
     mDeviceSyncInProcess = true;
 
-    controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
+    mController = controller;
+    controller->GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
 }
diff --git a/examples/fabric-admin/device_manager/DeviceSynchronization.h b/examples/fabric-admin/device_manager/DeviceSynchronization.h
index 6aca23f..c5a4237 100644
--- a/examples/fabric-admin/device_manager/DeviceSynchronization.h
+++ b/examples/fabric-admin/device_manager/DeviceSynchronization.h
@@ -40,7 +40,12 @@
     /// Usually called after commissioning is complete, initiates a
     /// read of required data from the remote node ID and then will synchronize
     /// the device towards the fabric bridge
-    void StartDeviceSynchronization(chip::Controller::DeviceController & controller, chip::NodeId nodeId, bool deviceIsIcd);
+    ///
+    /// @param controller Must be a non-null pointer. The DeviceController instance
+    ///        pointed to must out live the entire device synchronization process.
+    /// @param nodeId Node ID of the device we need to syncronize data from.
+    /// @param deviceIsIcd If the device is an ICD device.
+    void StartDeviceSynchronization(chip::Controller::DeviceController * controller, chip::NodeId nodeId, bool deviceIsIcd);
 
     ///////////////////////////////////////////////////////////////
     // ReadClient::Callback implementation
@@ -65,6 +70,9 @@
     chip::Callback::Callback<chip::OnDeviceConnected> mOnDeviceConnectedCallback;
     chip::Callback::Callback<chip::OnDeviceConnectionFailure> mOnDeviceConnectionFailureCallback;
 
-    bool mDeviceSyncInProcess                      = false;
-    chip_rpc_SynchronizedDevice mCurrentDeviceData = chip_rpc_SynchronizedDevice_init_default;
+    // mController is expected to remain valid throughout the entire device synchronization process (i.e. when
+    // mDeviceSyncInProcess is true).
+    chip::Controller::DeviceController * mController = nullptr;
+    bool mDeviceSyncInProcess                        = false;
+    chip_rpc_SynchronizedDevice mCurrentDeviceData   = chip_rpc_SynchronizedDevice_init_default;
 };