[ICD] Client-side device communication notification plumbing (#24061)

diff --git a/src/app/BufferedReadCallback.h b/src/app/BufferedReadCallback.h
index 6b2d31d..0bee4d3 100644
--- a/src/app/BufferedReadCallback.h
+++ b/src/app/BufferedReadCallback.h
@@ -107,6 +107,12 @@
     {
         return mCallback.GetHighestReceivedEventNumber(aEventNumber);
     }
+
+    void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) override
+    {
+        return mCallback.OnUnsolicitedMessageFromPublisher(apReadClient);
+    }
+
     /*
      * Given a reader positioned at a list element, allocate a packet buffer, copy the list item where
      * the reader is positioned into that buffer and add it to our buffered list for tracking.
diff --git a/src/app/ClusterStateCache.h b/src/app/ClusterStateCache.h
index 4b88b05..efc2cde 100644
--- a/src/app/ClusterStateCache.h
+++ b/src/app/ClusterStateCache.h
@@ -596,6 +596,11 @@
                                                      const Span<AttributePathParams> & aAttributePaths,
                                                      bool & aEncodedDataVersionList) override;
 
+    void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) override
+    {
+        return mCallback.OnUnsolicitedMessageFromPublisher(apReadClient);
+    }
+
     // Commit the pending cluster data version, if there is one.
     void CommitPendingDataVersion();
 
diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp
index 979831b..1f1c8aa 100644
--- a/src/app/InteractionModelEngine.cpp
+++ b/src/app/InteractionModelEngine.cpp
@@ -630,19 +630,37 @@
     VerifyOrReturnError(report.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, Status::InvalidAction);
     VerifyOrReturnError(report.ExitContainer() == CHIP_NO_ERROR, Status::InvalidAction);
 
+    ReadClient * foundSubscription = nullptr;
     for (auto * readClient = mpActiveReadClientList; readClient != nullptr; readClient = readClient->GetNextClient())
     {
+        auto peer = apExchangeContext->GetSessionHandle()->GetPeer();
+        if (readClient->GetFabricIndex() != peer.GetFabricIndex() || readClient->GetPeerNodeId() != peer.GetNodeId())
+        {
+            continue;
+        }
+
+        // Notify Subscriptions about incoming communication from node
+        readClient->OnUnsolicitedMessageFromPublisher();
+
         if (!readClient->IsSubscriptionActive())
         {
             continue;
         }
-        auto peer = apExchangeContext->GetSessionHandle()->GetPeer();
-        if (readClient->GetFabricIndex() != peer.GetFabricIndex() || readClient->GetPeerNodeId() != peer.GetNodeId() ||
-            !readClient->IsMatchingSubscriptionId(subscriptionId))
+
+        if (!readClient->IsMatchingSubscriptionId(subscriptionId))
         {
             continue;
         }
-        readClient->OnUnsolicitedReportData(apExchangeContext, std::move(aPayload));
+
+        if (!foundSubscription)
+        {
+            foundSubscription = readClient;
+        }
+    }
+
+    if (foundSubscription)
+    {
+        foundSubscription->OnUnsolicitedReportData(apExchangeContext, std::move(aPayload));
         return Status::Success;
     }
 
diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp
index db9f6ae..971df45 100644
--- a/src/app/ReadClient.cpp
+++ b/src/app/ReadClient.cpp
@@ -64,6 +64,7 @@
     mMinIntervalFloorSeconds      = 0;
     mMaxInterval                  = 0;
     mSubscriptionId               = 0;
+    mIsResubscriptionScheduled    = false;
     MoveToState(ClientState::Idle);
 }
 
@@ -155,6 +156,7 @@
     ReturnErrorOnFailure(
         InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
             System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this));
+    mIsResubscriptionScheduled = true;
 
     return CHIP_NO_ERROR;
 }
@@ -841,6 +843,7 @@
 {
     InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
         OnResubscribeTimerCallback, this);
+    mIsResubscriptionScheduled = false;
 }
 
 void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
@@ -1095,6 +1098,8 @@
     ReadClient * const _this = static_cast<ReadClient *>(apAppState);
     VerifyOrDie(_this != nullptr);
 
+    _this->mIsResubscriptionScheduled = false;
+
     CHIP_ERROR err;
 
     ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub);
diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h
index 59309da..4ea66c8 100644
--- a/src/app/ReadClient.h
+++ b/src/app/ReadClient.h
@@ -234,6 +234,21 @@
             aEventNumber.ClearValue();
             return CHIP_NO_ERROR;
         }
+
+        /**
+         * OnUnsolicitedMessageFromPublisher will be called for a subscription
+         * ReadClient when any incoming message is received from a matching
+         * node on the fabric.
+         *
+         * This callback will be called:
+         *   - When receiving any unsolicited communication from the node
+         *   - Even for disconnected subscriptions.
+         *
+         * Callee MUST not synchronously destroy ReadClients in this callback.
+         *
+         * @param[in] apReadClient the ReadClient for the subscription.
+         */
+        virtual void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) {}
     };
 
     enum class InteractionType : uint8_t
