[Darwin] Add optional concurrent execution to MTRAsyncWorkQueue (#33154)

* [Darwin] Add optional concurrent execution to MTRAsyncWorkQueue

* Update src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m

Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>

* Address review comments

* More locking to make thread sanitizer happy

---------

Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>
diff --git a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
index b584e30..598bbf0 100644
--- a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
+++ b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
@@ -192,6 +192,17 @@
 /// is lost.
 - (instancetype)initWithContext:(ContextType)context;
 
+/// Creates a work queue with the given context object and a queue width.
+///
+/// The queue will call readyHandler on up to "width" number of work items
+/// concurrently. Once "width" number of work items have started, no other
+/// work items will get a readyHandler call until one of the running work items
+/// has called its completion block with MTRAsyncWorkComplete.
+///
+/// This allows the a MTRAsyncWorkQueue object to manage a pool of
+/// resources that can be use concurrently at any given time.
+- (instancetype)initWithContext:(ContextType)context width:(NSUInteger)width;
+
 /// Enqueues the specified work item, making it eligible for execution.
 ///
 /// Once a work item is enqueued, ownership of it passes to the queue and
diff --git a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
index eff85b1..53a7195 100644
--- a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
+++ b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
@@ -197,7 +197,8 @@
     os_unfair_lock _lock;
     __weak id _context;
     NSMutableArray<MTRAsyncWorkItem *> * _items;
-    NSInteger _runningWorkItemCount;
+    NSUInteger _runningWorkItemCount;
+    NSUInteger _width;
 }
 
 // A helper struct that facilitates access to _context while
@@ -217,10 +218,16 @@
 
 - (instancetype)initWithContext:(id)context
 {
+    return [self initWithContext:context width:1];
+}
+
+- (instancetype)initWithContext:(id)context width:(NSUInteger)width
+{
     NSParameterAssert(context);
     if (self = [super init]) {
         _context = context;
         _items = [NSMutableArray array];
+        _width = width;
     }
     return self;
 }
@@ -286,35 +293,84 @@
 {
     os_unfair_lock_assert_owner(&_lock);
 
-    MTRAsyncWorkItem * runningWorkItem = (_runningWorkItemCount) ? _items.firstObject : nil;
-    if (workItem != runningWorkItem) {
+    BOOL foundWorkItem = NO;
+    NSUInteger indexOfWorkItem = 0;
+    for (NSUInteger i = 0; i < _width; i++) {
+        if (_items[i] == workItem) {
+            foundWorkItem = YES;
+            indexOfWorkItem = i;
+            break;
+        }
+    }
+    if (!foundWorkItem) {
         NSAssert(NO, @"work item to post-process is not running");
         return;
     }
 
+    // already part of the running work items allowed by width - retry directly
     if (retry) {
         MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> retry needed for work item [%llu]", context.description, workItem.uniqueID);
-    } else {
-        [workItem markComplete];
-        [_items removeObjectAtIndex:0];
-        MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);
+        [self _callWorkItem:workItem withContext:context];
+        return;
     }
 
-    // when "concurrency width" is implemented this will be decremented instead
-    _runningWorkItemCount = 0;
+    [workItem markComplete];
+    [_items removeObjectAtIndex:indexOfWorkItem];
+    MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);
+
+    // sanity check running work item count is positive
+    if (_runningWorkItemCount == 0) {
+        NSAssert(NO, @"running work item count should be positive");
+        return;
+    }
+
+    _runningWorkItemCount--;
     [self _callNextReadyWorkItemWithContext:context];
 }
 
