MEL
Microthread & Execution library
Loop.h
1 #pragma once
2 /*
3  * SPDX-FileCopyrightText: 2022 Daniel Barrientos <danivillamanin@gmail.com>
4  *
5  * SPDX-License-Identifier: MIT
6  */
7 #include <execution/Executor.h>
8 #include <preprocessor/utils.h>
9 #include <tuple>
10 namespace mel
11 {
12  namespace execution
13  {
14  namespace flow
15  {
16 
29  template <class TArg, class ExecutorAgent, class Flow, class I>
30  ExFuture<ExecutorAgent, void>
31  loop( ExFuture<ExecutorAgent, TArg> source, I getIteratorsFunc,
32  Flow flow, int increment = 1 )
33  {
34  ExFuture<ExecutorAgent, void> result( source.agent );
35  // std::exception_ptr except; //@todo, uhmm, no es muy
36  // importante, porque se refiere a error en la funcion que lanza
37  // el flow.
38  int length;
39  //@todo pending to pass input result to this function. The
40  // problem I see is about error management: is there is an
41  // error, this function can't be called, so the flows won't be
42  // able to
43  // catch the error
44  auto iterators = getIteratorsFunc();
45  using IteratorType = decltype( iterators[0] );
46  constexpr bool isArithIterator = mel::mpl::TypeTraits<
47  typename std::decay<IteratorType>::type>::isArith;
48  if constexpr ( isArithIterator )
49  length = ( iterators[1] - iterators[0] );
50  else
51  length = std::distance( iterators[0], iterators[1] );
52  int count = 0;
53  int nElements =
54  ( length + increment - 1 ) /
55  increment; //"manual" ceil, because ceil function fails
56  // sometimes in fast floating mode
57  ::mel::parallelism::Barrier barrier( nElements );
58  IteratorType i{ iterators[0] };
59  while ( count++ < nElements )
60  {
61  auto f = flow( i, source );
62  f.subscribeCallback( [barrier]( const auto& ) mutable
63  { barrier.set(); } );
64  i += increment;
65  }
66  // for(auto i = begin;i != end ;i+=increment)
67  // {
68  // auto f = flow(i,source);
69  // f.subscribeCallback( [barrier](const auto&) mutable
70  // {
71  // barrier.set();
72  // }
73  // );
74 
75  // }
76  barrier.subscribeCallback(
77  std::function<::mel::core::ECallbackResult(
78  const ::mel::parallelism::BarrierData& )>(
79  [result](
80  const ::mel::parallelism::BarrierData& ) mutable
81  {
82  result.setValue();
83  return ::mel::core::ECallbackResult::UNSUBSCRIBE;
84  } ) );
85  return result;
86  }
87 
88  namespace _private
89  {
90  template <class I, class F> struct ApplyLoop
91  {
92  template <class U, class T>
93  ApplyLoop( U&& its, T&& f, int inc )
94  : mGetIts( std::forward<U>( its ) ),
95  mFlow( std::forward<T>( f ) ), increment( inc )
96  {
97  }
98  I mGetIts;
99  F mFlow;
100  int increment;
101  template <class TArg, class ExecutorAgent>
102  auto operator()( ExFuture<ExecutorAgent, TArg> inputFut )
103  {
104  return flow::loop( inputFut, std::forward<I>( mGetIts ),
105  std::forward<F>( mFlow ),
106  increment );
107  }
108  };
109  } // namespace _private
110 
112  template <class I, class F>
113  flow::_private::ApplyLoop<I, F> loop( I&& getItFunc, F&& flow,
114  int increment = 1 )
115  {
116  return _private::ApplyLoop<I, F>( std::forward<I>( getItFunc ),
117  std::forward<F>( flow ),
118  increment );
119  }
120  } // namespace flow
121  } // namespace execution
122 } // namespace mel
Executor< ExecutorAgent > agent
execution agent associated with this instance
Definition: ExFuture.h:54
Definition: TypeTraits.h:296
Multithread barrier.
Definition: Barrier.h:43
void set()
Definition: Barrier.h:59
ECallbackResult
Type resturned by callbacks subscribed to CallbackSubscriptors.
Definition: CallbackSubscriptor.h:32
ExFuture< ExecutorAgent, void > loop(ExFuture< ExecutorAgent, TArg > source, I getIteratorsFunc, Flow flow, int increment=1)
Loop with independent iterations.
Definition: Loop.h:31
Definition: Callback_Impl.h:11