MEL
Microthread & Execution library
ThreadPool.h
1 #pragma once
2 #include <tasking/ThreadRunnable.h>
4 #include <parallelism/Barrier.h>
6 #include <mpl/Tuple.h>
7 using mel::mpl::Tuple;
8 #include <mpl/_If.h>
9 using::mel::mpl::_if;
10 #include <mpl/IsSame.h>
11 using::mel::mpl::isSame;
12 //#include <mpl/LinkArgs.h>
13 #include <mpl/ParamAdder.h>
14 using mel::mpl::addParam;
15 #include <functional>
16 #undef max
17 #include <limits>
18 #include <tasking/GenericProcessDefs.h>
23 namespace mel
24 {
25  namespace parallelism
26  {
37  class MEL_API ThreadPool
38  {
39  public:
40  enum class SchedulingPolicy {
41  SP_ROUNDROBIN,
42  SP_BESTFIT,
43  SP_EXPLICIT
44  };
45  constexpr static int THREADS_USE_ALL_CORES = ::std::numeric_limits<int>::max();
46  constexpr static uint64_t THREAD_AFFINITY_ALL = -1;
48  {
49  int nThreads = THREADS_USE_ALL_CORES; //number of threads to create or THREADS_USE_ALL_CORES. if negative number, means all available cores minus nThreads (p.e, if 8 available cores and set -2, use 6 cores)
50  uint64_t affinity = THREAD_AFFINITY_ALL; //by default, all cores allowed
51  bool forceAffinitty = false; //force each thread to be in a fixed core
52  Runnable::RunnableCreationOptions threadOpts;
53  };
54  //typedef Callback<void, void> TaskType;
55 
56  ThreadPool( const ThreadPoolOpts& opts );
57  ~ThreadPool();
58  inline size_t getNumThreads() const{return mNThreads;}
59  struct ExecutionOpts {
60  bool useCallingThread = false;
61  bool groupTasks = true; //group task when number of tasks is greater than threads. This way, grouped tasks, are execute one after each
62  SchedulingPolicy schedPolicy = SchedulingPolicy::SP_ROUNDROBIN;
63  size_t threadIndex = 0; //set thread index to use when schedPolicy is SP_EXPLICIT
64  };
65  template <class TArg,class ... FTypes> void execute(const ExecutionOpts& opts, Barrier& barrier,TArg&& arg,std::exception_ptr& except,FTypes ... functions)
66  {
67  constexpr int nTasks = sizeof...(functions);
68  _execute(opts,except,barrier,std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
69  }
70  //void argument overload
71  template <class ... FTypes> void execute(const ExecutionOpts& opts, Barrier& barrier,std::exception_ptr& except,FTypes ... functions)
72  {
73  constexpr int nTasks = sizeof...(functions);
74  _execute_void(opts,except,barrier,std::forward<FTypes>(functions)...);
75  }
76  template <class TArg,class ... FTypes> Barrier execute(const ExecutionOpts& opts,TArg&& arg,std::exception_ptr& except, FTypes ... functions)
77  {
78  constexpr int nTasks = sizeof...(functions);
79  Barrier result(nTasks);
80  _execute(opts,except,result,std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
81  return result;
82  }
83  //void argument overload
84  template <class ... FTypes> Barrier execute(const ExecutionOpts& opts,std::exception_ptr& except,FTypes ... functions)
85  {
86  constexpr int nTasks = sizeof...(functions);
87  Barrier result(nTasks);
88  _execute_void(opts,except,result,std::forward<FTypes>(functions)...);
89  return result;
90  }
94  template <class ReturnTuple,class TArg,class ... FTypes> void executeWithResult(const ExecutionOpts& opts,Barrier& barrier,ReturnTuple& output,TArg&& arg,std::exception_ptr& except, FTypes ... functions)
95  {
96  constexpr int nTasks = sizeof...(functions);
97  _executeWithResult<0,ReturnTuple>(opts, except,barrier, output,std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
98  }
99  template <class ReturnTuple, class TArg,class ... FTypes> Barrier executeWithResult(const ExecutionOpts& opts,ReturnTuple& output, TArg&& arg,std::exception_ptr& except,FTypes ... functions)
100  {
101  constexpr int nTasks = sizeof...(functions);
102  Barrier result(nTasks);
103  _executeWithResult<0,ReturnTuple>(opts,except,result,output,std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
104  return result;
105  }
106  //void argument overload
107  template <class ReturnTuple, class ... FTypes> Barrier executeWithResult(const ExecutionOpts& opts,ReturnTuple& output,std::exception_ptr& except, FTypes ... functions)
108  {
109  constexpr int nTasks = sizeof...(functions);
110  Barrier result(nTasks);
111  _executeWithResult_void<0,ReturnTuple>(opts,except,result,output,std::forward<FTypes>(functions)...);
112  return result;
113  }
118  std::shared_ptr<ThreadRunnable> selectThread(const ExecutionOpts& opts);
119  private:
120  ThreadPoolOpts mOpts;
121  std::shared_ptr<ThreadRunnable>* mPool;
122  unsigned int mNThreads;
123  volatile int mLastIndex; //last thread used
124  std::mutex mExceptionLock; //one lock to protect all exception set.
131  template <class F,class TArg,class ... FTypes> void _execute(const ExecutionOpts& opts,std::exception_ptr& except, Barrier& output,TArg&& arg, F&& func,FTypes&&... functions)
132  {
133  /*
134  //@todo tengo que resolver aqui el tema de no bindear el arg..
135  if (mNThreads != 0)
136  {
137  selectThread(opts)->post(
138  std::function<tasking::EGenericProcessResult (uint64_t, Process*)>([func, output,arg](uint64_t, Process*) mutable
139  {
140  func(arg);
141  output.set();
142  return mel::tasking::EGenericProcessResult::KILL;
143  })
144  );
145  }
146  else // no threads in pool, use calling thread
147  {
148  func(std::forward<TArg>(arg));
149  output.set();
150  }*/
151  _execute(opts,except, output,std::forward<TArg>(arg), std::forward<F>(func));
152  _execute(opts,except, output,std::forward<TArg>(arg), std::forward<FTypes>(functions)...);
153  }
154  //base case
155  template <class F,class TArg> void _execute(const ExecutionOpts& opts,std::exception_ptr& except, Barrier& output,TArg&& arg, F&& func)
156  {
157  static_assert( std::is_invocable<F,TArg>::value, "ThreadPool::_execute bad functor signature");
158  if ( opts.useCallingThread || mNThreads == 0 )
159  {
160  if constexpr (std::is_nothrow_invocable<F,TArg>::value)
161  {
162  func(std::forward<TArg>(arg));
163  }else
164  {
165  try
166  {
167  func(std::forward<TArg>(arg));
168  }catch(...)
169  {
170  std::scoped_lock<std::mutex> lck(mExceptionLock);
171  if ( !except )
172  except = std::current_exception();
173  }
174  }
175  output.set();
176  }
177  else
178  {
179  mLastIndex = _chooseIndex(opts);
180  //@todo tengo que resolver aqui el tema de no bindear el arg..
181  //if constexpr (std::is_nothrow_invocable<F,typename std::remove_reference<TArg>::type>::value)
182  if constexpr (std::is_nothrow_invocable<F,TArg>::value)
183  {
184  mPool[mLastIndex]->post(
185  std::function<tasking::EGenericProcessResult (uint64_t,Process*)>([func = std::forward<F>(func),output,arg](uint64_t, Process*) mutable
186  {
187  func(std::forward<TArg>(arg));
188  output.set();
190  })
191  );
192  }else
193  {
194  mPool[mLastIndex]->post(
195  std::function<tasking::EGenericProcessResult (uint64_t,Process*)>([&except,this,func = std::forward<F>(func),output,arg](uint64_t, Process*) mutable
196  {
197  try
198  {
199  func(std::forward<TArg>(arg));
200  }catch(...)
201  {
202  std::scoped_lock<std::mutex> lck(mExceptionLock);
203  if ( !except )
204  except = std::current_exception();
205  }
206  output.set();
208  })
209  );
210  }
211  }
212  }
213  //void overload
214  template <class F,class ... FTypes> void _execute_void(const ExecutionOpts& opts,std::exception_ptr& except, Barrier& output, F&& func,FTypes&&... functions)
215  {
216  /*
217  //@todo tengo que resolver aqui el tema de no bindear el arg..
218  if (mNThreads != 0)
219  {
220  selectThread(opts)->post(
221  std::function<tasking::EGenericProcessResult (uint64_t, Process*)>([func, output,arg](uint64_t, Process*) mutable
222  {
223  func(arg);
224  output.set();
225  return mel::tasking::EGenericProcessResult::KILL;
226  })
227  );
228  }
229  else // no threads in pool, use calling thread
230  {
231  func(std::forward<TArg>(arg));
232  output.set();
233  }*/
234  _execute_void(opts,except, output, std::forward<F>(func));
235  _execute_void(opts,except, output, std::forward<FTypes>(functions)...);
236  }
237  //base case for void overload
238  template <class F> void _execute_void(const ExecutionOpts& opts,std::exception_ptr& except, Barrier& output, F&& func)
239  {
240  if ( opts.useCallingThread || mNThreads == 0 )
241  {
242  if constexpr (std::is_nothrow_invocable<F>::value)
243  {
244  func();
245  }else
246  {
247  try
248  {
249  func();
250  }catch(...)
251  {
252  std::scoped_lock<std::mutex> lck(mExceptionLock);
253  if ( !except )
254  except = std::current_exception();
255  }
256  }
257  output.set();
258  }
259  else
260  {
261  mLastIndex = _chooseIndex(opts);
262  //@todo tengo que resolver aqui el tema de no bindear el arg..
263  //if constexpr (std::is_nothrow_invocable<F,typename std::remove_reference<TArg>::type>::value)
264  if constexpr (std::is_nothrow_invocable<F>::value)
265  {
266  mPool[mLastIndex]->post(
267  std::function<tasking::EGenericProcessResult (uint64_t,Process*)>([func = std::forward<F>(func),output](uint64_t, Process*) mutable
268  {
269  func();
270  output.set();
272  })
273  );
274  }else
275  {
276  mPool[mLastIndex]->post(
277  std::function<tasking::EGenericProcessResult (uint64_t,Process*)>([&except,this,func = std::forward<F>(func),output](uint64_t, Process*) mutable
278  {
279  try
280  {
281  func();
282  }catch(...)
283  {
284  std::scoped_lock<std::mutex> lck(mExceptionLock);
285  if ( !except )
286  except = std::current_exception();
287  }
288  output.set();
290  })
291  );
292  }
293  }
294  }
295  template <int n,class ReturnTuple,class F,class TArg,class ... FTypes> void _executeWithResult(const ExecutionOpts& opts,std::exception_ptr& except, Barrier& output,ReturnTuple& result, TArg&& arg,F&& func,FTypes&&... functions)
296  {
297  _executeWithResult<n,ReturnTuple>(opts,except, output,result, std::forward<TArg>(arg),std::forward<F>(func));
298  _executeWithResult<n+1,ReturnTuple>(opts,except, output,result, std::forward<TArg>(arg),std::forward<FTypes>(functions)...);
299  }
300  //base case
301  template <int n,class ReturnTuple,class F,class TArg> void _executeWithResult(const ExecutionOpts& opts,std::exception_ptr& except, Barrier& output,ReturnTuple& result, TArg&& arg,F&& func)
302  {
303  static_assert( std::is_invocable<F,TArg>::value, "ThreadPool::_executeWithResult bad fnuctor signature");
304  if ( opts.useCallingThread || mNThreads == 0 )
305  {
306  if constexpr (std::is_nothrow_invocable<F,TArg>::value)
307  {
308  if constexpr (std::is_same< std::invoke_result_t<F,TArg>,void >::value)
309  func(std::forward<TArg>(arg));
310  else
311  std::get<n>(result) = func(std::forward<TArg>(arg));
312  }else
313  {
314  try
315  {
316  if constexpr (std::is_same< std::invoke_result_t<F,TArg>,void >::value)
317  func(std::forward<TArg>(arg));
318  else
319  std::get<n>(result) = func(std::forward<TArg>(arg));
320  }catch(...)
321  {
322  std::scoped_lock<std::mutex> lck(mExceptionLock);
323  if ( !except )
324  except = std::current_exception();
325  }
326  }
327  output.set();
328  }
329  else
330  {
331  mLastIndex = _chooseIndex(opts);
332  if constexpr (std::is_nothrow_invocable<F,TArg>::value)
333  {
334  mPool[mLastIndex]->post(
335  std::function<tasking::EGenericProcessResult (uint64_t,Process*)>([func = std::forward<F>(func),output,arg,&result](uint64_t, Process*) mutable
336  {
337  if constexpr (std::is_same< std::invoke_result_t<F,TArg>,void >::value)
338  func(std::forward<TArg>(arg));
339  else
340  std::get<n>(result) = func(std::forward<TArg>(arg));
341  output.set();
343  })
344  );
345  }else
346  {
347  mPool[mLastIndex]->post(
348  std::function<tasking::EGenericProcessResult (uint64_t,Process*)>([&except,this,func = std::forward<F>(func),output,arg,&result](uint64_t, Process*) mutable
349  {
350  try
351  {
352  if constexpr (std::is_same< std::invoke_result_t<F,TArg>,void >::value)
353  func(std::forward<TArg>(arg));
354  else
355  std::get<n>(result) = func(std::forward<TArg>(arg));
356  }catch(...)
357  {
358  std::scoped_lock<std::mutex> lck(mExceptionLock);
359  if ( !except )
360  except = std::current_exception();
361  }
362  output.set();
364  })
365  );
366  }
367 
368  }
369  }
370  //void overload
371  template <int n,class ReturnTuple,class F,class ... FTypes> void _executeWithResult_void(const ExecutionOpts& opts,std::exception_ptr& except, Barrier& output,ReturnTuple& result, F&& func,FTypes&&... functions)
372  {
373  _executeWithResult_void<n,ReturnTuple>(opts,except, output,result, std::forward<F>(func));
374  _executeWithResult_void<n+1,ReturnTuple>(opts,except, output,result, std::forward<FTypes>(functions)...);
375  }
376  //base case
377  template <int n,class ReturnTuple,class F> void _executeWithResult_void(const ExecutionOpts& opts,std::exception_ptr& except, Barrier& output,ReturnTuple& result, F&& func)
378  {
379  if ( opts.useCallingThread || mNThreads == 0 )
380  {
381  if constexpr (std::is_nothrow_invocable<F>::value)
382  {
383  if constexpr (std::is_same< std::invoke_result_t<F>,void >::value)
384  func();
385  else
386  std::get<n>(result) = func();
387  }else
388  {
389  try
390  {
391  if constexpr (std::is_same< std::invoke_result_t<F>,void >::value)
392  func();
393  else
394  std::get<n>(result) = func();
395  }catch(...)
396  {
397  std::scoped_lock<std::mutex> lck(mExceptionLock);
398  if ( !except )
399  except = std::current_exception();
400  }
401  }
402  output.set();
403  }
404  else
405  {
406  mLastIndex = _chooseIndex(opts);
407  if constexpr (std::is_nothrow_invocable<F>::value)
408  {
409  mPool[mLastIndex]->post(
410  std::function<tasking::EGenericProcessResult (uint64_t,Process*)>([func = std::forward<F>(func),output,&result](uint64_t, Process*) mutable
411  {
412  if constexpr (std::is_same< std::invoke_result_t<F>,void >::value)
413  func();
414  else
415  std::get<n>(result) = func();
416  output.set();
418  })
419  );
420  }else
421  {
422  mPool[mLastIndex]->post(
423  std::function<tasking::EGenericProcessResult (uint64_t,Process*)>([&except,this,func = std::forward<F>(func),output,&result](uint64_t, Process*) mutable
424  {
425  try
426  {
427  std::get<n>(result) = func();
428  }catch(...)
429  {
430  std::scoped_lock<std::mutex> lck(mExceptionLock);
431  if ( !except )
432  except = std::current_exception();
433  }
434  output.set();
436  })
437  );
438  }
439 
440  }
441  }
442  size_t _chooseIndex(const ExecutionOpts& sp);
443  };
444 
445  }
446 }
447 
Definition: Tuple.h:252
Multithread barrier.
Definition: Barrier.h:43
void set()
Definition: Barrier.h:59
Pool of threads allowing parallel execution.
Definition: ThreadPool.h:38
std::shared_ptr< ThreadRunnable > selectThread(const ExecutionOpts &opts)
select thread for execution based on given opts
void executeWithResult(const ExecutionOpts &opts, Barrier &barrier, ReturnTuple &output, TArg &&arg, std::exception_ptr &except, FTypes ... functions)
execute given functions and return result in the tuple
Definition: ThreadPool.h:94
A periodic task, implementing a microthread.
Definition: Process.h:107
Thread with Runnable behaviour.
Definition: ThreadRunnable.h:40
EGenericProcessResult
Result from functor used in a GenericProcess.
Definition: GenericProcessDefs.h:14
Definition: Callback_Impl.h:11