[Darwin] Add optional concurrent execution to MTRAsyncWorkQueue (#33154)
* [Darwin] Add optional concurrent execution to MTRAsyncWorkQueue
* Update src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>
* Address review comments
* More locking to make thread sanitizer happy
---------
Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>
diff --git a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
index b584e30..598bbf0 100644
--- a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
+++ b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
@@ -192,6 +192,17 @@
/// is lost.
- (instancetype)initWithContext:(ContextType)context;
+/// Creates a work queue with the given context object and a queue width.
+///
+/// The queue will call readyHandler on up to "width" number of work items
+/// concurrently. Once "width" number of work items have started, no other
+/// work items will get a readyHandler call until one of the running work items
+/// has called its completion block with MTRAsyncWorkComplete.
+///
+/// This allows the a MTRAsyncWorkQueue object to manage a pool of
+/// resources that can be use concurrently at any given time.
+- (instancetype)initWithContext:(ContextType)context width:(NSUInteger)width;
+
/// Enqueues the specified work item, making it eligible for execution.
///
/// Once a work item is enqueued, ownership of it passes to the queue and
diff --git a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
index eff85b1..53a7195 100644
--- a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
+++ b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
@@ -197,7 +197,8 @@
os_unfair_lock _lock;
__weak id _context;
NSMutableArray<MTRAsyncWorkItem *> * _items;
- NSInteger _runningWorkItemCount;
+ NSUInteger _runningWorkItemCount;
+ NSUInteger _width;
}
// A helper struct that facilitates access to _context while
@@ -217,10 +218,16 @@
- (instancetype)initWithContext:(id)context
{
+ return [self initWithContext:context width:1];
+}
+
+- (instancetype)initWithContext:(id)context width:(NSUInteger)width
+{
NSParameterAssert(context);
if (self = [super init]) {
_context = context;
_items = [NSMutableArray array];
+ _width = width;
}
return self;
}
@@ -286,35 +293,84 @@
{
os_unfair_lock_assert_owner(&_lock);
- MTRAsyncWorkItem * runningWorkItem = (_runningWorkItemCount) ? _items.firstObject : nil;
- if (workItem != runningWorkItem) {
+ BOOL foundWorkItem = NO;
+ NSUInteger indexOfWorkItem = 0;
+ for (NSUInteger i = 0; i < _width; i++) {
+ if (_items[i] == workItem) {
+ foundWorkItem = YES;
+ indexOfWorkItem = i;
+ break;
+ }
+ }
+ if (!foundWorkItem) {
NSAssert(NO, @"work item to post-process is not running");
return;
}
+ // already part of the running work items allowed by width - retry directly
if (retry) {
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> retry needed for work item [%llu]", context.description, workItem.uniqueID);
- } else {
- [workItem markComplete];
- [_items removeObjectAtIndex:0];
- MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);
+ [self _callWorkItem:workItem withContext:context];
+ return;
}
- // when "concurrency width" is implemented this will be decremented instead
- _runningWorkItemCount = 0;
+ [workItem markComplete];
+ [_items removeObjectAtIndex:indexOfWorkItem];
+ MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);
+
+ // sanity check running work item count is positive
+ if (_runningWorkItemCount == 0) {
+ NSAssert(NO, @"running work item count should be positive");
+ return;
+ }
+
+ _runningWorkItemCount--;
[self _callNextReadyWorkItemWithContext:context];
}
+- (void)_callWorkItem:(MTRAsyncWorkItem *)workItem withContext:(ContextSnapshot const &)context
+{
+ os_unfair_lock_assert_owner(&_lock);
+
+ mtr_weakify(self);
+ [workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
+ mtr_strongify(self);
+ BOOL handled = NO;
+ if (self) {
+ ContextSnapshot context(self); // re-acquire a new snapshot
+ std::lock_guard lock(self->_lock);
+ if (!workItem.isComplete) {
+ [self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
+ handled = YES;
+ }
+ }
+ return handled;
+ }];
+}
+
- (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
{
os_unfair_lock_assert_owner(&_lock);
- // when "concurrency width" is implemented this will be checked against the width
- if (_runningWorkItemCount) {
- return; // can't run next work item until the current one is done
+ // sanity check not running more than allowed
+ if (_runningWorkItemCount > _width) {
+ NSAssert(NO, @"running work item count larger than the maximum width");
+ return;
}
- if (!_items.count) {
+ // sanity check consistent counts
+ if (_items.count < _runningWorkItemCount) {
+ NSAssert(NO, @"work item count is less than running work item count");
+ return;
+ }
+
+ // can't run more work items if already running at max concurrent width
+ if (_runningWorkItemCount == _width) {
+ return;
+ }
+
+ // no more items to run
+ if (_items.count == _runningWorkItemCount) {
return; // nothing to run
}
@@ -324,16 +380,16 @@
return;
}
- // when "concurrency width" is implemented this will be incremented instead
- _runningWorkItemCount = 1;
+ NSUInteger nextWorkItemToRunIndex = _runningWorkItemCount;
+ MTRAsyncWorkItem * workItem = _items[nextWorkItemToRunIndex];
+ _runningWorkItemCount++;
- MTRAsyncWorkItem * workItem = _items.firstObject;
-
- // Check if batching is possible or needed. Only ask work item to batch once for simplicity
+ // Check if batching is possible or needed.
auto batchingHandler = workItem.batchingHandler;
- if (batchingHandler && workItem.retryCount == 0) {
- while (_items.count >= 2) {
- MTRAsyncWorkItem * nextWorkItem = _items[1];
+ if (batchingHandler) {
+ while (_items.count > _runningWorkItemCount) {
+ NSUInteger firstNonRunningItemIndex = _runningWorkItemCount;
+ MTRAsyncWorkItem * nextWorkItem = _items[firstNonRunningItemIndex];
if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID) {
goto done; // next item is not eligible to merge with this one
}
@@ -355,20 +411,7 @@
done:;
}
- mtr_weakify(self);
- [workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
- mtr_strongify(self);
- BOOL handled = NO;
- if (self) {
- ContextSnapshot context(self); // re-acquire a new snapshot
- std::lock_guard lock(self->_lock);
- if (!workItem.isComplete) {
- [self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
- handled = YES;
- }
- }
- return handled;
- }];
+ [self _callWorkItem:workItem withContext:context];
}
- (BOOL)hasDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData
diff --git a/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m b/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
index 031c453..3207a85 100644
--- a/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
+++ b/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
@@ -491,4 +491,70 @@
[self waitForExpectationsWithTimeout:1 handler:nil];
}
+- (void)testItemsConcurrently
+{
+ MTRAsyncWorkQueue * workQueue = [[MTRAsyncWorkQueue alloc] initWithContext:NSNull.null width:3];
+
+ XCTestExpectation * first3WorkItemsExecutedExpectation = [self expectationWithDescription:@"First 3 work items executed"];
+ XCTestExpectation * first3WorkItemsSleptExpectation = [self expectationWithDescription:@"First 3 work items slept"];
+ __block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT;
+ __block int beforeSleepCounter = 0;
+ __block int afterSleepCounter = 0;
+ __auto_type sleep1ReadyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
+ os_unfair_lock_lock(&counterLock);
+ beforeSleepCounter++;
+ if (beforeSleepCounter == 3) {
+ [first3WorkItemsExecutedExpectation fulfill];
+ }
+ os_unfair_lock_unlock(&counterLock);
+ sleep(1);
+ os_unfair_lock_lock(&counterLock);
+ afterSleepCounter++;
+ if (afterSleepCounter == 3) {
+ [first3WorkItemsSleptExpectation fulfill];
+ }
+ os_unfair_lock_unlock(&counterLock);
+ completion(MTRAsyncWorkComplete);
+ };
+
+ MTRAsyncWorkItem * workItem1 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+ workItem1.readyHandler = sleep1ReadyHandler;
+ [workQueue enqueueWorkItem:workItem1 descriptionWithFormat:@"work item %d", 1];
+
+ MTRAsyncWorkItem * workItem2 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+ workItem2.readyHandler = sleep1ReadyHandler;
+ [workQueue enqueueWorkItem:workItem2 descriptionWithFormat:@"work item %d", 2];
+
+ MTRAsyncWorkItem * workItem3 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+ workItem3.readyHandler = sleep1ReadyHandler;
+ [workQueue enqueueWorkItem:workItem3 descriptionWithFormat:@"work item %d", 3];
+
+ // This is the item after the first 3, and should only execute when one of them finished
+ XCTestExpectation * lastWorkItemWaitedExpectation = [self expectationWithDescription:@"Last work item waited properly"];
+ MTRAsyncWorkItem * workItemLast = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+ workItemLast.readyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
+ // expect this to have waited until at least one of the above items finished after sleep() and incremented counter
+ os_unfair_lock_lock(&counterLock);
+ XCTAssert(afterSleepCounter > 0);
+ [lastWorkItemWaitedExpectation fulfill];
+ os_unfair_lock_unlock(&counterLock);
+ completion(MTRAsyncWorkComplete);
+ };
+ [workQueue enqueueWorkItem:workItemLast description:@"last work item"];
+
+ [self waitForExpectations:@[ first3WorkItemsExecutedExpectation ] timeout:2];
+ // the before-sleep counter should have reached 3 immediately as they all run concurrently.
+ os_unfair_lock_lock(&counterLock);
+ XCTAssertEqual(afterSleepCounter, 0);
+ os_unfair_lock_unlock(&counterLock);
+
+ [self waitForExpectations:@[ lastWorkItemWaitedExpectation, first3WorkItemsSleptExpectation ] timeout:2];
+
+ // see that all 3 first items ran and slept
+ os_unfair_lock_lock(&counterLock);
+ XCTAssertEqual(beforeSleepCounter, 3);
+ XCTAssertEqual(afterSleepCounter, 3);
+ os_unfair_lock_unlock(&counterLock);
+}
+
@end