[Darwin] MTRDeviceController getSessionForNode: to use subscription pool as needed (#33856)
* [Darwin] MTRDeviceController getSessionForNode: to use subscription pool as needed
* Amended MTRPerControllerStorageTests/testSubscriptionPool to include BaseDevice testing
* Update src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m
Co-authored-by: Justin Wood <woody@apple.com>
---------
Co-authored-by: Justin Wood <woody@apple.com>
diff --git a/src/darwin/Framework/CHIP/MTRDevice.mm b/src/darwin/Framework/CHIP/MTRDevice.mm
index 54fa8ec..9d1853c 100644
--- a/src/darwin/Framework/CHIP/MTRDevice.mm
+++ b/src/darwin/Framework/CHIP/MTRDevice.mm
@@ -1230,6 +1230,12 @@
[self _changeState:MTRDeviceStateUnreachable];
}
+- (BOOL)deviceUsesThread
+{
+ std::lock_guard lock(_lock);
+ return [self _deviceUsesThread];
+}
+
// This method is used for signaling whether to use the subscription pool. This functions as
// a heuristic for whether to throttle subscriptions to the device via a pool of subscriptions.
// If products appear that have both Thread and Wifi enabled but are primarily on wifi, this
@@ -2334,194 +2340,195 @@
});
}
+ // Call directlyGetSessionForNode because the subscription setup already goes through the subscription pool queue
[_deviceController
- getSessionForNode:_nodeID.unsignedLongLongValue
- completion:^(chip::Messaging::ExchangeManager * _Nullable exchangeManager,
- const chip::Optional<chip::SessionHandle> & session, NSError * _Nullable error,
- NSNumber * _Nullable retryDelay) {
- if (error != nil) {
- MTR_LOG_ERROR("%@ getSessionForNode error %@", self, error);
- dispatch_async(self.queue, ^{
- [self _handleSubscriptionError:error];
- [self _handleSubscriptionReset:retryDelay];
- });
- return;
- }
+ directlyGetSessionForNode:_nodeID.unsignedLongLongValue
+ completion:^(chip::Messaging::ExchangeManager * _Nullable exchangeManager,
+ const chip::Optional<chip::SessionHandle> & session, NSError * _Nullable error,
+ NSNumber * _Nullable retryDelay) {
+ if (error != nil) {
+ MTR_LOG_ERROR("%@ getSessionForNode error %@", self, error);
+ dispatch_async(self.queue, ^{
+ [self _handleSubscriptionError:error];
+ [self _handleSubscriptionReset:retryDelay];
+ });
+ return;
+ }
- auto callback = std::make_unique<SubscriptionCallback>(
- ^(NSArray * value) {
- MTR_LOG("%@ got attribute report %@", self, value);
- dispatch_async(self.queue, ^{
- // OnAttributeData
- [self _handleAttributeReport:value fromSubscription:YES];
+ auto callback = std::make_unique<SubscriptionCallback>(
+ ^(NSArray * value) {
+ MTR_LOG("%@ got attribute report %@", self, value);
+ dispatch_async(self.queue, ^{
+ // OnAttributeData
+ [self _handleAttributeReport:value fromSubscription:YES];
#ifdef DEBUG
- self->_unitTestAttributesReportedSinceLastCheck += value.count;
+ self->_unitTestAttributesReportedSinceLastCheck += value.count;
#endif
- });
- },
- ^(NSArray * value) {
- MTR_LOG("%@ got event report %@", self, value);
- dispatch_async(self.queue, ^{
- // OnEventReport
- [self _handleEventReport:value];
- });
- },
- ^(NSError * error) {
- MTR_LOG_ERROR("%@ got subscription error %@", self, error);
- dispatch_async(self.queue, ^{
- // OnError
- [self _handleSubscriptionError:error];
- });
- },
- ^(NSError * error, NSNumber * resubscriptionDelayMs) {
- MTR_LOG_ERROR("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelayMs);
- dispatch_async(self.queue, ^{
- // OnResubscriptionNeeded
- [self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs];
- });
- },
- ^(void) {
- MTR_LOG("%@ got subscription established", self);
- dispatch_async(self.queue, ^{
- // OnSubscriptionEstablished
- [self _handleSubscriptionEstablished];
- });
- },
- ^(void) {
- MTR_LOG("%@ got subscription done", self);
- // Drop our pointer to the ReadClient immediately, since
- // it's about to be destroyed and we don't want to be
- // holding a dangling pointer.
- std::lock_guard lock(self->_lock);
- self->_currentReadClient = nullptr;
- self->_currentSubscriptionCallback = nullptr;
+ });
+ },
+ ^(NSArray * value) {
+ MTR_LOG("%@ got event report %@", self, value);
+ dispatch_async(self.queue, ^{
+ // OnEventReport
+ [self _handleEventReport:value];
+ });
+ },
+ ^(NSError * error) {
+ MTR_LOG_ERROR("%@ got subscription error %@", self, error);
+ dispatch_async(self.queue, ^{
+ // OnError
+ [self _handleSubscriptionError:error];
+ });
+ },
+ ^(NSError * error, NSNumber * resubscriptionDelayMs) {
+ MTR_LOG_ERROR("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelayMs);
+ dispatch_async(self.queue, ^{
+ // OnResubscriptionNeeded
+ [self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs];
+ });
+ },
+ ^(void) {
+ MTR_LOG("%@ got subscription established", self);
+ dispatch_async(self.queue, ^{
+ // OnSubscriptionEstablished
+ [self _handleSubscriptionEstablished];
+ });
+ },
+ ^(void) {
+ MTR_LOG("%@ got subscription done", self);
+ // Drop our pointer to the ReadClient immediately, since
+ // it's about to be destroyed and we don't want to be
+ // holding a dangling pointer.
+ std::lock_guard lock(self->_lock);
+ self->_currentReadClient = nullptr;
+ self->_currentSubscriptionCallback = nullptr;
- dispatch_async(self.queue, ^{
- // OnDone
- [self _handleSubscriptionReset:nil];
- });
- },
- ^(void) {
- MTR_LOG("%@ got unsolicited message from publisher", self);
- dispatch_async(self.queue, ^{
- // OnUnsolicitedMessageFromPublisher
- [self _handleUnsolicitedMessageFromPublisher];
- });
- },
- ^(void) {
- MTR_LOG("%@ got report begin", self);
- dispatch_async(self.queue, ^{
- [self _handleReportBegin];
- });
- },
- ^(void) {
- MTR_LOG("%@ got report end", self);
- dispatch_async(self.queue, ^{
- [self _handleReportEnd];
- });
- });
+ dispatch_async(self.queue, ^{
+ // OnDone
+ [self _handleSubscriptionReset:nil];
+ });
+ },
+ ^(void) {
+ MTR_LOG("%@ got unsolicited message from publisher", self);
+ dispatch_async(self.queue, ^{
+ // OnUnsolicitedMessageFromPublisher
+ [self _handleUnsolicitedMessageFromPublisher];
+ });
+ },
+ ^(void) {
+ MTR_LOG("%@ got report begin", self);
+ dispatch_async(self.queue, ^{
+ [self _handleReportBegin];
+ });
+ },
+ ^(void) {
+ MTR_LOG("%@ got report end", self);
+ dispatch_async(self.queue, ^{
+ [self _handleReportEnd];
+ });
+ });
- // Set up a cluster state cache. We just want this for the logic it has for
- // tracking data versions and event numbers so we minimize the amount of data we
- // request on resubscribes, so tell it not to store data.
- auto clusterStateCache = std::make_unique<ClusterStateCache>(*callback.get(),
- /* highestReceivedEventNumber = */ NullOptional,
- /* cacheData = */ false);
- auto readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), exchangeManager,
- clusterStateCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
+ // Set up a cluster state cache. We just want this for the logic it has for
+ // tracking data versions and event numbers so we minimize the amount of data we
+ // request on resubscribes, so tell it not to store data.
+ auto clusterStateCache = std::make_unique<ClusterStateCache>(*callback.get(),
+ /* highestReceivedEventNumber = */ NullOptional,
+ /* cacheData = */ false);
+ auto readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), exchangeManager,
+ clusterStateCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
- // Subscribe with data version filter list and retry with smaller list if out of packet space
- CHIP_ERROR err;
- NSDictionary<MTRClusterPath *, NSNumber *> * dataVersions = [self _getCachedDataVersions];
- size_t dataVersionFilterListSizeReduction = 0;
- for (;;) {
- // Wildcard endpoint, cluster, attribute, event.
- auto attributePath = std::make_unique<AttributePathParams>();
- auto eventPath = std::make_unique<EventPathParams>();
- // We want to get event reports at the minInterval, not the maxInterval.
- eventPath->mIsUrgentEvent = true;
- ReadPrepareParams readParams(session.Value());
+ // Subscribe with data version filter list and retry with smaller list if out of packet space
+ CHIP_ERROR err;
+ NSDictionary<MTRClusterPath *, NSNumber *> * dataVersions = [self _getCachedDataVersions];
+ size_t dataVersionFilterListSizeReduction = 0;
+ for (;;) {
+ // Wildcard endpoint, cluster, attribute, event.
+ auto attributePath = std::make_unique<AttributePathParams>();
+ auto eventPath = std::make_unique<EventPathParams>();
+ // We want to get event reports at the minInterval, not the maxInterval.
+ eventPath->mIsUrgentEvent = true;
+ ReadPrepareParams readParams(session.Value());
- readParams.mMinIntervalFloorSeconds = 0;
- // Select a max interval based on the device's claimed idle sleep interval.
- auto idleSleepInterval = std::chrono::duration_cast<System::Clock::Seconds32>(
- session.Value()->GetRemoteMRPConfig().mIdleRetransTimeout);
+ readParams.mMinIntervalFloorSeconds = 0;
+ // Select a max interval based on the device's claimed idle sleep interval.
+ auto idleSleepInterval = std::chrono::duration_cast<System::Clock::Seconds32>(
+ session.Value()->GetRemoteMRPConfig().mIdleRetransTimeout);
- auto maxIntervalCeilingMin = System::Clock::Seconds32(MTR_DEVICE_SUBSCRIPTION_MAX_INTERVAL_MIN);
- if (idleSleepInterval < maxIntervalCeilingMin) {
- idleSleepInterval = maxIntervalCeilingMin;
- }
+ auto maxIntervalCeilingMin = System::Clock::Seconds32(MTR_DEVICE_SUBSCRIPTION_MAX_INTERVAL_MIN);
+ if (idleSleepInterval < maxIntervalCeilingMin) {
+ idleSleepInterval = maxIntervalCeilingMin;
+ }
- auto maxIntervalCeilingMax = System::Clock::Seconds32(MTR_DEVICE_SUBSCRIPTION_MAX_INTERVAL_MAX);
- if (idleSleepInterval > maxIntervalCeilingMax) {
- idleSleepInterval = maxIntervalCeilingMax;
- }
+ auto maxIntervalCeilingMax = System::Clock::Seconds32(MTR_DEVICE_SUBSCRIPTION_MAX_INTERVAL_MAX);
+ if (idleSleepInterval > maxIntervalCeilingMax) {
+ idleSleepInterval = maxIntervalCeilingMax;
+ }
#ifdef DEBUG
- if (maxIntervalOverride.HasValue()) {
- idleSleepInterval = maxIntervalOverride.Value();
- }
+ if (maxIntervalOverride.HasValue()) {
+ idleSleepInterval = maxIntervalOverride.Value();
+ }
#endif
- readParams.mMaxIntervalCeilingSeconds = static_cast<uint16_t>(idleSleepInterval.count());
+ readParams.mMaxIntervalCeilingSeconds = static_cast<uint16_t>(idleSleepInterval.count());
- readParams.mpAttributePathParamsList = attributePath.get();
- readParams.mAttributePathParamsListSize = 1;
- readParams.mpEventPathParamsList = eventPath.get();
- readParams.mEventPathParamsListSize = 1;
- readParams.mKeepSubscriptions = true;
- readParams.mIsFabricFiltered = false;
- size_t dataVersionFilterListSize = 0;
- DataVersionFilter * dataVersionFilterList;
- [self _createDataVersionFilterListFromDictionary:dataVersions dataVersionFilterList:&dataVersionFilterList count:&dataVersionFilterListSize sizeReduction:dataVersionFilterListSizeReduction];
- readParams.mDataVersionFilterListSize = dataVersionFilterListSize;
- readParams.mpDataVersionFilterList = dataVersionFilterList;
- attributePath.release();
- eventPath.release();
+ readParams.mpAttributePathParamsList = attributePath.get();
+ readParams.mAttributePathParamsListSize = 1;
+ readParams.mpEventPathParamsList = eventPath.get();
+ readParams.mEventPathParamsListSize = 1;
+ readParams.mKeepSubscriptions = true;
+ readParams.mIsFabricFiltered = false;
+ size_t dataVersionFilterListSize = 0;
+ DataVersionFilter * dataVersionFilterList;
+ [self _createDataVersionFilterListFromDictionary:dataVersions dataVersionFilterList:&dataVersionFilterList count:&dataVersionFilterListSize sizeReduction:dataVersionFilterListSizeReduction];
+ readParams.mDataVersionFilterListSize = dataVersionFilterListSize;
+ readParams.mpDataVersionFilterList = dataVersionFilterList;
+ attributePath.release();
+ eventPath.release();
- // TODO: Change from local filter list generation to rehydrating ClusterStateCache ot take advantage of existing filter list sorting algorithm
+ // TODO: Change from local filter list generation to rehydrating ClusterStateCache ot take advantage of existing filter list sorting algorithm
- // SendAutoResubscribeRequest cleans up the params, even on failure.
- err = readClient->SendAutoResubscribeRequest(std::move(readParams));
- if (err == CHIP_NO_ERROR) {
- break;
- }
+ // SendAutoResubscribeRequest cleans up the params, even on failure.
+ err = readClient->SendAutoResubscribeRequest(std::move(readParams));
+ if (err == CHIP_NO_ERROR) {
+ break;
+ }
- // If error is not a "no memory" issue, then break and go through regular resubscribe logic
- if (err != CHIP_ERROR_NO_MEMORY) {
- break;
- }
+ // If error is not a "no memory" issue, then break and go through regular resubscribe logic
+ if (err != CHIP_ERROR_NO_MEMORY) {
+ break;
+ }
- // If "no memory" error is not caused by data version filter list, break as well
- if (!dataVersionFilterListSize) {
- break;
- }
+ // If "no memory" error is not caused by data version filter list, break as well
+ if (!dataVersionFilterListSize) {
+ break;
+ }
- // Now "no memory" could mean subscribe request packet space ran out. Reduce size and try again immediately
- dataVersionFilterListSizeReduction++;
- }
+ // Now "no memory" could mean subscribe request packet space ran out. Reduce size and try again immediately
+ dataVersionFilterListSizeReduction++;
+ }
- if (err != CHIP_NO_ERROR) {
- NSError * error = [MTRError errorForCHIPErrorCode:err logContext:self];
- MTR_LOG_ERROR("%@ SendAutoResubscribeRequest error %@", self, error);
- dispatch_async(self.queue, ^{
- [self _handleSubscriptionError:error];
- [self _handleSubscriptionReset:nil];
- });
+ if (err != CHIP_NO_ERROR) {
+ NSError * error = [MTRError errorForCHIPErrorCode:err logContext:self];
+ MTR_LOG_ERROR("%@ SendAutoResubscribeRequest error %@", self, error);
+ dispatch_async(self.queue, ^{
+ [self _handleSubscriptionError:error];
+ [self _handleSubscriptionReset:nil];
+ });
- return;
- }
+ return;
+ }
- MTR_LOG("%@ Subscribe with data version list size %lu, reduced by %lu", self, static_cast<unsigned long>(dataVersions.count), static_cast<unsigned long>(dataVersionFilterListSizeReduction));
+ MTR_LOG("%@ Subscribe with data version list size %lu, reduced by %lu", self, static_cast<unsigned long>(dataVersions.count), static_cast<unsigned long>(dataVersionFilterListSizeReduction));
- // Callback and ClusterStateCache and ReadClient will be deleted
- // when OnDone is called.
- os_unfair_lock_lock(&self->_lock);
- self->_currentReadClient = readClient.get();
- self->_currentSubscriptionCallback = callback.get();
- os_unfair_lock_unlock(&self->_lock);
- callback->AdoptReadClient(std::move(readClient));
- callback->AdoptClusterStateCache(std::move(clusterStateCache));
- callback.release();
- }];
+ // Callback and ClusterStateCache and ReadClient will be deleted
+ // when OnDone is called.
+ os_unfair_lock_lock(&self->_lock);
+ self->_currentReadClient = readClient.get();
+ self->_currentSubscriptionCallback = callback.get();
+ os_unfair_lock_unlock(&self->_lock);
+ callback->AdoptReadClient(std::move(readClient));
+ callback->AdoptClusterStateCache(std::move(clusterStateCache));
+ callback.release();
+ }];
// Set up connectivity monitoring in case network becomes routable after any part of the subscription process goes into backoff retries.
[self _setupConnectivityMonitoring];
diff --git a/src/darwin/Framework/CHIP/MTRDeviceController.mm b/src/darwin/Framework/CHIP/MTRDeviceController.mm
index 303cc26..7dc7919 100644
--- a/src/darwin/Framework/CHIP/MTRDeviceController.mm
+++ b/src/darwin/Framework/CHIP/MTRDeviceController.mm
@@ -1257,6 +1257,33 @@
- (void)getSessionForNode:(chip::NodeId)nodeID completion:(MTRInternalDeviceConnectionCallback)completion
{
+ // First check if MTRDevice exists from having loaded from storage, or created by a client.
+ // Do not use deviceForNodeID here, because we don't want to create the device if it does not already exist.
+ os_unfair_lock_lock(&_deviceMapLock);
+ MTRDevice * device = _nodeIDToDeviceMap[@(nodeID)];
+ os_unfair_lock_unlock(&_deviceMapLock);
+
+ // In the case that this device is known to use thread, queue this with subscription attempts as well, to
+ // help with throttling Thread traffic.
+ if (device && [device deviceUsesThread]) {
+ MTRAsyncWorkItem * workItem = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+ [workItem setReadyHandler:^(id _Nonnull context, NSInteger retryCount, MTRAsyncWorkCompletionBlock _Nonnull workItemCompletion) {
+ MTRInternalDeviceConnectionCallback completionWrapper = ^(chip::Messaging::ExchangeManager * _Nullable exchangeManager,
+ const chip::Optional<chip::SessionHandle> & session, NSError * _Nullable error, NSNumber * _Nullable retryDelay) {
+ completion(exchangeManager, session, error, retryDelay);
+ workItemCompletion(MTRAsyncWorkComplete);
+ };
+ [self directlyGetSessionForNode:nodeID completion:completionWrapper];
+ }];
+
+ [_concurrentSubscriptionPool enqueueWorkItem:workItem descriptionWithFormat:@"device controller getSessionForNode nodeID: 0x%016llX", nodeID];
+ } else {
+ [self directlyGetSessionForNode:nodeID completion:completion];
+ }
+}
+
+- (void)directlyGetSessionForNode:(chip::NodeId)nodeID completion:(MTRInternalDeviceConnectionCallback)completion
+{
[self
asyncGetCommissionerOnMatterQueue:^(chip::Controller::DeviceCommissioner * commissioner) {
auto connectionBridge = new MTRDeviceConnectionBridge(completion);
diff --git a/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h b/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h
index 8fb61fb..925bbb2 100644
--- a/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h
+++ b/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h
@@ -268,6 +268,13 @@
- (NSNumber * _Nullable)syncGetCompressedFabricID;
+/**
+ * Since getSessionForNode now enqueues by the subscription pool for Thread
+ * devices, MTRDevice needs a direct non-queued access because it already
+ * makes use of the subscription pool.
+ */
+- (void)directlyGetSessionForNode:(chip::NodeId)nodeID completion:(MTRInternalDeviceConnectionCallback)completion;
+
@end
NS_ASSUME_NONNULL_END
diff --git a/src/darwin/Framework/CHIP/MTRDevice_Internal.h b/src/darwin/Framework/CHIP/MTRDevice_Internal.h
index 2ad2f64..1416aa2 100644
--- a/src/darwin/Framework/CHIP/MTRDevice_Internal.h
+++ b/src/darwin/Framework/CHIP/MTRDevice_Internal.h
@@ -117,6 +117,9 @@
- (void)setStorageBehaviorConfiguration:(MTRDeviceStorageBehaviorConfiguration *)storageBehaviorConfiguration;
+// Returns whether this MTRDevice uses Thread for communication
+- (BOOL)deviceUsesThread;
+
@end
#pragma mark - Utility for clamping numbers
diff --git a/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m b/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m
index 8b7f50d..ced79aa 100644
--- a/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m
+++ b/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m
@@ -2307,6 +2307,24 @@
[self commissionWithController:controller newNodeID:deviceID onboardingPayload:deviceOnboardingPayloads[deviceID]];
}
+ // Shutdown and restart, to reset all existing sessions, so that the subscriptions and base device usage start after
+ [controller shutdown];
+ XCTAssertFalse([controller isRunning]);
+
+ controller = [self startControllerWithRootKeys:rootKeys
+ operationalKeys:operationalKeys
+ fabricID:fabricID
+ nodeID:nodeID
+ storage:storageDelegate
+ error:&error
+ certificateIssuer:&certificateIssuer
+ concurrentSubscriptionPoolSize:subscriptionPoolSize];
+ XCTAssertNil(error);
+ XCTAssertNotNil(controller);
+ XCTAssertTrue([controller isRunning]);
+
+ XCTAssertEqualObjects(controller.controllerNodeID, nodeID);
+
// Set up expectations and delegates
NSDictionary<NSNumber *, XCTestExpectation *> * subscriptionExpectations = @{
@@ -2329,6 +2347,7 @@
__block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT;
__block NSUInteger subscriptionRunningCount = 0;
__block NSUInteger subscriptionDequeueCount = 0;
+ __block BOOL baseDeviceReadCompleted = NO;
for (NSNumber * deviceID in orderedDeviceIDs) {
MTRDeviceTestDelegate * delegate = deviceDelegates[deviceID];
@@ -2347,6 +2366,14 @@
// Stop counting subscribing right before calling work item completion
os_unfair_lock_lock(&counterLock);
subscriptionRunningCount--;
+
+ // Given the base device read is happening on the 5th device, at the completion
+ // time of the first [pool size] subscriptions, the BaseDevice's request to
+ // read can't have completed, as it should be gated on its call to the
+ // MTRDeviceController's getSessionForNode: call.
+ if (subscriptionDequeueCount <= (orderedDeviceIDs.count - subscriptionPoolSize)) {
+ XCTAssertFalse(baseDeviceReadCompleted);
+ }
os_unfair_lock_unlock(&counterLock);
};
__weak __auto_type weakDelegate = delegate;
@@ -2363,8 +2390,26 @@
[device setDelegate:deviceDelegates[deviceID] queue:queue];
}
+ // Create the base device to attempt to read from the 5th device
+ __auto_type * baseDeviceReadExpectation = [self expectationWithDescription:@"BaseDevice read"];
+ // Dispatch async to get around XCTest, so that this runs after the above devices queue their subscriptions
+ dispatch_async(queue, ^{
+ __auto_type * baseDevice = [MTRBaseDevice deviceWithNodeID:@(105) controller:controller];
+ __auto_type * onOffCluster = [[MTRBaseClusterOnOff alloc] initWithDevice:baseDevice endpointID:@(1) queue:queue];
+ [onOffCluster readAttributeOnOffWithCompletion:^(NSNumber * value, NSError * _Nullable error) {
+ XCTAssertNil(error);
+ // We expect the device to be off.
+ XCTAssertEqualObjects(value, @(0));
+ [baseDeviceReadExpectation fulfill];
+ os_unfair_lock_lock(&counterLock);
+ baseDeviceReadCompleted = YES;
+ os_unfair_lock_unlock(&counterLock);
+ }];
+ });
+
// Make the wait time depend on pool size and device count (can expand number of devices in the future)
- [self waitForExpectations:subscriptionExpectations.allValues timeout:(kSubscriptionPoolBaseTimeoutInSeconds * orderedDeviceIDs.count / subscriptionPoolSize)];
+ NSArray * expectationsToWait = [subscriptionExpectations.allValues arrayByAddingObject:baseDeviceReadExpectation];
+ [self waitForExpectations:expectationsToWait timeout:(kSubscriptionPoolBaseTimeoutInSeconds * orderedDeviceIDs.count / subscriptionPoolSize)];
XCTAssertEqual(subscriptionDequeueCount, orderedDeviceIDs.count);