Fix KeepSubscription flag handling (#22805)
* Fix KeepSubscription flag handling
This fixes the KeepSubscription flag handling to be done right at the
onset of processing a SubscribeRequest message. This ensures that
matching existing subscriptions are evicted before we attempt to
continue further processing and allocation of ReadHandlers.
Testing:
In addition to the new test in TestRead, validated setting up a sub in
chip-tool and then cancelling it by sending another sub to an invalid
endpoint:
basic subscribe location 0 5 2 0
any subscribe-by-id 0xFFFFFFFF 0xFFFFFFFF 0 5 2 100
Also tested by sending an empty subscribe request with the REPL.
* Fixup printf specifier
diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp
index 61af4c1..979831b 100644
--- a/src/app/InteractionModelEngine.cpp
+++ b/src/app/InteractionModelEngine.cpp
@@ -426,6 +426,29 @@
SubscribeRequestMessage::Parser subscribeRequestParser;
VerifyOrReturnError(subscribeRequestParser.Init(reader) == CHIP_NO_ERROR, Status::InvalidAction);
+
+ VerifyOrReturnError(subscribeRequestParser.GetKeepSubscriptions(&keepExistingSubscriptions) == CHIP_NO_ERROR,
+ Status::InvalidAction);
+ if (!keepExistingSubscriptions)
+ {
+ //
+ // Walk through all existing subscriptions and shut down those whose subscriber matches
+ // that which just came in.
+ //
+ mReadHandlers.ForEachActiveObject([this, apExchangeContext](ReadHandler * handler) {
+ if (handler->IsFromSubscriber(*apExchangeContext))
+ {
+ ChipLogProgress(InteractionModel,
+ "Deleting previous subscription from NodeId: " ChipLogFormatX64 ", FabricIndex: %u",
+ ChipLogValueX64(apExchangeContext->GetSessionHandle()->AsSecureSession()->GetPeerNodeId()),
+ apExchangeContext->GetSessionHandle()->GetFabricIndex());
+ mReadHandlers.ReleaseObject(handler);
+ }
+
+ return Loop::Continue;
+ });
+ }
+
{
size_t requestedAttributePathCount = 0;
size_t requestedEventPathCount = 0;
@@ -492,28 +515,6 @@
return Status::PathsExhausted;
}
}
-
- VerifyOrReturnError(subscribeRequestParser.GetKeepSubscriptions(&keepExistingSubscriptions) == CHIP_NO_ERROR,
- Status::InvalidAction);
- if (!keepExistingSubscriptions)
- {
- //
- // Walk through all existing subscriptions and shut down those whose subscriber matches
- // that which just came in.
- //
- mReadHandlers.ForEachActiveObject([this, apExchangeContext](ReadHandler * handler) {
- if (handler->IsFromSubscriber(*apExchangeContext))
- {
- ChipLogProgress(InteractionModel,
- "Deleting previous subscription from NodeId: " ChipLogFormatX64 ", FabricIndex: %u",
- ChipLogValueX64(apExchangeContext->GetSessionHandle()->AsSecureSession()->GetPeerNodeId()),
- apExchangeContext->GetSessionHandle()->GetFabricIndex());
- mReadHandlers.ReleaseObject(handler);
- }
-
- return Loop::Continue;
- });
- }
}
else
{
@@ -790,6 +791,10 @@
eventPathsSubscribedByCurrentFabric > perFabricPathCapacity ||
subscriptionsEstablishedByCurrentFabric > perFabricSubscriptionCapacity))
{
+ SubscriptionId subId;
+ candidate->GetSubscriptionId(subId);
+ ChipLogProgress(DataManagement, "Evicting Subscription ID %u:0x%" PRIx32, candidate->GetSubjectDescriptor().fabricIndex,
+ subId);
candidate->Close();
return true;
}
diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp
index f981cb1..94a29cd 100644
--- a/src/app/ReadClient.cpp
+++ b/src/app/ReadClient.cpp
@@ -952,9 +952,6 @@
Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
aReadPrepareParams.mDataVersionFilterListSize);
- VerifyOrReturnError(aReadPrepareParams.mAttributePathParamsListSize != 0 || aReadPrepareParams.mEventPathParamsListSize != 0,
- CHIP_ERROR_INVALID_ARGUMENT);
-
System::PacketBufferHandle msgBuf;
System::PacketBufferTLVWriter writer;
SubscribeRequestMessage::Builder request;
diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py
index 36d6e34..0fa3530 100644
--- a/src/controller/python/chip/clusters/Attribute.py
+++ b/src/controller/python/chip/clusters/Attribute.py
@@ -955,10 +955,7 @@
if (not attributes) and dataVersionFilters:
raise ValueError(
"Must provide valid attribute list when data version filters is not null")
- if (not attributes) and (not events):
- raise ValueError(
- "Must read some something"
- )
+
handle = chip.native.GetLibraryHandle()
transaction = AsyncReadTransaction(
future, eventLoop, devCtrl, returnClusterObject)
diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp
index ff43596..3da6e03 100644
--- a/src/controller/tests/data_model/TestRead.cpp
+++ b/src/controller/tests/data_model/TestRead.cpp
@@ -282,6 +282,7 @@
static void TestReadAttribute_ManyDataValuesWrongPath(nlTestSuite * apSuite, void * apContext);
static void TestReadAttribute_ManyErrors(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeAttributeDeniedNotExistPath(nlTestSuite * apSuite, void * apContext);
+ static void TestReadHandler_KeepSubscriptionTest(nlTestSuite * apSuite, void * apContext);
private:
static uint16_t mMaxInterval;
@@ -1675,6 +1676,7 @@
app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
+ ctx.GetLoopback().mNumMessagesToDrop = 0;
}
void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext)
@@ -4451,6 +4453,51 @@
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}
+//
+// This validates the KeepSubscriptions flag by first setting up a valid subscription, then sending
+// a subsequent SubcribeRequest with empty attribute AND event paths with KeepSubscriptions = false.
+//
+// This should evict the previous subscription before sending back an error.
+//
+void TestReadInteraction::TestReadHandler_KeepSubscriptionTest(nlTestSuite * apSuite, void * apContext)
+{
+ using namespace SubscriptionPathQuotaHelpers;
+
+ TestContext & ctx = *static_cast<TestContext *>(apContext);
+ TestReadCallback readCallback;
+ app::AttributePathParams pathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id);
+
+ app::ReadPrepareParams readParam(ctx.GetSessionAliceToBob());
+ readParam.mpAttributePathParamsList = &pathParams;
+ readParam.mAttributePathParamsListSize = 1;
+ readParam.mMaxIntervalCeilingSeconds = 1;
+ readParam.mKeepSubscriptions = false;
+
+ std::unique_ptr<app::ReadClient> readClient = std::make_unique<app::ReadClient>(
+ app::InteractionModelEngine::GetInstance(), app::InteractionModelEngine::GetInstance()->GetExchangeManager(), readCallback,
+ app::ReadClient::InteractionType::Subscribe);
+ NL_TEST_ASSERT(apSuite, readClient->SendRequest(readParam) == CHIP_NO_ERROR);
+
+ ctx.DrainAndServiceIO();
+
+ NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == 1);
+
+ ChipLogProgress(DataManagement, "Issue another subscription that will evict the first sub...");
+
+ readParam.mAttributePathParamsListSize = 0;
+ readClient = std::make_unique<app::ReadClient>(app::InteractionModelEngine::GetInstance(),
+ app::InteractionModelEngine::GetInstance()->GetExchangeManager(), readCallback,
+ app::ReadClient::InteractionType::Subscribe);
+ NL_TEST_ASSERT(apSuite, readClient->SendRequest(readParam) == CHIP_NO_ERROR);
+
+ ctx.DrainAndServiceIO();
+
+ NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == 0);
+ NL_TEST_ASSERT(apSuite, readCallback.mOnError != 0);
+ app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
+ ctx.DrainAndServiceIO();
+}
+
// clang-format off
const nlTest sTests[] =
{
@@ -4488,6 +4535,7 @@
NL_TEST_DEF("TestSubscribeAttributeDeniedNotExistPath", TestReadInteraction::TestSubscribeAttributeDeniedNotExistPath),
NL_TEST_DEF("TestResubscribeAttributeTimeout", TestReadInteraction::TestResubscribeAttributeTimeout),
NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout),
+ NL_TEST_DEF("TestReadHandler_KeepSubscriptionTest", TestReadInteraction::TestReadHandler_KeepSubscriptionTest),
NL_TEST_SENTINEL()
};
// clang-format on