| /** |
| * |
| * Copyright (c) 2022 Project CHIP Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #import "MTRAsyncWorkQueue.h" |
| #import "MTRDefines_Internal.h" |
| #import "MTRLogging_Internal.h" |
| #import "MTRUnfairLock.h" |
| |
| #import <atomic> |
| #import <os/lock.h> |
| |
| NS_ASSUME_NONNULL_BEGIN |
| |
| typedef NS_ENUM(NSInteger, MTRAsyncWorkItemState) { |
| MTRAsyncWorkItemMutable, |
| MTRAsyncWorkItemComplete, |
| MTRAsyncWorkItemEnqueued, |
| MTRAsyncWorkItemRunning, |
| MTRAsyncWorkItemRetryCountBase = MTRAsyncWorkItemRunning, // values >= MTRAsyncWorkItemRunning encode retryCount |
| }; |
| |
| MTR_DIRECT_MEMBERS |
| @implementation MTRAsyncWorkItem { |
| dispatch_queue_t _queue; |
| MTRAsyncWorkItemState _state; // protected by queue lock once enqueued |
| } |
| |
| #pragma mark Configuration by the client |
| |
| - (instancetype)initWithQueue:(dispatch_queue_t)queue |
| { |
| NSParameterAssert(queue); |
| if (self = [super init]) { |
| static std::atomic<NSUInteger> nextUniqueID(1); |
| _uniqueID = nextUniqueID++; |
| _queue = queue; |
| _state = MTRAsyncWorkItemMutable; |
| } |
| return self; |
| } |
| |
| - (void)setReadyHandler:(nullable void (^)(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion))readyHandler |
| { |
| [self assertMutable]; |
| _readyHandler = readyHandler; |
| } |
| |
| - (void)setCancelHandler:(nullable void (^)(void))cancelHandler |
| { |
| [self assertMutable]; |
| _cancelHandler = cancelHandler; |
| } |
| |
| - (void)setBatchingID:(NSUInteger)opaqueBatchingID data:(id)opaqueBatchableData handler:(MTRAsyncWorkBatchingHandler)batchingHandler |
| { |
| NSParameterAssert(batchingHandler); |
| [self assertMutable]; |
| _batchingID = opaqueBatchingID; |
| _batchableData = opaqueBatchableData; |
| _batchingHandler = batchingHandler; |
| } |
| |
| - (void)setDuplicateTypeID:(NSUInteger)opaqueDuplicateTypeID handler:(MTRAsyncWorkDuplicateCheckHandler)duplicateCheckHandler |
| { |
| NSParameterAssert(duplicateCheckHandler); |
| [self assertMutable]; |
| _duplicateTypeID = opaqueDuplicateTypeID; |
| _duplicateCheckHandler = duplicateCheckHandler; |
| } |
| |
| - (void)assertMutable |
| { |
| NSAssert(_state == MTRAsyncWorkItemMutable, @"work item is not mutable (%ld)", (long) _state); |
| } |
| |
| #pragma mark Management by the work queue (queue lock held) |
| |
| - (void)markEnqueued |
| { |
| [self assertMutable]; |
| _state = MTRAsyncWorkItemEnqueued; |
| } |
| |
| - (NSInteger)retryCount |
| { |
| switch (_state) { |
| case MTRAsyncWorkItemMutable: |
| case MTRAsyncWorkItemComplete: |
| case MTRAsyncWorkItemEnqueued: |
| return 0; |
| default: |
| return ((NSInteger) _state) - MTRAsyncWorkItemRetryCountBase; |
| } |
| } |
| |
| - (void)callReadyHandlerWithContext:(id)context completion:(MTRAsyncWorkCompletionBlock)completion |
| { |
| NSAssert(_state >= MTRAsyncWorkItemEnqueued, @"work item is not enqueued (%ld)", (long) _state); |
| NSInteger retryCount = 0; |
| if (_state == MTRAsyncWorkItemEnqueued) { |
| _state = MTRAsyncWorkItemRunning; |
| } else if (_state >= MTRAsyncWorkItemRunning) { |
| retryCount = (_state - MTRAsyncWorkItemRetryCountBase) + 1; // increment retryCount |
| _state = (MTRAsyncWorkItemState) (MTRAsyncWorkItemRetryCountBase + retryCount); |
| } else { |
| return; // asserted above |
| } |
| |
| // Always dispatch even if there is no readyHandler as this avoids synchronously |
| // re-entering the MTRAsyncWorkQueueCode, simplifying the implementation. |
| auto uniqueID = _uniqueID; |
| auto readyHandler = _readyHandler; |
| dispatch_async(_queue, ^{ |
| if (!retryCount) { |
| MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> executing work item [%llu]", context, uniqueID); |
| } else { |
| MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> executing work item [%llu] (retry %zd)", context, uniqueID, retryCount); |
| } |
| if (readyHandler) { |
| readyHandler(context, retryCount, completion); |
| } else { |
| completion(MTRAsyncWorkComplete); |
| } |
| }); |
| } |
| |
| - (void)cancel |
| { |
| if (_state != MTRAsyncWorkItemComplete) { |
| auto cancelHandler = _cancelHandler; |
| [self markComplete]; |
| if (cancelHandler) { |
| // Note that if the work item was running it may call the work |
| // completion handler before the cancel handler actually runs, |
| // however in this case the work completion handler will return |
| // NO, giving the work code the ability to deal with this race if |
| // necessary. |
| dispatch_async(_queue, cancelHandler); |
| } |
| } |
| } |
| |
| - (BOOL)isComplete |
| { |
| return _state == MTRAsyncWorkItemComplete; |
| } |
| |
| - (void)markComplete |
| { |
| NSAssert(_state >= MTRAsyncWorkItemEnqueued, @"work item was not enqueued (%ld)", (long) _state); |
| _state = MTRAsyncWorkItemComplete; |
| |
| // Clear all handlers in case any of them captured this object. |
| _readyHandler = nil; |
| _cancelHandler = nil; |
| _batchingHandler = nil; |
| _duplicateCheckHandler = nil; |
| } |
| |
| - (NSString *)description |
| { |
| NSString * state; |
| switch (_state) { |
| case MTRAsyncWorkItemMutable: |
| state = @"mutable"; |
| break; |
| case MTRAsyncWorkItemComplete: |
| state = @"complete"; |
| break; |
| case MTRAsyncWorkItemEnqueued: |
| state = @"enqueued"; |
| break; |
| default: |
| return [NSString stringWithFormat:@"<%@ %llu running retry: %tu>", self.class, _uniqueID, self.retryCount]; |
| } |
| return [NSString stringWithFormat:@"<%@ %llu %@>", self.class, _uniqueID, state]; |
| } |
| |
| @end |
| |
| MTR_DIRECT_MEMBERS |
| @implementation MTRAsyncWorkQueue { |
| os_unfair_lock _lock; |
| __weak id _context; |
| NSMutableArray<MTRAsyncWorkItem *> * _items; |
| NSUInteger _runningWorkItemCount; |
| NSUInteger _width; |
| } |
| |
| // A helper struct that facilitates access to _context while |
| // - only reading the _context weak reference once and retaining a strong |
| // reference for the duration of a particular queue method call |
| // - avoiding calls to `[context description]` under our lock |
| struct ContextSnapshot { |
| id _Nullable reference; |
| NSString * _Nullable description; |
| ContextSnapshot(MTRAsyncWorkQueue * queue) |
| { |
| os_unfair_lock_assert_not_owner(&queue->_lock); |
| reference = queue->_context; |
| description = [reference description]; |
| } |
| }; |
| |
| - (instancetype)initWithContext:(id)context |
| { |
| return [self initWithContext:context width:1]; |
| } |
| |
| - (instancetype)initWithContext:(id)context width:(NSUInteger)width |
| { |
| NSParameterAssert(context); |
| if (self = [super init]) { |
| _context = context; |
| _items = [NSMutableArray array]; |
| _width = width; |
| } |
| return self; |
| } |
| |
| - (NSString *)description |
| { |
| ContextSnapshot context(self); |
| std::lock_guard lock(_lock); |
| return [NSString stringWithFormat:@"<%@ context: %@, items count: %tu>", self.class, context.description, _items.count]; |
| } |
| |
| - (void)enqueueWorkItem:(MTRAsyncWorkItem *)item |
| { |
| [self enqueueWorkItem:item description:nil]; |
| } |
| |
| - (void)enqueueWorkItem:(MTRAsyncWorkItem *)item descriptionWithFormat:(NSString *)format, ... |
| { |
| va_list args; |
| va_start(args, format); |
| NSString * description = [[NSString alloc] initWithFormat:format arguments:args]; |
| va_end(args); |
| [self enqueueWorkItem:item description:description]; |
| } |
| |
| - (void)enqueueWorkItem:(MTRAsyncWorkItem *)item |
| description:(nullable NSString *)description |
| { |
| NSParameterAssert(item); |
| ContextSnapshot context(self); // outside of lock |
| NSAssert(context.reference, @"context has been lost"); |
| |
| std::lock_guard lock(_lock); |
| [item markEnqueued]; |
| [_items addObject:item]; |
| |
| if (description) { |
| // Logging the description once is enough because other log messages |
| // related to the work item (execution, completion etc) can easily be |
| // correlated using the unique id. |
| MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> enqueued work item [%llu]: %@", context.description, _items.count, item.uniqueID, description); |
| } else { |
| MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> enqueued work item [%llu]", context.description, _items.count, item.uniqueID); |
| } |
| |
| [self _callNextReadyWorkItemWithContext:context]; |
| } |
| |
| - (void)invalidate |
| { |
| ContextSnapshot context(self); // outside of lock |
| std::lock_guard lock(_lock); |
| MTR_LOG_INFO("MTRAsyncWorkQueue<%@> invalidate %tu items", context.description, _items.count); |
| for (MTRAsyncWorkItem * item in _items) { |
| [item cancel]; |
| } |
| [_items removeAllObjects]; |
| } |
| |
| - (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem |
| context:(ContextSnapshot const &)context |
| retry:(BOOL)retry |
| { |
| os_unfair_lock_assert_owner(&_lock); |
| |
| BOOL foundWorkItem = NO; |
| NSUInteger indexOfWorkItem = 0; |
| for (NSUInteger i = 0; i < _width; i++) { |
| if (_items[i] == workItem) { |
| foundWorkItem = YES; |
| indexOfWorkItem = i; |
| break; |
| } |
| } |
| if (!foundWorkItem) { |
| NSAssert(NO, @"work item to post-process is not running"); |
| return; |
| } |
| |
| // already part of the running work items allowed by width - retry directly |
| if (retry) { |
| MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> retry needed for work item [%llu]", context.description, workItem.uniqueID); |
| [self _callWorkItem:workItem withContext:context]; |
| return; |
| } |
| |
| [workItem markComplete]; |
| [_items removeObjectAtIndex:indexOfWorkItem]; |
| MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID); |
| |
| // sanity check running work item count is positive |
| if (_runningWorkItemCount == 0) { |
| NSAssert(NO, @"running work item count should be positive"); |
| return; |
| } |
| |
| _runningWorkItemCount--; |
| [self _callNextReadyWorkItemWithContext:context]; |
| } |
| |
| - (void)_callWorkItem:(MTRAsyncWorkItem *)workItem withContext:(ContextSnapshot const &)context |
| { |
| os_unfair_lock_assert_owner(&_lock); |
| |
| mtr_weakify(self); |
| [workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) { |
| mtr_strongify(self); |
| BOOL handled = NO; |
| if (self) { |
| ContextSnapshot context(self); // re-acquire a new snapshot |
| std::lock_guard lock(self->_lock); |
| if (!workItem.isComplete) { |
| [self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)]; |
| handled = YES; |
| } |
| } |
| return handled; |
| }]; |
| } |
| |
| - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context |
| { |
| os_unfair_lock_assert_owner(&_lock); |
| |
| // sanity check not running more than allowed |
| if (_runningWorkItemCount > _width) { |
| NSAssert(NO, @"running work item count larger than the maximum width"); |
| return; |
| } |
| |
| // sanity check consistent counts |
| if (_items.count < _runningWorkItemCount) { |
| NSAssert(NO, @"work item count is less than running work item count"); |
| return; |
| } |
| |
| // can't run more work items if already running at max concurrent width |
| if (_runningWorkItemCount == _width) { |
| return; |
| } |
| |
| // no more items to run |
| if (_items.count == _runningWorkItemCount) { |
| return; // nothing to run |
| } |
| |
| if (!context.reference) { |
| MTR_LOG_ERROR("MTRAsyncWorkQueue<%@> context has been lost, dropping queued work items", (id) nil); |
| [_items removeAllObjects]; |
| return; |
| } |
| |
| NSUInteger nextWorkItemToRunIndex = _runningWorkItemCount; |
| MTRAsyncWorkItem * workItem = _items[nextWorkItemToRunIndex]; |
| _runningWorkItemCount++; |
| |
| // Check if batching is possible or needed. |
| auto batchingHandler = workItem.batchingHandler; |
| if (batchingHandler) { |
| while (_items.count > _runningWorkItemCount) { |
| NSUInteger firstNonRunningItemIndex = _runningWorkItemCount; |
| MTRAsyncWorkItem * nextWorkItem = _items[firstNonRunningItemIndex]; |
| if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID) { |
| goto done; // next item is not eligible to merge with this one |
| } |
| |
| switch (batchingHandler(workItem.batchableData, nextWorkItem.batchableData)) { |
| case MTRNotBatched: |
| goto done; // can't merge anything else |
| case MTRBatchedPartially: |
| MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> partially merged work item [%llu] into %llu", |
| context.description, nextWorkItem.uniqueID, workItem.uniqueID); |
| goto done; // can't merge anything else |
| case MTRBatchedFully: |
| MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> fully merged work item [%llu] into %llu", |
| context.description, nextWorkItem.uniqueID, workItem.uniqueID); |
| [_items removeObjectAtIndex:1]; |
| continue; // try to batch the next item (if any) |
| } |
| } |
| done:; |
| } |
| |
| [self _callWorkItem:workItem withContext:context]; |
| } |
| |
| - (BOOL)hasDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData |
| { |
| std::lock_guard lock(_lock); |
| // Start from the last item |
| for (MTRAsyncWorkItem * item in [_items reverseObjectEnumerator]) { |
| auto duplicateCheckHandler = item.duplicateCheckHandler; |
| if (duplicateCheckHandler && item.duplicateTypeID == opaqueDuplicateTypeID) { |
| BOOL stop = NO; |
| BOOL isDuplicate = NO; |
| duplicateCheckHandler(opaqueWorkItemData, &isDuplicate, &stop); |
| if (stop) { |
| return isDuplicate; |
| } |
| } |
| } |
| |
| return NO; |
| } |
| |
| @end |
| |
| NS_ASSUME_NONNULL_END |