Add DeviceSubscriptionManager to manage subscription of fabric-admin (#35305)
---------
Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: Andrei Litvin <andy314@gmail.com>
Co-authored-by: saurabhst <s.kumar9@samsung.com>
diff --git a/examples/fabric-admin/BUILD.gn b/examples/fabric-admin/BUILD.gn
index e408cdd..805fda1 100644
--- a/examples/fabric-admin/BUILD.gn
+++ b/examples/fabric-admin/BUILD.gn
@@ -84,6 +84,8 @@
"device_manager/DeviceManager.h",
"device_manager/DeviceSubscription.cpp",
"device_manager/DeviceSubscription.h",
+ "device_manager/DeviceSubscriptionManager.cpp",
+ "device_manager/DeviceSubscriptionManager.h",
"device_manager/DeviceSynchronization.cpp",
"device_manager/DeviceSynchronization.h",
]
diff --git a/examples/fabric-admin/commands/pairing/PairingCommand.cpp b/examples/fabric-admin/commands/pairing/PairingCommand.cpp
index 89887ec..c9b58cd 100644
--- a/examples/fabric-admin/commands/pairing/PairingCommand.cpp
+++ b/examples/fabric-admin/commands/pairing/PairingCommand.cpp
@@ -423,7 +423,9 @@
{
// print to console
fprintf(stderr, "New device with Node ID: 0x%lx has been successfully added.\n", nodeId);
- DeviceSynchronizer::Instance().StartDeviceSynchronization(CurrentCommissioner(), mNodeId, mDeviceIsICD);
+ // CurrentCommissioner() has a lifetime that is the entire life of the application itself
+ // so it is safe to provide to StartDeviceSynchronization.
+ DeviceSynchronizer::Instance().StartDeviceSynchronization(&CurrentCommissioner(), mNodeId, mDeviceIsICD);
}
else
{
@@ -564,6 +566,8 @@
fprintf(stderr, "Device with Node ID: 0x%lx has been successfully removed.\n", nodeId);
#if defined(PW_RPC_ENABLED)
+ chip::app::InteractionModelEngine::GetInstance()->ShutdownSubscriptions(command->CurrentCommissioner().GetFabricIndex(),
+ nodeId);
RemoveSynchronizedDevice(nodeId);
#endif
}
diff --git a/examples/fabric-admin/device_manager/DeviceSubscription.cpp b/examples/fabric-admin/device_manager/DeviceSubscription.cpp
index 73f9098..eabbdda 100644
--- a/examples/fabric-admin/device_manager/DeviceSubscription.cpp
+++ b/examples/fabric-admin/device_manager/DeviceSubscription.cpp
@@ -100,7 +100,7 @@
#if defined(PW_RPC_ENABLED)
AdminCommissioningAttributeChanged(mCurrentAdministratorCommissioningAttributes);
#else
- ChipLogError(NotSpecified, "Cannot synchronize device with fabric bridge: RPC not enabled");
+ ChipLogError(NotSpecified, "Cannot forward Administrator Commissioning Attribute to fabric bridge: RPC not enabled");
#endif
mChangeDetected = false;
}
@@ -108,7 +108,10 @@
void DeviceSubscription::OnDone(ReadClient * apReadClient)
{
- // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
+ // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+ // DeviceSubscription.
+ MoveToState(State::AwaitingDestruction);
+ mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
}
void DeviceSubscription::OnError(CHIP_ERROR error)
@@ -118,6 +121,15 @@
void DeviceSubscription::OnDeviceConnected(Messaging::ExchangeManager & exchangeMgr, const SessionHandle & sessionHandle)
{
+ if (mState == State::Stopping)
+ {
+ // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+ // DeviceSubscription.
+ MoveToState(State::AwaitingDestruction);
+ mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
+ return;
+ }
+ VerifyOrDie(mState == State::Connecting);
mClient = std::make_unique<ReadClient>(app::InteractionModelEngine::GetInstance(), &exchangeMgr /* echangeMgr */,
*this /* callback */, ReadClient::InteractionType::Subscribe);
VerifyOrDie(mClient);
@@ -136,25 +148,95 @@
if (err != CHIP_NO_ERROR)
{
ChipLogError(NotSpecified, "Failed to issue subscription to AdministratorCommissioning data");
- // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
+ // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+ // DeviceSubscription.
+ MoveToState(State::AwaitingDestruction);
+ mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
+ return;
}
+ MoveToState(State::SubscriptionStarted);
+}
+
+void DeviceSubscription::MoveToState(const State aTargetState)
+{
+ mState = aTargetState;
+ ChipLogDetail(NotSpecified, "DeviceSubscription moving to [%10.10s]", GetStateStr());
+}
+
+const char * DeviceSubscription::GetStateStr() const
+{
+ switch (mState)
+ {
+ case State::Idle:
+ return "Idle";
+
+ case State::Connecting:
+ return "Connecting";
+
+ case State::Stopping:
+ return "Stopping";
+
+ case State::SubscriptionStarted:
+ return "SubscriptionStarted";
+
+ case State::AwaitingDestruction:
+ return "AwaitingDestruction";
+ }
+ return "N/A";
}
void DeviceSubscription::OnDeviceConnectionFailure(const ScopedNodeId & peerId, CHIP_ERROR error)
{
- ChipLogError(NotSpecified, "Device Sync failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId()));
- // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
+ VerifyOrDie(mState == State::Connecting || mState == State::Stopping);
+ ChipLogError(NotSpecified, "DeviceSubscription failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId()));
+ // TODO(#35333) Figure out how we should recover if we fail to connect and mState == State::Connecting.
+
+ // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+ // DeviceSubscription.
+ MoveToState(State::AwaitingDestruction);
+ mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
}
-void DeviceSubscription::StartSubscription(Controller::DeviceController & controller, NodeId nodeId)
+CHIP_ERROR DeviceSubscription::StartSubscription(OnDoneCallback onDoneCallback, Controller::DeviceController & controller,
+ NodeId nodeId)
{
- VerifyOrDie(!mSubscriptionStarted);
+ assertChipStackLockedByCurrentThread();
+ VerifyOrDie(mState == State::Idle);
mCurrentAdministratorCommissioningAttributes = chip_rpc_AdministratorCommissioningChanged_init_default;
mCurrentAdministratorCommissioningAttributes.node_id = nodeId;
mCurrentAdministratorCommissioningAttributes.window_status =
static_cast<uint32_t>(Clusters::AdministratorCommissioning::CommissioningWindowStatusEnum::kWindowNotOpen);
- mSubscriptionStarted = true;
+ mState = State::Connecting;
+ mOnDoneCallback = onDoneCallback;
- controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
+ return controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
+}
+
+void DeviceSubscription::StopSubscription()
+{
+ assertChipStackLockedByCurrentThread();
+ VerifyOrDie(mState != State::Idle);
+ // Something is seriously wrong if we die on the line below
+ VerifyOrDie(mState != State::AwaitingDestruction);
+
+ if (mState == State::Stopping)
+ {
+ // Stop is called again while we are still waiting on connected callbacks
+ return;
+ }
+
+ if (mState == State::Connecting)
+ {
+ MoveToState(State::Stopping);
+ return;
+ }
+
+ // By calling reset on our ReadClient we terminate the subscription.
+ VerifyOrDie(mClient);
+ mClient.reset();
+ // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
+ // DeviceSubscription.
+ MoveToState(State::AwaitingDestruction);
+ mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
}
diff --git a/examples/fabric-admin/device_manager/DeviceSubscription.h b/examples/fabric-admin/device_manager/DeviceSubscription.h
index 7a9e504..353eb30 100644
--- a/examples/fabric-admin/device_manager/DeviceSubscription.h
+++ b/examples/fabric-admin/device_manager/DeviceSubscription.h
@@ -26,6 +26,8 @@
#include "fabric_bridge_service/fabric_bridge_service.pb.h"
#include "fabric_bridge_service/fabric_bridge_service.rpc.pb.h"
+class DeviceSubscriptionManager;
+
/// Attribute subscription to attributes that are important to keep track and send to fabric-bridge
/// via RPC when change has been identified.
///
@@ -35,11 +37,18 @@
class DeviceSubscription : public chip::app::ReadClient::Callback
{
public:
+ using OnDoneCallback = std::function<void(chip::NodeId)>;
+
DeviceSubscription();
- /// Usually called after we have added a synchronized device to fabric-bridge to monitor
- /// for any changes that need to be propgated to fabric-bridge.
- void StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId);
+ CHIP_ERROR StartSubscription(OnDoneCallback onDoneCallback, chip::Controller::DeviceController & controller,
+ chip::NodeId nodeId);
+
+ /// This will trigger stopping the subscription. Once subscription is stopped the OnDoneCallback
+ /// provided in StartSubscription will be called to indicate that subscription have been terminated.
+ ///
+ /// Must only be called after StartSubscription was successfully called.
+ void StopSubscription();
///////////////////////////////////////////////////////////////
// ReadClient::Callback implementation
@@ -57,6 +66,19 @@
void OnDeviceConnectionFailure(const chip::ScopedNodeId & peerId, CHIP_ERROR error);
private:
+ enum class State : uint8_t
+ {
+ Idle, ///< Default state that the object starts out in, where no work has commenced
+ Connecting, ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks to be called
+ Stopping, ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks so we can terminate
+ SubscriptionStarted, ///< We have started a subscription.
+ AwaitingDestruction, ///< The object has completed its work and is awaiting destruction.
+ };
+
+ void MoveToState(const State aTargetState);
+ const char * GetStateStr() const;
+
+ OnDoneCallback mOnDoneCallback;
std::unique_ptr<chip::app::ReadClient> mClient;
chip::Callback::Callback<chip::OnDeviceConnected> mOnDeviceConnectedCallback;
@@ -64,7 +86,5 @@
chip_rpc_AdministratorCommissioningChanged mCurrentAdministratorCommissioningAttributes;
bool mChangeDetected = false;
- // Ensures that DeviceSubscription starts a subscription only once. If instance of
- // DeviceSubscription can be reused, the class documentation should be updated accordingly.
- bool mSubscriptionStarted = false;
+ State mState = State::Idle;
};
diff --git a/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp
new file mode 100644
index 0000000..c59fd98
--- /dev/null
+++ b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2024 Project CHIP Authors
+ * All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "DeviceSubscriptionManager.h"
+#include "rpc/RpcClient.h"
+
+#include <app/InteractionModelEngine.h>
+#include <app/server/Server.h>
+
+#include <app-common/zap-generated/ids/Attributes.h>
+#include <app-common/zap-generated/ids/Clusters.h>
+#include <device_manager/DeviceManager.h>
+
+using namespace ::chip;
+using namespace ::chip::app;
+
+DeviceSubscriptionManager & DeviceSubscriptionManager::Instance()
+{
+ static DeviceSubscriptionManager instance;
+ return instance;
+}
+
+CHIP_ERROR DeviceSubscriptionManager::StartSubscription(Controller::DeviceController & controller, NodeId nodeId)
+{
+ assertChipStackLockedByCurrentThread();
+ auto it = mDeviceSubscriptionMap.find(nodeId);
+ VerifyOrReturnError((it == mDeviceSubscriptionMap.end()), CHIP_ERROR_INCORRECT_STATE);
+
+ auto deviceSubscription = std::make_unique<DeviceSubscription>();
+ VerifyOrReturnError(deviceSubscription, CHIP_ERROR_NO_MEMORY);
+ ReturnErrorOnFailure(deviceSubscription->StartSubscription(
+ [this](NodeId aNodeId) { this->DeviceSubscriptionTerminated(aNodeId); }, controller, nodeId));
+
+ mDeviceSubscriptionMap[nodeId] = std::move(deviceSubscription);
+ return CHIP_NO_ERROR;
+}
+
+CHIP_ERROR DeviceSubscriptionManager::RemoveSubscription(chip::NodeId nodeId)
+{
+ assertChipStackLockedByCurrentThread();
+ auto it = mDeviceSubscriptionMap.find(nodeId);
+ VerifyOrReturnError((it != mDeviceSubscriptionMap.end()), CHIP_ERROR_NOT_FOUND);
+ // We cannot safely erase the DeviceSubscription from mDeviceSubscriptionMap.
+ // After calling StopSubscription we expect DeviceSubscription to eventually
+ // call the OnDoneCallback we provided in StartSubscription which will call
+ // DeviceSubscriptionTerminated where it will be erased from the
+ // mDeviceSubscriptionMap.
+ it->second->StopSubscription();
+ return CHIP_NO_ERROR;
+}
+
+void DeviceSubscriptionManager::DeviceSubscriptionTerminated(NodeId nodeId)
+{
+ assertChipStackLockedByCurrentThread();
+ auto it = mDeviceSubscriptionMap.find(nodeId);
+ // DeviceSubscriptionTerminated is a private method that is expected to only
+ // be called by DeviceSubscription when it is terminal and is ready to be
+ // cleaned up and removed. If it is not mapped that means something has gone
+ // really wrong and there is likely a memory leak somewhere.
+ VerifyOrDie(it != mDeviceSubscriptionMap.end());
+ mDeviceSubscriptionMap.erase(nodeId);
+}
diff --git a/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h
new file mode 100644
index 0000000..5f4e115
--- /dev/null
+++ b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2024 Project CHIP Authors
+ * All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include "DeviceSubscription.h"
+
+#include <app/ReadClient.h>
+#include <controller/CHIPDeviceController.h>
+#include <lib/core/DataModelTypes.h>
+
+#include <memory>
+
+class DeviceSubscriptionManager
+{
+public:
+ static DeviceSubscriptionManager & Instance();
+
+ /// Usually called after we have added a synchronized device to fabric-bridge to monitor
+ /// for any changes that need to be propagated to fabric-bridge.
+ CHIP_ERROR StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId);
+
+ CHIP_ERROR RemoveSubscription(chip::NodeId nodeId);
+
+private:
+ void DeviceSubscriptionTerminated(chip::NodeId nodeId);
+
+ std::unordered_map<chip::NodeId, std::unique_ptr<DeviceSubscription>> mDeviceSubscriptionMap;
+};
diff --git a/examples/fabric-admin/device_manager/DeviceSynchronization.cpp b/examples/fabric-admin/device_manager/DeviceSynchronization.cpp
index 3239f89..adbe0c0 100644
--- a/examples/fabric-admin/device_manager/DeviceSynchronization.cpp
+++ b/examples/fabric-admin/device_manager/DeviceSynchronization.cpp
@@ -17,6 +17,8 @@
*/
#include "DeviceSynchronization.h"
+
+#include "DeviceSubscriptionManager.h"
#include "rpc/RpcClient.h"
#include <app/InteractionModelEngine.h>
@@ -72,13 +74,6 @@
VerifyOrDie(path.mEndpointId == kRootEndpointId);
VerifyOrDie(path.mClusterId == Clusters::BasicInformation::Id);
- CHIP_ERROR error = status.ToChipError();
- if (CHIP_NO_ERROR != error)
- {
- ChipLogError(NotSpecified, "Response Failure: %" CHIP_ERROR_FORMAT, error.Format());
- return;
- }
-
switch (path.mAttributeId)
{
case Clusters::BasicInformation::Attributes::UniqueID::Id:
@@ -131,6 +126,17 @@
if (!DeviceMgr().IsCurrentBridgeDevice(mCurrentDeviceData.node_id))
{
AddSynchronizedDevice(mCurrentDeviceData);
+ // TODO(#35077) Figure out how we should reflect CADMIN values of ICD.
+ if (!mCurrentDeviceData.is_icd)
+ {
+ VerifyOrDie(mController);
+ // TODO(#35333) Figure out how we should recover in this circumstance.
+ CHIP_ERROR err = DeviceSubscriptionManager::Instance().StartSubscription(*mController, mCurrentDeviceData.node_id);
+ if (err != CHIP_NO_ERROR)
+ {
+ ChipLogError(NotSpecified, "Failed start subscription to ");
+ }
+ }
}
#else
ChipLogError(NotSpecified, "Cannot synchronize device with fabric bridge: RPC not enabled");
@@ -178,9 +184,10 @@
mDeviceSyncInProcess = false;
}
-void DeviceSynchronizer::StartDeviceSynchronization(chip::Controller::DeviceController & controller, chip::NodeId nodeId,
+void DeviceSynchronizer::StartDeviceSynchronization(chip::Controller::DeviceController * controller, chip::NodeId nodeId,
bool deviceIsIcd)
{
+ VerifyOrDie(controller);
if (mDeviceSyncInProcess)
{
ChipLogError(NotSpecified, "Device Sync NOT POSSIBLE: another sync is in progress");
@@ -194,5 +201,6 @@
mDeviceSyncInProcess = true;
- controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
+ mController = controller;
+ controller->GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
}
diff --git a/examples/fabric-admin/device_manager/DeviceSynchronization.h b/examples/fabric-admin/device_manager/DeviceSynchronization.h
index 6aca23f..c5a4237 100644
--- a/examples/fabric-admin/device_manager/DeviceSynchronization.h
+++ b/examples/fabric-admin/device_manager/DeviceSynchronization.h
@@ -40,7 +40,12 @@
/// Usually called after commissioning is complete, initiates a
/// read of required data from the remote node ID and then will synchronize
/// the device towards the fabric bridge
- void StartDeviceSynchronization(chip::Controller::DeviceController & controller, chip::NodeId nodeId, bool deviceIsIcd);
+ ///
+ /// @param controller Must be a non-null pointer. The DeviceController instance
+ /// pointed to must out live the entire device synchronization process.
+ /// @param nodeId Node ID of the device we need to syncronize data from.
+ /// @param deviceIsIcd If the device is an ICD device.
+ void StartDeviceSynchronization(chip::Controller::DeviceController * controller, chip::NodeId nodeId, bool deviceIsIcd);
///////////////////////////////////////////////////////////////
// ReadClient::Callback implementation
@@ -65,6 +70,9 @@
chip::Callback::Callback<chip::OnDeviceConnected> mOnDeviceConnectedCallback;
chip::Callback::Callback<chip::OnDeviceConnectionFailure> mOnDeviceConnectionFailureCallback;
- bool mDeviceSyncInProcess = false;
- chip_rpc_SynchronizedDevice mCurrentDeviceData = chip_rpc_SynchronizedDevice_init_default;
+ // mController is expected to remain valid throughout the entire device synchronization process (i.e. when
+ // mDeviceSyncInProcess is true).
+ chip::Controller::DeviceController * mController = nullptr;
+ bool mDeviceSyncInProcess = false;
+ chip_rpc_SynchronizedDevice mCurrentDeviceData = chip_rpc_SynchronizedDevice_init_default;
};