2 #include <MelLibType.h>
3 #include <parallelism/ThreadPool.h>
4 #include <parallelism/Barrier.h>
10 #ifndef _ITERATOR_DEBUG_LEVEL
11 #define _ITERATOR_DEBUG_LEVEL 0
14 #include <core/Thread.h>
23 template <
class I>
static typename std::iterator_traits<I>::difference_type get(
const I& a,
const I& b)
25 return std::distance(a, b);
31 template <
class I>
static I get(I a, I b)
39 template <
class I>
static void get(I& it,
int increment)
41 std::advance(it, increment);
47 template <
class I>
static void get(I& it,
int increment)
56 template <
class I,
class F>
inline static void execute(
int& cont,
int nIterations,I i,I, I end,F&& functor,
int divisionSize,
int leftOver,
int increment,
int,
ThreadPool* tp,
const ThreadPool::ExecutionOpts& opts,
Barrier& barrier)
59 newOpts.useCallingThread =
false;
61 std::exception_ptr except;
62 bool finish = ((i == end) || (cont > nIterations));
65 tp->execute(newOpts,barrier,i,except,std::function<
void(I)>([ divisionSize, functor, increment, cont, nIterations, leftOver](I i)
mutable
68 int size = (cont == nIterations) ? divisionSize + leftOver : divisionSize;
69 for (
int n = 0; n < size;)
73 Advance<::mel::mpl::TypeTraits<I>::isArith>::get(j, increment);
78 if (++cont <= nIterations)
80 Advance<::mel::mpl::TypeTraits<I>::isArith>::get(i, divisionSize*increment);
92 template <
class I,
class F>
static void execute(
int& cont,
int nIterations, I i,I begin, I end, F&& functor,
int divisionSize,
int leftOver,
int increment,
int loopSize,
ThreadPool* tp,
const ThreadPool::ExecutionOpts& opts,
Barrier& barrier)
94 if (cont > nIterations)
97 newOpts.useCallingThread =
false;
100 typedef typename ::std::iterator_traits< typename std::decay<I>::type>::value_type ObjType;
101 typedef ::std::vector<ObjType*> VType;
102 ::std::shared_ptr<VType> vCopy(
new VType(loopSize));
105 for (
auto j = begin; j != end; ++j)
107 (*vCopy.get())[idx++] = &(*j);
109 std::exception_ptr except;
110 while (cont <= nIterations)
112 tp->execute(newOpts,barrier,cont,except,
113 std::function<
void()>([divisionSize, functor, increment, vCopy, nIterations, leftOver](
int cont)
mutable
115 int size = (cont == nIterations) ? divisionSize + leftOver : divisionSize;
116 for (
int n = 0; n < size; ++n)
118 int idx = (cont - 1)*(divisionSize*increment) + n * increment;
133 template <
class I,
class F>
Barrier _for(ThreadPool* tp,
const ThreadPool::ExecutionOpts& opts, I&& begin, I&& end, F&& functor,
int increment,
int loopSize)
136 typedef typename std::decay<I>::type DecayedIt;
138 int nElements = (loopSize + increment - 1) / increment;
139 size_t nThreads = tp->getNumThreads()+(opts.useCallingThread?1:0);
144 if (nElements <= (
int)nThreads || opts.groupTasks ==
false || nThreads == 0 )
148 if (opts.useCallingThread)
153 Advance<isArithIterator>::get(i, increment);
156 std::exception_ptr except;
157 ThreadPool::ExecutionOpts newOpts(opts);
158 newOpts.useCallingThread =
false;
159 bool finish = ((i == end) || (cont >= nElements));
163 tp->execute(newOpts, result,i,except,
typename std::decay<F>::type(functor));
164 if (++cont < nElements)
166 Advance<isArithIterator >::get(i, increment);
171 if (opts.useCallingThread && nElements > 0)
179 int divisionSize = nElements / nThreads;
180 int nIterations = nThreads;
181 int leftOver = nElements % nIterations;
184 if (opts.useCallingThread)
188 Advance<isArithIterator>::get(i, divisionSize*increment);
194 BulkExecute<false>::execute(cont, nIterations, i,std::forward<I>(begin), std::forward<I>(end), std::forward<F>(functor), divisionSize, leftOver, increment, loopSize, tp, opts, result);
196 if (opts.useCallingThread && nIterations > 0)
198 int size = divisionSize;
199 typename std::decay<I>::type j = begin;
200 for (
int n = 0; n < size;)
204 Advance<isArithIterator>::get(j, increment);
Typical Traits for types.
Definition: TypeTraits.h:296
Multithread barrier.
Definition: Barrier.h:43
void set()
Definition: Barrier.h:59
Pool of threads allowing parallel execution.
Definition: ThreadPool.h:38
Barrier _for(ThreadPool *tp, const ThreadPool::ExecutionOpts &opts, I begin, I end, F &&functor, int increment=1)
Parallel for.
Definition: For.h:216
Definition: Callback_Impl.h:11
static void execute(int &cont, int nIterations, I i, I begin, I end, F &&functor, int divisionSize, int leftOver, int increment, int loopSize, ThreadPool *tp, const ThreadPool::ExecutionOpts &opts, Barrier &barrier)
Definition: For.h:92
Definition: ThreadPool.h:59
Utilities on tasking, because lack of a better place...Some funcions are intended to be put in a cust...