Skip to content

Commit d776522

Browse files
[Darwin] Add optional concurrent execution to MTRAsyncWorkQueue (#33154)
* [Darwin] Add optional concurrent execution to MTRAsyncWorkQueue * Update src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m Co-authored-by: Boris Zbarsky <bzbarsky@apple.com> * Address review comments * More locking to make thread sanitizer happy --------- Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>
1 parent 98b14ed commit d776522

File tree

3 files changed

+155
-35
lines changed

3 files changed

+155
-35
lines changed

src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h

+11
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,17 @@ MTR_TESTABLE
192192
/// is lost.
193193
- (instancetype)initWithContext:(ContextType)context;
194194

195+
/// Creates a work queue with the given context object and a queue width.
196+
///
197+
/// The queue will call readyHandler on up to "width" number of work items
198+
/// concurrently. Once "width" number of work items have started, no other
199+
/// work items will get a readyHandler call until one of the running work items
200+
/// has called its completion block with MTRAsyncWorkComplete.
201+
///
202+
/// This allows the a MTRAsyncWorkQueue object to manage a pool of
203+
/// resources that can be use concurrently at any given time.
204+
- (instancetype)initWithContext:(ContextType)context width:(NSUInteger)width;
205+
195206
/// Enqueues the specified work item, making it eligible for execution.
196207
///
197208
/// Once a work item is enqueued, ownership of it passes to the queue and

src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm

+78-35
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ @implementation MTRAsyncWorkQueue {
197197
os_unfair_lock _lock;
198198
__weak id _context;
199199
NSMutableArray<MTRAsyncWorkItem *> * _items;
200-
NSInteger _runningWorkItemCount;
200+
NSUInteger _runningWorkItemCount;
201+
NSUInteger _width;
201202
}
202203

203204
// A helper struct that facilitates access to _context while
@@ -216,11 +217,17 @@ @implementation MTRAsyncWorkQueue {
216217
};
217218

218219
- (instancetype)initWithContext:(id)context
220+
{
221+
return [self initWithContext:context width:1];
222+
}
223+
224+
- (instancetype)initWithContext:(id)context width:(NSUInteger)width
219225
{
220226
NSParameterAssert(context);
221227
if (self = [super init]) {
222228
_context = context;
223229
_items = [NSMutableArray array];
230+
_width = width;
224231
}
225232
return self;
226233
}
@@ -286,35 +293,84 @@ - (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem
286293
{
287294
os_unfair_lock_assert_owner(&_lock);
288295

289-
MTRAsyncWorkItem * runningWorkItem = (_runningWorkItemCount) ? _items.firstObject : nil;
290-
if (workItem != runningWorkItem) {
296+
BOOL foundWorkItem = NO;
297+
NSUInteger indexOfWorkItem = 0;
298+
for (NSUInteger i = 0; i < _width; i++) {
299+
if (_items[i] == workItem) {
300+
foundWorkItem = YES;
301+
indexOfWorkItem = i;
302+
break;
303+
}
304+
}
305+
if (!foundWorkItem) {
291306
NSAssert(NO, @"work item to post-process is not running");
292307
return;
293308
}
294309

310+
// already part of the running work items allowed by width - retry directly
295311
if (retry) {
296312
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> retry needed for work item [%llu]", context.description, workItem.uniqueID);
297-
} else {
298-
[workItem markComplete];
299-
[_items removeObjectAtIndex:0];
300-
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);
313+
[self _callWorkItem:workItem withContext:context];
314+
return;
301315
}
302316

303-
// when "concurrency width" is implemented this will be decremented instead
304-
_runningWorkItemCount = 0;
317+
[workItem markComplete];
318+
[_items removeObjectAtIndex:indexOfWorkItem];
319+
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);
320+
321+
// sanity check running work item count is positive
322+
if (_runningWorkItemCount == 0) {
323+
NSAssert(NO, @"running work item count should be positive");
324+
return;
325+
}
326+
327+
_runningWorkItemCount--;
305328
[self _callNextReadyWorkItemWithContext:context];
306329
}
307330

331+
- (void)_callWorkItem:(MTRAsyncWorkItem *)workItem withContext:(ContextSnapshot const &)context
332+
{
333+
os_unfair_lock_assert_owner(&_lock);
334+
335+
mtr_weakify(self);
336+
[workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
337+
mtr_strongify(self);
338+
BOOL handled = NO;
339+
if (self) {
340+
ContextSnapshot context(self); // re-acquire a new snapshot
341+
std::lock_guard lock(self->_lock);
342+
if (!workItem.isComplete) {
343+
[self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
344+
handled = YES;
345+
}
346+
}
347+
return handled;
348+
}];
349+
}
350+
308351
- (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
309352
{
310353
os_unfair_lock_assert_owner(&_lock);
311354

312-
// when "concurrency width" is implemented this will be checked against the width
313-
if (_runningWorkItemCount) {
314-
return; // can't run next work item until the current one is done
355+
// sanity check not running more than allowed
356+
if (_runningWorkItemCount > _width) {
357+
NSAssert(NO, @"running work item count larger than the maximum width");
358+
return;
315359
}
316360

317-
if (!_items.count) {
361+
// sanity check consistent counts
362+
if (_items.count < _runningWorkItemCount) {
363+
NSAssert(NO, @"work item count is less than running work item count");
364+
return;
365+
}
366+
367+
// can't run more work items if already running at max concurrent width
368+
if (_runningWorkItemCount == _width) {
369+
return;
370+
}
371+
372+
// no more items to run
373+
if (_items.count == _runningWorkItemCount) {
318374
return; // nothing to run
319375
}
320376

@@ -324,16 +380,16 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
324380
return;
325381
}
326382

327-
// when "concurrency width" is implemented this will be incremented instead
328-
_runningWorkItemCount = 1;
329-
330-
MTRAsyncWorkItem * workItem = _items.firstObject;
383+
NSUInteger nextWorkItemToRunIndex = _runningWorkItemCount;
384+
MTRAsyncWorkItem * workItem = _items[nextWorkItemToRunIndex];
385+
_runningWorkItemCount++;
331386

332-
// Check if batching is possible or needed. Only ask work item to batch once for simplicity
387+
// Check if batching is possible or needed.
333388
auto batchingHandler = workItem.batchingHandler;
334-
if (batchingHandler && workItem.retryCount == 0) {
335-
while (_items.count >= 2) {
336-
MTRAsyncWorkItem * nextWorkItem = _items[1];
389+
if (batchingHandler) {
390+
while (_items.count > _runningWorkItemCount) {
391+
NSUInteger firstNonRunningItemIndex = _runningWorkItemCount;
392+
MTRAsyncWorkItem * nextWorkItem = _items[firstNonRunningItemIndex];
337393
if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID) {
338394
goto done; // next item is not eligible to merge with this one
339395
}
@@ -355,20 +411,7 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
355411
done:;
356412
}
357413

358-
mtr_weakify(self);
359-
[workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
360-
mtr_strongify(self);
361-
BOOL handled = NO;
362-
if (self) {
363-
ContextSnapshot context(self); // re-acquire a new snapshot
364-
std::lock_guard lock(self->_lock);
365-
if (!workItem.isComplete) {
366-
[self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
367-
handled = YES;
368-
}
369-
}
370-
return handled;
371-
}];
414+
[self _callWorkItem:workItem withContext:context];
372415
}
373416

374417
- (BOOL)hasDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData

src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m

+66
Original file line numberDiff line numberDiff line change
@@ -491,4 +491,70 @@ - (void)testContextLoss
491491
[self waitForExpectationsWithTimeout:1 handler:nil];
492492
}
493493

494+
- (void)testItemsConcurrently
495+
{
496+
MTRAsyncWorkQueue * workQueue = [[MTRAsyncWorkQueue alloc] initWithContext:NSNull.null width:3];
497+
498+
XCTestExpectation * first3WorkItemsExecutedExpectation = [self expectationWithDescription:@"First 3 work items executed"];
499+
XCTestExpectation * first3WorkItemsSleptExpectation = [self expectationWithDescription:@"First 3 work items slept"];
500+
__block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT;
501+
__block int beforeSleepCounter = 0;
502+
__block int afterSleepCounter = 0;
503+
__auto_type sleep1ReadyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
504+
os_unfair_lock_lock(&counterLock);
505+
beforeSleepCounter++;
506+
if (beforeSleepCounter == 3) {
507+
[first3WorkItemsExecutedExpectation fulfill];
508+
}
509+
os_unfair_lock_unlock(&counterLock);
510+
sleep(1);
511+
os_unfair_lock_lock(&counterLock);
512+
afterSleepCounter++;
513+
if (afterSleepCounter == 3) {
514+
[first3WorkItemsSleptExpectation fulfill];
515+
}
516+
os_unfair_lock_unlock(&counterLock);
517+
completion(MTRAsyncWorkComplete);
518+
};
519+
520+
MTRAsyncWorkItem * workItem1 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
521+
workItem1.readyHandler = sleep1ReadyHandler;
522+
[workQueue enqueueWorkItem:workItem1 descriptionWithFormat:@"work item %d", 1];
523+
524+
MTRAsyncWorkItem * workItem2 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
525+
workItem2.readyHandler = sleep1ReadyHandler;
526+
[workQueue enqueueWorkItem:workItem2 descriptionWithFormat:@"work item %d", 2];
527+
528+
MTRAsyncWorkItem * workItem3 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
529+
workItem3.readyHandler = sleep1ReadyHandler;
530+
[workQueue enqueueWorkItem:workItem3 descriptionWithFormat:@"work item %d", 3];
531+
532+
// This is the item after the first 3, and should only execute when one of them finished
533+
XCTestExpectation * lastWorkItemWaitedExpectation = [self expectationWithDescription:@"Last work item waited properly"];
534+
MTRAsyncWorkItem * workItemLast = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
535+
workItemLast.readyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
536+
// expect this to have waited until at least one of the above items finished after sleep() and incremented counter
537+
os_unfair_lock_lock(&counterLock);
538+
XCTAssert(afterSleepCounter > 0);
539+
[lastWorkItemWaitedExpectation fulfill];
540+
os_unfair_lock_unlock(&counterLock);
541+
completion(MTRAsyncWorkComplete);
542+
};
543+
[workQueue enqueueWorkItem:workItemLast description:@"last work item"];
544+
545+
[self waitForExpectations:@[ first3WorkItemsExecutedExpectation ] timeout:2];
546+
// the before-sleep counter should have reached 3 immediately as they all run concurrently.
547+
os_unfair_lock_lock(&counterLock);
548+
XCTAssertEqual(afterSleepCounter, 0);
549+
os_unfair_lock_unlock(&counterLock);
550+
551+
[self waitForExpectations:@[ lastWorkItemWaitedExpectation, first3WorkItemsSleptExpectation ] timeout:2];
552+
553+
// see that all 3 first items ran and slept
554+
os_unfair_lock_lock(&counterLock);
555+
XCTAssertEqual(beforeSleepCounter, 3);
556+
XCTAssertEqual(afterSleepCounter, 3);
557+
os_unfair_lock_unlock(&counterLock);
558+
}
559+
494560
@end

0 commit comments

Comments
 (0)