@@ -288,6 +303,20 @@
 
     void OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload);
 
+    void OnUnsolicitedMessageFromPublisher()
+    {
+        // accelerate resubscription if scheduled
+        if (mIsResubscriptionScheduled)
+        {
+            ChipLogDetail(DataManagement, "%s ReadClient[%p] resubscribe on unsolicited message", __func__, this);
+            CancelResubscribeTimer();
+            OnResubscribeTimerCallback(nullptr, this);
+        }
+
+        // Then notify callbacks
+        mpCallback.OnUnsolicitedMessageFromPublisher(this);
+    }
+
     auto GetSubscriptionId() const
     {
         using returnType = Optional<decltype(mSubscriptionId)>;
@@ -501,7 +530,8 @@
     InteractionType mInteractionType = InteractionType::Read;
     Timestamp mEventTimestamp;
 
-    bool mForceCaseOnNextResub = true;
+    bool mForceCaseOnNextResub      = true;
+    bool mIsResubscriptionScheduled = false;
 
     chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
     chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
diff --git a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
index 69db806..4716bbc 100644
--- a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
+++ b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
@@ -55,12 +55,14 @@
 typedef void (^ErrorCallback)(NSError * error);
 typedef void (^SubscriptionEstablishedHandler)(void);
 typedef void (^OnDoneHandler)(void);
+typedef void (^UnsolicitedMessageFromPublisherHandler)(void);
 
 class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callback {
 public:
     MTRBaseSubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
         ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback,
-        SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler)
+        SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler,
+        UnsolicitedMessageFromPublisherHandler _Nullable unsolicitedMessageFromPublisherHandler = NULL)
         : mAttributeReportCallback(attributeReportCallback)
         , mEventReportCallback(eventReportCallback)
         , mErrorCallback(errorCallback)
@@ -68,6 +70,7 @@
         , mSubscriptionEstablishedHandler(subscriptionEstablishedHandler)
         , mBufferedReadAdapter(*this)
         , mOnDoneHandler(onDoneHandler)
+        , mUnsolicitedMessageFromPublisherHandler(unsolicitedMessageFromPublisherHandler)
     {
     }
 
@@ -117,6 +120,8 @@
 
     CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override;
 
+    void OnUnsolicitedMessageFromPublisher(chip::app::ReadClient * apReadClient) override;
+
     void ReportData();
 
 protected:
@@ -131,6 +136,7 @@
     ErrorCallback _Nullable mErrorCallback = nil;
     MTRDeviceResubscriptionScheduledHandler _Nullable mResubscriptionCallback = nil;
     SubscriptionEstablishedHandler _Nullable mSubscriptionEstablishedHandler = nil;
+    UnsolicitedMessageFromPublisherHandler _Nullable mUnsolicitedMessageFromPublisherHandler = nil;
     chip::app::BufferedReadCallback mBufferedReadAdapter;
 
     // Our lifetime management is a little complicated.  On errors that don't
diff --git a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
index abbd8ed..7e07720 100644
--- a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
+++ b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
@@ -16,6 +16,7 @@
 
 #import "MTRBaseSubscriptionCallback.h"
 #import "MTRError_Internal.h"
+#import "MTRLogging_Internal.h"
 
 #include <platform/PlatformManager.h>
 
@@ -111,6 +112,14 @@
     return CHIP_NO_ERROR;
 }
 
