Fix liveness timer not firing for subscriptions (#21693)

diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp
index 19e1ced..7ba7e9b 100644
--- a/src/app/ReadClient.cpp
+++ b/src/app/ReadClient.cpp
@@ -458,6 +458,16 @@
     Status status = Status::Success;
     mExchange.Grab(apExchangeContext);
 
+    //
+    // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received.
+    // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid
+    // session to us, of which there could be multiple.
+    //
+    // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible
+    // to maximize our chances of success later.
+    //
+    mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle());
+
     CHIP_ERROR err = ProcessReportData(std::move(aPayload));
     if (err != CHIP_NO_ERROR)
     {
@@ -576,14 +586,14 @@
         {
             MoveToState(ClientState::AwaitingSubscribeResponse);
         }
-        else if (IsSubscriptionActive())
+        else if (IsSubscriptionActive() && err == CHIP_NO_ERROR)
         {
             //
             // Only refresh the liveness check timer if we've successfully established
             // a subscription and have a valid value for mMaxInterval which the function
             // relies on.
             //
-            RefreshLivenessCheckTimer();
+            err = RefreshLivenessCheckTimer();
         }
     }
 
@@ -753,7 +763,11 @@
 void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout)
 {
     mLivenessTimeoutOverride = aLivenessTimeout;
-    RefreshLivenessCheckTimer();
+    auto err                 = RefreshLivenessCheckTimer();
+    if (err != CHIP_NO_ERROR)
+    {
+        Close(err);
+    }
 }
 
 CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
@@ -784,11 +798,6 @@
     err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
         timeout, OnLivenessTimeoutCallback, this);
 
-    if (err != CHIP_NO_ERROR)
-    {
-        Close(err);
-    }
-
     return err;
 }
 
@@ -854,7 +863,7 @@
 
     mNumRetries = 0;
 
-    RefreshLivenessCheckTimer();
+    ReturnErrorOnFailure(RefreshLivenessCheckTimer());
 
     return CHIP_NO_ERROR;
 }
@@ -874,6 +883,7 @@
 {
     VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
                         CHIP_ERROR_INVALID_ARGUMENT);
+
     return SendSubscribeRequestImpl(aReadPrepareParams);
 }
 
@@ -881,6 +891,11 @@
 {
     VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE);
 
+    if (&aReadPrepareParams != &mReadPrepareParams)
+    {
+        mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder;
+    }
+
     mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds;
 
     // Todo: Remove the below, Update span in ReadPrepareParams
diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h
index 2b161a7..0f5a72c 100644
--- a/src/app/ReadClient.h
+++ b/src/app/ReadClient.h
@@ -516,6 +516,12 @@
 
     ReadClient * mpNext                 = nullptr;
     InteractionModelEngine * mpImEngine = nullptr;
+
+    //
+    // This stores the params associated with the interaction in a specific set of cases:
+    //      1. Stores all parameters when used with subscriptions initiated using SendAutoResubscribeRequest.
+    //      2. Stores just the SessionHolder when used with any subscriptions.
+    //
     ReadPrepareParams mReadPrepareParams;
     uint32_t mNumRetries = 0;
 
diff --git a/src/app/tests/TestReadInteraction.cpp b/src/app/tests/TestReadInteraction.cpp
index 660cd8e..1123f5c 100644
--- a/src/app/tests/TestReadInteraction.cpp
+++ b/src/app/tests/TestReadInteraction.cpp
@@ -1752,7 +1752,7 @@
     readPrepareParams.mAttributePathParamsListSize = 2;
 
     readPrepareParams.mMinIntervalFloorSeconds   = 0;
