MEL
Microthread & Execution library
RunnableExecutor.h
1 #pragma once
2 /*
3  * SPDX-FileCopyrightText: 2022 Daniel Barrientos <danivillamanin@gmail.com>
4  *
5  * SPDX-License-Identifier: MIT
6  */
7 #include <execution/Executor.h>
8 #include <tasking/Runnable.h>
9 
10 #include <mpl/TypeTraits.h>
11 #include <parallelism/Barrier.h>
12 #include <type_traits>
13 namespace mel
14 {
15  namespace execution
16  {
22  {
23  bool independentTasks =
24  true; //<! if true, try to make each iteration independent
25  bool autoKill =
26  true;
29  };
33  template <> class Executor<Runnable>
34  {
35  public:
36  Executor() = default;
37  Executor( Executor&& ex )
38  : mRunnable( std::move( ex.mRunnable ) ), mOpts( ex.mOpts )
39  {
40  }
41  Executor( const Executor& ex )
42  : mRunnable( ex.mRunnable ), mOpts( ex.mOpts )
43  {
44  }
45  Executor( std::shared_ptr<Runnable> runnable )
46  : mRunnable( runnable ){};
47  Executor& operator=( const Executor& ex )
48  {
49  mRunnable = ex.mRunnable;
50  mOpts = ex.mOpts;
51  return *this;
52  }
53  Executor& operator=( Executor&& ex )
54  {
55  mRunnable = std::move( ex.mRunnable );
56  mOpts = ex.mOpts;
57  return *this;
58  }
59  void setOpts( const RunnableExecutorOpts& opts ) { mOpts = opts; }
60  const RunnableExecutorOpts& getOpts() { return mOpts; }
61  inline const std::weak_ptr<Runnable>& getRunnable() const
62  {
63  return mRunnable;
64  }
65  inline std::weak_ptr<Runnable>& getRunnable() { return mRunnable; }
68  template <class TRet, class TArg, class F>
69  void launch( F&& f, TArg&& arg,
70  ExFuture<Runnable, TRet> output ) const noexcept
71  {
72  if ( !mRunnable.expired() )
73  {
74  // it seems that noexcept specifier is not preserved in
75  // bind, so need to use a lambda
76  // mRunnable.lock()->execute<TRet>(std::bind(std::forward<F>(f),std::forward<TArg>(arg)),static_cast<Future<TRet>>(output),mOpts.autoKill?Runnable::killTrue:Runnable::killFalse);
77  // if constexpr (noexcept(f(arg)))
78  mRunnable.lock()->execute<TRet>(
79  [f = std::forward<F>( f ),
80  arg = std::forward<TArg>(
81  arg )]() mutable noexcept( std::
82  is_nothrow_invocable<
83  F,
84  TArg>::value )
85  -> TRet { return f( std::forward<TArg>( arg ) ); },
86  static_cast<Future<TRet>>( output ),
87  mOpts.autoKill ? Runnable::killTrue
89  }
90  else
91  {
92  output.setError( new std::runtime_error(
93  "RunnableExecutor::launch. Runnable has expired!!!" ) );
94  }
95  }
96  template <class TRet, class F>
97  void launch( F&& f, ExFuture<Runnable, TRet> output ) const noexcept
98  {
99  if ( !mRunnable.expired() )
100  {
101  mRunnable.lock()->execute<TRet>(
102  std::forward<F>( f ),
103  static_cast<Future<TRet>>( output ),
104  mOpts.autoKill ? Runnable::killTrue
106  }
107  else
108  {
109  output.setError( new std::runtime_error(
110  "RunnableExecutor::launch. Runnable has expired!!!" ) );
111  }
112  }
113  template <class I, class F>
114  ::mel::parallelism::Barrier loop( I&& begin, I&& end, F&& functor,
115  int increment );
116  template <class TArg, class... FTypes>
117  ::mel::parallelism::Barrier parallel( ExFuture<Runnable, TArg> fut,
118  std::exception_ptr& excpt,
119  FTypes&&... functions );
120  template <class ReturnTuple, class TArg, class... FTypes>
122  parallel_convert( ExFuture<Runnable, TArg> fut,
123  std::exception_ptr& excpt, ReturnTuple& result,
124  FTypes&&... functions );
126  private:
127  std::weak_ptr<Runnable> mRunnable;
128  RunnableExecutorOpts mOpts;
129  };
130  namespace _private
131  {
132  template <class F, class TArg>
133  void _invoke( ExFuture<Runnable, TArg> fut,
135  std::exception_ptr& except, F&& f )
136  {
137  static_assert( std::is_invocable<F, TArg>::value,
138  "_invoke bad signature" );
139  if constexpr ( std::is_nothrow_invocable<F, TArg>::value )
140  {
141  // use the exception pointer hasn't sense here because
142  // noexcept was specified
144  fut.agent,
145  [f = std::forward<F>( f ),
146  b]( ExFuture<Runnable, TArg>& fut ) mutable noexcept
147  {
148  f( fut.getValue().value() );
149  b.set();
150  },
151  fut );
152  }
153  else
154  {
156  fut.agent,
157  [f = std::forward<F>( f ), b,
158  &except]( ExFuture<Runnable, TArg> fut ) mutable
159  {
160  try
161  {
162  f( fut.getValue().value() );
163  }
164  catch ( ... )
165  {
166  if ( !except )
167  except = std::current_exception();
168  }
169  b.set();
170  },
171  fut );
172  }
173  }
174  // void overload
175  template <class F>
176  void _invoke( ExFuture<Runnable, void> fut,
178  std::exception_ptr& except, F&& f )
179  {
180  if constexpr ( std::is_nothrow_invocable<F>::value )
181  {
183  fut.agent,
184  [f = std::forward<F>( f ),
185  b]( ExFuture<Runnable, void>& fut ) mutable noexcept
186  {
187  f();
188  b.set();
189  },
190  fut );
191  }
192  else
193  {
195  fut.agent,
196  [f = std::forward<F>( f ), b,
197  &except]( ExFuture<Runnable, void>& fut ) mutable
198  {
199  try
200  {
201  f();
202  }
203  catch ( ... )
204  {
205  if ( !except )
206  except = std::current_exception();
207  }
208  b.set();
209  },
210  fut );
211  }
212  }
213  template <class TArg, class F, class... FTypes>
214  void _invoke( ExFuture<Runnable, TArg> fut,
216  std::exception_ptr& except, F&& f, FTypes&&... fs )
217  {
218  _invoke( fut, b, except, std::forward<F>( f ) );
219  _invoke( fut, b, except, std::forward<FTypes>( fs )... );
220  }
221  template <int n, class ResultTuple, class F, class TArg>
222  void _invoke_with_result( ExFuture<Runnable, TArg> fut,
224  std::exception_ptr& except,
225  ResultTuple& output, F&& f )
226  {
227  static_assert( std::is_invocable<F, TArg>::value,
228  "_invoke_with_result bad signature" );
229  if constexpr ( std::is_nothrow_invocable<F, TArg>::value )
230  {
232  fut.agent,
233  [f = std::forward<F>( f ), b, &output](
234  ExFuture<Runnable, TArg>& fut ) mutable noexcept
235  {
236  if constexpr ( std::is_same<
237  std::invoke_result_t<F, TArg>,
238  void>::value )
239  f( fut.getValue().value() );
240  else
241  std::get<n>( output ) =
242  f( fut.getValue().value() );
243  b.set();
244  },
245  fut );
246  }
247  else
248  {
250  fut.agent,
251  [f = std::forward<F>( f ), b, &output,
252  &except]( ExFuture<Runnable, TArg>& fut ) mutable
253  {
254  try
255  {
256  if constexpr ( std::is_same<
257  std::invoke_result_t<F,
258  TArg>,
259  void>::value )
260  f( fut.getValue().value() );
261  else
262  std::get<n>( output ) =
263  f( fut.getValue().value() );
264  }
265  catch ( ... )
266  {
267  if ( !except )
268  except = std::current_exception();
269  }
270  b.set();
271  },
272  fut );
273  }
274  }
275  // void overload
276  template <int n, class ResultTuple, class F>
277  void _invoke_with_result( ExFuture<Runnable, void>& fut,
279  std::exception_ptr& except,
280  ResultTuple& output, F&& f )
281  {
282  if constexpr ( std::is_nothrow_invocable<F>::value )
283  {
285  fut.agent,
286  [f = std::forward<F>( f ), b, &output](
287  ExFuture<Runnable, void>& fut ) mutable noexcept
288  {
289  if constexpr ( std::is_same<std::invoke_result_t<F>,
290  void>::value )
291  f();
292  else
293  std::get<n>( output ) = f();
294  b.set();
295  },
296  fut );
297  }
298  else
299  {
301  fut.agent,
302  [f = std::forward<F>( f ), b, &output,
303  &except]( ExFuture<Runnable, void>& fut ) mutable
304  {
305  try
306  {
307  if constexpr ( std::is_same<
308  std::invoke_result_t<F>,
309  void>::value )
310  f();
311  else
312  std::get<n>( output ) = f();
313  }
314  catch ( ... )
315  {
316  if ( !except )
317  except = std::current_exception();
318  }
319 
320  b.set();
321  },
322  fut );
323  }
324  }
325 
326  template <int n, class ResultTuple, class TArg, class F,
327  class... FTypes>
328  void _invoke_with_result( ExFuture<Runnable, TArg> fut,
330  std::exception_ptr& except,
331  ResultTuple& output, F&& f,
332  FTypes&&... fs )
333  {
334  _invoke_with_result<n>( fut, b, except, output,
335  std::forward<F>( f ) );
336  _invoke_with_result<n + 1>( fut, b, except, output,
337  std::forward<FTypes>( fs )... );
338  }
339  } // namespace _private
352  template <class I, class F>
354  Executor<Runnable>::loop( I&& begin, I&& end, F&& functor,
355  int increment )
356  {
357  typedef typename std::decay<I>::type DecayedIt;
358  constexpr bool isArithIterator =
360  if ( getRunnable().expired() )
361  {
362  throw std::runtime_error( "Runnable has been destroyed" );
363  }
364  bool autoKill = getOpts().autoKill;
365  bool independentTasks = getOpts().independentTasks;
366  int length;
367  if constexpr ( isArithIterator )
368  length = ( end - begin );
369  else
370  length = std::distance( begin, end );
371  int nElements = independentTasks
372  ? ( length + increment - 1 ) / increment
373  : 1; // round-up
374  auto ptr = getRunnable().lock();
375 
376  ::mel::parallelism::Barrier barrier( nElements );
377  //@todo mal sistema iteradores si no aritmaticos
378  if ( independentTasks )
379  {
380  for ( auto i = begin; i < end; i += increment )
381  {
382  ptr->fireAndForget(
383  [functor, barrier, i]() mutable noexcept(
384  ( std::is_nothrow_invocable<F, I>::value ) )
385  {
386  functor( i );
387  barrier.set();
388  },
389  0,
390  autoKill ? Runnable::killTrue : Runnable::killFalse );
391  }
392  }
393  else
394  {
395  ptr->fireAndForget(
396  [functor, barrier, begin, end,
397  increment]() mutable noexcept( std::
398  is_nothrow_invocable<
399  F, I>::value )
400  {
401  for ( auto i = begin; i < end; i += increment )
402  {
403  functor( i );
404  }
405  barrier.set();
406  },
407  0, autoKill ? Runnable::killTrue : Runnable::killFalse );
408  }
409  return barrier;
410  }
411  template <class TArg, class... FTypes>
414  std::exception_ptr& except,
415  FTypes&&... functions )
416  {
417  ::mel::parallelism::Barrier barrier( sizeof...( functions ) );
418  _private::_invoke( fut, barrier, except,
419  std::forward<FTypes>( functions )... );
420  return barrier;
421  }
422  template <class ReturnTuple, class TArg, class... FTypes>
423  ::mel::parallelism::Barrier Executor<Runnable>::parallel_convert(
424  ExFuture<Runnable, TArg> fut, std::exception_ptr& except,
425  ReturnTuple& result, FTypes&&... functions )
426  {
427  ::mel::parallelism::Barrier barrier( sizeof...( functions ) );
428  _private::_invoke_with_result<0>(
429  fut, barrier, except, result,
430  std::forward<FTypes>( functions )... );
431  return barrier;
432  }
436  template <>
438  {
439  enum
440  {
441  has_microthreading = true
442  }; // support microthreading?
443  };
446  } // namespace execution
447 } // namespace mel
Typical Traits for types.
Represents a value that maybe is not present at the current moment.
Definition: Future.h:750
Extension of mel::core::Future to apply to executors.
Definition: ExFuture.h:21
Executor specialization using Runnable as execution agent.
Definition: RunnableExecutor.h:34
void launch(F &&f, TArg &&arg, ExFuture< Runnable, TRet > output) const noexcept
Definition: RunnableExecutor.h:69
Definition: Executor.h:26
Definition: TypeTraits.h:296
Multithread barrier.
Definition: Barrier.h:43
void set()
Definition: Barrier.h:59
A class representing a "running" task, with added functionality to post events requesting execution o...
Definition: Runnable.h:183
static std::function< bool()> killFalse
helper function to reject kill when receiving kill signal
Definition: Runnable.h:276
static std::function< bool()> killTrue
helper function to automatically kill process when receiving kill signal
Definition: Runnable.h:274
ExFuture< ExecutorAgent, std::invoke_result_t< F > > launch(Executor< ExecutorAgent > ex, F &&f)
Launch given functor in given executor.
Definition: Executor.h:65
ExFuture< ExecutorAgent, TArg > parallel(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Execute given functions in a (possibly, depending on concrete executor) parallel way If an exception ...
Definition: Executor.h:499
ExFuture< ExecutorAgent, TArg > loop(ExFuture< ExecutorAgent, TArg > source, I getIteratorsFunc, F functor, int increment=1)
parallel (possibly, depending on executor capabilities) loop
Definition: Executor.h:290
ExFuture< ExecutorAgent, typename ::mel::execution::_private::GetReturn< TArg, FTypes... >::type > parallel_convert(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Same as parallel but returning a tuple with the values for each functor.
Definition: Executor.h:726
Definition: Callback_Impl.h:11
Default traits for any executor.
Definition: Executor.h:46
Concrete options for this type of executor.
Definition: RunnableExecutor.h:22
bool autoKill
Definition: RunnableExecutor.h:25