7 #include <execution/Executor.h> 
    8 #include <parallelism/For.h> 
    9 #include <parallelism/ThreadPool.h> 
   14         using ::mel::parallelism::ThreadPool;
 
   21             bool independentTasks =
 
   36             Executor( std::shared_ptr<ThreadPool> pool ) : mPool( pool ){};
 
   38                 : mPool( std::move( ex.mPool ) ), mOpts( ex.mOpts )
 
   42                 : mPool( ex.mPool ), mOpts( ex.mOpts )
 
   53                 mPool = std::move( ex.mPool );
 
   59             std::weak_ptr<ThreadPool>& getPool() { 
return mPool; }
 
   60             const std::weak_ptr<ThreadPool>& getPool()
 const { 
return mPool; }
 
   63             template <
class TRet, 
class TArg, 
class F>
 
   67                 if ( !mPool.expired() )
 
   69                     ThreadPool::ExecutionOpts opts;
 
   70                     opts.schedPolicy = ThreadPool::SchedulingPolicy::SP_BESTFIT;
 
   71                     auto th = mPool.lock()->selectThread( opts );
 
   74                         [f = std::forward<F>( f ),
 
   75                          arg = std::forward<TArg>(
 
   76                              arg )]() 
mutable noexcept( std::
 
   80                             -> TRet { 
return f( std::forward<TArg>( arg ) ); },
 
   87                     output.setError( 
new std::runtime_error(
 
   88                         "ThreadPoolExecutor::launch. Pool has expired!!!" ) );
 
   91             template <
class TRet, 
class F>
 
   95                 if ( !mPool.expired() )
 
   97                     ThreadPool::ExecutionOpts opts;
 
   98                     opts.schedPolicy = ThreadPool::SchedulingPolicy::SP_BESTFIT;
 
   99                     auto th = mPool.lock()->selectThread( opts );
 
  100                     th->execute<TRet>( std::forward<F>( f ),
 
  107                     output.setError( 
new std::runtime_error(
 
  108                         "ThreadPoolExecutor::launch. Pool has expired!!!" ) );
 
  111             template <
class I, 
class F>
 
  114             template <
class TArg, 
class... FTypes>
 
  116             parallel( ExFuture<ThreadPool, TArg> fut, std::exception_ptr& excpt,
 
  117                       FTypes&&... functions );
 
  118             template <
class ReturnTuple, 
class TArg, 
class... FTypes>
 
  121                               std::exception_ptr& except, ReturnTuple& result,
 
  122                               FTypes&&... functions );
 
  125             std::weak_ptr<ThreadPool> mPool;
 
  126             ThreadPoolExecutorOpts mOpts;
 
  128         template <
class I, 
class F>
 
  130         Executor<ThreadPool>::loop( I&& begin, I&& end, F&& functor,
 
  133             static_assert( std::is_invocable<F, I>::value,
 
  134                            "ThreadPoolExecutor::loop bad functor signature" );
 
  135             ThreadPool::ExecutionOpts exopts;
 
  136             exopts.useCallingThread = 
false;
 
  137             exopts.groupTasks = !getOpts().independentTasks;
 
  138             return ::mel::parallelism::_for(
 
  139                 getPool().lock().get(), exopts, std::forward<I>( begin ),
 
  140                 std::forward<I>( end ), std::forward<F>( functor ), increment );
 
  146             template <
class T> 
class ValueWrapper
 
  152                 ValueWrapper( 
const FutType& fut ) : mFut( fut ) {}
 
  153                 ValueWrapper( FutType&& fut ) : mFut( std::move( fut ) ) {}
 
  154                 ValueWrapper( ValueWrapper&& vw ) : mFut( std::move( vw.mFut ) )
 
  157                 ValueWrapper( 
const ValueWrapper& vw ) : mFut( vw.mFut ) {}
 
  158                 bool isValid()
 const { 
return mFut.getValue().isValid(); }
 
  159                 bool isAvailable()
 const 
  161                     return mFut.getValue().isAvailable();
 
  165                 typename FutType::ValueType::ReturnType value()
 
  167                     return mFut.getValue().value();
 
  169                 typename FutType::ValueType::CReturnType value()
 const 
  171                     return mFut.getValue().value();
 
  173                 std::exception_ptr error()
 const 
  175                     return mFut.getValue().error();
 
  177                 operator T&() noexcept { 
return mFut.getValue().value(); }
 
  178                 operator const T&() 
const noexcept
 
  180                     return mFut.getValue().value();
 
  188         template <
class TArg, 
class... FTypes>
 
  190         Executor<ThreadPool>::parallel( ExFuture<ThreadPool, TArg> fut,
 
  191                                         std::exception_ptr& except,
 
  192                                         FTypes&&... functions )
 
  194             ThreadPool::ExecutionOpts exopts;
 
  195             exopts.useCallingThread = 
false;
 
  196             exopts.groupTasks = !getOpts().independentTasks;
 
  197             if constexpr ( std::is_same<TArg, void>::value )
 
  198                 return getPool().lock()->execute(
 
  199                     exopts, except, std::forward<FTypes>( functions )... );
 
  201                 return getPool().lock()->execute(
 
  202                     exopts, _private::ValueWrapper<TArg>( fut ), except,
 
  203                     std::forward<FTypes>( functions )... );
 
  205         template <
class ReturnTuple, 
class TArg, 
class... FTypes>
 
  207             ExFuture<ThreadPool, TArg> fut, std::exception_ptr& except,
 
  208             ReturnTuple& result, FTypes&&... functions )
 
  210             ThreadPool::ExecutionOpts exopts;
 
  211             exopts.useCallingThread = 
false;
 
  212             exopts.groupTasks = !getOpts().independentTasks;
 
  213             if constexpr ( std::is_same<TArg, void>::value )
 
  214                 return getPool().lock()->executeWithResult(
 
  215                     exopts, result, except,
 
  216                     std::forward<FTypes>( functions )... );
 
  218                 return getPool().lock()->executeWithResult(
 
  219                     exopts, result, _private::ValueWrapper<TArg>( fut ), except,
 
  220                     std::forward<FTypes>( functions )... );
 
  230                 has_microthreading = 
true 
  234                 has_parallelism = 
true 
Represents a value that maybe is not present at the current moment.
Definition: Future.h:750
 
Extension of mel::core::Future to apply to executors.
Definition: ExFuture.h:21
 
Executor specialization using a ThreadPool as execution agent.
Definition: ThreadPoolExecutor.h:33
 
void launch(F &&f, TArg &&arg, ExFuture< ThreadPool, TRet > output) const noexcept
Definition: ThreadPoolExecutor.h:64
 
Definition: Executor.h:26
 
Multithread barrier.
Definition: Barrier.h:43
 
static std::function< bool()> killFalse
helper function to reject kill when receiving kill signal
Definition: Runnable.h:276
 
static std::function< bool()> killTrue
helper function to automatically kill process when receiving kill signal
Definition: Runnable.h:274
 
ExFuture< ExecutorAgent, std::invoke_result_t< F > > launch(Executor< ExecutorAgent > ex, F &&f)
Launch given functor in given executor.
Definition: Executor.h:65
 
ExFuture< ExecutorAgent, TArg > parallel(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Execute given functions in a (possibly, depending on concrete executor) parallel way If an exception ...
Definition: Executor.h:499
 
ExFuture< ExecutorAgent, TArg > loop(ExFuture< ExecutorAgent, TArg > source, I getIteratorsFunc, F functor, int increment=1)
parallel (possibly, depending on executor capabilities) loop
Definition: Executor.h:290
 
Executor< ThreadPool > ThreadPoolExecutor
alias for Executor<ThreadPool>
Definition: ThreadPoolExecutor.h:238
 
ExFuture< ExecutorAgent, typename ::mel::execution::_private::GetReturn< TArg, FTypes... >::type > parallel_convert(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Same as parallel but returning a tuple with the values for each functor.
Definition: Executor.h:726
 
Definition: Callback_Impl.h:11
 
Default traits for any executor.
Definition: Executor.h:46
 
Concrete options for this type of executor.
Definition: ThreadPoolExecutor.h:20
 
bool autoKill
Definition: ThreadPoolExecutor.h:24