[ICD] Client-side device communication notification plumbing (#24061)
diff --git a/src/app/BufferedReadCallback.h b/src/app/BufferedReadCallback.h
index 6b2d31d..0bee4d3 100644
--- a/src/app/BufferedReadCallback.h
+++ b/src/app/BufferedReadCallback.h
@@ -107,6 +107,12 @@
{
return mCallback.GetHighestReceivedEventNumber(aEventNumber);
}
+
+ void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) override
+ {
+ return mCallback.OnUnsolicitedMessageFromPublisher(apReadClient);
+ }
+
/*
* Given a reader positioned at a list element, allocate a packet buffer, copy the list item where
* the reader is positioned into that buffer and add it to our buffered list for tracking.
diff --git a/src/app/ClusterStateCache.h b/src/app/ClusterStateCache.h
index 4b88b05..efc2cde 100644
--- a/src/app/ClusterStateCache.h
+++ b/src/app/ClusterStateCache.h
@@ -596,6 +596,11 @@
const Span<AttributePathParams> & aAttributePaths,
bool & aEncodedDataVersionList) override;
+ void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) override
+ {
+ return mCallback.OnUnsolicitedMessageFromPublisher(apReadClient);
+ }
+
// Commit the pending cluster data version, if there is one.
void CommitPendingDataVersion();
diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp
index 979831b..1f1c8aa 100644
--- a/src/app/InteractionModelEngine.cpp
+++ b/src/app/InteractionModelEngine.cpp
@@ -630,19 +630,37 @@
VerifyOrReturnError(report.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, Status::InvalidAction);
VerifyOrReturnError(report.ExitContainer() == CHIP_NO_ERROR, Status::InvalidAction);
+ ReadClient * foundSubscription = nullptr;
for (auto * readClient = mpActiveReadClientList; readClient != nullptr; readClient = readClient->GetNextClient())
{
+ auto peer = apExchangeContext->GetSessionHandle()->GetPeer();
+ if (readClient->GetFabricIndex() != peer.GetFabricIndex() || readClient->GetPeerNodeId() != peer.GetNodeId())
+ {
+ continue;
+ }
+
+ // Notify Subscriptions about incoming communication from node
+ readClient->OnUnsolicitedMessageFromPublisher();
+
if (!readClient->IsSubscriptionActive())
{
continue;
}
- auto peer = apExchangeContext->GetSessionHandle()->GetPeer();
- if (readClient->GetFabricIndex() != peer.GetFabricIndex() || readClient->GetPeerNodeId() != peer.GetNodeId() ||
- !readClient->IsMatchingSubscriptionId(subscriptionId))
+
+ if (!readClient->IsMatchingSubscriptionId(subscriptionId))
{
continue;
}
- readClient->OnUnsolicitedReportData(apExchangeContext, std::move(aPayload));
+
+ if (!foundSubscription)
+ {
+ foundSubscription = readClient;
+ }
+ }
+
+ if (foundSubscription)
+ {
+ foundSubscription->OnUnsolicitedReportData(apExchangeContext, std::move(aPayload));
return Status::Success;
}
diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp
index db9f6ae..971df45 100644
--- a/src/app/ReadClient.cpp
+++ b/src/app/ReadClient.cpp
@@ -64,6 +64,7 @@
mMinIntervalFloorSeconds = 0;
mMaxInterval = 0;
mSubscriptionId = 0;
+ mIsResubscriptionScheduled = false;
MoveToState(ClientState::Idle);
}
@@ -155,6 +156,7 @@
ReturnErrorOnFailure(
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this));
+ mIsResubscriptionScheduled = true;
return CHIP_NO_ERROR;
}
@@ -841,6 +843,7 @@
{
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
OnResubscribeTimerCallback, this);
+ mIsResubscriptionScheduled = false;
}
void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
@@ -1095,6 +1098,8 @@
ReadClient * const _this = static_cast<ReadClient *>(apAppState);
VerifyOrDie(_this != nullptr);
+ _this->mIsResubscriptionScheduled = false;
+
CHIP_ERROR err;
ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub);
diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h
index 59309da..4ea66c8 100644
--- a/src/app/ReadClient.h
+++ b/src/app/ReadClient.h
@@ -234,6 +234,21 @@
aEventNumber.ClearValue();
return CHIP_NO_ERROR;
}
+
+ /**
+ * OnUnsolicitedMessageFromPublisher will be called for a subscription
+ * ReadClient when any incoming message is received from a matching
+ * node on the fabric.
+ *
+ * This callback will be called:
+ * - When receiving any unsolicited communication from the node
+ * - Even for disconnected subscriptions.
+ *
+ * Callee MUST not synchronously destroy ReadClients in this callback.
+ *
+ * @param[in] apReadClient the ReadClient for the subscription.
+ */
+ virtual void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) {}
};
enum class InteractionType : uint8_t
@@ -288,6 +303,20 @@
void OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload);
+ void OnUnsolicitedMessageFromPublisher()
+ {
+ // accelerate resubscription if scheduled
+ if (mIsResubscriptionScheduled)
+ {
+ ChipLogDetail(DataManagement, "%s ReadClient[%p] resubscribe on unsolicited message", __func__, this);
+ CancelResubscribeTimer();
+ OnResubscribeTimerCallback(nullptr, this);
+ }
+
+ // Then notify callbacks
+ mpCallback.OnUnsolicitedMessageFromPublisher(this);
+ }
+
auto GetSubscriptionId() const
{
using returnType = Optional<decltype(mSubscriptionId)>;
@@ -501,7 +530,8 @@
InteractionType mInteractionType = InteractionType::Read;
Timestamp mEventTimestamp;
- bool mForceCaseOnNextResub = true;
+ bool mForceCaseOnNextResub = true;
+ bool mIsResubscriptionScheduled = false;
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
diff --git a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
index 69db806..4716bbc 100644
--- a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
+++ b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
@@ -55,12 +55,14 @@
typedef void (^ErrorCallback)(NSError * error);
typedef void (^SubscriptionEstablishedHandler)(void);
typedef void (^OnDoneHandler)(void);
+typedef void (^UnsolicitedMessageFromPublisherHandler)(void);
class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callback {
public:
MTRBaseSubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback,
- SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler)
+ SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler,
+ UnsolicitedMessageFromPublisherHandler _Nullable unsolicitedMessageFromPublisherHandler = NULL)
: mAttributeReportCallback(attributeReportCallback)
, mEventReportCallback(eventReportCallback)
, mErrorCallback(errorCallback)
@@ -68,6 +70,7 @@
, mSubscriptionEstablishedHandler(subscriptionEstablishedHandler)
, mBufferedReadAdapter(*this)
, mOnDoneHandler(onDoneHandler)
+ , mUnsolicitedMessageFromPublisherHandler(unsolicitedMessageFromPublisherHandler)
{
}
@@ -117,6 +120,8 @@
CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override;
+ void OnUnsolicitedMessageFromPublisher(chip::app::ReadClient * apReadClient) override;
+
void ReportData();
protected:
@@ -131,6 +136,7 @@
ErrorCallback _Nullable mErrorCallback = nil;
MTRDeviceResubscriptionScheduledHandler _Nullable mResubscriptionCallback = nil;
SubscriptionEstablishedHandler _Nullable mSubscriptionEstablishedHandler = nil;
+ UnsolicitedMessageFromPublisherHandler _Nullable mUnsolicitedMessageFromPublisherHandler = nil;
chip::app::BufferedReadCallback mBufferedReadAdapter;
// Our lifetime management is a little complicated. On errors that don't
diff --git a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
index abbd8ed..7e07720 100644
--- a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
+++ b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
@@ -16,6 +16,7 @@
#import "MTRBaseSubscriptionCallback.h"
#import "MTRError_Internal.h"
+#import "MTRLogging_Internal.h"
#include <platform/PlatformManager.h>
@@ -111,6 +112,14 @@
return CHIP_NO_ERROR;
}
+void MTRBaseSubscriptionCallback::OnUnsolicitedMessageFromPublisher(ReadClient *)
+{
+ if (mUnsolicitedMessageFromPublisherHandler) {
+ auto unsolicitedMessageFromPublisherHandler = mUnsolicitedMessageFromPublisherHandler;
+ unsolicitedMessageFromPublisherHandler();
+ }
+}
+
void MTRBaseSubscriptionCallback::ReportError(CHIP_ERROR aError, bool aCancelSubscription)
{
auto * err = [MTRError errorForCHIPErrorCode:aError];
diff --git a/src/darwin/Framework/CHIP/MTRDevice.h b/src/darwin/Framework/CHIP/MTRDevice.h
index e76ec45..15df1e9 100644
--- a/src/darwin/Framework/CHIP/MTRDevice.h
+++ b/src/darwin/Framework/CHIP/MTRDevice.h
@@ -193,6 +193,14 @@
*/
- (void)device:(MTRDevice *)device receivedEventReport:(NSArray<NSDictionary<NSString *, id> *> *)eventReport;
+@optional
+/**
+ * deviceStartedCommunicating:
+ *
+ * Notifies delegate the device is currently communicating
+ */
+- (void)didReceiveCommunicationFromDevice:(MTRDevice *)device;
+
@end
@interface MTRDevice (Deprecated)
diff --git a/src/darwin/Framework/CHIP/MTRDevice.mm b/src/darwin/Framework/CHIP/MTRDevice.mm
index 389a4ce..f4455ce 100644
--- a/src/darwin/Framework/CHIP/MTRDevice.mm
+++ b/src/darwin/Framework/CHIP/MTRDevice.mm
@@ -113,9 +113,10 @@
public:
SubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler resubscriptionCallback,
- SubscriptionEstablishedHandler subscriptionEstablishedHandler, OnDoneHandler onDoneHandler)
+ SubscriptionEstablishedHandler subscriptionEstablishedHandler, OnDoneHandler onDoneHandler,
+ UnsolicitedMessageFromPublisherHandler unsolicitedMessageFromPublisherHandler)
: MTRBaseSubscriptionCallback(attributeReportCallback, eventReportCallback, errorCallback, resubscriptionCallback,
- subscriptionEstablishedHandler, onDoneHandler)
+ subscriptionEstablishedHandler, onDoneHandler, unsolicitedMessageFromPublisherHandler)
{
}
@@ -211,6 +212,19 @@
os_unfair_lock_unlock(&self->_lock);
}
+// assume lock is held
+- (void)_changeState:(MTRDeviceState)state
+{
+ MTRDeviceState lastState = _state;
+ _state = state;
+ id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
+ if (delegate && (lastState != state)) {
+ dispatch_async(_delegateQueue, ^{
+ [delegate device:self stateChanged:state];
+ });
+ }
+}
+
- (void)_handleSubscriptionEstablished
{
os_unfair_lock_lock(&self->_lock);
@@ -218,13 +232,7 @@
// reset subscription attempt wait time when subscription succeeds
_lastSubscriptionAttemptWait = 0;
- _state = MTRDeviceStateReachable;
- id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
- if (delegate) {
- dispatch_async(_delegateQueue, ^{
- [delegate device:self stateChanged:MTRDeviceStateReachable];
- });
- }
+ [self _changeState:MTRDeviceStateReachable];
os_unfair_lock_unlock(&self->_lock);
}
@@ -236,12 +244,7 @@
_subscriptionActive = NO;
_unreportedEvents = nil;
- id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
- if (delegate) {
- dispatch_async(_delegateQueue, ^{
- [delegate device:self stateChanged:MTRDeviceStateUnreachable];
- });
- }
+ [self _changeState:MTRDeviceStateUnreachable];
os_unfair_lock_unlock(&self->_lock);
}
@@ -250,14 +253,7 @@
{
os_unfair_lock_lock(&self->_lock);
- _state = MTRDeviceStateUnknown;
-
- id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
- if (delegate) {
- dispatch_async(_delegateQueue, ^{
- [delegate device:self stateChanged:MTRDeviceStateUnknown];
- });
- }
+ [self _changeState:MTRDeviceStateUnknown];
os_unfair_lock_unlock(&self->_lock);
}
@@ -303,6 +299,26 @@
os_unfair_lock_unlock(&self->_lock);
}
+- (void)_handleUnsolicitedMessageFromPublisher
+{
+ os_unfair_lock_lock(&self->_lock);
+
+ [self _changeState:MTRDeviceStateReachable];
+
+ id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
+ if (delegate && [delegate respondsToSelector:@selector(didReceiveCommunicationFromDevice:)]) {
+ dispatch_async(_delegateQueue, ^{
+ [delegate didReceiveCommunicationFromDevice:self];
+ });
+ }
+
+ // in case this is called dyring exponential back off of subscription
+ // reestablishment, this starts the attempt right away
+ [self _setupSubscription];
+
+ os_unfair_lock_unlock(&self->_lock);
+}
+
// assume lock is held
- (void)_reportAttributes:(NSArray<NSDictionary<NSString *, id> *> *)attributes
{
@@ -444,6 +460,13 @@
// OnDone
[self _handleSubscriptionReset];
});
+ },
+ ^(void) {
+ MTR_LOG_INFO("%@ got unsolicited message from publisher", self);
+ dispatch_async(self.queue, ^{
+ // OnUnsolicitedMessageFromPublisher
+ [self _handleUnsolicitedMessageFromPublisher];
+ });
});
// Set up a cluster state cache. We really just want this for the