Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Darwin] Add optional concurrent execution to MTRAsyncWorkQueue #33154

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@ MTR_TESTABLE
/// is lost.
- (instancetype)initWithContext:(ContextType)context;

/// Creates a work queue with the given context object and a queue width.
///
/// The queue will call readyHandler on up to "width" number of work items
/// concurrently. Once "width" number of work items have started, no other
/// work items will get a readyHandler call until one of the running work items
/// has called its completion block with MTRAsyncWorkComplete.
///
/// This allows the a MTRAsyncWorkQueue object to manage a pool of
/// resources that can be use concurrently at any given time.
- (instancetype)initWithContext:(ContextType)context width:(NSUInteger)width;

/// Enqueues the specified work item, making it eligible for execution.
///
/// Once a work item is enqueued, ownership of it passes to the queue and
Expand Down
113 changes: 78 additions & 35 deletions src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ @implementation MTRAsyncWorkQueue {
os_unfair_lock _lock;
__weak id _context;
NSMutableArray<MTRAsyncWorkItem *> * _items;
NSInteger _runningWorkItemCount;
NSUInteger _runningWorkItemCount;
NSUInteger _width;
}

// A helper struct that facilitates access to _context while
Expand All @@ -216,11 +217,17 @@ @implementation MTRAsyncWorkQueue {
};

- (instancetype)initWithContext:(id)context
{
return [self initWithContext:context width:1];
}

- (instancetype)initWithContext:(id)context width:(NSUInteger)width
{
NSParameterAssert(context);
if (self = [super init]) {
_context = context;
_items = [NSMutableArray array];
_width = width;
}
return self;
}
Expand Down Expand Up @@ -286,35 +293,84 @@ - (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem
{
os_unfair_lock_assert_owner(&_lock);

MTRAsyncWorkItem * runningWorkItem = (_runningWorkItemCount) ? _items.firstObject : nil;
if (workItem != runningWorkItem) {
BOOL foundWorkItem = NO;
NSUInteger indexOfWorkItem = 0;
for (NSUInteger i = 0; i < _width; i++) {
if (_items[i] == workItem) {
foundWorkItem = YES;
indexOfWorkItem = i;
break;
}
}
if (!foundWorkItem) {
NSAssert(NO, @"work item to post-process is not running");
return;
}

// already part of the running work items allowed by width - retry directly
if (retry) {
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> retry needed for work item [%llu]", context.description, workItem.uniqueID);
} else {
[workItem markComplete];
[_items removeObjectAtIndex:0];
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);
[self _callWorkItem:workItem withContext:context];
return;
}

// when "concurrency width" is implemented this will be decremented instead
_runningWorkItemCount = 0;
[workItem markComplete];
[_items removeObjectAtIndex:indexOfWorkItem];
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);

// sanity check running work item count is positive
if (_runningWorkItemCount == 0) {
NSAssert(NO, @"running work item count should be positive");
return;
}

_runningWorkItemCount--;
[self _callNextReadyWorkItemWithContext:context];
}

- (void)_callWorkItem:(MTRAsyncWorkItem *)workItem withContext:(ContextSnapshot const &)context
{
os_unfair_lock_assert_owner(&_lock);

mtr_weakify(self);
[workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
mtr_strongify(self);
BOOL handled = NO;
if (self) {
ContextSnapshot context(self); // re-acquire a new snapshot
std::lock_guard lock(self->_lock);
if (!workItem.isComplete) {
[self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
handled = YES;
}
}
return handled;
}];
}

