Fix liveness timer not firing for subscriptions (#21693)
diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp
index 19e1ced..7ba7e9b 100644
--- a/src/app/ReadClient.cpp
+++ b/src/app/ReadClient.cpp
@@ -458,6 +458,16 @@
Status status = Status::Success;
mExchange.Grab(apExchangeContext);
+ //
+ // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received.
+ // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid
+ // session to us, of which there could be multiple.
+ //
+ // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible
+ // to maximize our chances of success later.
+ //
+ mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle());
+
CHIP_ERROR err = ProcessReportData(std::move(aPayload));
if (err != CHIP_NO_ERROR)
{
@@ -576,14 +586,14 @@
{
MoveToState(ClientState::AwaitingSubscribeResponse);
}
- else if (IsSubscriptionActive())
+ else if (IsSubscriptionActive() && err == CHIP_NO_ERROR)
{
//
// Only refresh the liveness check timer if we've successfully established
// a subscription and have a valid value for mMaxInterval which the function
// relies on.
//
- RefreshLivenessCheckTimer();
+ err = RefreshLivenessCheckTimer();
}
}
@@ -753,7 +763,11 @@
void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout)
{
mLivenessTimeoutOverride = aLivenessTimeout;
- RefreshLivenessCheckTimer();
+ auto err = RefreshLivenessCheckTimer();
+ if (err != CHIP_NO_ERROR)
+ {
+ Close(err);
+ }
}
CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
@@ -784,11 +798,6 @@
err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
timeout, OnLivenessTimeoutCallback, this);
- if (err != CHIP_NO_ERROR)
- {
- Close(err);
- }
-
return err;
}
@@ -854,7 +863,7 @@
mNumRetries = 0;
- RefreshLivenessCheckTimer();
+ ReturnErrorOnFailure(RefreshLivenessCheckTimer());
return CHIP_NO_ERROR;
}
@@ -874,6 +883,7 @@
{
VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
CHIP_ERROR_INVALID_ARGUMENT);
+
return SendSubscribeRequestImpl(aReadPrepareParams);
}
@@ -881,6 +891,11 @@
{
VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE);
+ if (&aReadPrepareParams != &mReadPrepareParams)
+ {
+ mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder;
+ }
+
mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds;
// Todo: Remove the below, Update span in ReadPrepareParams
diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h
index 2b161a7..0f5a72c 100644
--- a/src/app/ReadClient.h
+++ b/src/app/ReadClient.h
@@ -516,6 +516,12 @@
ReadClient * mpNext = nullptr;
InteractionModelEngine * mpImEngine = nullptr;
+
+ //
+ // This stores the params associated with the interaction in a specific set of cases:
+ // 1. Stores all parameters when used with subscriptions initiated using SendAutoResubscribeRequest.
+ // 2. Stores just the SessionHolder when used with any subscriptions.
+ //
ReadPrepareParams mReadPrepareParams;
uint32_t mNumRetries = 0;
diff --git a/src/app/tests/TestReadInteraction.cpp b/src/app/tests/TestReadInteraction.cpp
index 660cd8e..1123f5c 100644
--- a/src/app/tests/TestReadInteraction.cpp
+++ b/src/app/tests/TestReadInteraction.cpp
@@ -1752,7 +1752,7 @@
readPrepareParams.mAttributePathParamsListSize = 2;
readPrepareParams.mMinIntervalFloorSeconds = 0;
- readPrepareParams.mMaxIntervalCeilingSeconds = 0;
+ readPrepareParams.mMaxIntervalCeilingSeconds = 1;
printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
{
@@ -1857,7 +1857,7 @@
readPrepareParams.mAttributePathParamsListSize = 1;
readPrepareParams.mMinIntervalFloorSeconds = 0;
- readPrepareParams.mMaxIntervalCeilingSeconds = 0;
+ readPrepareParams.mMaxIntervalCeilingSeconds = 1;
printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
{
@@ -1933,7 +1933,7 @@
readPrepareParams.mAttributePathParamsListSize = 1;
readPrepareParams.mMinIntervalFloorSeconds = 0;
- readPrepareParams.mMaxIntervalCeilingSeconds = 0;
+ readPrepareParams.mMaxIntervalCeilingSeconds = 1;
printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
{
@@ -2059,7 +2059,7 @@
readPrepareParams.mSessionHolder.Grab(ctx.GetSessionBobToAlice());
readPrepareParams.mMinIntervalFloorSeconds = 0;
- readPrepareParams.mMaxIntervalCeilingSeconds = 0;
+ readPrepareParams.mMaxIntervalCeilingSeconds = 1;
printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
{
diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp
index 0dda7e3..cc314e6 100644
--- a/src/controller/tests/data_model/TestRead.cpp
+++ b/src/controller/tests/data_model/TestRead.cpp
@@ -246,6 +246,7 @@
static void TestReadAttributeError(nlTestSuite * apSuite, void * apContext);
static void TestReadAttributeTimeout(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext);
+ static void TestResubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext);
static void TestReadEventResponse(nlTestSuite * apSuite, void * apContext);
static void TestReadFabricScopedWithoutFabricFilter(nlTestSuite * apSuite, void * apContext);
static void TestReadFabricScopedWithFabricFilter(nlTestSuite * apSuite, void * apContext);
@@ -1488,20 +1489,12 @@
mLastError = aError;
}
- void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override
- {
- mOnSubscriptionEstablishedCount++;
-
- //
- // Set the liveness timeout to a super small number that isn't 0 to
- // force the liveness timeout to fire.
- //
- mpReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(10));
- }
+ void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablishedCount++; }
CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override
{
mOnResubscriptionsAttempted++;
+ mLastError = aTerminationCause;
return apReadClient->ScheduleResubscription(apReadClient->ComputeTimeTillNextSubscription(), NullOptional, false);
}
@@ -1532,11 +1525,13 @@
// TODO: This does not validate the CASE establishment pathways since we're limited by the PASE-centric TestContext.
//
//
-void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
+void TestReadInteraction::TestResubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
auto sessionHandle = ctx.GetSessionBobToAlice();
+ ctx.SetMRPMode(Test::MessagingContext::MRPMode::kResponsive);
+
{
TestResubscriptionCallback callback;
app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback,
@@ -1556,27 +1551,115 @@
readPrepareParams.mMaxIntervalCeilingSeconds = 1;
- readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
+ auto err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
+ NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
//
- // Drive servicing IO till we have established a subscription at least 2 times.
+ // Drive servicing IO till we have established a subscription.
//
- ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(2),
- [&]() { return callback.mOnSubscriptionEstablishedCount > 1; });
-
- NL_TEST_ASSERT(apSuite, callback.mOnDone == 0);
+ ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
+ [&]() { return callback.mOnSubscriptionEstablishedCount >= 1; });
+ NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);
+ NL_TEST_ASSERT(apSuite, callback.mOnError == 0);
+ NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 0);
//
- // With re-sub enabled, we shouldn't encounter any errors.
+ // Disable packet transmission, and drive IO till we have reported a re-subscription attempt.
+ //
+ // 1.5s should cover the liveness timeout in the client of 1s max interval + 50ms ACK timeout.
+ //
+ ctx.GetLoopback().mNumMessagesToDrop = Test::LoopbackTransport::kUnlimitedMessageCount;
+ ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(1500),
+ [&]() { return callback.mOnResubscriptionsAttempted > 0; });
+
+ NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1);
+ NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_ERROR_TIMEOUT);
+
+ ctx.GetLoopback().mNumMessagesToDrop = 0;
+ callback.ClearCounters();
+
+ //
+ // Drive servicing IO till we have established a subscription.
+ //
+ ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
+ [&]() { return callback.mOnSubscriptionEstablishedCount == 1; });
+ NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);
+
+ //
+ // With re-sub enabled, we shouldn't have encountered any errors
//
NL_TEST_ASSERT(apSuite, callback.mOnError == 0);
+ NL_TEST_ASSERT(apSuite, callback.mOnDone == 0);
+ }
+
+ ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);
+
+ app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
+ NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
+}
+
+//
+// This validates a vanilla subscription with re-susbcription disabled timing out correctly on the client
+// side and triggering the OnError callback with the right error code.
+//
+void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
+{
+ TestContext & ctx = *static_cast<TestContext *>(apContext);
+ auto sessionHandle = ctx.GetSessionBobToAlice();
+
+ ctx.SetMRPMode(Test::MessagingContext::MRPMode::kResponsive);
+
+ {
+ TestResubscriptionCallback callback;
+ app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback,
+ app::ReadClient::InteractionType::Subscribe);
+
+ callback.SetReadClient(&readClient);
+
+ app::ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
+
+ app::AttributePathParams attributePathParams[1];
+ readPrepareParams.mpAttributePathParamsList = attributePathParams;
+ readPrepareParams.mAttributePathParamsListSize = ArraySize(attributePathParams);
+ attributePathParams[0].mClusterId = app::Clusters::TestCluster::Id;
+ attributePathParams[0].mAttributeId = app::Clusters::TestCluster::Attributes::Boolean::Id;
//
- // We should have attempted just one re-subscription.
+ // Request a max interval that's very small to reduce time to discovering a liveness failure.
//
- NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1);
+ readPrepareParams.mMaxIntervalCeilingSeconds = 1;
+
+ auto err = readClient.SendRequest(readPrepareParams);
+ NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
+
+ //
+ // Drive servicing IO till we have established a subscription.
+ //
+ ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
+ [&]() { return callback.mOnSubscriptionEstablishedCount >= 1; });
+ NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);
+
+ //
+ // Request we drop all further messages.
+ //
+ ctx.GetLoopback().mNumMessagesToDrop = Test::LoopbackTransport::kUnlimitedMessageCount;
+
+ //
+ // Drive IO until we get an error on the subscription, which should be caused
+ // by the liveness timer firing within ~1s of the establishment of the subscription.
+ //
+ // 1.5s should cover the liveness timeout in the client of 1s max interval + 50ms ACK timeout.
+ //
+ ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(1500), [&]() { return callback.mOnError >= 1; });
+
+ NL_TEST_ASSERT(apSuite, callback.mOnError == 1);
+ NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_ERROR_TIMEOUT);
+ NL_TEST_ASSERT(apSuite, callback.mOnDone == 1);
+ NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 0);
}
+ ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);
+
app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}
@@ -1645,6 +1728,7 @@
NL_TEST_ASSERT(apSuite, gTestReadInteraction.mNumActiveSubscriptions == 0);
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
+ ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);
app::InteractionModelEngine::GetInstance()->UnregisterReadHandlerAppCallback();
}
@@ -4334,6 +4418,7 @@
NL_TEST_DEF("TestReadAttribute_ManyDataValues", TestReadInteraction::TestReadAttribute_ManyDataValues),
NL_TEST_DEF("TestReadAttribute_ManyDataValuesWrongPath", TestReadInteraction::TestReadAttribute_ManyDataValuesWrongPath),
NL_TEST_DEF("TestReadAttribute_ManyErrors", TestReadInteraction::TestReadAttribute_ManyErrors),
+ NL_TEST_DEF("TestResubscribeAttributeTimeout", TestReadInteraction::TestResubscribeAttributeTimeout),
NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout),
NL_TEST_SENTINEL()
};
diff --git a/src/messaging/tests/MessagingContext.cpp b/src/messaging/tests/MessagingContext.cpp
index c31e66b..00fd532 100644
--- a/src/messaging/tests/MessagingContext.cpp
+++ b/src/messaging/tests/MessagingContext.cpp
@@ -16,6 +16,7 @@
*/
#include "MessagingContext.h"
+#include "system/SystemClock.h"
#include <credentials/tests/CHIPCert_unit_test_vectors.h>
#include <lib/support/CodeUtils.h>
@@ -99,6 +100,28 @@
existing.mTransport->SetSessionManager(&existing.GetSecureSessionManager());
}
+void MessagingContext::SetMRPMode(MRPMode mode)
+{
+ if (mode == MRPMode::kDefault)
+ {
+ mSessionBobToAlice->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
+ mSessionAliceToBob->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
+ mSessionCharlieToDavid->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
+ mSessionDavidToCharlie->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
+ }
+ else
+ {
+ mSessionBobToAlice->AsSecureSession()->SetRemoteMRPConfig(
+ ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
+ mSessionAliceToBob->AsSecureSession()->SetRemoteMRPConfig(
+ ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
+ mSessionCharlieToDavid->AsSecureSession()->SetRemoteMRPConfig(
+ ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
+ mSessionDavidToCharlie->AsSecureSession()->SetRemoteMRPConfig(
+ ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
+ }
+}
+
CHIP_ERROR MessagingContext::CreateSessionBobToAlice()
{
return mSessionManager.InjectPaseSessionWithTestKey(mSessionBobToAlice, kBobKeyId, GetAliceFabric()->GetNodeId(), kAliceKeyId,
diff --git a/src/messaging/tests/MessagingContext.h b/src/messaging/tests/MessagingContext.h
index 559a046..f4108ff 100644
--- a/src/messaging/tests/MessagingContext.h
+++ b/src/messaging/tests/MessagingContext.h
@@ -73,6 +73,17 @@
class MessagingContext : public PlatformMemoryUser
{
public:
+ enum MRPMode
+ {
+ kDefault = 1, // This adopts the default MRP values for idle/active as per the spec.
+ // i.e IDLE = 4s, ACTIVE = 300ms
+
+ kResponsive = 2, // This adopts values that are better suited for loopback tests that
+ // don't actually go over a network interface, and are tuned much lower
+ // to permit more responsive tests.
+ // i.e IDLE = 10ms, ACTIVE = 10ms
+ };
+
MessagingContext() :
mInitialized(false), mAliceAddress(Transport::PeerAddress::UDP(GetAddress(), CHIP_PORT + 1)),
mBobAddress(Transport::PeerAddress::UDP(GetAddress(), CHIP_PORT))
@@ -129,6 +140,8 @@
void ExpireSessionAliceToBob();
void ExpireSessionBobToFriends();
+ void SetMRPMode(MRPMode mode);
+
SessionHandle GetSessionBobToAlice();
SessionHandle GetSessionAliceToBob();
SessionHandle GetSessionCharlieToDavid();
diff --git a/src/transport/raw/tests/NetworkTestHelpers.h b/src/transport/raw/tests/NetworkTestHelpers.h
index cde0746..de459cb 100644
--- a/src/transport/raw/tests/NetworkTestHelpers.h
+++ b/src/transport/raw/tests/NetworkTestHelpers.h
@@ -99,6 +99,8 @@
}
}
+ static constexpr uint32_t kUnlimitedMessageCount = std::numeric_limits<uint32_t>::max();
+
CHIP_ERROR SendMessage(const Transport::PeerAddress & address, System::PacketBufferHandle && msgBuf) override
{
ReturnErrorOnFailure(mMessageSendError);
@@ -116,6 +118,7 @@
if (dropMessage)
{
+ ChipLogProgress(Test, "Dropping message...");
mDroppedMessageCount++;
if (mDelegate != nullptr)
{