MEL
Microthread & Execution library
Executor.h
1 #pragma once
2 /*
3  * SPDX-FileCopyrightText: 2022 Daniel Barrientos <danivillamanin@gmail.com>
4  *
5  * SPDX-License-Identifier: MIT
6  */
7 #include <execution/CommonDefs.h>
8 #include <execution/ExFuture.h>
9 #include <functional>
10 #include <optional>
11 #include <parallelism/Barrier.h>
12 #include <stdexcept>
13 #include <string>
14 #include <type_traits>
15 namespace mel
16 {
23  namespace execution
24  {
25  template <class ExecutorAgent> class Executor
26  {
27  // mandatory interface to imlement in specializations
28  // template <class TRet,class F> void launch( F&&
29  // f,ExFuture<ExecutorAgent,TRet> output) const; template <class
30  // TRet,class TArg,class F> void launch( F&& f,TArg&&
31  // arg,ExFuture<ExecutorAgent,TRet> output) const; template <class
32  // I, class F> ::mel::parallelism::Barrier loop(I&& begin, I&&
33  // end, F&& functor, int increment); template <class TArg,class
34  // ...FTypes> ::mel::parallelism::Barrier
35  // parallel(ExFuture<ExecutorAgent,TArg> fut, FTypes&&...
36  // functions); template <class ReturnTuple,class TArg,class
37  // ...FTypes> ::mel::parallelism::Barrier
38  // parallel_convert(ExFuture<ExecutorAgent,TArg> fut,ReturnTuple&
39  // result, FTypes&&... functions);
40  };
45  template <class ExecutorType> struct ExecutorTraits
46  {
48  enum
49  {
50  has_microthreading = false
51  };
53  enum
54  {
55  has_parallelism = false
56  };
57  };
58 
63  template <class F, class ExecutorAgent>
66  {
67  typedef std::invoke_result_t<F> TRet;
68  ExFuture<ExecutorAgent, TRet> result( ex );
69  ex.template launch<TRet>( std::forward<F>( f ), result );
70  return result;
71  }
77  template <class TArg, class F, class ExecutorAgent>
78  ExFuture<ExecutorAgent, std::invoke_result_t<F, TArg>>
79  launch( Executor<ExecutorAgent> ex, F&& f, TArg&& arg )
80  {
81  /*
82  @todo I need to mature this idea. It's not so transparent to add
83  reference check but same rules as for "inmediate" should be followed
84  static_assert( !std::is_lvalue_reference<TArg>::value ||
85  std::is_const< typename
86  std::remove_reference<TArg>::type>::value,"execution::launch. Use
87  std::ref() to pass argument as reference");
88  */
89  typedef std::invoke_result_t<F, TArg> TRet;
90 
91  ExFuture<ExecutorAgent, TRet> result( ex );
92  ex.template launch<TRet>( std::forward<F>( f ),
93  std::forward<TArg>( arg ), result );
94  return result;
95  }
103  template <class ExecutorAgent>
105  {
106  return launch( ex, [] {} );
107  }
113  template <class ExecutorAgent, class TArg, class TRet>
114  ExFuture<ExecutorAgent,
115  typename std::remove_cv<
116  typename std::remove_reference<TRet>::type>::type>
118  {
119  static_assert(
120  !std::is_lvalue_reference<TRet>::value ||
121  std::is_const<
122  typename std::remove_reference<TRet>::type>::value,
123  "execution::inmediate. Use std::ref() to pass argument as "
124  "reference" );
125  using NewType = typename std::remove_cv<
126  typename std::remove_reference<TRet>::type>::type;
127  typedef typename ExFuture<ExecutorAgent, TArg>::ValueType ValueType;
129  fut.subscribeCallback(
130  [fut, result,
131  arg = std::forward<TRet>( arg )]( ValueType& input ) mutable
132  {
133  // launch tasks as response for callback for two reasons:
134  // manage the case when Future is already available when
135  // checked, so callback is trigered on calling thread and to
136  // decouple tasks and no saturate execution resource and
137  // independence tasks
138  if ( input.isValid() )
139  {
140  launch(
141  fut.agent, [result, arg = std::forward<TRet>(
142  arg )]() mutable noexcept
143  { result.setValue( std::forward<TRet>( arg ) ); } );
144  }
145  else
146  {
147  // set error as task in executor
148  launch( fut.agent,
149  [result, err = std::move(
150  input.error() )]() mutable noexcept
151  { result.setError( std::move( err ) ); } );
152  }
153  } );
154  return result;
155  }
156 
163  template <class F, class TArg, class ExecutorAgent>
164  ExFuture<ExecutorAgent, std::invoke_result_t<F, TArg>>
165  // next(ExFuture<ExecutorAgent,TArg> source, F&& f)
167  {
168  static_assert( std::is_invocable<F, TArg>::value,
169  "execution::next bad functor signature" );
170  typedef typename ExFuture<ExecutorAgent, TArg>::ValueType ValueType;
171  typedef std::invoke_result_t<F, TArg> TRet;
172  ExFuture<ExecutorAgent, TRet> result( source.agent );
173  source.subscribeCallback(
174  // need to bind de source future to not get lost and input
175  // pointing to unknown place [source,f =
176  // std::forward<F>(f),result]( ValueType& input) mutable
177  [source, f = std::move( f ), result]( ValueType& input ) mutable
178  {
179  if ( input.isValid() )
180  {
181  // source.agent. template
182  // launch<TRet>([f=std::forward<F>(f)](ExFuture<ExecutorAgent,TArg>&
183  // arg) mutable
184  // noexcept(std::is_nothrow_invocable<F,TArg>::value)->TRet
185  source.agent.template launch<TRet>(
186  [f = std::move( f )](
187  ExFuture<ExecutorAgent, TArg>&
188  arg ) mutable noexcept( std::
189  is_nothrow_invocable<
190  F, TArg>::
191  value )
192  -> TRet { return f( arg.getValue().value() ); },
193  source, result );
194  }
195  else
196  {
197  // set error as task in executor
198  launch( source.agent,
199  [result, err = std::move(
200  input.error() )]() mutable noexcept
201  { result.setError( std::move( err ) ); } );
202  }
203  } );
204  return result;
205  }
207  // overload for void arg
208  template <class F, class ExecutorAgent>
209  ExFuture<ExecutorAgent, std::invoke_result_t<F>>
210  next( ExFuture<ExecutorAgent, void> source, F f )
211  {
212  typedef typename ExFuture<ExecutorAgent, void>::ValueType ValueType;
213  typedef std::invoke_result_t<F> TRet;
214  ExFuture<ExecutorAgent, TRet> result( source.agent );
215  source.subscribeCallback(
216  // need to bind de source future to not get lost and input
217  // pointing to unknown place
218  [source, f = std::forward<F>( f ),
219  result]( ValueType& input ) mutable
220  {
221  if ( input.isValid() )
222  {
223  if constexpr ( noexcept( f() ) )
224  {
225  // source.agent. template
226  // launch<TRet>([f=std::forward<F>(f)](ExFuture<ExecutorAgent,void>&
227  // arg) noexcept->TRet
228  source.agent.template launch<TRet>(
229  [f = std::move( f )](
230  ExFuture<ExecutorAgent, void>&
231  arg ) noexcept -> TRet { return f(); },
232  source, result );
233  }
234  else
235  {
236  // source.agent. template
237  // launch<TRet>([f=std::forward<F>(f)](ExFuture<ExecutorAgent,void>&
238  // arg)->TRet
239  source.agent.template launch<TRet>(
240  [f = std::move( f )](
241  ExFuture<ExecutorAgent, void>& arg ) -> TRet
242  { return f(); },
243  source, result );
244  }
245  }
246  else
247  {
248  // set error as task in executor
249  launch( source.agent,
250  [result, err = std::move(
251  input.error() )]() mutable noexcept
252  { result.setError( std::move( err ) ); } );
253  }
254  } );
255  return result;
256  }
258 
263  template <class NewExecutorAgent, class OldExecutorAgent, class TRet>
264  ExFuture<NewExecutorAgent, TRet>
266  Executor<NewExecutorAgent> newAgent )
267  {
268  ExFuture<NewExecutorAgent, TRet> result( newAgent );
269  typedef
270  typename ExFuture<OldExecutorAgent, TRet>::ValueType ValueType;
271  source.subscribeCallback(
272  [result, source]( ValueType& input ) mutable
273  {
274  // result.assign(std::move(input));
275  result.assign( source );
276  } );
277  return result;
278  }
288  template <class ExecutorAgent, class TArg, class I, class F>
289  ExFuture<ExecutorAgent, TArg>
290  loop( ExFuture<ExecutorAgent, TArg> source, I getIteratorsFunc,
291  F functor, int increment = 1 )
292  {
293  ExFuture<ExecutorAgent, TArg> result( source.agent );
294  typedef typename ExFuture<ExecutorAgent, TArg>::ValueType ValueType;
295  source.subscribeCallback(
296  [source, functor = std::move( functor ), result,
297  getIteratorsFunc = std::move( getIteratorsFunc ),
298  increment]( ValueType& input ) mutable
299  {
300  try
301  {
302  if ( input.isValid() )
303  {
304  std::exception_ptr* except =
305  new std::exception_ptr( nullptr );
306  ::mel::parallelism::Barrier barrier;
307  auto iterators = getIteratorsFunc( input.value() );
308  using IteratorType = decltype( iterators[0] );
309  static_assert(
310  std::is_invocable<F, IteratorType, TArg>::value,
311  "execution::loop bad functor signature" );
312  if constexpr ( std::is_nothrow_invocable<
313  F, IteratorType, TArg>::value )
314  {
315  barrier = source.agent.loop(
316  iterators[0], iterators[1],
317  [f = std::move( functor ),
318  source]( auto idx ) mutable noexcept
319  {
320  //@todo arreglar el loop para que reciba
321  // I&&
322  // f(std::forward<I>(idx),fut.getValue());
323 
324  f( idx, source.getValue().value() );
325  },
326  increment );
327  }
328  else
329  {
330  barrier = source.agent.loop(
331  iterators[0], iterators[1],
332  [f = std::move( functor ), source,
333  except]( auto idx ) mutable
334  {
335  try
336  {
337  f( idx, source.getValue().value() );
338  }
339  catch ( ... )
340  {
341  if ( !*except )
342  *except =
343  std::current_exception();
344  }
345  },
346  increment );
347  }
348  barrier.subscribeCallback(
349  std::function<::mel::core::ECallbackResult(
350  const ::mel::parallelism::BarrierData& )>(
351  [result, source,
352  except]( const ::mel::parallelism::
353  BarrierData& ) mutable
354  {
355  if ( *except ) // any exception?
356  result.setError( *except );
357  else
358  // result.assign(std::move(source.getValue()));
359  // //@todo it's not correct, but
360  // necesary to avoid a lot of
361  // copies. I left this way until
362  // solved in the root. Really is not
363  // very worrying
364  result.assign( source );
365  delete except;
366  return ::mel::core::ECallbackResult::
367  UNSUBSCRIBE;
368  } ) );
369  }
370  else
371  {
372  // set error as task in executor
373  launch(
374  source.agent,
375  [result, err = std::move(
376  input.error() )]() mutable noexcept
377  { result.setError( std::move( err ) ); } );
378  }
379  }
380  catch ( ... )
381  {
382  result.setError( std::current_exception() );
383  }
384  } );
385  return result;
386  }
387 
389 
390  // void argument overload
391  template <class ExecutorAgent, class I, class F>
392  ExFuture<ExecutorAgent, void>
393  loop( ExFuture<ExecutorAgent, void> source, I getIteratorsFunc,
394  F functor, int increment = 1 )
395  {
396  static_assert( std::is_invocable<F, int>::value,
397  "execution::loop bad functor signature" );
398  ExFuture<ExecutorAgent, void> result( source.agent );
399  typedef typename ExFuture<ExecutorAgent, void>::ValueType ValueType;
400  source.subscribeCallback(
401  [source, functor = std::move( functor ), result,
402  getIteratorsFunc = std::move( getIteratorsFunc ),
403  increment]( ValueType& input ) mutable
404  {
405  try
406  {
407  if ( input.isValid() )
408  {
409  auto iterators = getIteratorsFunc();
410  std::exception_ptr* except =
411  new std::exception_ptr( nullptr );
412  ::mel::parallelism::Barrier barrier;
413  if constexpr ( std::is_nothrow_invocable<
414  F, decltype( iterators[0] )>::
415  value )
416  {
417  barrier = source.agent.loop(
418  iterators[0], iterators[1],
419  [f = std::move( functor ),
420  source]( auto idx ) mutable noexcept
421  { f( idx ); },
422  increment );
423  }
424  else
425  {
426  barrier = source.agent.loop(
427  iterators[0], iterators[1],
428  [f = std::move( functor ), source,
429  except]( auto idx ) mutable
430  {
431  try
432  {
433  f( idx );
434  }
435  catch ( ... )
436  {
437  if ( !*except )
438  *except =
439  std::current_exception();
440  }
441  },
442  increment );
443  }
444 
445  barrier.subscribeCallback(
446  std::function<::mel::core::ECallbackResult(
447  const ::mel::parallelism::BarrierData& )>(
448  [result, source,
449  except]( const ::mel::parallelism::
450  BarrierData& ) mutable
451  {
452  if ( *except ) // any exception?
453  result.setError( *except );
454  else
455  // result.assign(std::move(source.getValue()));
456  // //@todo it's not correct, but
457  // necesary to avoid a lot of
458  // copies. I left this way until
459  // solved in the root. Really is not
460  // very worrying
461  result.assign( source );
462  delete except;
463  return ::mel::core::ECallbackResult::
464  UNSUBSCRIBE;
465  } ) );
466  }
467  else
468  {
469  // set error as task in executor
470  launch(
471  source.agent,
472  [result, err = std::move(
473  input.error() )]() mutable noexcept
474  { result.setError( std::move( err ) ); } );
475  }
476  }
477 
478  catch ( ... )
479  {
480  result.setError( std::current_exception() );
481  }
482  } );
483  return result;
484  }
486 
497  template <class ExecutorAgent, class TArg, class... FTypes>
498  ExFuture<ExecutorAgent, TArg>
499  parallel( ExFuture<ExecutorAgent, TArg> source, FTypes... functions )
500  {
501  ExFuture<ExecutorAgent, TArg> result( source.agent );
502  typedef typename ExFuture<ExecutorAgent, TArg>::ValueType ValueType;
503  source.subscribeCallback(
504  //@note in C++20 I could have done fs... =
505  // std::forward<FTypes>(functions)..., but not in C++17
506  //[source,result,fs =
507  // std::make_tuple(std::forward<FTypes>(functions)...
508  //)](ValueType& input) mutable
509  [source, result,
510  fs = std::make_tuple( std::move( functions )... )](
511  ValueType& input ) mutable
512  {
513  if ( input.isValid() )
514  {
515  std::exception_ptr* except =
516  new std::exception_ptr( nullptr );
517  // auto barrier =
518  // source.agent.parallel(source,*except,std::forward<FTypes>(std::get<FTypes>(fs))...);
519  auto barrier = source.agent.parallel(
520  source, *except,
521  std::move( std::get<FTypes>( fs ) )... );
522  barrier.subscribeCallback(
523  std::function<::mel::core::ECallbackResult(
524  const ::mel::parallelism::BarrierData& )>(
525  [source, result,
526  except]( const ::mel::parallelism::
527  BarrierData& ) mutable
528  {
529  if ( *except ) // any exception?
530  result.setError( *except );
531  else
532  result.assign( source );
533  delete except;
534  return ::mel::core::ECallbackResult::
535  UNSUBSCRIBE;
536  } ) );
537  }
538  else
539  {
540  // set error as task in executor
541  launch( source.agent,
542  [result, err = std::move(
543  input.error() )]() mutable noexcept
544  { result.setError( std::move( err ) ); } );
545  }
546  } );
547  return result;
548  }
549 
557  template <class F, class TArg, class ExecutorAgent>
558  ExFuture<ExecutorAgent, TArg>
560  {
561  typedef typename ExFuture<ExecutorAgent, TArg>::ValueType ValueType;
562  ExFuture<ExecutorAgent, TArg> result( source.agent );
563  source.subscribeCallback(
564  // need to bind de source future to not get lost and input
565  // pointing to unknown place
566  [source, f = std::forward<F>( f ),
567  result]( ValueType& input ) mutable
568  {
569  if ( !input.isValid() )
570  {
571  launch(
572  source.agent,
573  [result, source,
574  f = std::forward<F>( f )]() mutable noexcept
575  {
576  if constexpr ( std::is_nothrow_invocable<
577  F,
578  std::exception_ptr>::value )
579  result.setValue(
580  f( source.getValue().error() ) );
581  else
582  {
583  try
584  {
585  result.setValue(
586  f( source.getValue().error() ) );
587  }
588  catch ( ... )
589  {
590  result.setError(
591  std::current_exception() );
592  }
593  }
594  } );
595  }
596  else
597  result.assign( source );
598  } );
599  return result;
600  }
602  namespace _private
603  {
604  using ::mel::execution::VoidType;
605  template <class F, class TArg> struct inv_res
606  {
607  using _realtype = std::invoke_result_t<F, TArg>;
608  // using type = typename
609  // std::conditional<std::is_same<_realtype,void>::value,VoidType,_realtype>::type;
610  using type =
611  typename mel::execution::WrapperType<_realtype>::type;
612  };
613  template <class F> struct inv_res<F, void>
614  {
615  using _realtype = std::invoke_result_t<F>;
616  using type =
617  typename mel::execution::WrapperType<_realtype>::type;
618  };
619  // return deduction for parallel_convert
620  template <class TArg, class F1 = void, class F2 = void,
621  class F3 = void, class F4 = void, class F5 = void,
622  class F6 = void, class F7 = void, class F8 = void,
623  class F9 = void>
624  struct GetReturn
625  {
626  using type = std::tuple<typename inv_res<F1, TArg>::type,
627  typename inv_res<F2, TArg>::type,
628  typename inv_res<F3, TArg>::type,
629  typename inv_res<F4, TArg>::type,
630  typename inv_res<F5, TArg>::type,
631  typename inv_res<F6, TArg>::type,
632  typename inv_res<F7, TArg>::type,
633  typename inv_res<F8, TArg>::type,
634  typename inv_res<F9, TArg>::type>;
635  };
636  template <class TArg, class F1, class F2, class F3, class F4,
637  class F5, class F6, class F7, class F8>
638  struct GetReturn<TArg, F1, F2, F3, F4, F5, F6, F7, F8, void>
639  {
640  using type = std::tuple<typename inv_res<F1, TArg>::type,
641  typename inv_res<F2, TArg>::type,
642  typename inv_res<F3, TArg>::type,
643  typename inv_res<F4, TArg>::type,
644  typename inv_res<F5, TArg>::type,
645  typename inv_res<F6, TArg>::type,
646  typename inv_res<F7, TArg>::type,
647  typename inv_res<F8, TArg>::type>;
648  };
649  template <class TArg, class F1, class F2, class F3, class F4,
650  class F5, class F6, class F7>
651  struct GetReturn<TArg, F1, F2, F3, F4, F5, F6, F7, void, void>
652  {
653  using type = std::tuple<typename inv_res<F1, TArg>::type,
654  typename inv_res<F2, TArg>::type,
655  typename inv_res<F3, TArg>::type,
656  typename inv_res<F4, TArg>::type,
657  typename inv_res<F5, TArg>::type,
658  typename inv_res<F6, TArg>::type,
659  typename inv_res<F7, TArg>::type>;
660  };
661  template <class TArg, class F1, class F2, class F3, class F4,
662  class F5, class F6>
663  struct GetReturn<TArg, F1, F2, F3, F4, F5, F6, void, void, void>
664  {
665  using type = std::tuple<typename inv_res<F1, TArg>::type,
666  typename inv_res<F2, TArg>::type,
667  typename inv_res<F3, TArg>::type,
668  typename inv_res<F4, TArg>::type,
669  typename inv_res<F5, TArg>::type,
670  typename inv_res<F6, TArg>::type>;
671  };
672  template <class TArg, class F1, class F2, class F3, class F4,
673  class F5>
674  struct GetReturn<TArg, F1, F2, F3, F4, F5, void, void, void, void>
675  {
676  using type = std::tuple<typename inv_res<F1, TArg>::type,
677  typename inv_res<F2, TArg>::type,
678  typename inv_res<F3, TArg>::type,
679  typename inv_res<F4, TArg>::type,
680  typename inv_res<F5, TArg>::type>;
681  };
682  template <class TArg, class F1, class F2, class F3, class F4>
683  struct GetReturn<TArg, F1, F2, F3, F4, void, void, void, void, void>
684  {
685  using type = std::tuple<typename inv_res<F1, TArg>::type,
686  typename inv_res<F2, TArg>::type,
687  typename inv_res<F3, TArg>::type,
688  typename inv_res<F4, TArg>::type>;
689  };
690  template <class TArg, class F1, class F2, class F3>
691  struct GetReturn<TArg, F1, F2, F3, void, void, void, void, void,
692  void>
693  {
694  using type = std::tuple<typename inv_res<F1, TArg>::type,
695  typename inv_res<F2, TArg>::type,
696  typename inv_res<F3, TArg>::type>;
697  };
698  template <class TArg, class F1, class F2>
699  struct GetReturn<TArg, F1, F2, void, void, void, void, void, void,
700  void>
701  {
702  using type = std::tuple<typename inv_res<F1, TArg>::type,
703  typename inv_res<F2, TArg>::type>;
704  };
705  template <class TArg, class F1>
706  struct GetReturn<TArg, F1, void, void, void, void, void, void, void,
707  void>
708  {
709  using type = std::tuple<typename inv_res<F1, TArg>::type>;
710  };
711  } // namespace _private
713 
723  template <class TArg, class ExecutorAgent, class... FTypes>
724  ExFuture<ExecutorAgent, typename ::mel::execution::_private::GetReturn<
725  TArg, FTypes...>::type>
727  FTypes... functions )
728  {
729  typedef typename ::mel::execution::_private::GetReturn<
730  TArg, FTypes...>::type ResultTuple;
731  static_assert( std::is_default_constructible<ResultTuple>::value,
732  "All types returned by the input ExFutures must be "
733  "DefaultConstructible" );
734  typedef typename ExFuture<ExecutorAgent, TArg>::ValueType ValueType;
736  source.subscribeCallback(
737  [source, result,
738  fs = std::make_tuple( std::move( functions )... )](
739  ValueType& input ) mutable
740  {
741  if ( input.isValid() )
742  {
743  std::exception_ptr* except =
744  new std::exception_ptr( nullptr );
745  ResultTuple* output = new ResultTuple;
746  // auto barrier =
747  // source.agent.parallel_convert(source,*except,*output,std::forward<FTypes>(std::get<FTypes>(fs))...);
748  auto barrier = source.agent.parallel_convert(
749  source, *except, *output,
750  std::move( std::get<FTypes>( fs ) )... );
751  barrier.subscribeCallback(
752  std::function<::mel::core::ECallbackResult(
753  const ::mel::parallelism::BarrierData& )>(
754  [result, output,
755  except]( const ::mel::parallelism::
756  BarrierData& ) mutable
757  {
758  if ( *except ) // any exception?
759  result.setError( *except );
760  else
761  result.setValue( std::move( *output ) );
762  delete output;
763  delete except;
764  return ::mel::core::ECallbackResult::
765  UNSUBSCRIBE;
766  } ) );
767  }
768  else
769  {
770  // set error as task in executor
771  launch( source.agent,
772  [result, err = std::move(
773  input.error() )]() mutable noexcept
774  { result.setError( std::move( err ) ); } );
775  }
776  } );
777  return result;
778  }
787  template <class F, class TArg, class ExecutorAgent>
788  ExFuture<ExecutorAgent,
789  std::invoke_result_t<F, Executor<ExecutorAgent>, TArg>>
791  {
792  typedef typename ExFuture<ExecutorAgent, TArg>::ValueType ValueType;
793  typedef std::invoke_result_t<F, Executor<ExecutorAgent>, TArg> TRet;
794  ExFuture<ExecutorAgent, TRet> result( source.agent );
795  source.subscribeCallback(
796  // need to bind de source future to not get lost and input
797  // pointing to unknown place
798  [source, f = std::move( f ), result]( ValueType& input ) mutable
799  {
800  if ( input.isValid() )
801  {
802  // source.agent. template
803  // launch<TRet>([f=std::forward<F>(f)](ExFuture<ExecutorAgent,TArg>&
804  // arg) mutable
805  // noexcept(std::is_nothrow_invocable<F,TArg>::value)->TRet
806  source.agent.template launch<TRet>(
807  [f = std::move( f )](
808  ExFuture<ExecutorAgent, TArg>&
809  arg ) mutable noexcept( std::
810  is_nothrow_invocable<
811  F, TArg>::
812  value )
813  -> TRet
814  { return f( arg.agent, arg.getValue().value() ); },
815  source, result );
816  }
817  else
818  {
819  // set error as task in executor
820  launch( source.agent,
821  [result, err = std::move(
822  input.error() )]() mutable noexcept
823  { result.setError( std::move( err ) ); } );
824  }
825  } );
826  return result;
827  }
828 
830  // overload for void arg
831  template <class F, class ExecutorAgent>
832  ExFuture<ExecutorAgent,
833  std::invoke_result_t<F, Executor<ExecutorAgent>>>
834  getExecutor( ExFuture<ExecutorAgent, void> source, F f )
835  {
836  typedef typename ExFuture<ExecutorAgent, void>::ValueType ValueType;
837  typedef std::invoke_result_t<F, Executor<ExecutorAgent>> TRet;
838  ExFuture<ExecutorAgent, TRet> result( source.agent );
839  source.subscribeCallback(
840  // need to bind de source future to not get lost and input
841  // pointing to unknown place
842  [source, f = std::move( f ), result]( ValueType& input ) mutable
843  {
844  if ( input.isValid() )
845  {
846  // source.agent. template
847  // launch<TRet>([f=std::forward<F>(f)](ExFuture<ExecutorAgent,TArg>&
848  // arg) mutable
849  // noexcept(std::is_nothrow_invocable<F,TArg>::value)->TRet
850  source.agent.template launch<TRet>(
851  [f = std::move( f )](
852  ExFuture<ExecutorAgent, void>&
853  arg ) mutable noexcept( std::
854  is_nothrow_invocable<
855  F, void>::
856  value )
857  -> TRet { return f( arg.agent ); },
858  source, result );
859  }
860  else
861  {
862  // set error as task in executor
863  launch( source.agent,
864  [result, err = std::move(
865  input.error() )]() mutable noexcept
866  { result.setError( std::move( err ) ); } );
867  }
868  } );
869  return result;
870  }
872 
874  namespace _private
875  {
876  template <class TRet> struct ApplyInmediate
877  {
878  template <class T>
879  ApplyInmediate( T&& a ) : arg( std::forward<T>( a ) )
880  {
881  }
882  TRet arg;
883  template <class TArg, class ExecutorAgent>
884  auto operator()( ExFuture<ExecutorAgent, TArg> fut )
885  {
886  return inmediate( fut, std::forward<TRet>( arg ) );
887  }
888  };
889  template <class NewExecutionAgent> struct ApplyTransfer
890  {
891  ApplyTransfer( Executor<NewExecutionAgent>&& a )
892  : newAgent( std::move( a ) )
893  {
894  }
895  ApplyTransfer( const Executor<NewExecutionAgent>& a )
896  : newAgent( a )
897  {
898  }
899  Executor<NewExecutionAgent> newAgent;
900  template <class TRet, class OldExecutionAgent>
901  ExFuture<NewExecutionAgent, TRet>
902  operator()( ExFuture<OldExecutionAgent, TRet> fut )
903  {
904  return transfer( fut, newAgent );
905  }
906  };
907  template <class F> struct ApplyNext
908  {
909  template <class T>
910  ApplyNext( T&& f ) : mFunc( std::forward<T>( f ) )
911  {
912  }
913  F mFunc;
914  template <class TArg, class ExecutorAgent>
915  auto operator()( const ExFuture<ExecutorAgent, TArg>& inputFut )
916  {
917  return next( inputFut, std::forward<F>( mFunc ) );
918  }
919  template <class TArg, class ExecutorAgent>
920  auto operator()( ExFuture<ExecutorAgent, TArg>&& inputFut )
921  {
922  return next( std::move( inputFut ),
923  std::forward<F>( mFunc ) );
924  }
925  };
926  template <class I, class F> struct ApplyLoop
927  {
928  template <class U, class T>
929  ApplyLoop( U&& its, T&& f, int inc )
930  : mGetIts( std::forward<U>( its ) ),
931  mFunc( std::forward<T>( f ) ), increment( inc )
932  {
933  }
934  I mGetIts;
935  F mFunc;
936  int increment;
937  template <class TArg, class ExecutorAgent>
938  auto operator()( ExFuture<ExecutorAgent, TArg> inputFut )
939  {
940  return loop( inputFut, std::forward<I>( mGetIts ),
941  std::forward<F>( mFunc ), increment );
942  }
943  };
944 
945  template <class... FTypes> struct ApplyBulk
946  {
947  template <class... Fs>
948  ApplyBulk( Fs&&... fs )
949  : mFuncs( std::forward<FTypes>( fs )... )
950  {
951  }
952  std::tuple<FTypes...> mFuncs;
953  template <class TArg, class ExecutorAgent>
954  auto operator()( ExFuture<ExecutorAgent, TArg> inputFut )
955  {
956  return parallel(
957  inputFut,
958  std::forward<FTypes>( std::get<FTypes>( mFuncs ) )... );
959  }
960  };
961  template <class F> struct ApplyError
962  {
963  template <class T>
964  ApplyError( T&& f ) : mFunc( std::forward<T>( f ) )
965  {
966  }
967  F mFunc;
968  template <class TArg, class ExecutorAgent>
969  auto operator()( ExFuture<ExecutorAgent, TArg> inputFut )
970  {
971  return catchError( inputFut, std::forward<F>( mFunc ) );
972  }
973  };
974 
975  template <class F> struct ApplyGetExecutor
976  {
977  template <class T>
978  ApplyGetExecutor( T&& f ) : mFunc( std::forward<T>( f ) )
979  {
980  }
981  F mFunc;
982  template <class TArg, class ExecutorAgent>
983  auto operator()( ExFuture<ExecutorAgent, TArg> inputFut )
984  {
985  return getExecutor( inputFut, std::forward<F>( mFunc ) );
986  }
987  };
988  template <int n, class TupleType, class FType>
989  void _on_all( TupleType* tup, ::mel::parallelism::Barrier& barrier,
990  FType fut )
991  {
992  fut.subscribeCallback(
993  [tup, barrier]( typename FType::ValueType& input ) mutable
994  {
995  std::get<n>( *tup ) = std::move( input );
996  barrier.set();
997  } );
998  }
999 
1000  template <int n, class TupleType, class FType, class... FTypes>
1001  void _on_all( TupleType* tup, ::mel::parallelism::Barrier& barrier,
1002  FType fut, FTypes... rest )
1003  {
1004  _on_all<n>( tup, barrier, fut );
1005  _on_all<n + 1>( tup, barrier, rest... );
1006  }
1007  template <int n, class SourceTuple, class TargetTuple>
1008  std::optional<std::pair<int, std::exception_ptr>>
1009  _moveValue( SourceTuple& st, TargetTuple& tt )
1010  {
1011  if constexpr ( n != std::tuple_size<SourceTuple>::value )
1012  {
1013  auto&& val = std::get<n>( st );
1014  if ( val.isValid() )
1015  {
1016  using ValType = std::remove_reference_t<
1017  decltype( val )>; // type of the underlying
1018  // FutureValue
1019  if constexpr ( !std::is_same<typename ValType::Type,
1020  void>::value )
1021  std::get<n>( tt ) = std::move( val.value() );
1022  return _moveValue<n + 1>( st, tt );
1023  }
1024  else
1025  return std::make_pair( n, std::move( val.error() ) );
1026  }
1027  return std::nullopt;
1028  }
1029  template <class... FTypes> struct ApplyParallelConvert
1030  {
1031  template <class... Fs>
1032  ApplyParallelConvert( Fs&&... fs )
1033  : mFuncs( std::forward<FTypes>( fs )... )
1034  {
1035  }
1036  std::tuple<FTypes...> mFuncs;
1037  template <class ExecutorAgent, class TArg>
1038  auto operator()( ExFuture<ExecutorAgent, TArg> inputFut )
1039  {
1040  return parallel_convert(
1041  inputFut,
1042  std::forward<FTypes>( std::get<FTypes>( mFuncs ) )... );
1043  }
1044  };
1045  } // namespace _private
1047 
1048  //@brief version for use with operator |
1049  template <class TRet>
1050  _private::ApplyInmediate<TRet> inmediate( TRet&& arg )
1051  {
1052  return _private::ApplyInmediate<TRet>( std::forward<TRet>( arg ) );
1053  }
1057  template <class NewExecutionAgent>
1058  _private::ApplyTransfer<NewExecutionAgent>
1060  {
1061  return _private::ApplyTransfer<NewExecutionAgent>( newAgent );
1062  }
1063 
1065  template <class F> _private::ApplyNext<F> next( F&& f )
1066  {
1067  return _private::ApplyNext<F>( std::forward<F>( f ) );
1068  }
1070  template <class I, class F>
1071  _private::ApplyLoop<I, F> loop( I&& getItFunc, F&& functor,
1072  int increment = 1 )
1073  {
1074  return _private::ApplyLoop<I, F>( std::forward<I>( getItFunc ),
1075  std::forward<F>( functor ),
1076  increment );
1077  }
1079  template <class... FTypes>
1080  _private::ApplyBulk<FTypes...> parallel( FTypes&&... functions )
1081  {
1082  return _private::ApplyBulk<FTypes...>(
1083  std::forward<FTypes>( functions )... );
1084  }
1086  template <class F> _private::ApplyError<F> catchError( F&& f )
1087  {
1088  return _private::ApplyError<F>( std::forward<F>( f ) );
1089  }
1090 
1092  template <class... FTypes>
1093  _private::ApplyParallelConvert<FTypes...>
1094  parallel_convert( FTypes&&... functions )
1095  {
1096  return _private::ApplyParallelConvert<FTypes...>(
1097  std::forward<FTypes>( functions )... );
1098  }
1099 
1101  template <class F> _private::ApplyGetExecutor<F> getExecutor( F&& f )
1102  {
1103  return _private::ApplyGetExecutor<F>( std::forward<F>( f ) );
1104  }
1106 
1110  template <class ExecutorAgent, class TRet1, class U>
1111  auto operator|( const ExFuture<ExecutorAgent, TRet1>& input, U&& u )
1112  {
1113  return u( input );
1114  }
1115  template <class ExecutorAgent, class TRet1, class U>
1116  auto operator|( ExFuture<ExecutorAgent, TRet1>&& input, U&& u )
1117  {
1118  return u( std::move( input ) );
1119  }
1120 
1128  class OnAllException : public std::runtime_error
1129  {
1130  public:
1131  OnAllException( int idx, std::exception_ptr source,
1132  const std::string& msg )
1133  : mElementIdx( idx ),
1134  mSource( source ), std::runtime_error( msg )
1135  {
1136  }
1137  OnAllException( int idx, std::exception_ptr source,
1138  std::string&& msg )
1139  : mElementIdx( idx ),
1140  mSource( source ), std::runtime_error( std::move( msg ) )
1141  {
1142  }
1143  inline int getWrongElement() const noexcept { return mElementIdx; }
1144  std::exception_ptr getCause() noexcept { return mSource; }
1145 
1146  private:
1147  int mElementIdx;
1148  std::exception_ptr mSource;
1149  };
1160  template <class ExecutorAgent, class... FTypes>
1161  auto on_all( Executor<ExecutorAgent> ex, FTypes... futs )
1162  {
1163  typedef std::tuple<typename mel::execution::WrapperType<
1164  typename FTypes::ValueType::Type>::type...>
1165  ReturnType;
1166  static_assert( std::is_default_constructible<ReturnType>::value,
1167  "All types returned by the input ExFutures must be "
1168  "DefaultConstructible" );
1170  ::mel::parallelism::Barrier barrier( sizeof...( futs ) );
1171  typedef std::tuple<typename FTypes::ValueType...> _ttype;
1172  _ttype* tupleRes = new _ttype;
1173 
1174  barrier.subscribeCallback(
1175  std::function<::mel::core::ECallbackResult(
1176  const ::mel::parallelism::BarrierData& )>(
1177  [result,
1178  tupleRes]( const ::mel::parallelism::BarrierData& ) mutable
1179  {
1180  ReturnType resultVal;
1181  auto r = ::mel::execution::_private::_moveValue<0>(
1182  *tupleRes, resultVal );
1183  if ( r == std::nullopt )
1184  {
1185  result.setValue( std::move( resultVal ) );
1186  }
1187  else
1188  {
1189  result.setError( std::make_exception_ptr(
1190  OnAllException( r.value().first,
1191  r.value().second,
1192  "OnAllException: error in "
1193  "tuple element" ) ) );
1194  }
1195  delete tupleRes;
1196  return ::mel::core::ECallbackResult::UNSUBSCRIBE;
1197  } ) );
1198  ::mel::execution::_private::_on_all<0, _ttype>(
1199  tupleRes, barrier,
1200  futs... ); // la idea es pasar una barrera o lo que sea y
1201  // devolver resultado al activarse
1202  return result;
1203  }
1204  } // namespace execution
1205 } // namespace mel
int subscribeCallback(F &&f) const
Subscribe callback to be executed when future is ready (valid or error)
Definition: Future.h:681
Extension of mel::core::Future to apply to executors.
Definition: ExFuture.h:21
Executor< ExecutorAgent > agent
execution agent associated with this instance
Definition: ExFuture.h:54
Definition: Executor.h:26
Excepcion thrown by on_all when some of the futures raise error.
Definition: Executor.h:1129
Multithread barrier.
Definition: Barrier.h:43
void set()
Definition: Barrier.h:59
ECallbackResult
Type resturned by callbacks subscribed to CallbackSubscriptors.
Definition: CallbackSubscriptor.h:32
ExFuture< ExecutorAgent, typename std::remove_cv< typename std::remove_reference< TRet >::type >::type > inmediate(ExFuture< ExecutorAgent, TArg > fut, TRet &&arg)
Produces an inmediate value in the context of the given ExFuture executor as a response to input fut ...
Definition: Executor.h:117
ExFuture< ExecutorAgent, std::invoke_result_t< F > > launch(Executor< ExecutorAgent > ex, F &&f)
Launch given functor in given executor.
Definition: Executor.h:65
ExFuture< NewExecutorAgent, TRet > transfer(ExFuture< OldExecutorAgent, TRet > source, Executor< NewExecutorAgent > newAgent)
Transfer given ExFuture to a different executor This way, continuations can be chained but executed i...
Definition: Executor.h:265
ExFuture< ExecutorAgent, TArg > catchError(ExFuture< ExecutorAgent, TArg > source, F &&f)
Capture previous error, if any, and execute the function.
Definition: Executor.h:559
ExFuture< ExecutorAgent, TArg > parallel(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Execute given functions in a (possibly, depending on concrete executor) parallel way If an exception ...
Definition: Executor.h:499
ExFuture< ExecutorAgent, TArg > loop(ExFuture< ExecutorAgent, TArg > source, I getIteratorsFunc, F functor, int increment=1)
parallel (possibly, depending on executor capabilities) loop
Definition: Executor.h:290
ExFuture< ExecutorAgent, std::invoke_result_t< F, Executor< ExecutorAgent >, TArg > > getExecutor(ExFuture< ExecutorAgent, TArg > source, F f)
Get the Executor used in the chain of execution.
Definition: Executor.h:790
auto on_all(Executor< ExecutorAgent > ex, FTypes... futs)
return ExFuture which will be executed, in the context of the given executor ex, when all the given E...
Definition: Executor.h:1161
ExFuture< ExecutorAgent, typename ::mel::execution::_private::GetReturn< TArg, FTypes... >::type > parallel_convert(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Same as parallel but returning a tuple with the values for each functor.
Definition: Executor.h:726
ExFuture< ExecutorAgent, std::invoke_result_t< F, TArg > > next(ExFuture< ExecutorAgent, TArg > source, F f)
Attach a functor to execute when input fut is complete Given functor will be executed inf the input E...
Definition: Executor.h:166
ExFuture< ExecutorAgent, void > start(Executor< ExecutorAgent > ex)
Start a chain of execution in given executor.
Definition: Executor.h:104
auto operator|(const ExFuture< ExecutorAgent, TRet1 > &input, U &&u)
overload operator | for chaining
Definition: Executor.h:1111
Definition: Callback_Impl.h:11
Default traits for any executor.
Definition: Executor.h:46
Definition: CommonDefs.h:21