Skip to content

Commit

Permalink
Merge pull request #28 from renestein/iss-27_Default_Scheduler_Lifecycle
Browse files Browse the repository at this point in the history
iss #27: Default scheduler does not require call of the Stop method when app exits
  • Loading branch information
renestein authored Feb 19, 2021
2 parents e5a148d + 76c7f2f commit f3b5027
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 17 deletions.
2 changes: 1 addition & 1 deletion RStein.AsyncCpp.Test/RStein.AsyncCpp.Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Environment : public ::testing::Environment {

void TearDown() override
{
RStein::AsyncCpp::Schedulers::Scheduler::StopDefaultScheduler();
//RStein::AsyncCpp::Schedulers::Scheduler::StopDefaultScheduler();
}
};

Expand Down
21 changes: 12 additions & 9 deletions RStein.AsyncCpp/Schedulers/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,29 @@ namespace RStein::AsyncCpp::Schedulers

Scheduler::~Scheduler() = default;

thread_local Scheduler::SchedulerPtr Scheduler::_currentScheduler = SchedulerPtr{};
std::unique_ptr<SimpleThreadPool> Scheduler::_defaultSchedulerThreadPool{};
thread_local Scheduler::SchedulerPtr Scheduler::_currentScheduler{};
Scheduler::SchedulerPtr Scheduler::_defaultScheduler{};
once_flag Scheduler::_initSchedulerOnceFlag{};

Scheduler::SchedulerPtr Scheduler::initDefaultScheduler()
void Scheduler::initDefaultScheduler()
{
//TODO: Better ThreadPool
static unsigned int MIN_THREADS = 8;
static unsigned int HW_THREADS = std::thread::hardware_concurrency() * 2;
const unsigned int THREADS_COUNT = max(MIN_THREADS, HW_THREADS);

static SimpleThreadPool threadPool{THREADS_COUNT};
static SchedulerPtr defaultScheduler = std::make_shared<ThreadPoolScheduler>(threadPool);
defaultScheduler->Start();
return defaultScheduler;
_defaultSchedulerThreadPool = make_unique<SimpleThreadPool>(THREADS_COUNT);
_defaultSchedulerThreadPool->Start();
_defaultScheduler = std::make_shared<ThreadPoolScheduler>(*_defaultSchedulerThreadPool);
_defaultScheduler->Start();
}

//TODO: Change Create/Start/Stop of the default scheduler
Scheduler::SchedulerPtr Scheduler::DefaultScheduler()
{
static SchedulerPtr defaultScheduler = initDefaultScheduler();
return defaultScheduler;
call_once(_initSchedulerOnceFlag, &initDefaultScheduler);
assert(_defaultScheduler);
return _defaultScheduler;
}

void Scheduler::StopDefaultScheduler()
Expand Down
9 changes: 6 additions & 3 deletions RStein.AsyncCpp/Schedulers/Scheduler.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "../Utils/FinallyBlock.h"

#if defined(__clang__)
#include "../ClangWinSpecific/Coroutine.h"
#elif defined(__cpp_impl_coroutine)
Expand All @@ -11,10 +10,11 @@
#endif
#include <memory>
#include <functional>
#include <xcall_once.h>

namespace RStein::AsyncCpp::Schedulers
{

class SimpleThreadPool;

class Scheduler : public std::enable_shared_from_this<Scheduler>
{
Expand Down Expand Up @@ -53,8 +53,11 @@ class Scheduler : public std::enable_shared_from_this<Scheduler>
protected:
virtual void OnEnqueueItem(std::function<void()>&& originalFunction) = 0;
private:
static std::unique_ptr<SimpleThreadPool> _defaultSchedulerThreadPool;
static thread_local SchedulerPtr _currentScheduler;
static SchedulerPtr initDefaultScheduler();
static SchedulerPtr _defaultScheduler;
static std::once_flag _initSchedulerOnceFlag;
static void initDefaultScheduler();
};

template <typename TFunc>
Expand Down
10 changes: 9 additions & 1 deletion RStein.AsyncCpp/Schedulers/SimpleThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@ namespace RStein::AsyncCpp::Schedulers

SimpleThreadPool::~SimpleThreadPool()
{
if (_threadPoolState != ThreadPoolState::Stopped)
if (_threadPoolState == ThreadPoolState::Started)
{
/*throwInvalidThreadPoolState();*/
//Log invalid life cycle
try
{
Stop();
}
catch(...)
{

}
}
}

Expand Down
19 changes: 17 additions & 2 deletions RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@ namespace RStein::AsyncCpp::Schedulers
}


ThreadPoolScheduler::~ThreadPoolScheduler() = default;
ThreadPoolScheduler::~ThreadPoolScheduler()
{
try
{
stopThreadPool();
}
catch(...)
{

}
};

void ThreadPoolScheduler::Start()
{
Expand All @@ -18,14 +28,19 @@ namespace RStein::AsyncCpp::Schedulers
}
}

void ThreadPoolScheduler::Stop()
void ThreadPoolScheduler::stopThreadPool() const
{
if (_threadPool.GetThreadPoolState() != SimpleThreadPool::ThreadPoolState::Stopped)
{
_threadPool.Stop();
}
}

void ThreadPoolScheduler::Stop()
{
stopThreadPool();
}

void ThreadPoolScheduler::OnEnqueueItem(std::function<void()>&& originalFunction)
{
_threadPool.EnqueueItem(move(originalFunction));
Expand Down
1 change: 1 addition & 0 deletions RStein.AsyncCpp/Schedulers/ThreadPoolScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace RStein::AsyncCpp::Schedulers
ThreadPoolScheduler& operator=(ThreadPoolScheduler&& other) = delete;

void Start() override;
void stopThreadPool() const;
void Stop() override;

bool IsMethodInvocationSerialized() const override;
Expand Down
2 changes: 1 addition & 1 deletion Samples/Samples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ int main()
{
Samples::MapReduceActors::MapReduceActorRunner mapReduceActorRunner;
mapReduceActorRunner.Run();
RStein::AsyncCpp::Schedulers::Scheduler::StopDefaultScheduler();
//RStein::AsyncCpp::Schedulers::Scheduler::StopDefaultScheduler();
}

// Run program: Ctrl + F5 or Debug > Start Without Debugging menu
Expand Down

0 comments on commit f3b5027

Please sign in to comment.