+- (void)_callWorkItem:(MTRAsyncWorkItem *)workItem withContext:(ContextSnapshot const &)context
+{
+    os_unfair_lock_assert_owner(&_lock);
+
+    mtr_weakify(self);
+    [workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
+        mtr_strongify(self);
+        BOOL handled = NO;
+        if (self) {
+            ContextSnapshot context(self); // re-acquire a new snapshot
+            std::lock_guard lock(self->_lock);
+            if (!workItem.isComplete) {
+                [self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
+                handled = YES;
+            }
+        }
+        return handled;
+    }];
+}
+
 - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
 {
     os_unfair_lock_assert_owner(&_lock);
 
-    // when "concurrency width" is implemented this will be checked against the width
-    if (_runningWorkItemCount) {
-        return; // can't run next work item until the current one is done
+    // sanity check not running more than allowed
+    if (_runningWorkItemCount > _width) {
+        NSAssert(NO, @"running work item count larger than the maximum width");
+        return;
     }
 
-    if (!_items.count) {
+    // sanity check consistent counts
+    if (_items.count < _runningWorkItemCount) {
+        NSAssert(NO, @"work item count is less than running work item count");
+        return;
+    }
+
+    // can't run more work items if already running at max concurrent width
+    if (_runningWorkItemCount == _width) {
+        return;
+    }
+
+    // no more items to run
+    if (_items.count == _runningWorkItemCount) {
         return; // nothing to run
     }
 
@@ -324,16 +380,16 @@
         return;
     }
 
-    // when "concurrency width" is implemented this will be incremented instead
-    _runningWorkItemCount = 1;
+    NSUInteger nextWorkItemToRunIndex = _runningWorkItemCount;
+    MTRAsyncWorkItem * workItem = _items[nextWorkItemToRunIndex];
+    _runningWorkItemCount++;
 
-    MTRAsyncWorkItem * workItem = _items.firstObject;
-
-    // Check if batching is possible or needed. Only ask work item to batch once for simplicity
+    // Check if batching is possible or needed.
     auto batchingHandler = workItem.batchingHandler;
-    if (batchingHandler && workItem.retryCount == 0) {
-        while (_items.count >= 2) {
-            MTRAsyncWorkItem * nextWorkItem = _items[1];
+    if (batchingHandler) {
+        while (_items.count > _runningWorkItemCount) {
+            NSUInteger firstNonRunningItemIndex = _runningWorkItemCount;
+            MTRAsyncWorkItem * nextWorkItem = _items[firstNonRunningItemIndex];
             if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID) {
                 goto done; // next item is not eligible to merge with this one
             }
@@ -355,20 +411,7 @@
     done:;
     }
 
-    mtr_weakify(self);
-    [workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
-        mtr_strongify(self);
-        BOOL handled = NO;
-        if (self) {
-            ContextSnapshot context(self); // re-acquire a new snapshot
-            std::lock_guard lock(self->_lock);
-            if (!workItem.isComplete) {
-                [self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
-                handled = YES;
-            }
-        }
-        return handled;
-    }];
+    [self _callWorkItem:workItem withContext:context];
 }
 
 - (BOOL)hasDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData
diff --git a/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m b/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
index 031c453..3207a85 100644
--- a/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
+++ b/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
@@ -491,4 +491,70 @@
     [self waitForExpectationsWithTimeout:1 handler:nil];
 }
 
+- (void)testItemsConcurrently
+{
+    MTRAsyncWorkQueue * workQueue = [[MTRAsyncWorkQueue alloc] initWithContext:NSNull.null width:3];
+
+    XCTestExpectation * first3WorkItemsExecutedExpectation = [self expectationWithDescription:@"First 3 work items executed"];
+    XCTestExpectation * first3WorkItemsSleptExpectation = [self expectationWithDescription:@"First 3 work items slept"];
+    __block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT;
+    __block int beforeSleepCounter = 0;
+    __block int afterSleepCounter = 0;
+    __auto_type sleep1ReadyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
+        os_unfair_lock_lock(&counterLock);
+        beforeSleepCounter++;
+        if (beforeSleepCounter == 3) {
+            [first3WorkItemsExecutedExpectation fulfill];
+        }
+        os_unfair_lock_unlock(&counterLock);
+        sleep(1);
+        os_unfair_lock_lock(&counterLock);
+        afterSleepCounter++;
+        if (afterSleepCounter == 3) {
+            [first3WorkItemsSleptExpectation fulfill];
+        }
+        os_unfair_lock_unlock(&counterLock);
+        completion(MTRAsyncWorkComplete);
+    };
+
+    MTRAsyncWorkItem * workItem1 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+    workItem1.readyHandler = sleep1ReadyHandler;
+    [workQueue enqueueWorkItem:workItem1 descriptionWithFormat:@"work item %d", 1];
+
+    MTRAsyncWorkItem * workItem2 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+    workItem2.readyHandler = sleep1ReadyHandler;
+    [workQueue enqueueWorkItem:workItem2 descriptionWithFormat:@"work item %d", 2];
+
+    MTRAsyncWorkItem * workItem3 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+    workItem3.readyHandler = sleep1ReadyHandler;
+    [workQueue enqueueWorkItem:workItem3 descriptionWithFormat:@"work item %d", 3];
+
+    // This is the item after the first 3, and should only execute when one of them finished
+    XCTestExpectation * lastWorkItemWaitedExpectation = [self expectationWithDescription:@"Last work item waited properly"];
+    MTRAsyncWorkItem * workItemLast = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
+    workItemLast.readyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
+        // expect this to have waited until at least one of the above items finished after sleep() and incremented counter
+        os_unfair_lock_lock(&counterLock);
+        XCTAssert(afterSleepCounter > 0);
+        [lastWorkItemWaitedExpectation fulfill];
+        os_unfair_lock_unlock(&counterLock);
+        completion(MTRAsyncWorkComplete);
+    };
+    [workQueue enqueueWorkItem:workItemLast description:@"last work item"];
+
+    [self waitForExpectations:@[ first3WorkItemsExecutedExpectation ] timeout:2];
+    // the before-sleep counter should have reached 3 immediately as they all run concurrently.
+    os_unfair_lock_lock(&counterLock);
+    XCTAssertEqual(afterSleepCounter, 0);
+    os_unfair_lock_unlock(&counterLock);
+
+    [self waitForExpectations:@[ lastWorkItemWaitedExpectation, first3WorkItemsSleptExpectation ] timeout:2];
+
+    // see that all 3 first items ran and slept
+    os_unfair_lock_lock(&counterLock);
+    XCTAssertEqual(beforeSleepCounter, 3);
+    XCTAssertEqual(afterSleepCounter, 3);
+    os_unfair_lock_unlock(&counterLock);
+}
+
 @end