blob: 65b8424e69a9a06d9a05c17b2d53e77af686a6f1 [file] [log] [blame]
/**
*
* Copyright (c) 2022 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#import <dispatch/dispatch.h>
#import <os/lock.h>
#import "MTRAsyncCallbackWorkQueue.h"
#import "MTRLogging.h"
#pragma mark - Class extensions
@interface MTRAsyncCallbackWorkQueue ()
// The lock protects the internal state of the work queue so that these may be called from any queue or thread:
// -enqueueWorkItem:
// -invalidate
// -endWork:
// -retryWork:
@property (nonatomic, readonly) os_unfair_lock lock;
@property (nonatomic, strong, readonly) id context;
@property (nonatomic, strong, readonly) dispatch_queue_t queue;
@property (nonatomic, strong, readonly) NSMutableArray<MTRAsyncCallbackQueueWorkItem *> * items;
@property (nonatomic, readwrite) NSUInteger runningWorkItemCount;
// For WorkItem's use only - the parameter is for sanity check
- (void)endWork:(MTRAsyncCallbackQueueWorkItem *)workItem;
- (void)retryWork:(MTRAsyncCallbackQueueWorkItem *)workItem;
@end
@interface MTRAsyncCallbackQueueWorkItem ()
@property (nonatomic, strong, readonly) dispatch_queue_t queue;
@property (nonatomic, readwrite) NSUInteger retryCount;
@property (nonatomic, strong) MTRAsyncCallbackWorkQueue * workQueue;
// Called by the queue
- (void)callReadyHandlerWithContext:(id)context;
- (void)cancel;
@end
#pragma mark - Class implementations
@implementation MTRAsyncCallbackWorkQueue
- (instancetype)initWithContext:(id)context queue:(dispatch_queue_t)queue
{
if (self = [super init]) {
_lock = OS_UNFAIR_LOCK_INIT;
_context = context;
_queue = queue;
_items = [NSMutableArray array];
MTR_LOG_INFO("MTRAsyncCallbackWorkQueue init for context %@", context);
}
return self;
}
- (NSString *)description
{
return [NSString
stringWithFormat:@"MTRAsyncCallbackWorkQueue context: %@ items count: %lu", self.context, (unsigned long) self.items.count];
}
- (void)enqueueWorkItem:(MTRAsyncCallbackQueueWorkItem *)item
{
os_unfair_lock_lock(&_lock);
item.workQueue = self;
[self.items addObject:item];
[self _callNextReadyWorkItem];
os_unfair_lock_unlock(&_lock);
}
- (void)invalidate
{
os_unfair_lock_lock(&_lock);
NSMutableArray * invalidateItems = _items;
_items = nil;
os_unfair_lock_unlock(&_lock);
MTR_LOG_INFO(
"MTRAsyncCallbackWorkQueue invalidate for context %@ items count: %lu", _context, (unsigned long) invalidateItems.count);
for (MTRAsyncCallbackQueueWorkItem * item in invalidateItems) {
[item cancel];
}
[invalidateItems removeAllObjects];
}
// called after executing a work item
- (void)_postProcessWorkItem:(MTRAsyncCallbackQueueWorkItem *)workItem retry:(BOOL)retry
{
os_unfair_lock_lock(&_lock);
// sanity check if running
if (!self.runningWorkItemCount) {
// something is wrong with state - nothing is currently running
os_unfair_lock_unlock(&_lock);
MTR_LOG_ERROR("MTRAsyncCallbackWorkQueue endWork: no work is running on work queue");
return;
}
// sanity check the same work item is running
// when "concurrency width" is implemented need to check first N items
MTRAsyncCallbackQueueWorkItem * firstWorkItem = self.items.firstObject;
if (firstWorkItem != workItem) {
// something is wrong with this work item - should not be currently running
os_unfair_lock_unlock(&_lock);
MTR_LOG_ERROR("MTRAsyncCallbackWorkQueue endWork: work item is not first on work queue");
return;
}
// if work item is done (no need to retry), remove from queue and call ready on the next item
if (!retry) {
[self.items removeObjectAtIndex:0];
}
// when "concurrency width" is implemented this will be decremented instead
self.runningWorkItemCount = 0;
[self _callNextReadyWorkItem];
os_unfair_lock_unlock(&_lock);
}
- (void)endWork:(MTRAsyncCallbackQueueWorkItem *)workItem
{
[self _postProcessWorkItem:workItem retry:NO];
}
- (void)retryWork:(MTRAsyncCallbackQueueWorkItem *)workItem
{
[self _postProcessWorkItem:workItem retry:YES];
}
// assume lock is held while calling this
- (void)_callNextReadyWorkItem
{
// when "concurrency width" is implemented this will be checked against the width
if (self.runningWorkItemCount) {
// can't run next work item until the current one is done
return;
}
// only proceed to mark queue as running if there are items to run
if (self.items.count) {
// when "concurrency width" is implemented this will be incremented instead
self.runningWorkItemCount = 1;
MTRAsyncCallbackQueueWorkItem * workItem = self.items.firstObject;
[workItem callReadyHandlerWithContext:self.context];
}
}
@end
@implementation MTRAsyncCallbackQueueWorkItem
- (instancetype)initWithQueue:(dispatch_queue_t)queue
{
if (self = [super init]) {
_queue = queue;
}
return self;
}
- (void)endWork
{
[self.workQueue endWork:self];
}
- (void)retryWork
{
[self.workQueue retryWork:self];
}
// Called by the work queue
- (void)callReadyHandlerWithContext:(id)context
{
dispatch_async(self.queue, ^{
self.readyHandler(context, self.retryCount);
self.retryCount++;
});
}
// Called by the work queue
- (void)cancel
{
dispatch_async(self.queue, ^{
self.cancelHandler();
});
}
@end