-    readPrepareParams.mMaxIntervalCeilingSeconds = 0;
+    readPrepareParams.mMaxIntervalCeilingSeconds = 1;
     printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
 
     {
@@ -1857,7 +1857,7 @@
     readPrepareParams.mAttributePathParamsListSize = 1;
 
     readPrepareParams.mMinIntervalFloorSeconds   = 0;
-    readPrepareParams.mMaxIntervalCeilingSeconds = 0;
+    readPrepareParams.mMaxIntervalCeilingSeconds = 1;
     printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
 
     {
@@ -1933,7 +1933,7 @@
     readPrepareParams.mAttributePathParamsListSize = 1;
 
     readPrepareParams.mMinIntervalFloorSeconds   = 0;
-    readPrepareParams.mMaxIntervalCeilingSeconds = 0;
+    readPrepareParams.mMaxIntervalCeilingSeconds = 1;
     printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
 
     {
@@ -2059,7 +2059,7 @@
 
     readPrepareParams.mSessionHolder.Grab(ctx.GetSessionBobToAlice());
     readPrepareParams.mMinIntervalFloorSeconds   = 0;
-    readPrepareParams.mMaxIntervalCeilingSeconds = 0;
+    readPrepareParams.mMaxIntervalCeilingSeconds = 1;
     printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
 
     {
diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp
index 0dda7e3..cc314e6 100644
--- a/src/controller/tests/data_model/TestRead.cpp
+++ b/src/controller/tests/data_model/TestRead.cpp
@@ -246,6 +246,7 @@
     static void TestReadAttributeError(nlTestSuite * apSuite, void * apContext);
     static void TestReadAttributeTimeout(nlTestSuite * apSuite, void * apContext);
     static void TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext);
+    static void TestResubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext);
     static void TestReadEventResponse(nlTestSuite * apSuite, void * apContext);
     static void TestReadFabricScopedWithoutFabricFilter(nlTestSuite * apSuite, void * apContext);
     static void TestReadFabricScopedWithFabricFilter(nlTestSuite * apSuite, void * apContext);
@@ -1488,20 +1489,12 @@
         mLastError = aError;
     }
 
-    void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override
-    {
-        mOnSubscriptionEstablishedCount++;
-
-        //
-        // Set the liveness timeout to a super small number that isn't 0 to
-        // force the liveness timeout to fire.
-        //
-        mpReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(10));
-    }
+    void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablishedCount++; }
 
     CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override
     {
         mOnResubscriptionsAttempted++;
+        mLastError = aTerminationCause;
         return apReadClient->ScheduleResubscription(apReadClient->ComputeTimeTillNextSubscription(), NullOptional, false);
     }
 
@@ -1532,11 +1525,13 @@
 // TODO: This does not validate the CASE establishment pathways since we're limited by the PASE-centric TestContext.
 //
 //
-void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
+void TestReadInteraction::TestResubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
 {
     TestContext & ctx  = *static_cast<TestContext *>(apContext);
     auto sessionHandle = ctx.GetSessionBobToAlice();
 
+    ctx.SetMRPMode(Test::MessagingContext::MRPMode::kResponsive);
+
     {
         TestResubscriptionCallback callback;
         app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback,
@@ -1556,27 +1551,115 @@
 
         readPrepareParams.mMaxIntervalCeilingSeconds = 1;
 
-        readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
+        auto err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
+        NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
 
         //
-        // Drive servicing IO till we have established a subscription at least 2 times.
+        // Drive servicing IO till we have established a subscription.
         //
-        ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(2),
-                                        [&]() { return callback.mOnSubscriptionEstablishedCount > 1; });
-
-        NL_TEST_ASSERT(apSuite, callback.mOnDone == 0);
+        ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
+                                        [&]() { return callback.mOnSubscriptionEstablishedCount >= 1; });
+        NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);
+        NL_TEST_ASSERT(apSuite, callback.mOnError == 0);
+        NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 0);
 
         //
-        // With re-sub enabled, we shouldn't encounter any errors.
+        // Disable packet transmission, and drive IO till we have reported a re-subscription attempt.
+        //
+        // 1.5s should cover the liveness timeout in the client of 1s max interval + 50ms ACK timeout.
+        //
+        ctx.GetLoopback().mNumMessagesToDrop = Test::LoopbackTransport::kUnlimitedMessageCount;
+        ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(1500),
+                                        [&]() { return callback.mOnResubscriptionsAttempted > 0; });
+
+        NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1);
+        NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_ERROR_TIMEOUT);
+
+        ctx.GetLoopback().mNumMessagesToDrop = 0;
+        callback.ClearCounters();
+
+        //
+        // Drive servicing IO till we have established a subscription.
+        //
+        ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
+                                        [&]() { return callback.mOnSubscriptionEstablishedCount == 1; });
+        NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);
+
+        //
+        // With re-sub enabled, we shouldn't have encountered any errors
         //
         NL_TEST_ASSERT(apSuite, callback.mOnError == 0);
