MEL
Microthread & Execution library
For.h
1 #pragma once
2 #include <MelLibType.h>
3 #include <parallelism/ThreadPool.h>
4 #include <parallelism/Barrier.h>
5 #include <mpl/TypeTraits.h>
6 #include <iterator>
7 #include <math.h>
8 #include <functional>
9 
10 #ifndef _ITERATOR_DEBUG_LEVEL
11 #define _ITERATOR_DEBUG_LEVEL 0
12 #endif
13 #include <tasking/utilities.h>
14 #include <core/Thread.h>
15 namespace mel
16 {
17  namespace parallelism
18  {
20  template <bool>
21  struct Distance
22  {
23  template <class I> static typename std::iterator_traits<I>::difference_type get(const I& a, const I& b)
24  {
25  return std::distance(a, b);
26  }
27  };
28  template <>
29  struct Distance<true>
30  {
31  template <class I> static I get(I a, I b)
32  {
33  return b - a;
34  }
35  };
36  template <bool>
37  struct Advance
38  {
39  template <class I> static void get(I& it, int increment)
40  {
41  std::advance(it, increment);
42  }
43  };
44  template <>
45  struct Advance<true>
46  {
47  template <class I> static void get(I& it, int increment)
48  {
49  it += increment;
50  }
51  };
53  template <bool>
54  struct BulkExecute
55  {
56  template <class I, class F> inline static void execute(int& cont,int nIterations,I i,I, I end,F&& functor,int divisionSize,int leftOver,int increment,int,ThreadPool* tp, const ThreadPool::ExecutionOpts& opts,Barrier& barrier)
57  {
58  ThreadPool::ExecutionOpts newOpts(opts);
59  newOpts.useCallingThread = false; //only one iteration in calling thread
60 
61  std::exception_ptr except; //@todo para que compile, pero hay que pasarselo
62  bool finish = ((i == end) || (cont > nIterations));
63  while (!finish)
64  {
65  tp->execute(newOpts,barrier,i,except,std::function<void(I)>([ divisionSize, functor, increment, cont, nIterations, leftOver](I i) mutable
66  {
67  I j = i;
68  int size = (cont == nIterations) ? divisionSize + leftOver : divisionSize;
69  for (int n = 0; n < size;)
70  {
71  functor(j);
72  if (++n < size)
73  Advance<::mel::mpl::TypeTraits<I>::isArith>::get(j, increment);
74  }
75  }
76  )
77  );
78  if (++cont <= nIterations) //because an iteration for each thread
79  {
80  Advance<::mel::mpl::TypeTraits<I>::isArith>::get(i, divisionSize*increment);
81  }
82  else
83  finish = true;
84  }
85  }
86  };
87  //specialization whitout using iterators and for not arithmethic iterators
88  //@todo ya no vale
89  template <>
90  struct BulkExecute<true>
91  {
92  template <class I, class F> static void execute(int& cont, int nIterations, I i,I begin, I end, F&& functor, int divisionSize, int leftOver, int increment, int loopSize, ThreadPool* tp, const ThreadPool::ExecutionOpts& opts, Barrier& barrier)
93  {
94  if (cont > nIterations)
95  return;
96  ThreadPool::ExecutionOpts newOpts(opts);
97  newOpts.useCallingThread = false; //only one iteration in calling thread
98 
100  typedef typename ::std::iterator_traits< typename std::decay<I>::type>::value_type ObjType;
101  typedef ::std::vector<ObjType*> VType;
102  ::std::shared_ptr<VType> vCopy(new VType(loopSize));
103  int idx = 0;
104 
105  for (auto j = begin; j != end; ++j)
106  {
107  (*vCopy.get())[idx++] = &(*j);
108  }
109  std::exception_ptr except; //@todo para que compile, pero hay que pasarselo
110  while (cont <= nIterations)
111  {
112  tp->execute(newOpts,barrier,cont,except,
113  std::function<void()>([divisionSize, functor, increment, vCopy, nIterations, leftOver](int cont) mutable
114  {
115  int size = (cont == nIterations) ? divisionSize + leftOver : divisionSize;
116  for (int n = 0; n < size; ++n)
117  {
118  int idx = (cont - 1)*(divisionSize*increment) + n * increment;
119  //@todo no vale porque necesito iterador
120  //ObjType* element = (*vCopy)[idx];
121  //functor(*element);
122  }
123  }
124  )
125  );
126  ++cont;
127  }
128  }
129  };
131  namespace _private
132  {
133  template <class I, class F> Barrier _for(ThreadPool* tp, const ThreadPool::ExecutionOpts& opts, I&& begin, I&& end, F&& functor, int increment, int loopSize)
134  {
135  Barrier result;
136  typedef typename std::decay<I>::type DecayedIt;
137  constexpr bool isArithIterator =::mel::mpl::TypeTraits<DecayedIt>::isArith;
138  int nElements = (loopSize + increment - 1) / increment; //"manual" ceil, because ceil function fails sometimes in fast floating mode
139  size_t nThreads = tp->getNumThreads()+(opts.useCallingThread?1:0);
140  if (begin == end)
141  return result;
142  DecayedIt i(begin);
143 
144  if (nElements <= (int)nThreads || opts.groupTasks == false || nThreads == 0 ) //more or equal threads than tasks
145  {
146  result = Barrier(nElements);
147  int cont = 0;
148  if (opts.useCallingThread)
149  {
150  ++cont;
151  if (nElements > 1)
152  {
153  Advance<isArithIterator>::get(i, increment);
154  }
155  }
156  std::exception_ptr except; //@todo para compilar, hay que pasarselo por param
157  ThreadPool::ExecutionOpts newOpts(opts);
158  newOpts.useCallingThread = false; //only one iteration in calling thread
159  bool finish = ((i == end) || (cont >= nElements));
160  while (!finish)
161  {
162  //tp->execute(newOpts, mBarrier, false, std::bind(typename std::decay<F>::type(functor), std::ref(GetElement<isArithIterator>::get(i)))); //@todo notengo claro que deba usar ref???
163  tp->execute(newOpts, result,i,except, typename std::decay<F>::type(functor));
164  if (++cont < nElements)
165  {
166  Advance<isArithIterator >::get(i, increment);
167  }
168  else
169  finish = true;
170  }
171  if (opts.useCallingThread && nElements > 0)
172  {
173  functor(begin);
174  result.set();
175  }
176  }
177  else //less threads than elements to process
178  {
179  int divisionSize = nElements / nThreads;
180  int nIterations = nThreads; //number of parallel iterations
181  int leftOver = nElements % nIterations;
182  result = Barrier(nIterations);
183  int cont = 1;
184  if (opts.useCallingThread)
185  {
186  if (nIterations > 1)
187  {
188  Advance<isArithIterator>::get(i, divisionSize*increment);
189  }
190  ++cont;
191  }
192  //BulkExecute<_ITERATOR_DEBUG_LEVEL != 0 && !isArithIterator >::execute(cont, nIterations, i,std::forward<I>(begin), std::forward<I>(end), std::forward<F>(functor), divisionSize, leftOver, increment, loopSize, tp, opts, mBarrier);
193  //@todo arreglar metodo sin iteradores
194  BulkExecute<false>::execute(cont, nIterations, i,std::forward<I>(begin), std::forward<I>(end), std::forward<F>(functor), divisionSize, leftOver, increment, loopSize, tp, opts, result);
195 
196  if (opts.useCallingThread && nIterations > 0)
197  {
198  int size = divisionSize;
199  typename std::decay<I>::type j = begin;
200  for (int n = 0; n < size/* && j != end*/;)
201  {
202  functor(j);
203  if (++n < size)
204  Advance<isArithIterator>::get(j, increment);
205  }
206  result.set();
207  }
208  }
209  return result;
210  }
211  }
213 
216  template <class I, class F> Barrier _for(ThreadPool* tp, const ThreadPool::ExecutionOpts& opts, I begin, I end, F&& functor, int increment = 1)
217  {
218  return _private::_for(tp,opts,begin,end,std::forward<F>(functor),increment,Distance<::mel::mpl::TypeTraits<I>::isArith>::get(begin, end));
219  }
220  };
221 
222 }
Typical Traits for types.
Definition: TypeTraits.h:296
Multithread barrier.
Definition: Barrier.h:43
void set()
Definition: Barrier.h:59
Pool of threads allowing parallel execution.
Definition: ThreadPool.h:38
Barrier _for(ThreadPool *tp, const ThreadPool::ExecutionOpts &opts, I begin, I end, F &&functor, int increment=1)
Parallel for.
Definition: For.h:216
Definition: Callback_Impl.h:11
static void execute(int &cont, int nIterations, I i, I begin, I end, F &&functor, int divisionSize, int leftOver, int increment, int loopSize, ThreadPool *tp, const ThreadPool::ExecutionOpts &opts, Barrier &barrier)
Definition: For.h:92
Definition: For.h:55
Utilities on tasking, because lack of a better place...Some funcions are intended to be put in a cust...