MEL
Microthread & Execution library
Thread.h
1 #pragma once
2 
3 #include <core/Event.h>
4 #include <core/Future.h>
5 #include <core/ThreadDefs.h>
6 #include <memory>
7 #include <parallelism/Barrier.h>
8 #include <thread>
9 
10 namespace mel
11 {
15  namespace core
16  {
20  MEL_API unsigned int getNumProcessors();
21  MEL_API uint64_t getProcessAffinity();
23  MEL_API bool setAffinity( uint64_t );
24  using std::string;
25 
32  class MEL_API Thread
33  {
34 
35  public:
36  //@brief policy for yield() function
37  enum YieldPolicy
38  {
39  YP_ANY_THREAD_ANY_PROCESSOR = 0,
40  YP_ANY_THREAD_SAME_PROCESSOR
41  };
42 
47  template <class F> Thread( F&& f );
48  // not a thread
49  Thread();
53  virtual ~Thread();
54 
65  inline EThreadPriority getPriority() const;
66 
74  bool
75  join( unsigned int millis = 0xFFFFFFFF /*TODO mac: INFINITE*/ );
76 
88  static void sleep( const unsigned int millis );
97  static void yield( YieldPolicy yp = YP_ANY_THREAD_SAME_PROCESSOR );
98 
102  inline std::thread::id getThreadId() const;
107  // inline EThreadState getState() const;
111  // inline bool getTerminateRequest() const;
112 
113  uint64_t getAffinity() const;
117  bool setAffinity( uint64_t );
125  constexpr static unsigned getMinimunSleepTime()
126  {
127  //@todo por ahora pongo tiempo fijo "t�pico" para que se pueda
128  // usar y ya trataremos de que sea autom�tico o al menos m�s
129  // flexible
130  constexpr unsigned MINIMUM_SLEEP = 10;
131  return MINIMUM_SLEEP;
132  }
141  void terminate( unsigned int exitCode = 0 );
142 
143  private:
144  Thread( const char* name );
145  enum class EJoinResult
146  {
147  JOINED_NONE,
148  JOINED_OK,
149  JOINED_ERROR
150  } mJoinResult;
151 #if defined( MEL_LINUX ) || defined( MEL_MACOSX ) || defined( MEL_ANDROID ) || \
152  defined( MEL_IOS )
153  int mPriorityMin;
154  int mPriorityMax;
155 #endif
156  uint64_t mAffinity = 0; // affinity to set on start. if 0, is
157  // ignored
158  unsigned int mExitCode;
159  EThreadPriority mPriority = EThreadPriority::TP_NONE;
160 
161  std::thread mThread;
162  void _initialize();
163  };
164  template <class F>
165  Thread::Thread( F&& f ) : mThread( std::forward<F>( f ) )
166  {
167  _initialize();
168  }
169  std::thread::id Thread::getThreadId() const { return mThread.get_id(); }
170 
171  EThreadPriority Thread::getPriority() const { return mPriority; }
172 
176  /*template<class T> ::mel::core::WaitResult<T> waitForFutureThread(
177  const mel::core::Future<T>& f,unsigned int msecs =
178  ::mel::core::Event::EVENT_WAIT_INFINITE)
179  {
180  using ::mel::core::Event;
181  struct _Receiver
182  {
183  _Receiver():mEvent(false,false){}
184  using futT = mel::core::Future<T>;
185  mel::core::EWaitError wait( const futT& f,unsigned int
186  msecs)
187  {
188  Event::EWaitCode eventresult;
189  int evId = f.subscribeCallback([this](typename
190  futT::ValueType& )
191  {
192  mEvent.set();
193  });
194  eventresult = mEvent.wait(msecs);
195  f.unsubscribeCallback(evId);
196  switch( eventresult )
197  {
198  case ::mel::core::Event::EVENT_WAIT_OK:
199  return
200  ::mel::core::EWaitError::FUTURE_WAIT_OK; break; case
201  ::mel::core::Event::EVENT_WAIT_TIMEOUT: return
202  ::mel::core::EWaitError::FUTURE_WAIT_TIMEOUT; break; case
203  ::mel::core::Event::EVENT_WAIT_ERROR: return
204  ::mel::core::EWaitError::FUTURE_UNKNOWN_ERROR; break; default: return
205  ::mel::core::EWaitError::FUTURE_WAIT_OK; //silent warnin
206  }
207  }
208  private:
209  mel::core::Event mEvent;
210  };
211  auto receiver = std::make_unique<_Receiver>();
212  mel::core::EWaitError waitRes = receiver->wait(f,msecs);
213  switch (waitRes)
214  {
215  case ::mel::core::EWaitError::FUTURE_WAIT_OK:
216  if ( f.getValue().isValid())
217  return ::mel::core::WaitResult(f);
218  else
219  std::rethrow_exception(f.getValue().error());
220  break;
221  case ::mel::core::EWaitError::FUTURE_RECEIVED_KILL_SIGNAL:
222  throw
223  mel::core::WaitException(mel::core::EWaitError::FUTURE_RECEIVED_KILL_SIGNAL,"Kill
224  signal received"); break; case
225  ::mel::core::EWaitError::FUTURE_WAIT_TIMEOUT: throw
226  mel::core::WaitException(mel::core::EWaitError::FUTURE_RECEIVED_KILL_SIGNAL,"Time
227  out exceeded"); break; case
228  ::mel::core::EWaitError::FUTURE_UNKNOWN_ERROR: throw
229  mel::core::WaitException(mel::core::EWaitError::FUTURE_UNKNOWN_ERROR,"Unknown
230  error"); break; default: throw
231  mel::core::WaitException(mel::core::EWaitError::FUTURE_UNKNOWN_ERROR,"Unknown
232  error"); break;
233  }
234  */
235  template <class ErrorType = mel::core::WaitErrorAsException, class T>
237  const mel::core::Future<T>& f,
238  unsigned int msecs = ::mel::core::Event::
239  EVENT_WAIT_INFINITE ) noexcept( std::
240  is_same<
241  ErrorType,
242  ::mel::core::
244  value )
245  {
246  constexpr bool NotuseException =
247  std::is_same<ErrorType,
249  constexpr bool UseException =
250  std::is_same<ErrorType,
252  static_assert( NotuseException || UseException,
253  "WaitForFutureMThread must specify either "
254  "WaitErrorNoException or WaitErrorAsException" );
255  using ::mel::core::Event;
256  struct _Receiver
257  {
258  _Receiver() : mEvent( false, false ) {}
259  using futT = mel::core::Future<T>;
260  mel::core::EWaitError wait( const futT& f, unsigned int msecs )
261  {
262  Event::EWaitCode eventresult;
263  int evId = f.subscribeCallback(
264  [this]( typename futT::ValueType& ) { mEvent.set(); } );
265  eventresult = mEvent.wait( msecs );
266  f.unsubscribeCallback( evId );
267  switch ( eventresult )
268  {
269  case ::mel::core::Event::EVENT_WAIT_OK:
270  return ::mel::core::EWaitError::FUTURE_WAIT_OK;
271  break;
272  case ::mel::core::Event::EVENT_WAIT_TIMEOUT:
273  return ::mel::core::EWaitError::FUTURE_WAIT_TIMEOUT;
274  break;
275  case ::mel::core::Event::EVENT_WAIT_ERROR:
276  return ::mel::core::EWaitError::FUTURE_UNKNOWN_ERROR;
277  break;
278  default:
279  return ::mel::core::EWaitError::
280  FUTURE_WAIT_OK; // silent warnin
281  }
282  }
283 
284  private:
285  mel::core::Event mEvent;
286  };
287  ::mel::core::WaitResult result( f );
288  auto receiver = std::make_unique<_Receiver>();
289  mel::core::EWaitError waitRes = receiver->wait( f, msecs );
290  if constexpr ( NotuseException )
291  {
292  switch ( waitRes )
293  {
294  case ::mel::core::EWaitError::FUTURE_WAIT_OK:
295  break;
296  case ::mel::core::EWaitError::FUTURE_RECEIVED_KILL_SIGNAL:
297  result.setError(
298  std::make_exception_ptr( mel::core::WaitException(
300  "Kill signal received" ) ) );
301  break;
302  case ::mel::core::EWaitError::FUTURE_WAIT_TIMEOUT:
303  result.setError(
304  std::make_exception_ptr( mel::core::WaitException(
306  "Time out exceeded" ) ) );
307  break;
308  case ::mel::core::EWaitError::FUTURE_UNKNOWN_ERROR:
309  result.setError(
310  std::make_exception_ptr( mel::core::WaitException(
312  "Unknown error" ) ) );
313  break;
314  default:
315  result.setError(
316  std::make_exception_ptr( mel::core::WaitException(
318  "Unknown error" ) ) );
319  break;
320  }
321  }
322  else // verion throwing exception (UseExceptoion == true)
323  {
324  switch ( waitRes )
325  {
326  case ::mel::core::EWaitError::FUTURE_WAIT_OK:
327  if ( f.getValue().isValid() )
328  return ::mel::core::WaitResult( f );
329  else
330  std::rethrow_exception( f.getValue().error() );
331  break;
332  case ::mel::core::EWaitError::FUTURE_RECEIVED_KILL_SIGNAL:
335  "Kill signal received" );
336  break;
337  case ::mel::core::EWaitError::FUTURE_WAIT_TIMEOUT:
340  "Time out exceeded" );
341  break;
342  case ::mel::core::EWaitError::FUTURE_UNKNOWN_ERROR:
345  "Unknown error" );
346  break;
347  default:
350  "Unknown error" );
351  break;
352  }
353  }
354  return result;
355  }
356 
361  MEL_API ::mel::core::Event::EWaitCode
362  waitForBarrierThread( const ::mel::parallelism::Barrier& b,
363  unsigned int msecs = Event::EVENT_WAIT_INFINITE );
364  } // namespace core
365 } // namespace mel
Definition: Event.h:25
EWaitCode
Wait result codes.
Definition: Event.h:31
int subscribeCallback(F &&f) const
Subscribe callback to be executed when future is ready (valid or error)
Definition: Future.h:681
Represents a value that maybe is not present at the current moment.
Definition: Future.h:750
Wrapper around std::thread to prive more abstractions.
Definition: Thread.h:33
bool join(unsigned int millis=0xFFFFFFFF)
std::thread::id getThreadId() const
Definition: Thread.h:169
uint64_t getAffinity() const
virtual ~Thread()
Thread is joined on destruction.
bool setAffinity(uint64_t)
EThreadPriority getPriority() const
Definition: Thread.h:171
void setPriority(EThreadPriority tp)
void terminate(unsigned int exitCode=0)
static void yield(YieldPolicy yp=YP_ANY_THREAD_SAME_PROCESSOR)
static void sleep(const unsigned int millis)
constexpr static unsigned getMinimunSleepTime()
Definition: Thread.h:125
Exception class generated by WaitResult when wait for future gets an error.
Definition: Future.h:882
Wrapper for future value after wait.
Definition: Future.h:902
base functionalities
Definition: Callback_Impl.h:13
MEL_API unsigned int getNumProcessors()
Get the number of logical processors.
::mel::core::WaitResult< T > waitForFutureThread(const mel::core::Future< T > &f, unsigned int msecs=::mel::core::Event::EVENT_WAIT_INFINITE) noexcept(std::is_same< ErrorType, ::mel::core::WaitErrorNoException >::value)
Wait for a Future from a Thread.
Definition: Thread.h:236
MEL_API bool setAffinity(uint64_t)
Set affinity for current thread.
MEL_API ::mel::core::Event::EWaitCode waitForBarrierThread(const ::mel::parallelism::Barrier &b, unsigned int msecs=Event::EVENT_WAIT_INFINITE)
Wait for a barrier to activated in the context of a thread.
EThreadPriority
Definition: ThreadDefs.h:21
EWaitError
Generic result error codes for future waiting.
Definition: Future.h:29
@ FUTURE_WAIT_TIMEOUT
Time out expired while waiting for Future.
@ FUTURE_UNKNOWN_ERROR
Unknow error while waiting for Future.
Definition: Callback_Impl.h:11
Policy for treating error as exception in future wait functions.
Definition: Future.h:936
Policy for not treating error as exception in future wait functions.
Definition: Future.h:941