+        NL_TEST_ASSERT(apSuite, callback.mOnDone == 0);
+    }
+
+    ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);
+
+    app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
+    NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
+}
+
+//
+// This validates a vanilla subscription with re-susbcription disabled timing out correctly on the client
+// side and triggering the OnError callback with the right error code.
+//
+void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
+{
+    TestContext & ctx  = *static_cast<TestContext *>(apContext);
+    auto sessionHandle = ctx.GetSessionBobToAlice();
+
+    ctx.SetMRPMode(Test::MessagingContext::MRPMode::kResponsive);
+
+    {
+        TestResubscriptionCallback callback;
+        app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback,
+                                   app::ReadClient::InteractionType::Subscribe);
+
+        callback.SetReadClient(&readClient);
+
+        app::ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
+
+        app::AttributePathParams attributePathParams[1];
+        readPrepareParams.mpAttributePathParamsList    = attributePathParams;
+        readPrepareParams.mAttributePathParamsListSize = ArraySize(attributePathParams);
+        attributePathParams[0].mClusterId              = app::Clusters::TestCluster::Id;
+        attributePathParams[0].mAttributeId            = app::Clusters::TestCluster::Attributes::Boolean::Id;
 
         //
-        // We should have attempted just one re-subscription.
+        // Request a max interval that's very small to reduce time to discovering a liveness failure.
         //
-        NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1);
+        readPrepareParams.mMaxIntervalCeilingSeconds = 1;
+
+        auto err = readClient.SendRequest(readPrepareParams);
+        NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
+
+        //
+        // Drive servicing IO till we have established a subscription.
+        //
+        ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
+                                        [&]() { return callback.mOnSubscriptionEstablishedCount >= 1; });
+        NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);
+
+        //
+        // Request we drop all further messages.
+        //
+        ctx.GetLoopback().mNumMessagesToDrop = Test::LoopbackTransport::kUnlimitedMessageCount;
+
+        //
+        // Drive IO until we get an error on the subscription, which should be caused
+        // by the liveness timer firing within ~1s of the establishment of the subscription.
+        //
+        // 1.5s should cover the liveness timeout in the client of 1s max interval + 50ms ACK timeout.
+        //
+        ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(1500), [&]() { return callback.mOnError >= 1; });
+
+        NL_TEST_ASSERT(apSuite, callback.mOnError == 1);
+        NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_ERROR_TIMEOUT);
+        NL_TEST_ASSERT(apSuite, callback.mOnDone == 1);
+        NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 0);
     }
 
+    ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);
+
     app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
     NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
 }
@@ -1645,6 +1728,7 @@
     NL_TEST_ASSERT(apSuite, gTestReadInteraction.mNumActiveSubscriptions == 0);
     NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
 
+    ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);
     app::InteractionModelEngine::GetInstance()->UnregisterReadHandlerAppCallback();
 }
 
@@ -4334,6 +4418,7 @@
     NL_TEST_DEF("TestReadAttribute_ManyDataValues", TestReadInteraction::TestReadAttribute_ManyDataValues),
     NL_TEST_DEF("TestReadAttribute_ManyDataValuesWrongPath", TestReadInteraction::TestReadAttribute_ManyDataValuesWrongPath),
     NL_TEST_DEF("TestReadAttribute_ManyErrors", TestReadInteraction::TestReadAttribute_ManyErrors),
+    NL_TEST_DEF("TestResubscribeAttributeTimeout", TestReadInteraction::TestResubscribeAttributeTimeout),
     NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout),
     NL_TEST_SENTINEL()
 };
