18
18
#include " openvino/itt.hpp"
19
19
#include " openvino/runtime/system_conf.hpp"
20
20
#include " openvino/runtime/threading/cpu_streams_executor_internal.hpp"
21
+ #include " openvino/runtime/threading/cpu_streams_info.hpp"
21
22
#include " openvino/runtime/threading/executor_manager.hpp"
22
23
#include " openvino/runtime/threading/thread_local.hpp"
23
24
@@ -58,6 +59,11 @@ struct CPUStreamsExecutor::Impl {
58
59
_streamId = _impl->_streamIdQueue .front ();
59
60
_impl->_streamIdQueue .pop ();
60
61
}
62
+ if (!_impl->_subStreamIdQueue .empty () && _impl->_subStreamsNum < _impl->_config .get_sub_streams ()) {
63
+ _sub_stream_id = _impl->_subStreamIdQueue .front ();
64
+ _impl->_subStreamIdQueue .pop ();
65
+ _impl->_subStreamsNum ++;
66
+ }
61
67
}
62
68
_numaNodeId =
63
69
_impl->_config .get_streams ()
@@ -144,9 +150,8 @@ struct CPUStreamsExecutor::Impl {
144
150
.set_max_threads_per_core (max_threads_per_core)});
145
151
} else {
146
152
_taskArena.reset (new custom::task_arena{concurrency});
147
- _cpu_ids = static_cast <int >(stream_processors.size ()) == _impl->_config .get_streams ()
148
- ? stream_processors[stream_id]
149
- : _cpu_ids;
153
+ _cpu_ids =
154
+ stream_id < static_cast <int >(stream_processors.size ()) ? stream_processors[stream_id] : _cpu_ids;
150
155
if (_cpu_ids.size () > 0 ) {
151
156
CpuSet processMask;
152
157
int ncpus = 0 ;
@@ -166,7 +171,8 @@ struct CPUStreamsExecutor::Impl {
166
171
StreamCreateType stream_type;
167
172
const auto org_proc_type_table = get_org_proc_type_table ();
168
173
int streams_num = _impl->_config .get_streams ();
169
- const auto stream_id = streams_num == 0 ? 0 : _streamId % streams_num;
174
+ const auto stream_id =
175
+ streams_num == 0 ? 0 : (_sub_stream_id >= 0 ? streams_num + _sub_stream_id : _streamId % streams_num);
170
176
get_cur_stream_info (stream_id,
171
177
_impl->_config .get_cpu_reservation (),
172
178
org_proc_type_table,
@@ -193,6 +199,7 @@ struct CPUStreamsExecutor::Impl {
193
199
int _numaNodeId = 0 ;
194
200
int _socketId = 0 ;
195
201
bool _execute = false ;
202
+ int _sub_stream_id = -1 ;
196
203
std::queue<Task> _taskQueue;
197
204
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
198
205
std::unique_ptr<custom::task_arena> _taskArena;
@@ -314,13 +321,17 @@ struct CPUStreamsExecutor::Impl {
314
321
_exectorMgr = executor_manager ();
315
322
auto numaNodes = get_available_numa_nodes ();
316
323
int streams_num = _config.get_streams ();
324
+ int sub_streams_num = _config.get_sub_streams ();
317
325
if (streams_num != 0 ) {
318
326
std::copy_n (std::begin (numaNodes),
319
327
std::min<std::size_t >(streams_num, numaNodes.size ()),
320
328
std::back_inserter (_usedNumaNodes));
321
329
} else {
322
330
_usedNumaNodes = numaNodes;
323
331
}
332
+ if (sub_streams_num > 0 ) {
333
+ _subTaskThread.assign (sub_streams_num, std::make_shared<SubQueue>());
334
+ }
324
335
for (auto streamId = 0 ; streamId < streams_num; ++streamId) {
325
336
_threads.emplace_back ([this , streamId] {
326
337
openvino::itt::threadName (_config.get_name () + " _" + std::to_string (streamId));
@@ -343,6 +354,31 @@ struct CPUStreamsExecutor::Impl {
343
354
});
344
355
}
345
356
_streams.set_thread_ids_map (_threads);
357
+
358
+ for (auto subId = 0 ; subId < sub_streams_num; ++subId) {
359
+ _subThreads.emplace_back ([this , subId, sub_streams_num] {
360
+ openvino::itt::threadName (_config.get_name () + " _subthreads" + " _" + std::to_string (subId));
361
+ for (bool stopped = false ; !stopped;) {
362
+ Task task;
363
+ { _subTaskThread[subId]->que_pop (task, stopped); }
364
+ if (task) {
365
+ {
366
+ std::lock_guard<std::mutex> lock{_streamIdMutex};
367
+ if (_subStreamsNum < sub_streams_num) {
368
+ _subStreamIdQueue.push (subId);
369
+ } else {
370
+ std::queue<int > empty;
371
+ std::swap (_subStreamIdQueue, empty);
372
+ }
373
+ }
374
+ Execute (task, *(_streams.local ()));
375
+ }
376
+ }
377
+ });
378
+ }
379
+ if (_subThreads.size () > 0 ) {
380
+ _streams.set_thread_ids_map (_subThreads);
381
+ }
346
382
}
347
383
348
384
void Enqueue (Task task) {
@@ -353,6 +389,10 @@ struct CPUStreamsExecutor::Impl {
353
389
_queueCondVar.notify_one ();
354
390
}
355
391
392
+ void Enqueue_sub (Task task, int id) {
393
+ _subTaskThread[id]->que_push (std::move (task));
394
+ }
395
+
356
396
void Execute (const Task& task, Stream& stream) {
357
397
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
358
398
auto & arena = stream._taskArena ;
@@ -382,15 +422,49 @@ struct CPUStreamsExecutor::Impl {
382
422
}
383
423
}
384
424
425
+ struct SubQueue {
426
+ std::mutex _subMutex;
427
+ std::condition_variable _subQueueCondVar;
428
+ bool _isSubStopped = false ;
429
+ std::queue<Task> _subTaskQueue;
430
+
431
+ SubQueue () {}
432
+
433
+ void que_push (Task task) {
434
+ {
435
+ std::lock_guard<std::mutex> lock (_subMutex);
436
+ _subTaskQueue.emplace (std::move (task));
437
+ }
438
+ _subQueueCondVar.notify_one ();
439
+ }
440
+
441
+ void que_pop (Task& task, bool & stopped) {
442
+ std::unique_lock<std::mutex> lock (_subMutex);
443
+ _subQueueCondVar.wait (lock, [&] {
444
+ return !_subTaskQueue.empty () || (stopped = _isSubStopped);
445
+ });
446
+ if (!_subTaskQueue.empty ()) {
447
+ task = std::move (_subTaskQueue.front ());
448
+ _subTaskQueue.pop ();
449
+ }
450
+ }
451
+
452
+ ~SubQueue () {}
453
+ };
454
+
385
455
Config _config;
386
456
std::mutex _streamIdMutex;
387
457
int _streamId = 0 ;
388
458
std::queue<int > _streamIdQueue;
459
+ std::queue<int > _subStreamIdQueue;
460
+ int _subStreamsNum = 0 ;
389
461
std::vector<std::thread> _threads;
462
+ std::vector<std::thread> _subThreads;
390
463
std::mutex _mutex;
391
464
std::condition_variable _queueCondVar;
392
465
std::queue<Task> _taskQueue;
393
466
bool _isStopped = false ;
467
+ std::vector<std::shared_ptr<SubQueue>> _subTaskThread;
394
468
std::vector<int > _usedNumaNodes;
395
469
CustomThreadLocal _streams;
396
470
std::shared_ptr<ExecutorManager> _exectorMgr;
@@ -424,6 +498,18 @@ CPUStreamsExecutor::~CPUStreamsExecutor() {
424
498
thread.join ();
425
499
}
426
500
}
501
+ for (size_t i = 0 ; i < _impl->_subTaskThread .size (); i++) {
502
+ {
503
+ std::lock_guard<std::mutex> lock (_impl->_subTaskThread [i]->_subMutex );
504
+ _impl->_subTaskThread [i]->_isSubStopped = true ;
505
+ }
506
+ _impl->_subTaskThread [i]->_subQueueCondVar .notify_all ();
507
+ }
508
+ for (auto & thread : _impl->_subThreads ) {
509
+ if (thread.joinable ()) {
510
+ thread.join ();
511
+ }
512
+ }
427
513
}
428
514
429
515
void CPUStreamsExecutor::execute (Task task) {
@@ -438,5 +524,9 @@ void CPUStreamsExecutor::run(Task task) {
438
524
}
439
525
}
440
526
527
+ void CPUStreamsExecutor::run_sub_stream (Task task, int id) {
528
+ _impl->Enqueue_sub (std::move (task), id);
529
+ }
530
+
441
531
} // namespace threading
442
532
} // namespace ov
0 commit comments