MEL
Microthread & Execution library
ProcessScheduler.h
1 
2 /*
3  * SPDX-FileCopyrightText: 2005,2022 Daniel Barrientos
4  * <danivillamanin@gmail.com>
5  *
6  * SPDX-License-Identifier: MIT
7  */
8 
9 #pragma once
10 
11 #include <tasking/Process.h>
12 
13 #include <forward_list>
15 using std::forward_list;
16 
17 #include <utility>
18 
19 /*
20  * SPDX-FileCopyrightText: 2005,2022 Daniel Barrientos
21  * <danivillamanin@gmail.com>
22  *
23  * SPDX-License-Identifier: MIT
24  */
25 
26 #include <core/Timer.h>
27 using mel::core::Timer;
28 #include <core/CallbackSubscriptor.h>
30 #include <mpl/Int2Type.h>
31 using ::mel::mpl::Int2Type;
32 #include <atomic>
33 #include <deque>
34 #include <memory>
35 #include <mutex>
36 #include <variant>
37 #include <vector>
38 namespace mel
39 {
40  namespace tasking
41  {
43  std::shared_ptr<Process>>
44  WakeSubscriptor;
46  std::shared_ptr<Process>>
47  SleepSubscriptor;
49  std::shared_ptr<Process>>
50  EvictSubscriptor;
58  class MEL_API ProcessScheduler
59  {
60  typedef unsigned int ThreadID;
61  friend class Process;
62  // container for new tasks when using lock_Free
63  class LockFreeTasksContainer
64  {
65  public:
66  struct ElementType
67  {
68  ElementType() : p( nullptr ), st( 0 ), valid( false ) {}
69  std::shared_ptr<Process> p;
70  unsigned int st;
71  std::atomic<bool> valid;
72  };
73 
74  private:
75  typedef std::vector<ElementType> PoolType;
76  std::atomic<size_t> mCurrIdx = 0;
77  std::atomic<size_t> mSize;
78  std::deque<PoolType> mPool;
79  size_t mChunkSize;
80  size_t mMaxSize;
81  volatile bool mInvalidate;
82  std::mutex mSC;
83 
84  public:
85  LockFreeTasksContainer( size_t chunkSize, size_t maxChunks );
86  PoolType::value_type& operator[]( size_t idx );
87  void add( std::shared_ptr<Process>& process,
88  unsigned int startTime );
89  inline size_t
90  getCurrIdx( std::memory_order mo = std::memory_order_relaxed )
91  {
92  return mCurrIdx.load( mo );
93  }
94  void clear();
95  size_t size( std::memory_order memOrder =
96  std::memory_order_acquire ) const
97  {
98  return mSize.load( memOrder );
99  } // pruebas memoryorder
100  // return previous value
101  size_t exchangeIdx( size_t v, std::memory_order order =
102  std::memory_order_seq_cst );
103  //@todo lock y unlock de pruebas
104  void lock();
105  void unlock();
106  bool isInvalidate() const { return mInvalidate; }
107  void setInvalidate( bool v ) { mInvalidate = v; }
108  size_t getMaxSize() const { return mMaxSize; }
109  };
110 
111  public:
112  typedef std::pair<std::shared_ptr<Process>, unsigned int>
113  ProcessElement;
114  typedef forward_list<ProcessElement>
115  TProcessList; // pairs of processes and starttime
116  typedef std::deque<
117  std::pair<std::shared_ptr<Process>, unsigned int>>
118  TNewProcesses; // pairs of processes and starttime in blocking
119  // scheduler
120 
126  {
127  size_t chunkSize = 512;
128  size_t maxChunks = 4;
130  };
133  {
134  // empty now
135  };
137  typedef std::variant<BlockingOptions, LockFreeOptions>
149  ~ProcessScheduler( void );
150 
160  void insertProcess( std::shared_ptr<Process> process,
161  unsigned int startTime = 0 );
171  void insertProcessNoLock( std::shared_ptr<Process> process,
172  unsigned int startTime = 0 );
173 
177  inline unsigned int getProcessCount() const;
178  /*
179  * get number of processes running (not sleeped)
180  */
181  inline unsigned int getActiveProcessCount() const;
197  template <class T> void pauseProcesses( T& predicate );
198 
214  void killProcesses( bool deferred );
215 
225  // inline bool checkFor(unsigned int taskId);
226 
227  inline TProcessList& getProcesses();
228  // inline TNewProcesses& getPendingProcesses();
234 
238  void setTimer( std::shared_ptr<Timer> timer );
242  inline const std::shared_ptr<Timer> getTimer() const;
243  inline std::shared_ptr<Timer> getTimer();
244 
248  static std::shared_ptr<Process> getCurrentProcess();
249 
253  template <class F> int susbcribeSleepEvent( F&& f )
254  {
255  return mSS.subscribeCallback( std::forward<F>( f ) );
256  }
257  template <class F> void unsusbcribeSleepEvent( F&& f )
258  {
259  mSS.unsubscribeCallback( std::forward<F>( f ) );
260  }
261  void unsusbcribeSleepEvent( int id )
262  {
263  mSS.unsubscribeCallback( id );
264  }
265  template <class F> int susbcribeWakeEvent( F&& f )
266  {
267  return mWS.subscribeCallback( std::forward<F>( f ) );
268  }
269  template <class F> void unsusbcribeWakeEvent( F&& f )
270  {
271  mWS.unsubscribeCallback( std::forward<F>( f ) );
272  }
273  template <class F> int subscribeProcessEvicted( F&& f )
274  {
275  return mES.subscribeCallback( std::forward<F>( f ) );
276  }
277  template <class F> void unsubscribeProcessEvicted( F&& f )
278  {
279  mES.unsubscribeCallback( std::forward<F>( f ) );
280  }
281  void unsubscribeProcessEvicted( int id )
282  {
283  mES.unsubscribeCallback( id );
284  }
285  struct ProcessInfo // for TLS
286  {
287  std::shared_ptr<Process> current;
288  };
289 
290  private:
291  SchedulerOptions mOpts;
292  bool mIsLockFree; // same as checking index in mOpts but for
293  // performance
294 
295  ProcessInfo* mProcessInfo;
296  TProcessList mProcessList;
297  std::unique_ptr<LockFreeTasksContainer> mLockFreeTasks;
298  // new processes to insert next time blocking variant
299  TNewProcesses mBlockingTasks;
300  size_t mLastIdx;
301  std::mutex mCS;
302  std::shared_ptr<Timer> mTimer;
303  std::atomic<unsigned int> mProcessCount;
304  std::atomic<int32_t> mInactiveProcessCount;
305  bool mKillingProcess; // flag to mark when ther is a kill task
306  // pending
307 #ifndef NDEBUG
308  void* _stack = nullptr;
309 #endif
310  WakeSubscriptor mWS;
311  SleepSubscriptor mSS;
312  EvictSubscriptor mES;
313 
317  void _executeProcesses( uint64_t time,
318  TProcessList& processes ) OPTIMIZE_FLAGS;
319 
320  void _killTasks();
321  void _triggerSleepEvents( std::shared_ptr<Process> p );
322  void _triggerWakeEvents( std::shared_ptr<Process> p );
323  static ProcessInfo* _getCurrentProcessInfo();
330  void processAwakened( std::shared_ptr<Process> );
337  void processAsleep( std::shared_ptr<Process> );
338  };
339 
340  template <class T> void ProcessScheduler::pauseProcesses( T& predicate )
341  {
342  for ( auto i = mProcessList.begin(); i != mProcessList.end(); ++i )
343  {
344  if ( predicate( *i ) )
345  {
346  ( *i ).first->pause();
347  }
348  }
349  }
350  ProcessScheduler::TProcessList& ProcessScheduler::getProcesses()
351  {
352  return mProcessList;
353  }
354  /*ProcessScheduler::TNewProcesses&
355  ProcessScheduler::getPendingProcesses()
356  {
357  return mNewProcesses;
358  }*/
359 
361  {
362 
363  return mProcessCount;
364  }
365  unsigned int ProcessScheduler::getActiveProcessCount() const
366  {
367  return getProcessCount() - ( (unsigned int)mInactiveProcessCount );
368  }
369 
370  const std::shared_ptr<Timer> ProcessScheduler::getTimer() const
371  {
372  return mTimer;
373  }
374  std::shared_ptr<Timer> ProcessScheduler::getTimer() { return mTimer; }
375  // static thread_local ProcessScheduler::ProcessInfo
376  // tlCurrentProcess{nullptr};
377  } // namespace tasking
378 } // namespace mel
Definition: CallbackSubscriptor_Impl.h:454
Definition: Timer.h:18
A periodic task, implementing a microthread.
Definition: Process.h:107
Definition: ProcessScheduler.h:59
const std::shared_ptr< Timer > getTimer() const
Definition: ProcessScheduler.h:370
void killProcesses(bool deferred)
void setTimer(std::shared_ptr< Timer > timer)
unsigned int getProcessCount() const
Definition: ProcessScheduler.h:360
void insertProcessNoLock(std::shared_ptr< Process > process, unsigned int startTime=0)
Insert new process in scheduler without locking processlist.
TProcessList & getProcesses()
Definition: ProcessScheduler.h:350
std::variant< BlockingOptions, LockFreeOptions > SchedulerOptions
Scheduler creation options. By default use BlockingOotions.
Definition: ProcessScheduler.h:138
int susbcribeSleepEvent(F &&f)
Definition: ProcessScheduler.h:253
void insertProcess(std::shared_ptr< Process > process, unsigned int startTime=0)
ProcessScheduler(SchedulerOptions opts)
Constructor.
static std::shared_ptr< Process > getCurrentProcess()
Definition: Callback_Impl.h:11
Definition: CallbackSubscriptor.h:37
Definition: CallbackSubscriptor.h:40
option for blocking scheduler
Definition: ProcessScheduler.h:133
options for lock_free scheduler
Definition: ProcessScheduler.h:126
Definition: ProcessScheduler.h:286