2 #include <tasking/ThreadRunnable.h>
4 #include <parallelism/Barrier.h>
10 #include <mpl/IsSame.h>
11 using::mel::mpl::isSame;
13 #include <mpl/ParamAdder.h>
14 using mel::mpl::addParam;
18 #include <tasking/GenericProcessDefs.h>
40 enum class SchedulingPolicy {
45 constexpr
static int THREADS_USE_ALL_CORES = ::std::numeric_limits<int>::max();
46 constexpr
static uint64_t THREAD_AFFINITY_ALL = -1;
49 int nThreads = THREADS_USE_ALL_CORES;
50 uint64_t affinity = THREAD_AFFINITY_ALL;
51 bool forceAffinitty =
false;
52 Runnable::RunnableCreationOptions threadOpts;
58 inline size_t getNumThreads()
const{
return mNThreads;}
60 bool useCallingThread =
false;
61 bool groupTasks =
true;
62 SchedulingPolicy schedPolicy = SchedulingPolicy::SP_ROUNDROBIN;
63 size_t threadIndex = 0;
65 template <
class TArg,
class ... FTypes>
void execute(
const ExecutionOpts& opts,
Barrier& barrier,TArg&& arg,std::exception_ptr& except,FTypes ... functions)
67 constexpr
int nTasks =
sizeof...(functions);
68 _execute(opts,except,barrier,std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
71 template <
class ... FTypes>
void execute(
const ExecutionOpts& opts,
Barrier& barrier,std::exception_ptr& except,FTypes ... functions)
73 constexpr
int nTasks =
sizeof...(functions);
74 _execute_void(opts,except,barrier,std::forward<FTypes>(functions)...);
76 template <
class TArg,
class ... FTypes>
Barrier execute(
const ExecutionOpts& opts,TArg&& arg,std::exception_ptr& except, FTypes ... functions)
78 constexpr
int nTasks =
sizeof...(functions);
80 _execute(opts,except,result,std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
84 template <
class ... FTypes>
Barrier execute(
const ExecutionOpts& opts,std::exception_ptr& except,FTypes ... functions)
86 constexpr
int nTasks =
sizeof...(functions);
88 _execute_void(opts,except,result,std::forward<FTypes>(functions)...);
94 template <
class ReturnTuple,
class TArg,
class ... FTypes>
void executeWithResult(
const ExecutionOpts& opts,
Barrier& barrier,ReturnTuple& output,TArg&& arg,std::exception_ptr& except, FTypes ... functions)
96 constexpr
int nTasks =
sizeof...(functions);
97 _executeWithResult<0,ReturnTuple>(opts, except,barrier, output,std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
99 template <
class ReturnTuple,
class TArg,
class ... FTypes>
Barrier executeWithResult(
const ExecutionOpts& opts,ReturnTuple& output, TArg&& arg,std::exception_ptr& except,FTypes ... functions)
101 constexpr
int nTasks =
sizeof...(functions);
103 _executeWithResult<0,ReturnTuple>(opts,except,result,output,std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
107 template <
class ReturnTuple,
class ... FTypes>
Barrier executeWithResult(
const ExecutionOpts& opts,ReturnTuple& output,std::exception_ptr& except, FTypes ... functions)
109 constexpr
int nTasks =
sizeof...(functions);
111 _executeWithResult_void<0,ReturnTuple>(opts,except,result,output,std::forward<FTypes>(functions)...);
121 std::shared_ptr<ThreadRunnable>* mPool;
122 unsigned int mNThreads;
123 volatile int mLastIndex;
124 std::mutex mExceptionLock;
131 template <
class F,
class TArg,
class ... FTypes>
void _execute(
const ExecutionOpts& opts,std::exception_ptr& except,
Barrier& output,TArg&& arg, F&& func,FTypes&&... functions)
151 _execute(opts,except, output,std::forward<TArg>(arg), std::forward<F>(func));
152 _execute(opts,except, output,std::forward<TArg>(arg), std::forward<FTypes>(functions)...);
155 template <
class F,
class TArg>
void _execute(
const ExecutionOpts& opts,std::exception_ptr& except,
Barrier& output,TArg&& arg, F&& func)
157 static_assert( std::is_invocable<F,TArg>::value,
"ThreadPool::_execute bad functor signature");
158 if ( opts.useCallingThread || mNThreads == 0 )
160 if constexpr (std::is_nothrow_invocable<F,TArg>::value)
162 func(std::forward<TArg>(arg));
167 func(std::forward<TArg>(arg));
170 std::scoped_lock<std::mutex> lck(mExceptionLock);
172 except = std::current_exception();
179 mLastIndex = _chooseIndex(opts);
182 if constexpr (std::is_nothrow_invocable<F,TArg>::value)
184 mPool[mLastIndex]->post(
187 func(std::forward<TArg>(arg));
194 mPool[mLastIndex]->post(
199 func(std::forward<TArg>(arg));
202 std::scoped_lock<std::mutex> lck(mExceptionLock);
204 except = std::current_exception();
214 template <
class F,
class ... FTypes>
void _execute_void(
const ExecutionOpts& opts,std::exception_ptr& except,
Barrier& output, F&& func,FTypes&&... functions)
234 _execute_void(opts,except, output, std::forward<F>(func));
235 _execute_void(opts,except, output, std::forward<FTypes>(functions)...);
238 template <
class F>
void _execute_void(
const ExecutionOpts& opts,std::exception_ptr& except,
Barrier& output, F&& func)
240 if ( opts.useCallingThread || mNThreads == 0 )
242 if constexpr (std::is_nothrow_invocable<F>::value)
252 std::scoped_lock<std::mutex> lck(mExceptionLock);
254 except = std::current_exception();
261 mLastIndex = _chooseIndex(opts);
264 if constexpr (std::is_nothrow_invocable<F>::value)
266 mPool[mLastIndex]->post(
276 mPool[mLastIndex]->post(
284 std::scoped_lock<std::mutex> lck(mExceptionLock);
286 except = std::current_exception();
295 template <
int n,
class ReturnTuple,
class F,
class TArg,
class ... FTypes>
void _executeWithResult(
const ExecutionOpts& opts,std::exception_ptr& except,
Barrier& output,ReturnTuple& result, TArg&& arg,F&& func,FTypes&&... functions)
297 _executeWithResult<n,ReturnTuple>(opts,except, output,result, std::forward<TArg>(arg),std::forward<F>(func));
298 _executeWithResult<n+1,ReturnTuple>(opts,except, output,result, std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
301 template <
int n,
class ReturnTuple,
class F,
class TArg>
void _executeWithResult(
const ExecutionOpts& opts,std::exception_ptr& except,
Barrier& output,ReturnTuple& result, TArg&& arg,F&& func)
303 static_assert( std::is_invocable<F,TArg>::value,
"ThreadPool::_executeWithResult bad fnuctor signature");
304 if ( opts.useCallingThread || mNThreads == 0 )
306 if constexpr (std::is_nothrow_invocable<F,TArg>::value)
308 if constexpr (std::is_same< std::invoke_result_t<F,TArg>,
void >::value)
309 func(std::forward<TArg>(arg));
311 std::get<n>(result) = func(std::forward<TArg>(arg));
316 if constexpr (std::is_same< std::invoke_result_t<F,TArg>,
void >::value)
317 func(std::forward<TArg>(arg));
319 std::get<n>(result) = func(std::forward<TArg>(arg));
322 std::scoped_lock<std::mutex> lck(mExceptionLock);
324 except = std::current_exception();
331 mLastIndex = _chooseIndex(opts);
332 if constexpr (std::is_nothrow_invocable<F,TArg>::value)
334 mPool[mLastIndex]->post(
337 if constexpr (std::is_same< std::invoke_result_t<F,TArg>,
void >::value)
338 func(std::forward<TArg>(arg));
340 std::get<n>(result) = func(std::forward<TArg>(arg));
347 mPool[mLastIndex]->post(
352 if constexpr (std::is_same< std::invoke_result_t<F,TArg>,
void >::value)
353 func(std::forward<TArg>(arg));
355 std::get<n>(result) = func(std::forward<TArg>(arg));
358 std::scoped_lock<std::mutex> lck(mExceptionLock);
360 except = std::current_exception();
371 template <
int n,
class ReturnTuple,
class F,
class ... FTypes>
void _executeWithResult_void(
const ExecutionOpts& opts,std::exception_ptr& except,
Barrier& output,ReturnTuple& result, F&& func,FTypes&&... functions)
373 _executeWithResult_void<n,ReturnTuple>(opts,except, output,result, std::forward<F>(func));
374 _executeWithResult_void<n+1,ReturnTuple>(opts,except, output,result, std::forward<FTypes>(functions)...);
377 template <
int n,
class ReturnTuple,
class F>
void _executeWithResult_void(
const ExecutionOpts& opts,std::exception_ptr& except,
Barrier& output,ReturnTuple& result, F&& func)
379 if ( opts.useCallingThread || mNThreads == 0 )
381 if constexpr (std::is_nothrow_invocable<F>::value)
383 if constexpr (std::is_same< std::invoke_result_t<F>,
void >::value)
386 std::get<n>(result) = func();
391 if constexpr (std::is_same< std::invoke_result_t<F>,
void >::value)
394 std::get<n>(result) = func();
397 std::scoped_lock<std::mutex> lck(mExceptionLock);
399 except = std::current_exception();
406 mLastIndex = _chooseIndex(opts);
407 if constexpr (std::is_nothrow_invocable<F>::value)
409 mPool[mLastIndex]->post(
412 if constexpr (std::is_same< std::invoke_result_t<F>,
void >::value)
415 std::get<n>(result) = func();
422 mPool[mLastIndex]->post(
427 std::get<n>(result) = func();
430 std::scoped_lock<std::mutex> lck(mExceptionLock);
432 except = std::current_exception();
442 size_t _chooseIndex(
const ExecutionOpts& sp);
Multithread barrier.
Definition: Barrier.h:43
void set()
Definition: Barrier.h:59
Pool of threads allowing parallel execution.
Definition: ThreadPool.h:38
std::shared_ptr< ThreadRunnable > selectThread(const ExecutionOpts &opts)
select thread for execution based on given opts
void executeWithResult(const ExecutionOpts &opts, Barrier &barrier, ReturnTuple &output, TArg &&arg, std::exception_ptr &except, FTypes ... functions)
execute given functions and return result in the tuple
Definition: ThreadPool.h:94
A periodic task, implementing a microthread.
Definition: Process.h:107
Thread with Runnable behaviour.
Definition: ThreadRunnable.h:40
EGenericProcessResult
Result from functor used in a GenericProcess.
Definition: GenericProcessDefs.h:14
Definition: Callback_Impl.h:11
Definition: ThreadPool.h:59
Definition: ThreadPool.h:48