diff --git a/src/messaging/tests/MessagingContext.cpp b/src/messaging/tests/MessagingContext.cpp
index c31e66b..00fd532 100644
--- a/src/messaging/tests/MessagingContext.cpp
+++ b/src/messaging/tests/MessagingContext.cpp
@@ -16,6 +16,7 @@
  */
 
 #include "MessagingContext.h"
+#include "system/SystemClock.h"
 
 #include <credentials/tests/CHIPCert_unit_test_vectors.h>
 #include <lib/support/CodeUtils.h>
@@ -99,6 +100,28 @@
     existing.mTransport->SetSessionManager(&existing.GetSecureSessionManager());
 }
 
+void MessagingContext::SetMRPMode(MRPMode mode)
+{
+    if (mode == MRPMode::kDefault)
+    {
+        mSessionBobToAlice->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
+        mSessionAliceToBob->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
+        mSessionCharlieToDavid->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
+        mSessionDavidToCharlie->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
+    }
+    else
+    {
+        mSessionBobToAlice->AsSecureSession()->SetRemoteMRPConfig(
+            ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
+        mSessionAliceToBob->AsSecureSession()->SetRemoteMRPConfig(
+            ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
+        mSessionCharlieToDavid->AsSecureSession()->SetRemoteMRPConfig(
+            ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
+        mSessionDavidToCharlie->AsSecureSession()->SetRemoteMRPConfig(
+            ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
+    }
+}
+
 CHIP_ERROR MessagingContext::CreateSessionBobToAlice()
 {
     return mSessionManager.InjectPaseSessionWithTestKey(mSessionBobToAlice, kBobKeyId, GetAliceFabric()->GetNodeId(), kAliceKeyId,
diff --git a/src/messaging/tests/MessagingContext.h b/src/messaging/tests/MessagingContext.h
index 559a046..f4108ff 100644
--- a/src/messaging/tests/MessagingContext.h
+++ b/src/messaging/tests/MessagingContext.h
@@ -73,6 +73,17 @@
 class MessagingContext : public PlatformMemoryUser
 {
 public:
+    enum MRPMode
+    {
+        kDefault = 1, // This adopts the default MRP values for idle/active as per the spec.
+                      //      i.e IDLE = 4s, ACTIVE = 300ms
+
+        kResponsive = 2, // This adopts values that are better suited for loopback tests that
+                         // don't actually go over a network interface, and are tuned much lower
+                         // to permit more responsive tests.
+                         //      i.e IDLE = 10ms, ACTIVE = 10ms
+    };
+
     MessagingContext() :
         mInitialized(false), mAliceAddress(Transport::PeerAddress::UDP(GetAddress(), CHIP_PORT + 1)),
         mBobAddress(Transport::PeerAddress::UDP(GetAddress(), CHIP_PORT))
@@ -129,6 +140,8 @@
     void ExpireSessionAliceToBob();
     void ExpireSessionBobToFriends();
 
+    void SetMRPMode(MRPMode mode);
+
     SessionHandle GetSessionBobToAlice();
     SessionHandle GetSessionAliceToBob();
     SessionHandle GetSessionCharlieToDavid();
diff --git a/src/transport/raw/tests/NetworkTestHelpers.h b/src/transport/raw/tests/NetworkTestHelpers.h
index cde0746..de459cb 100644
--- a/src/transport/raw/tests/NetworkTestHelpers.h
+++ b/src/transport/raw/tests/NetworkTestHelpers.h
@@ -99,6 +99,8 @@
         }
     }
 
+    static constexpr uint32_t kUnlimitedMessageCount = std::numeric_limits<uint32_t>::max();
+
     CHIP_ERROR SendMessage(const Transport::PeerAddress & address, System::PacketBufferHandle && msgBuf) override
     {
         ReturnErrorOnFailure(mMessageSendError);
@@ -116,6 +118,7 @@
 
         if (dropMessage)
         {
+            ChipLogProgress(Test, "Dropping message...");
             mDroppedMessageCount++;
             if (mDelegate != nullptr)
             {