MEL
Microthread & Execution library
Runnable.h
1 #pragma once
2 /*
3  * SPDX-FileCopyrightText: 2005,2022 Daniel Barrientos <danivillamanin@gmail.com>
4  *
5  * SPDX-License-Identifier: MIT
6  */
7 #include <core/Callback.h>
8 #include <tasking/ProcessScheduler.h>
9 #include <tasking/GenericProcess.h>
10 #include <core/CallbackSubscriptor.h>
12 
13 #include <map>
14 using std::map;
15 
18 
19 #include <mpl/ParamAdder.h>
20 using mel::mpl::addParam;
21 #include <mpl/LinkArgs.h>
22 using mel::mpl::linkFunctor;
23 #include <mpl/ReturnAdaptor.h>
24 using mel::mpl::returnAdaptor;
25 #include <mpl/MemberEncapsulate.h>
26 using mel::mpl::makeMemberEncapsulate;
27 #include <mpl/Functor.h>
28 using mel::mpl::chain;
29 #include <mpl/AsPtr.h>
30 using mel::mpl::asPtr;
31 
32 #include <core/ThreadDefs.h>
33 
34 #include <core/Future.h>
35 using mel::core::Future;
36 #include <functional>
37 #include <cassert>
38 #include <forward_list>
39 #include <type_traits>
40 
41 #define RUNNABLE_TASK_ALIGNMENT 8
42 namespace mel
43 {
44  namespace tasking
45  {
46  //some macros to create task functors easily from bool f() functors
47  //note: be carefull with f forma, because if it has soma "," inside, maybe you
48  // need to use "coma" (defined somewhere in mpl) or use a extra parenthesis
49  // for example, if you want to do RUNNABLE_CREATETASK( link1st<false,void>( xxx ) ) this doesn't compile
50  //so you need to do RUNNABLE_CREATETASK( (link1st<false,void>( xxx )) ) or RUNNABLE_CREATETASK( link1st<false coma void>( xxx ) )
51  //to show de preprocessor that the ',' is not a parameter separator sign
52  #define RUNNABLE_CREATETASK( f ) \
53  addParam< ::mel::tasking::EGenericProcessResult,Process*,uint64_t,void > \
54  (addParam< ::mel::tasking::EGenericProcessResult,uint64_t,void >( f ) )
55  //useful macro to declare task parameters
56  #define RUNNABLE_TASK_PARAMS uint64_t t,Process* p
57  class Runnable; //predeclaration
61  class MEL_API ProcessFactory
62  {
63  public:
65  inline GenericProcess* create(Runnable* owner)const {return onCreate(owner);};
66  protected:
73  virtual GenericProcess* onCreate(Runnable* owner) const;
74 
75  };
77  struct MEL_API DefaultAllocator
78  {
80  static GenericProcess* allocate(Runnable* _this);
81  };
83  namespace _private
84  {
85  class MEL_API RunnableTask final: public GenericProcess
86  {
87  public:
88  //throws bad_alloc oif no enoguh memory
89  static void* operator new( size_t s,Runnable* owner );
90  void operator delete(void* ptr, Runnable*) noexcept;
91  static void operator delete( void* ptr ) noexcept;
92  RunnableTask(){}
93  private:
94  };
95  /*
96  //default allocator for new tasks (through post) doing a simple new
97  template <class T>
98  struct Allocator
99  {
100  static T* allocate(Runnable* _this)
101  {
102  return new T();
103  }
104  };
105  //special allocator for RunnableTask using internal pool
106  template <>
107  struct Allocator<RunnableTask>
108  {
109  static RunnableTask* allocate(Runnable* _this)
110  {
111  return new (_this)RunnableTask();
112  }
113  };
114  */
115 
116  struct RTMemPool; //predeclaration
117  struct RTMemBlock
118  {
119  enum class EMemState:uint8_t { FREE = 0,USED = 1 } ;
120  EMemState memState = EMemState::FREE;
121  alignas(RUNNABLE_TASK_ALIGNMENT) char task[ sizeof( ::mel::tasking::_private::RunnableTask ) ] ;
122  //RunnableTask task;
123  RTMemPool* owner;
124  };
125  struct MemZoneList
126  {
127  typedef std::forward_list<RTMemPool> ListType;
128  public:
129  MemZoneList():mSize(0){}
130  size_t size(){ return mSize;};
131  void push_front(RTMemPool&& pool)
132  {
133  mList.push_front(std::move(pool));
134  ++mSize;
135  }
136  ListType& getList(){ return mList;}
137  void remove(RTMemPool* pool )
138  {
139  //@todo hacer más eficiente teniendo iterador
140  mList.remove_if([pool](const RTMemPool& p)
141  {
142  auto r = (pool == &p);
143  return r;
144  });
145  --mSize; //todo no guaratee was removed. Since C++20 remove returns number of elements removed
146  }
147  private:
148  size_t mSize;
149  ListType mList;
150  };
151  //typedef std::forward_list<RTMemPool> MemZoneList;
152  struct RTMemPool
153  {
154  RTMemPool():pool(0),count(0){}
155  RTMemBlock* pool; //array to memory blocks
156  Runnable* owner;
157  size_t count;
158  //MemZoneList::iterator iterator; quitar
159  };
160  }
162 
182  class MEL_API Runnable
183  {
184  private:
185 
186  public:
188  {
189  Runnable* current = nullptr;
190  };
191  static const unsigned int DEFAULT_POOL_SIZE = 512;
192  //static const unsigned int DEFAULT_MAX_NEW_TASKS = DEFAULT_POOL_SIZE*4;
193 
195  {
196  unsigned int maxPoolSize = DEFAULT_POOL_SIZE;
198  };
199  inline ::mel::core::ThreadId getOwnerThreadId() const { assert(mOwnerThread != 0); return mOwnerThread; }
206  inline void setOwnerThreadId(mel::core::ThreadId tid) {mOwnerThread=tid;}
207  static Runnable* getCurrentRunnable();
212  void setDefaultFactory(ProcessFactory* factory){mDefaultFactory.reset(factory);}
214  inline const ProcessFactory* getDefaultFactory() const{return mDefaultFactory.get();}
215  protected:
216  private:
217  //get info on currently executing Runnable in current thread
218  static RunnableInfo* _getCurrentRunnableInfo();
219  friend class ::mel::tasking::_private:: RunnableTask;
220  //static thread_local Runnable::RunnableInfo tlCurrentRunnable;
221  RunnableInfo* mCurrentInfo;
222  std::unique_ptr<ProcessFactory> mDefaultFactory; //factory to use for allocating tasks if no other given
223  ProcessScheduler mTasks;
224  RunnableCreationOptions mOpts;
225  mel::tasking::_private::MemZoneList mRTZone;
226  //std::atomic<State> mState;
227  std::mutex mMemPoolCS;
228  mel::core::ThreadId mOwnerThread;//thread executing Runnable
229 
231  mel::tasking::_private::RTMemPool* _addNewPool();
233  void _removePool( ::mel::tasking::_private::RTMemPool* );
234 
235  protected:
241  void processTasks();
242 
243  virtual void onPostTask(std::shared_ptr<Process> process){};
244 
252  //template <class F>
253  //void setTaskAddedEvent( F functor );
254  //inline TTaskAddedEvent* getTaskAddedEvent() const;
255  public:
263  void postTask(std::shared_ptr<Process> process,unsigned int startTime = 0);
264 
269  virtual ~Runnable();
270 
271 
272  //helper functions to use as the "killFunctor" parameter in post(),fireAndForget(), and execute()
274  static std::function<bool()> killTrue;
276  static std::function<bool()> killFalse;
290  template <bool ignoreNoThrow=false,class AllocatorType = ::mel::tasking::DefaultAllocator, class F,class KF = const std::function<bool()>&>
291  std::shared_ptr<Process> post(
292  F&& task_proc,
293  KF&& killFunction=killFalse,
294  unsigned int period = 0,unsigned int startTime = 0);
301  template <bool ignoreNoThrow=false,class AllocatorType = ::mel::tasking::DefaultAllocator, class F,class KF = const std::function<bool()>&>
302  std::shared_ptr<Process> fireAndForget(
303  F&& task_proc,
304  unsigned int startTime = 0,
305  KF&& killFunction=killTrue);
314  template <class TRet,class F,class KF = const std::function<bool()>&>
315  Future<TRet> execute( F&& function,KF&& killFunction=killFalse) noexcept;
316  template <class TRet,class F,class KF = const std::function<bool()>&>
317  Future<TRet> execute( F&& function,Future<TRet>,KF&& killFunction=killFalse) noexcept;
318 
319 
320 
326  inline const ProcessScheduler& getScheduler() const;
327  inline ProcessScheduler& getScheduler() ;
328 
333  void setTimer( std::shared_ptr<Timer> timer );
334  inline const std::shared_ptr<Timer> getTimer( ) const;
335  inline std::shared_ptr<Timer> getTimer( );
336  inline unsigned int getPendingTaskCount() const
337  {
338  return (unsigned int)getScheduler().getProcessCount();
339  }
340  /*
341  * get number of processes running (not sleeped or paused)
342  */
343  // intentar flexibilizarlo para cuando considere los wait
344  inline unsigned int getActiveTaskCount() const
345  {
346  return (unsigned int)getScheduler().getActiveProcessCount();
347  }
348  inline unsigned int getMaxPoolSize() const{ return mOpts.maxPoolSize;}
349  private:
350 
351  };
352 
353  template <bool ignoreNoThrow, class AllocatorType, class F,class KF>
354  std::shared_ptr<Process> Runnable::post(
355  F&& task_proc,
356  KF&& killFunction,
357  unsigned int period,unsigned int startTime )
358  {
359  static_assert( !ignoreNoThrow || std::is_nothrow_invocable<F,uint64_t,Process*>::value,"Runnable::post. Task must be noexcept");
360  ::std::shared_ptr<GenericProcess> p(AllocatorType::allocate(this));
361  p->setProcessCallback( ::std::forward<F>(task_proc) );
362  p->setPeriod( period );
363  p->setKillCallback( ::std::forward<KF>(killFunction) );
364  postTask(p,startTime);
365  return p;
366  }
367  template <bool ignoreNoThrow, class AllocatorType, class F,class KF>
368  std::shared_ptr<Process> Runnable::fireAndForget(
369  F&& task_proc,
370  unsigned int startTime,
371  KF&& killFunction)
372  {
373  return post<ignoreNoThrow,AllocatorType>(
374  [f=std::forward<F>(task_proc)](RUNNABLE_TASK_PARAMS) mutable noexcept( noexcept(task_proc()) )
375  {
376  f();
377  return ::mel::tasking::EGenericProcessResult::KILL;
378  }
379  ,std::forward<KF>(killFunction),0,startTime
380  );
381 
382  }
383 
385  {
386  return mTasks;
387  }
389  {
390  return mTasks;
391  }
392  const std::shared_ptr<Timer> Runnable::getTimer( ) const
393  {
394  return mTasks.getTimer();
395  }
396  std::shared_ptr<Timer> Runnable::getTimer( )
397  {
398  return mTasks.getTimer();
399  }
400 
401  template <class TRet, class F,class KF>
402  Future<TRet> Runnable::execute( F&& function,KF&& killFunction) noexcept
403  {
404  Future<TRet> future;
405  return execute(std::forward<F>(function),future,std::forward<KF>(killFunction));
406  }
413  template <class TRet, class F,class KF>
414  Future<TRet> Runnable::execute( F&& f,Future<TRet> output,KF&& killFunction) noexcept
415  {
416  //always post the task, despite being in same thread. This is the most consistent way of doing it
417 
418  //PRUEBAS
419  // try
420  // {
421  // if constexpr (std::is_same<void,TRet>::value)
422  // {
423  // f();
424  // output.setValue();
425  // }
426  // else
427  // output.setValue(f());
428  // }
429  // //check chances of Exception
430  // catch( std::exception& e )
431  // {
432  // output.setError( ErrorType(Runnable::ERRORCODE_EXCEPTION,e.what()) );
433  // }
434  // catch(...)
435  // {
436 
437  // output.setError( ErrorType(Runnable::ERRORCODE_UNKNOWN_EXCEPTION,"Unknown exception") );
438 
439  // }
440 
441  post(
442  [output,f = std::forward<F>(f)](RUNNABLE_TASK_PARAMS) mutable noexcept
443  {
444  if constexpr (noexcept(f()))
445  {
446  if constexpr (std::is_same<void,TRet>::value)
447  {
448  f();
449  output.setValue();
450  }
451  else
452  {
453  output.setValue(f());
454  }
455  }else
456  {
457  try
458  {
459  if constexpr (std::is_same<void,TRet>::value)
460  {
461  f();
462  output.setValue();
463  }
464  else
465  {
466  output.setValue(f());
467  }
468  }
469  catch(...)
470  {
471  //output.setError( ErrorType(Runnable::ERRORCODE_UNKNOWN_EXCEPTION,"Unknown exception") );
472  output.setError( std::current_exception() );
473  }
474  }
475  /*
476  //check chances of Exception
477  catch( std::exception& e )
478  {
479  output.setError( ErrorType(Runnable::ERRORCODE_EXCEPTION,e.what()) );
480  }*/
481 
482  return ::mel::tasking::EGenericProcessResult::KILL;
483  },std::forward<KF>(killFunction)
484  );
485 
486 
487  return output;
488  }
489 
490  }
491 }
Definition: CallbackSubscriptor_Impl.h:454
Represents a value that maybe is not present at the current moment.
Definition: Future.h:750
A Process constructed from a functor with signature EGenericProcessResult(uint64_t,...
Definition: GenericProcess.h:24
Base factory class for tasks.
Definition: Runnable.h:62
virtual GenericProcess * onCreate(Runnable *owner) const
Reimplement in children to change default behaviour.
GenericProcess * create(Runnable *owner) const
Create new process for given Runnable.
Definition: Runnable.h:65
Definition: ProcessScheduler.h:59
const std::shared_ptr< Timer > getTimer() const
Definition: ProcessScheduler.h:370
std::variant< BlockingOptions, LockFreeOptions > SchedulerOptions
Scheduler creation options. By default use BlockingOotions.
Definition: ProcessScheduler.h:138
A class representing a "running" task, with added functionality to post events requesting execution o...
Definition: Runnable.h:183
Future< TRet > execute(F &&function, KF &&killFunction=killFalse) noexcept
Executes a function in a context of the Runnable. If this Runnable is in the same thread than caller ...
Definition: Runnable.h:402
void setDefaultFactory(ProcessFactory *factory)
Change default factory used to create task through post and fireAndForget.
Definition: Runnable.h:212
std::shared_ptr< Process > fireAndForget(F &&task_proc, unsigned int startTime=0, KF &&killFunction=killTrue)
Convenient function to post no periodic task with signature void f()
Definition: Runnable.h:368
void setOwnerThreadId(mel::core::ThreadId tid)
Definition: Runnable.h:206
void processTasks()
Performs a controlled loop over the internal queue, executing pending tasks. This function must be ca...
void postTask(std::shared_ptr< Process > process, unsigned int startTime=0)
static std::function< bool()> killFalse
helper function to reject kill when receiving kill signal
Definition: Runnable.h:276
const ProcessScheduler & getScheduler() const
Definition: Runnable.h:384
static std::function< bool()> killTrue
helper function to automatically kill process when receiving kill signal
Definition: Runnable.h:274
const ProcessFactory * getDefaultFactory() const
Retrieves the current default factory for tasks.
Definition: Runnable.h:214
std::shared_ptr< Process > post(F &&task_proc, KF &&killFunction=killFalse, unsigned int period=0, unsigned int startTime=0)
Definition: Runnable.h:354
void setTimer(std::shared_ptr< Timer > timer)
Runnable(RunnableCreationOptions opts)
Constructor.
Definition: Callback_Impl.h:11
Default allocator for new tasks (through Runnable::post) using the runnable default factory Runnable:...
Definition: Runnable.h:78
static GenericProcess * allocate(Runnable *_this)
Allocation function.
ProcessScheduler::SchedulerOptions schedulerOpts
creation options for internal scheduler
Definition: Runnable.h:197
Definition: Runnable.h:188