diff --git a/RStein.AsyncCpp.Test/RStein.AsyncCpp.Test.cpp b/RStein.AsyncCpp.Test/RStein.AsyncCpp.Test.cpp index 2232e56..2d6512c 100644 --- a/RStein.AsyncCpp.Test/RStein.AsyncCpp.Test.cpp +++ b/RStein.AsyncCpp.Test/RStein.AsyncCpp.Test.cpp @@ -11,7 +11,7 @@ class Environment : public ::testing::Environment { void TearDown() override { - RStein::AsyncCpp::Schedulers::Scheduler::StopDefaultScheduler(); + //RStein::AsyncCpp::Schedulers::Scheduler::StopDefaultScheduler(); } }; diff --git a/RStein.AsyncCpp/Schedulers/Scheduler.cpp b/RStein.AsyncCpp/Schedulers/Scheduler.cpp index c9a64b3..2d814cd 100644 --- a/RStein.AsyncCpp/Schedulers/Scheduler.cpp +++ b/RStein.AsyncCpp/Schedulers/Scheduler.cpp @@ -26,26 +26,29 @@ namespace RStein::AsyncCpp::Schedulers Scheduler::~Scheduler() = default; - thread_local Scheduler::SchedulerPtr Scheduler::_currentScheduler = SchedulerPtr{}; + std::unique_ptr Scheduler::_defaultSchedulerThreadPool{}; + thread_local Scheduler::SchedulerPtr Scheduler::_currentScheduler{}; + Scheduler::SchedulerPtr Scheduler::_defaultScheduler{}; + once_flag Scheduler::_initSchedulerOnceFlag{}; - Scheduler::SchedulerPtr Scheduler::initDefaultScheduler() + void Scheduler::initDefaultScheduler() { //TODO: Better ThreadPool static unsigned int MIN_THREADS = 8; static unsigned int HW_THREADS = std::thread::hardware_concurrency() * 2; const unsigned int THREADS_COUNT = max(MIN_THREADS, HW_THREADS); - static SimpleThreadPool threadPool{THREADS_COUNT}; - static SchedulerPtr defaultScheduler = std::make_shared(threadPool); - defaultScheduler->Start(); - return defaultScheduler; + _defaultSchedulerThreadPool = make_unique(THREADS_COUNT); + _defaultSchedulerThreadPool->Start(); + _defaultScheduler = std::make_shared(*_defaultSchedulerThreadPool); + _defaultScheduler->Start(); } - //TODO: Change Create/Start/Stop of the default scheduler Scheduler::SchedulerPtr Scheduler::DefaultScheduler() { - static SchedulerPtr defaultScheduler = initDefaultScheduler(); - return defaultScheduler; + call_once(_initSchedulerOnceFlag, &initDefaultScheduler); + assert(_defaultScheduler); + return _defaultScheduler; } void Scheduler::StopDefaultScheduler() diff --git a/RStein.AsyncCpp/Schedulers/Scheduler.h b/RStein.AsyncCpp/Schedulers/Scheduler.h index 832103a..7758802 100644 --- a/RStein.AsyncCpp/Schedulers/Scheduler.h +++ b/RStein.AsyncCpp/Schedulers/Scheduler.h @@ -1,7 +1,6 @@ #pragma once #include "../Utils/FinallyBlock.h" - #if defined(__clang__) #include "../ClangWinSpecific/Coroutine.h" #elif defined(__cpp_impl_coroutine) @@ -11,10 +10,11 @@ #endif #include #include +#include namespace RStein::AsyncCpp::Schedulers { - + class SimpleThreadPool; class Scheduler : public std::enable_shared_from_this { @@ -53,8 +53,11 @@ class Scheduler : public std::enable_shared_from_this protected: virtual void OnEnqueueItem(std::function&& originalFunction) = 0; private: + static std::unique_ptr _defaultSchedulerThreadPool; static thread_local SchedulerPtr _currentScheduler; - static SchedulerPtr initDefaultScheduler(); + static SchedulerPtr _defaultScheduler; + static std::once_flag _initSchedulerOnceFlag; + static void initDefaultScheduler(); }; template diff --git a/RStein.AsyncCpp/Schedulers/SimpleThreadPool.cpp b/RStein.AsyncCpp/Schedulers/SimpleThreadPool.cpp index 167232b..35ff2de 100644 --- a/RStein.AsyncCpp/Schedulers/SimpleThreadPool.cpp +++ b/RStein.AsyncCpp/Schedulers/SimpleThreadPool.cpp @@ -27,10 +27,18 @@ namespace RStein::AsyncCpp::Schedulers SimpleThreadPool::~SimpleThreadPool() { - if (_threadPoolState != ThreadPoolState::Stopped) + if (_threadPoolState == ThreadPoolState::Started) { /*throwInvalidThreadPoolState();*/ //Log invalid life cycle + try + { + Stop(); + } + catch(...) + { + + } } } diff --git a/RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.cpp b/RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.cpp index 0a4d1bd..c569ab4 100644 --- a/RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.cpp +++ b/RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.cpp @@ -8,7 +8,17 @@ namespace RStein::AsyncCpp::Schedulers } - ThreadPoolScheduler::~ThreadPoolScheduler() = default; + ThreadPoolScheduler::~ThreadPoolScheduler() + { + try + { + stopThreadPool(); + } + catch(...) + { + + } + }; void ThreadPoolScheduler::Start() { @@ -18,7 +28,7 @@ namespace RStein::AsyncCpp::Schedulers } } - void ThreadPoolScheduler::Stop() + void ThreadPoolScheduler::stopThreadPool() const { if (_threadPool.GetThreadPoolState() != SimpleThreadPool::ThreadPoolState::Stopped) { @@ -26,6 +36,11 @@ namespace RStein::AsyncCpp::Schedulers } } + void ThreadPoolScheduler::Stop() + { + stopThreadPool(); + } + void ThreadPoolScheduler::OnEnqueueItem(std::function&& originalFunction) { _threadPool.EnqueueItem(move(originalFunction)); diff --git a/RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.h b/RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.h index 924d2a4..c214a08 100644 --- a/RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.h +++ b/RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.h @@ -20,6 +20,7 @@ namespace RStein::AsyncCpp::Schedulers ThreadPoolScheduler& operator=(ThreadPoolScheduler&& other) = delete; void Start() override; + void stopThreadPool() const; void Stop() override; bool IsMethodInvocationSerialized() const override; diff --git a/Samples/Samples.cpp b/Samples/Samples.cpp index 49000f1..8c98ea7 100644 --- a/Samples/Samples.cpp +++ b/Samples/Samples.cpp @@ -10,7 +10,7 @@ int main() { Samples::MapReduceActors::MapReduceActorRunner mapReduceActorRunner; mapReduceActorRunner.Run(); - RStein::AsyncCpp::Schedulers::Scheduler::StopDefaultScheduler(); + //RStein::AsyncCpp::Schedulers::Scheduler::StopDefaultScheduler(); } // Run program: Ctrl + F5 or Debug > Start Without Debugging menu