MEL
Microthread & Execution library
ThreadPoolExecutor.h
1 #pragma once
2 /*
3  * SPDX-FileCopyrightText: 2022 Daniel Barrientos <danivillamanin@gmail.com>
4  *
5  * SPDX-License-Identifier: MIT
6  */
7 #include <execution/Executor.h>
8 #include <parallelism/For.h>
9 #include <parallelism/ThreadPool.h>
10 namespace mel
11 {
12  namespace execution
13  {
14  using ::mel::parallelism::ThreadPool;
15 
20  {
21  bool independentTasks =
22  true; //<! if true, try to make each iteration independent
23  // opcion temporal, espero poder quitarla
24  bool autoKill =
25  true;
28  };
32  template <> class Executor<ThreadPool>
33  {
34  public:
35  Executor() = default;
36  Executor( std::shared_ptr<ThreadPool> pool ) : mPool( pool ){};
37  Executor( Executor&& ex )
38  : mPool( std::move( ex.mPool ) ), mOpts( ex.mOpts )
39  {
40  }
41  Executor( const Executor& ex )
42  : mPool( ex.mPool ), mOpts( ex.mOpts )
43  {
44  }
45  Executor& operator=( const Executor& ex )
46  {
47  mPool = ex.mPool;
48  mOpts = ex.mOpts;
49  return *this;
50  }
51  Executor& operator=( Executor&& ex )
52  {
53  mPool = std::move( ex.mPool );
54  mOpts = ex.mOpts;
55  return *this;
56  }
57  void setOpts( const ThreadPoolExecutorOpts& opts ) { mOpts = opts; }
58  const ThreadPoolExecutorOpts& getOpts() { return mOpts; }
59  std::weak_ptr<ThreadPool>& getPool() { return mPool; }
60  const std::weak_ptr<ThreadPool>& getPool() const { return mPool; }
63  template <class TRet, class TArg, class F>
64  void launch( F&& f, TArg&& arg,
65  ExFuture<ThreadPool, TRet> output ) const noexcept
66  {
67  if ( !mPool.expired() )
68  {
69  ThreadPool::ExecutionOpts opts;
70  opts.schedPolicy = ThreadPool::SchedulingPolicy::SP_BESTFIT;
71  auto th = mPool.lock()->selectThread( opts );
72  // th->execute<TRet>(std::bind(std::forward<F>(f),std::forward<TArg>(arg)),static_cast<Future<TRet>>(output),mOpts.autoKill?Runnable::killTrue:Runnable::killFalse);
73  th->execute<TRet>(
74  [f = std::forward<F>( f ),
75  arg = std::forward<TArg>(
76  arg )]() mutable noexcept( std::
77  is_nothrow_invocable<
78  F,
79  TArg>::value )
80  -> TRet { return f( std::forward<TArg>( arg ) ); },
81  static_cast<Future<TRet>>( output ),
82  mOpts.autoKill ? Runnable::killTrue
84  }
85  else
86  {
87  output.setError( new std::runtime_error(
88  "ThreadPoolExecutor::launch. Pool has expired!!!" ) );
89  }
90  }
91  template <class TRet, class F>
92  void launch( F&& f,
93  ExFuture<ThreadPool, TRet> output ) const noexcept
94  {
95  if ( !mPool.expired() )
96  {
97  ThreadPool::ExecutionOpts opts;
98  opts.schedPolicy = ThreadPool::SchedulingPolicy::SP_BESTFIT;
99  auto th = mPool.lock()->selectThread( opts );
100  th->execute<TRet>( std::forward<F>( f ),
101  static_cast<Future<TRet>>( output ),
102  mOpts.autoKill ? Runnable::killTrue
104  }
105  else
106  {
107  output.setError( new std::runtime_error(
108  "ThreadPoolExecutor::launch. Pool has expired!!!" ) );
109  }
110  }
111  template <class I, class F>
112  ::mel::parallelism::Barrier loop( I&& begin, I&& end, F&& functor,
113  int increment );
114  template <class TArg, class... FTypes>
116  parallel( ExFuture<ThreadPool, TArg> fut, std::exception_ptr& excpt,
117  FTypes&&... functions );
118  template <class ReturnTuple, class TArg, class... FTypes>
120  parallel_convert( ExFuture<ThreadPool, TArg> fut,
121  std::exception_ptr& except, ReturnTuple& result,
122  FTypes&&... functions );
124  private:
125  std::weak_ptr<ThreadPool> mPool;
126  ThreadPoolExecutorOpts mOpts;
127  };
128  template <class I, class F>
130  Executor<ThreadPool>::loop( I&& begin, I&& end, F&& functor,
131  int increment )
132  {
133  static_assert( std::is_invocable<F, I>::value,
134  "ThreadPoolExecutor::loop bad functor signature" );
135  ThreadPool::ExecutionOpts exopts;
136  exopts.useCallingThread = false;
137  exopts.groupTasks = !getOpts().independentTasks;
138  return ::mel::parallelism::_for(
139  getPool().lock().get(), exopts, std::forward<I>( begin ),
140  std::forward<I>( end ), std::forward<F>( functor ), increment );
141  }
143  namespace _private
144  {
145  // helper class to use ThreadPool::execute
146  template <class T> class ValueWrapper
147  {
149  FutType;
150 
151  public:
152  ValueWrapper( const FutType& fut ) : mFut( fut ) {}
153  ValueWrapper( FutType&& fut ) : mFut( std::move( fut ) ) {}
154  ValueWrapper( ValueWrapper&& vw ) : mFut( std::move( vw.mFut ) )
155  {
156  }
157  ValueWrapper( const ValueWrapper& vw ) : mFut( vw.mFut ) {}
158  bool isValid() const { return mFut.getValue().isValid(); }
159  bool isAvailable() const
160  {
161  return mFut.getValue().isAvailable();
162  }
163  // wrapper for std::get. Same rules as std::Get, so
164  // bad_variant_access is thrown if not a valid value
165  typename FutType::ValueType::ReturnType value()
166  {
167  return mFut.getValue().value();
168  }
169  typename FutType::ValueType::CReturnType value() const
170  {
171  return mFut.getValue().value();
172  }
173  std::exception_ptr error() const
174  {
175  return mFut.getValue().error();
176  }
177  operator T&() noexcept { return mFut.getValue().value(); }
178  operator const T&() const noexcept
179  {
180  return mFut.getValue().value();
181  }
182 
183  private:
184  FutType mFut;
185  };
186  } // namespace _private
188  template <class TArg, class... FTypes>
190  Executor<ThreadPool>::parallel( ExFuture<ThreadPool, TArg> fut,
191  std::exception_ptr& except,
192  FTypes&&... functions )
193  {
194  ThreadPool::ExecutionOpts exopts;
195  exopts.useCallingThread = false;
196  exopts.groupTasks = !getOpts().independentTasks;
197  if constexpr ( std::is_same<TArg, void>::value )
198  return getPool().lock()->execute(
199  exopts, except, std::forward<FTypes>( functions )... );
200  else
201  return getPool().lock()->execute(
202  exopts, _private::ValueWrapper<TArg>( fut ), except,
203  std::forward<FTypes>( functions )... );
204  }
205  template <class ReturnTuple, class TArg, class... FTypes>
206  ::mel::parallelism::Barrier Executor<ThreadPool>::parallel_convert(
207  ExFuture<ThreadPool, TArg> fut, std::exception_ptr& except,
208  ReturnTuple& result, FTypes&&... functions )
209  {
210  ThreadPool::ExecutionOpts exopts;
211  exopts.useCallingThread = false;
212  exopts.groupTasks = !getOpts().independentTasks;
213  if constexpr ( std::is_same<TArg, void>::value )
214  return getPool().lock()->executeWithResult(
215  exopts, result, except,
216  std::forward<FTypes>( functions )... );
217  else
218  return getPool().lock()->executeWithResult(
219  exopts, result, _private::ValueWrapper<TArg>( fut ), except,
220  std::forward<FTypes>( functions )... );
221  }
225  template <>
226  struct ExecutorTraits<Executor<ThreadPool>> : ExecutorTraits<void>
227  {
228  enum
229  {
230  has_microthreading = true
231  }; // support microthreading?
232  enum
233  {
234  has_parallelism = true
235  };
236  };
239  } // namespace execution
240 } // namespace mel
Represents a value that maybe is not present at the current moment.
Definition: Future.h:750
Extension of mel::core::Future to apply to executors.
Definition: ExFuture.h:21
Executor specialization using a ThreadPool as execution agent.
Definition: ThreadPoolExecutor.h:33
void launch(F &&f, TArg &&arg, ExFuture< ThreadPool, TRet > output) const noexcept
Definition: ThreadPoolExecutor.h:64
Definition: Executor.h:26
Multithread barrier.
Definition: Barrier.h:43
static std::function< bool()> killFalse
helper function to reject kill when receiving kill signal
Definition: Runnable.h:276
static std::function< bool()> killTrue
helper function to automatically kill process when receiving kill signal
Definition: Runnable.h:274
ExFuture< ExecutorAgent, std::invoke_result_t< F > > launch(Executor< ExecutorAgent > ex, F &&f)
Launch given functor in given executor.
Definition: Executor.h:65
ExFuture< ExecutorAgent, TArg > parallel(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Execute given functions in a (possibly, depending on concrete executor) parallel way If an exception ...
Definition: Executor.h:499
ExFuture< ExecutorAgent, TArg > loop(ExFuture< ExecutorAgent, TArg > source, I getIteratorsFunc, F functor, int increment=1)
parallel (possibly, depending on executor capabilities) loop
Definition: Executor.h:290
Executor< ThreadPool > ThreadPoolExecutor
alias for Executor<ThreadPool>
Definition: ThreadPoolExecutor.h:238
ExFuture< ExecutorAgent, typename ::mel::execution::_private::GetReturn< TArg, FTypes... >::type > parallel_convert(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Same as parallel but returning a tuple with the values for each functor.
Definition: Executor.h:726
Definition: Callback_Impl.h:11
Default traits for any executor.
Definition: Executor.h:46
Concrete options for this type of executor.
Definition: ThreadPoolExecutor.h:20
bool autoKill
Definition: ThreadPoolExecutor.h:24