11 #include <tasking/Process.h>
13 #include <forward_list>
15 using std::forward_list;
26 #include <core/Timer.h>
28 #include <core/CallbackSubscriptor.h>
30 #include <mpl/Int2Type.h>
31 using ::mel::mpl::Int2Type;
43 std::shared_ptr<Process>>
46 std::shared_ptr<Process>>
49 std::shared_ptr<Process>>
60 typedef unsigned int ThreadID;
63 class LockFreeTasksContainer
68 ElementType() : p(
nullptr ), st( 0 ), valid(
false ) {}
69 std::shared_ptr<Process> p;
71 std::atomic<bool> valid;
75 typedef std::vector<ElementType> PoolType;
76 std::atomic<size_t> mCurrIdx = 0;
77 std::atomic<size_t> mSize;
78 std::deque<PoolType> mPool;
81 volatile bool mInvalidate;
85 LockFreeTasksContainer(
size_t chunkSize,
size_t maxChunks );
86 PoolType::value_type& operator[](
size_t idx );
87 void add( std::shared_ptr<Process>& process,
88 unsigned int startTime );
90 getCurrIdx( std::memory_order mo = std::memory_order_relaxed )
92 return mCurrIdx.load( mo );
95 size_t size( std::memory_order memOrder =
96 std::memory_order_acquire )
const
98 return mSize.load( memOrder );
101 size_t exchangeIdx(
size_t v, std::memory_order order =
102 std::memory_order_seq_cst );
106 bool isInvalidate()
const {
return mInvalidate; }
107 void setInvalidate(
bool v ) { mInvalidate = v; }
108 size_t getMaxSize()
const {
return mMaxSize; }
112 typedef std::pair<std::shared_ptr<Process>,
unsigned int>
114 typedef forward_list<ProcessElement>
117 std::pair<std::shared_ptr<Process>,
unsigned int>>
127 size_t chunkSize = 512;
128 size_t maxChunks = 4;
137 typedef std::variant<BlockingOptions, LockFreeOptions>
161 unsigned int startTime = 0 );
172 unsigned int startTime = 0 );
177 inline unsigned int getProcessCount()
const;
181 inline unsigned int getActiveProcessCount()
const;
197 template <
class T>
void pauseProcesses( T& predicate );
227 inline TProcessList& getProcesses();
242 inline const std::shared_ptr<Timer> getTimer()
const;
243 inline std::shared_ptr<Timer> getTimer();
255 return mSS.subscribeCallback( std::forward<F>( f ) );
257 template <
class F>
void unsusbcribeSleepEvent( F&& f )
259 mSS.unsubscribeCallback( std::forward<F>( f ) );
261 void unsusbcribeSleepEvent(
int id )
263 mSS.unsubscribeCallback(
id );
265 template <
class F>
int susbcribeWakeEvent( F&& f )
267 return mWS.subscribeCallback( std::forward<F>( f ) );
269 template <
class F>
void unsusbcribeWakeEvent( F&& f )
271 mWS.unsubscribeCallback( std::forward<F>( f ) );
273 template <
class F>
int subscribeProcessEvicted( F&& f )
275 return mES.subscribeCallback( std::forward<F>( f ) );
277 template <
class F>
void unsubscribeProcessEvicted( F&& f )
279 mES.unsubscribeCallback( std::forward<F>( f ) );
281 void unsubscribeProcessEvicted(
int id )
283 mES.unsubscribeCallback(
id );
287 std::shared_ptr<Process> current;
296 TProcessList mProcessList;
297 std::unique_ptr<LockFreeTasksContainer> mLockFreeTasks;
299 TNewProcesses mBlockingTasks;
302 std::shared_ptr<Timer> mTimer;
303 std::atomic<unsigned int> mProcessCount;
304 std::atomic<int32_t> mInactiveProcessCount;
305 bool mKillingProcess;
308 void* _stack =
nullptr;
317 void _executeProcesses( uint64_t time,
318 TProcessList& processes ) OPTIMIZE_FLAGS;
321 void _triggerSleepEvents( std::shared_ptr<Process> p );
322 void _triggerWakeEvents( std::shared_ptr<Process> p );
330 void processAwakened( std::shared_ptr<Process> );
337 void processAsleep( std::shared_ptr<Process> );
342 for (
auto i = mProcessList.begin(); i != mProcessList.end(); ++i )
344 if ( predicate( *i ) )
346 ( *i ).first->pause();
363 return mProcessCount;
365 unsigned int ProcessScheduler::getActiveProcessCount()
const
Definition: CallbackSubscriptor_Impl.h:454
A periodic task, implementing a microthread.
Definition: Process.h:107
Definition: ProcessScheduler.h:59
const std::shared_ptr< Timer > getTimer() const
Definition: ProcessScheduler.h:370
void killProcesses(bool deferred)
void setTimer(std::shared_ptr< Timer > timer)
unsigned int getProcessCount() const
Definition: ProcessScheduler.h:360
void insertProcessNoLock(std::shared_ptr< Process > process, unsigned int startTime=0)
Insert new process in scheduler without locking processlist.
void destroyAllProcesses()
TProcessList & getProcesses()
Definition: ProcessScheduler.h:350
std::variant< BlockingOptions, LockFreeOptions > SchedulerOptions
Scheduler creation options. By default use BlockingOotions.
Definition: ProcessScheduler.h:138
int susbcribeSleepEvent(F &&f)
Definition: ProcessScheduler.h:253
void insertProcess(std::shared_ptr< Process > process, unsigned int startTime=0)
ProcessScheduler(SchedulerOptions opts)
Constructor.
static std::shared_ptr< Process > getCurrentProcess()
Definition: Callback_Impl.h:11
Definition: CallbackSubscriptor.h:37
Definition: CallbackSubscriptor.h:40
option for blocking scheduler
Definition: ProcessScheduler.h:133
options for lock_free scheduler
Definition: ProcessScheduler.h:126
Definition: ProcessScheduler.h:67
Definition: ProcessScheduler.h:286