- (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
{
os_unfair_lock_assert_owner(&_lock);

// when "concurrency width" is implemented this will be checked against the width
if (_runningWorkItemCount) {
return; // can't run next work item until the current one is done
// sanity check not running more than allowed
if (_runningWorkItemCount > _width) {
NSAssert(NO, @"running work item count larger than the maximum width");
return;
}

if (!_items.count) {
// sanity check consistent counts
if (_items.count < _runningWorkItemCount) {
NSAssert(NO, @"work item count is less than running work item count");
return;
}

// can't run more work items if already running at max concurrent width
if (_runningWorkItemCount == _width) {
return;
}

// no more items to run
if (_items.count == _runningWorkItemCount) {
return; // nothing to run
}

Expand All @@ -324,16 +380,16 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
return;
}

// when "concurrency width" is implemented this will be incremented instead
_runningWorkItemCount = 1;

MTRAsyncWorkItem * workItem = _items.firstObject;
NSUInteger nextWorkItemToRunIndex = _runningWorkItemCount;
MTRAsyncWorkItem * workItem = _items[nextWorkItemToRunIndex];
_runningWorkItemCount++;

// Check if batching is possible or needed. Only ask work item to batch once for simplicity
// Check if batching is possible or needed.
auto batchingHandler = workItem.batchingHandler;
if (batchingHandler && workItem.retryCount == 0) {
while (_items.count >= 2) {
MTRAsyncWorkItem * nextWorkItem = _items[1];
if (batchingHandler) {
while (_items.count > _runningWorkItemCount) {
NSUInteger firstNonRunningItemIndex = _runningWorkItemCount;
MTRAsyncWorkItem * nextWorkItem = _items[firstNonRunningItemIndex];
if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID) {
goto done; // next item is not eligible to merge with this one
}
Expand All @@ -355,20 +411,7 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
done:;
}

mtr_weakify(self);
[workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
mtr_strongify(self);
BOOL handled = NO;
if (self) {
ContextSnapshot context(self); // re-acquire a new snapshot
std::lock_guard lock(self->_lock);
if (!workItem.isComplete) {
[self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
handled = YES;
}
}
return handled;
}];
[self _callWorkItem:workItem withContext:context];
}

- (BOOL)hasDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData
Expand Down
66 changes: 66 additions & 0 deletions src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,70 @@ - (void)testContextLoss
[self waitForExpectationsWithTimeout:1 handler:nil];
}

- (void)testItemsConcurrently
{
MTRAsyncWorkQueue * workQueue = [[MTRAsyncWorkQueue alloc] initWithContext:NSNull.null width:3];

XCTestExpectation * first3WorkItemsExecutedExpectation = [self expectationWithDescription:@"First 3 work items executed"];
XCTestExpectation * first3WorkItemsSleptExpectation = [self expectationWithDescription:@"First 3 work items slept"];
__block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT;
__block int beforeSleepCounter = 0;
__block int afterSleepCounter = 0;
__auto_type sleep1ReadyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
os_unfair_lock_lock(&counterLock);
beforeSleepCounter++;
if (beforeSleepCounter == 3) {
[first3WorkItemsExecutedExpectation fulfill];
}
os_unfair_lock_unlock(&counterLock);
sleep(1);
os_unfair_lock_lock(&counterLock);
afterSleepCounter++;
if (afterSleepCounter == 3) {
[first3WorkItemsSleptExpectation fulfill];
}
os_unfair_lock_unlock(&counterLock);
completion(MTRAsyncWorkComplete);
};

MTRAsyncWorkItem * workItem1 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
workItem1.readyHandler = sleep1ReadyHandler;
[workQueue enqueueWorkItem:workItem1 descriptionWithFormat:@"work item %d", 1];

MTRAsyncWorkItem * workItem2 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
workItem2.readyHandler = sleep1ReadyHandler;
[workQueue enqueueWorkItem:workItem2 descriptionWithFormat:@"work item %d", 2];

MTRAsyncWorkItem * workItem3 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
workItem3.readyHandler = sleep1ReadyHandler;
[workQueue enqueueWorkItem:workItem3 descriptionWithFormat:@"work item %d", 3];

// This is the item after the first 3, and should only execute when one of them finished
XCTestExpectation * lastWorkItemWaitedExpectation = [self expectationWithDescription:@"Last work item waited properly"];
MTRAsyncWorkItem * workItemLast = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
workItemLast.readyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
// expect this to have waited until at least one of the above items finished after sleep() and incremented counter
os_unfair_lock_lock(&counterLock);
XCTAssert(afterSleepCounter > 0);
[lastWorkItemWaitedExpectation fulfill];
os_unfair_lock_unlock(&counterLock);
completion(MTRAsyncWorkComplete);
};
[workQueue enqueueWorkItem:workItemLast description:@"last work item"];

[self waitForExpectations:@[ first3WorkItemsExecutedExpectation ] timeout:2];
// the before-sleep counter should have reached 3 immediately as they all run concurrently.
os_unfair_lock_lock(&counterLock);
XCTAssertEqual(afterSleepCounter, 0);
os_unfair_lock_unlock(&counterLock);

[self waitForExpectations:@[ lastWorkItemWaitedExpectation, first3WorkItemsSleptExpectation ] timeout:2];

// see that all 3 first items ran and slept
os_unfair_lock_lock(&counterLock);
XCTAssertEqual(beforeSleepCounter, 3);
XCTAssertEqual(afterSleepCounter, 3);
os_unfair_lock_unlock(&counterLock);
}

@end
Loading