+void MTRBaseSubscriptionCallback::OnUnsolicitedMessageFromPublisher(ReadClient *)
+{
+    if (mUnsolicitedMessageFromPublisherHandler) {
+        auto unsolicitedMessageFromPublisherHandler = mUnsolicitedMessageFromPublisherHandler;
+        unsolicitedMessageFromPublisherHandler();
+    }
+}
+
 void MTRBaseSubscriptionCallback::ReportError(CHIP_ERROR aError, bool aCancelSubscription)
 {
     auto * err = [MTRError errorForCHIPErrorCode:aError];
diff --git a/src/darwin/Framework/CHIP/MTRDevice.h b/src/darwin/Framework/CHIP/MTRDevice.h
index e76ec45..15df1e9 100644
--- a/src/darwin/Framework/CHIP/MTRDevice.h
+++ b/src/darwin/Framework/CHIP/MTRDevice.h
@@ -193,6 +193,14 @@
  */
 - (void)device:(MTRDevice *)device receivedEventReport:(NSArray<NSDictionary<NSString *, id> *> *)eventReport;
 
+@optional
+/**
+ * deviceStartedCommunicating:
+ *
+ * Notifies delegate the device is currently communicating
+ */
+- (void)didReceiveCommunicationFromDevice:(MTRDevice *)device;
+
 @end
 
 @interface MTRDevice (Deprecated)
diff --git a/src/darwin/Framework/CHIP/MTRDevice.mm b/src/darwin/Framework/CHIP/MTRDevice.mm
index 389a4ce..f4455ce 100644
--- a/src/darwin/Framework/CHIP/MTRDevice.mm
+++ b/src/darwin/Framework/CHIP/MTRDevice.mm
@@ -113,9 +113,10 @@
 public:
     SubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
         ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler resubscriptionCallback,
-        SubscriptionEstablishedHandler subscriptionEstablishedHandler, OnDoneHandler onDoneHandler)
+        SubscriptionEstablishedHandler subscriptionEstablishedHandler, OnDoneHandler onDoneHandler,
+        UnsolicitedMessageFromPublisherHandler unsolicitedMessageFromPublisherHandler)
         : MTRBaseSubscriptionCallback(attributeReportCallback, eventReportCallback, errorCallback, resubscriptionCallback,
-            subscriptionEstablishedHandler, onDoneHandler)
+            subscriptionEstablishedHandler, onDoneHandler, unsolicitedMessageFromPublisherHandler)
     {
     }
 
@@ -211,6 +212,19 @@
     os_unfair_lock_unlock(&self->_lock);
 }
 
+// assume lock is held
+- (void)_changeState:(MTRDeviceState)state
+{
+    MTRDeviceState lastState = _state;
+    _state = state;
+    id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
+    if (delegate && (lastState != state)) {
+        dispatch_async(_delegateQueue, ^{
+            [delegate device:self stateChanged:state];
+        });
+    }
+}
+
 - (void)_handleSubscriptionEstablished
 {
     os_unfair_lock_lock(&self->_lock);
@@ -218,13 +232,7 @@
     // reset subscription attempt wait time when subscription succeeds
     _lastSubscriptionAttemptWait = 0;
 
-    _state = MTRDeviceStateReachable;
-    id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
-    if (delegate) {
-        dispatch_async(_delegateQueue, ^{
-            [delegate device:self stateChanged:MTRDeviceStateReachable];
-        });
-    }
+    [self _changeState:MTRDeviceStateReachable];
 
     os_unfair_lock_unlock(&self->_lock);
 }
@@ -236,12 +244,7 @@
     _subscriptionActive = NO;
     _unreportedEvents = nil;
 
-    id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
-    if (delegate) {
-        dispatch_async(_delegateQueue, ^{
-            [delegate device:self stateChanged:MTRDeviceStateUnreachable];
-        });
-    }
+    [self _changeState:MTRDeviceStateUnreachable];
 
     os_unfair_lock_unlock(&self->_lock);
 }
@@ -250,14 +253,7 @@
 {
     os_unfair_lock_lock(&self->_lock);
 
-    _state = MTRDeviceStateUnknown;
-
-    id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
-    if (delegate) {
-        dispatch_async(_delegateQueue, ^{
-            [delegate device:self stateChanged:MTRDeviceStateUnknown];
-        });
-    }
+    [self _changeState:MTRDeviceStateUnknown];
 
     os_unfair_lock_unlock(&self->_lock);
 }
@@ -303,6 +299,26 @@
     os_unfair_lock_unlock(&self->_lock);
 }
 
+- (void)_handleUnsolicitedMessageFromPublisher
+{
+    os_unfair_lock_lock(&self->_lock);
+
+    [self _changeState:MTRDeviceStateReachable];
+
+    id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
+    if (delegate && [delegate respondsToSelector:@selector(didReceiveCommunicationFromDevice:)]) {
+        dispatch_async(_delegateQueue, ^{
+            [delegate didReceiveCommunicationFromDevice:self];
+        });
+    }
+
+    // in case this is called dyring exponential back off of subscription
+    // reestablishment, this starts the attempt right away
+    [self _setupSubscription];
+
+    os_unfair_lock_unlock(&self->_lock);
+}
+
 // assume lock is held
 - (void)_reportAttributes:(NSArray<NSDictionary<NSString *, id> *> *)attributes
 {
@@ -444,6 +460,13 @@
                                               // OnDone
                                               [self _handleSubscriptionReset];
                                           });
+                                      },
+                                      ^(void) {
+                                          MTR_LOG_INFO("%@ got unsolicited message from publisher", self);
+                                          dispatch_async(self.queue, ^{
+                                              // OnUnsolicitedMessageFromPublisher
+                                              [self _handleUnsolicitedMessageFromPublisher];
+                                          });
                                       });
 
                                   // Set up a cluster state cache.  We really just want this for the