@@ -197,7 +197,8 @@ @implementation MTRAsyncWorkQueue {
197
197
os_unfair_lock _lock;
198
198
__weak id _context;
199
199
NSMutableArray <MTRAsyncWorkItem *> * _items;
200
- NSInteger _runningWorkItemCount;
200
+ NSUInteger _runningWorkItemCount;
201
+ NSUInteger _width;
201
202
}
202
203
203
204
// A helper struct that facilitates access to _context while
@@ -216,11 +217,17 @@ @implementation MTRAsyncWorkQueue {
216
217
};
217
218
218
219
- (instancetype )initWithContext : (id )context
220
+ {
221
+ return [self initWithContext: context width: 1 ];
222
+ }
223
+
224
+ - (instancetype )initWithContext : (id )context width : (NSUInteger )width
219
225
{
220
226
NSParameterAssert (context);
221
227
if (self = [super init ]) {
222
228
_context = context;
223
229
_items = [NSMutableArray array ];
230
+ _width = width;
224
231
}
225
232
return self;
226
233
}
@@ -286,35 +293,84 @@ - (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem
286
293
{
287
294
os_unfair_lock_assert_owner (&_lock);
288
295
289
- MTRAsyncWorkItem * runningWorkItem = (_runningWorkItemCount) ? _items.firstObject : nil ;
290
- if (workItem != runningWorkItem) {
296
+ BOOL foundWorkItem = NO ;
297
+ NSUInteger indexOfWorkItem = 0 ;
298
+ for (NSUInteger i = 0 ; i < _width; i++) {
299
+ if (_items[i] == workItem) {
300
+ foundWorkItem = YES ;
301
+ indexOfWorkItem = i;
302
+ break ;
303
+ }
304
+ }
305
+ if (!foundWorkItem) {
291
306
NSAssert (NO , @" work item to post-process is not running" );
292
307
return ;
293
308
}
294
309
310
+ // already part of the running work items allowed by width - retry directly
295
311
if (retry) {
296
312
MTR_LOG_DEFAULT (" MTRAsyncWorkQueue<%@> retry needed for work item [%llu]" , context.description , workItem.uniqueID );
297
- } else {
298
- [workItem markComplete ];
299
- [_items removeObjectAtIndex: 0 ];
300
- MTR_LOG_DEFAULT (" MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]" , context.description , _items.count , workItem.uniqueID );
313
+ [self _callWorkItem: workItem withContext: context];
314
+ return ;
301
315
}
302
316
303
- // when "concurrency width" is implemented this will be decremented instead
304
- _runningWorkItemCount = 0 ;
317
+ [workItem markComplete ];
318
+ [_items removeObjectAtIndex: indexOfWorkItem];
319
+ MTR_LOG_DEFAULT (" MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]" , context.description , _items.count , workItem.uniqueID );
320
+
321
+ // sanity check running work item count is positive
322
+ if (_runningWorkItemCount == 0 ) {
323
+ NSAssert (NO , @" running work item count should be positive" );
324
+ return ;
325
+ }
326
+
327
+ _runningWorkItemCount--;
305
328
[self _callNextReadyWorkItemWithContext: context];
306
329
}
307
330
331
+ - (void )_callWorkItem : (MTRAsyncWorkItem *)workItem withContext : (ContextSnapshot const &)context
332
+ {
333
+ os_unfair_lock_assert_owner (&_lock);
334
+
335
+ mtr_weakify (self);
336
+ [workItem callReadyHandlerWithContext: context.reference completion: ^(MTRAsyncWorkOutcome outcome) {
337
+ mtr_strongify (self);
338
+ BOOL handled = NO ;
339
+ if (self) {
340
+ ContextSnapshot context (self); // re-acquire a new snapshot
341
+ std::lock_guard lock (self->_lock );
342
+ if (!workItem.isComplete ) {
343
+ [self _postProcessWorkItem: workItem context: context retry: (outcome == MTRAsyncWorkNeedsRetry)];
344
+ handled = YES ;
345
+ }
346
+ }
347
+ return handled;
348
+ }];
349
+ }
350
+
308
351
- (void )_callNextReadyWorkItemWithContext : (ContextSnapshot const &)context
309
352
{
310
353
os_unfair_lock_assert_owner (&_lock);
311
354
312
- // when "concurrency width" is implemented this will be checked against the width
313
- if (_runningWorkItemCount) {
314
- return ; // can't run next work item until the current one is done
355
+ // sanity check not running more than allowed
356
+ if (_runningWorkItemCount > _width) {
357
+ NSAssert (NO , @" running work item count larger than the maximum width" );
358
+ return ;
315
359
}
316
360
317
- if (!_items.count ) {
361
+ // sanity check consistent counts
362
+ if (_items.count < _runningWorkItemCount) {
363
+ NSAssert (NO , @" work item count is less than running work item count" );
364
+ return ;
365
+ }
366
+
367
+ // can't run more work items if already running at max concurrent width
368
+ if (_runningWorkItemCount == _width) {
369
+ return ;
370
+ }
371
+
372
+ // no more items to run
373
+ if (_items.count == _runningWorkItemCount) {
318
374
return ; // nothing to run
319
375
}
320
376
@@ -324,16 +380,16 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
324
380
return ;
325
381
}
326
382
327
- // when "concurrency width" is implemented this will be incremented instead
328
- _runningWorkItemCount = 1 ;
329
-
330
- MTRAsyncWorkItem * workItem = _items.firstObject ;
383
+ NSUInteger nextWorkItemToRunIndex = _runningWorkItemCount;
384
+ MTRAsyncWorkItem * workItem = _items[nextWorkItemToRunIndex];
385
+ _runningWorkItemCount++;
331
386
332
- // Check if batching is possible or needed. Only ask work item to batch once for simplicity
387
+ // Check if batching is possible or needed.
333
388
auto batchingHandler = workItem.batchingHandler ;
334
- if (batchingHandler && workItem.retryCount == 0 ) {
335
- while (_items.count >= 2 ) {
336
- MTRAsyncWorkItem * nextWorkItem = _items[1 ];
389
+ if (batchingHandler) {
390
+ while (_items.count > _runningWorkItemCount) {
391
+ NSUInteger firstNonRunningItemIndex = _runningWorkItemCount;
392
+ MTRAsyncWorkItem * nextWorkItem = _items[firstNonRunningItemIndex];
337
393
if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID ) {
338
394
goto done; // next item is not eligible to merge with this one
339
395
}
@@ -355,20 +411,7 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
355
411
done:;
356
412
}
357
413
358
- mtr_weakify (self);
359
- [workItem callReadyHandlerWithContext: context.reference completion: ^(MTRAsyncWorkOutcome outcome) {
360
- mtr_strongify (self);
361
- BOOL handled = NO ;
362
- if (self) {
363
- ContextSnapshot context (self); // re-acquire a new snapshot
364
- std::lock_guard lock (self->_lock );
365
- if (!workItem.isComplete ) {
366
- [self _postProcessWorkItem: workItem context: context retry: (outcome == MTRAsyncWorkNeedsRetry)];
367
- handled = YES ;
368
- }
369
- }
370
- return handled;
371
- }];
414
+ [self _callWorkItem: workItem withContext: context];
372
415
}
373
416
374
417
- (BOOL )hasDuplicateForTypeID : (NSUInteger )opaqueDuplicateTypeID workItemData : (id )opaqueWorkItemData
0 commit comments