MEL
Microthread & Execution library
Work flows

Introduction

It's possible to create independent work flows to reuse then or apply them at functions as condition. A flow could be defined as a callable with signature ExFuture<...> flow( ExFuture<...> ), meaning that it can substitute a chain of job. With the help of generic lambdas this callable will be defined as [](auto input)->auto.

The lambda or callable used as a flow will receive an ExFuture which serves as the input of any other execution function. Let's start with a simple example, defining two flows:

template <class ExecutorType> void _sampleFlows1(ExecutorType ex)
{
auto th = ThreadRunnable::create();
//create a flow. input parameter is an ExFuture depending on given executor
auto flow1 = [](auto input)
{
return input | execution::next(
[](const string& str)
{
return str+" Dani";
}
);
};
th->fireAndForget([ex,flow1] () mutable
{
auto res = mel::tasking::waitForFutureMThread<::mel::core::WaitErrorNoException>(
execution::launch(ex,[]()
{
return "Hello"s;
}
)
| flow1
| execution::next([](const string& str)
{
return str + " Barrientos";
}
)
|
[](auto input)
{
return input | execution::next([](const string& str)
{
return str + ". Bye!!";
}
);
}
);
if ( res.isValid())
{
text::info("Result = {}",res.value());
}else
{
try
{
std::rethrow_exception(res.error());
}
catch(...)
{
::text::error("Some error occured!!");
}
}
},0,::tasking::Runnable::killFalse);
}

Output will be:

Result = Hello Dani Barrientos. Bye!!

Conditions

Besides creating flows for reusability, one important use is for conditions, which are the equivalent to a normal switch, such that a flow is selected based on the previous result in the chain. In the next example a flow is chosen, between two, according to a random number. Both flows are written as lambas local to the function, but any other callbable could be used or lambdas written inline in the code, it's shown this way for clarity.

template <class ExecutorType> void _sampleFlowsCondition(ExecutorType ex)
{
auto th = ThreadRunnable::create();
//create a flow. input paramter is an ExFuture depending on given executor
auto flow0 = [](/*execution::ExFuture<ExecutorType,string> */auto input)
{
return input | execution::next(
[](int val)
{
return "Flow0";
}
);
};
auto flow1 = [](auto input)
{
return input | execution::next(
[](int val)
{
return "Flow1";
}
);
};
th->fireAndForget([ex,flow0,flow1] () mutable
{
srand(time(NULL));
auto res = mel::tasking::waitForFutureMThread<::mel::core::WaitErrorNoException>(
execution::launch(ex,[]()
{
return rand()%10;
}
)
| execution::condition(
[](int val)
{
int result = val<5?0:1;
text::info("Input value = {}. Selecting flow {}",val,result);
return result;
},
flow0,flow1
)
| execution::next( [](const string& str)
{
return str+" End!";
})
);
if ( res.isValid())
{
text::info("Result = {}",res.value());
}else
{
try
{
std::rethrow_exception(res.error());
}
catch(...)
{
::text::error("Some error occured!!");
}
}
},0,::tasking::Runnable::killFalse);
}

And the output, in a concrete execution, is:

Input value = 3. Selecting flow 0
Result = Flow0 End!

Launching flows

Flows can also be launched at same time (of course, this concept of same time depends on the underlying executor capabilities) via flow::launch and get a tuple of ExFuture as a result, being each position in the tuple the result of the corresponding flow. Below an example of launching three flows, one of which can throw exception randomly. The function flow::on_all is used to get the result of all flows as a tuple. As can be seen, throwing error in any flow is propagated correctly to the final result.

template <class ExecutorType> void _sampleFlowLaunch(ExecutorType ex)
{
auto th = ThreadRunnable::create();
th->fireAndForget([ex] () mutable
{
try
{
auto finalRes = mel::tasking::waitForFutureMThread<::mel::core::WaitErrorAsException>(
execution::launch(ex,[]() noexcept{ return "Starting job!!"s;})
| execution::parallel_convert(
[](const string& str) noexcept
{
mel::text::info("parallel_convert task 1, returning void");
},
[](const string& str) noexcept
{
return str+" Second parallel task"s;
}
)
| execution::flow::launch(
[](auto input) noexcept
{
return input | execution::inmediate("hola"s)
| execution::next( [](const string& s)
{
if ( rand()%10 < 5 )
throw std::runtime_error("parallel:_convert second task. Throwing exception!!");
}
); //return void for testing purposes
},
[](auto input) noexcept
{
return input | execution::inmediate(7.8f);
}
,
[](auto input) noexcept
{
return input | execution::next( [](const std::tuple<mel::execution::VoidType,string>& v) noexcept
{
return std::get<1>(v).size();
}
);
}
)
);
auto& finalValue = finalRes.value();
mel::text::info("Final res = (void,{},{})",std::get<1>(finalValue),std::get<2>(finalValue));
{
try
{
rethrow_exception( e.getCause() );
}catch(std::exception& e)
{
mel::text::error("Error {}",e.what());
}catch(...)
{
mel::text::error("OnAllException. unknown error");
}
}
catch(std::exception& e)
{
mel::text::error(e.what());
}catch(...)
{
mel::text::error("Unknown error!!!");
}
},0,::tasking::Runnable::killFalse);
}
Excepcion thrown by on_all when some of the futures raise error.
Definition: Executor.h:1129
auto on_all(Executor< ExecutionAgent > ex, TupleFlow &&f)
Takes a tuple with the results of execution of some flows and does a execution::on_all.
Definition: Launch.h:102

Loops

It's possible to implement iterations as part of an execution flow. There are two kinds of iterations in flows: sequential, implemented with flow::doWhile and independent iterations with flow::loop.

Below an example of using doWhile where a random number is created and shown in console for 4 times, doing a microthread wait in each iteration (if used executor allows it)

template <class ExecutorType> void _sampleWhile(ExecutorType ex)
{
auto th = ThreadRunnable::create();
th->fireAndForget([ex] () mutable
{
srand(time(NULL));
int idx = 0;
auto res = mel::tasking::waitForFutureMThread<::mel::core::WaitErrorNoException>(
execution::start(ex)
| execution::doWhile(
[]( auto input ) noexcept
{
return input | execution::next( []() noexcept -> int
{
return rand()%10;
})
| execution::next( [](int v ) noexcept
{
mel::text::info(" new value = {}",v);
if constexpr(execution::ExecutorTraits<decltype(ex)>::has_microthreading)
{
}
else
mel::text::info("Current executor doesn't supports true parallelism, wait not done");
});
},
[idx]() mutable noexcept
{
if ( ++idx == 4 )
return false; //finish while
else
return true; //continue iterating
}
)
);
if ( res.isValid())
{
text::info("Finished");
}else
{
try
{
std::rethrow_exception(res.error());
}
catch(...)
{
::text::error("Some error occured!!");
}
}
},0,::tasking::Runnable::killFalse);
}
static ESwitchResult wait(unsigned int msegs) OPTIMIZE_FLAGS

And now an example of flow::loop where iterations are independent, and so could be parallelized if executor allows it.

template <class ExecutorType> void _sampleFlowLoop(ExecutorType ex)
{
auto th = ThreadRunnable::create();
th->fireAndForget([ex] () mutable
{
srand(time(NULL));
int idx = 0;
auto res = mel::tasking::waitForFutureMThread<::mel::core::WaitErrorNoException>(
execution::start(ex)
| execution::flow::loop( 0,4,
[]( int idx, auto input ) noexcept
{
return input | execution::next( []() noexcept -> int
{
return rand()%10;
})
| execution::next( [](int v ) noexcept
{
mel::text::info(" new value = {}. Now waiting",v);
if constexpr(execution::ExecutorTraits<decltype(ex)>::has_microthreading)
{
}
else
mel::text::info("Current executor doesn't support true parallelism, wait not done");
mel::text::info(" new value = {}. After wait",v);
});
}
)
| execution::next( []{
mel::text::info(" Flow finished!!");
})
);
if ( res.isValid())
{
text::info("Finished");
}else
{
try
{
std::rethrow_exception(res.error());
}
catch(...)
{
::text::error("Some error occured!!");
}
}
},0,::tasking::Runnable::killFalse);
}

The fact that independent iterations are used can be seen in the output ,where an iteration doesn't need the previous to be finished:

new value = 9. Now waiting
new value = 1. Now waiting
new value = 3. Now waiting
new value = 3. Now waiting
new value = 9. After wait
new value = 3. After wait
new value = 1. After wait
new value = 3. After wait

Flow chart example

In order to show a more complex and practical (although the tasks acoomplished here haven't any interest) let's see a translation from a flowchart to code.

The goal is to implement the next flowchart:

A brief explanation:

  • an integer vector is processed in a parallel way (remember, this parallelism depends on underlying executor, meaning that each iteration is considered to be independent)
  • each element is added a random integer number in [0...9) and passed to next step
  • A flow among 3 different ones is selected depending on the previos value
  • all of this have to be repeated 5 times

And the code doing this work could be something like:

template <class ExecutorType> void _sampleFlowChart(ExecutorType ex)
{
using std::vector;
size_t vecSize = rand()%20+10;
mel::text::info("vecSize will be {}",vecSize);
auto inputVec = vector<int>(vecSize);
for( auto& v:inputVec)
v= rand()%5;
auto th = ThreadRunnable::create(); //need to be the last declaration to be the first thing to destroy and not destroy vector until finished
th->fireAndForget([ex,&inputVec] () mutable
{
srand(time(NULL));
int iteration = 0;
auto res = mel::tasking::waitForFutureMThread<::mel::core::WaitErrorNoException>(
| mel::execution::inmediate(std::ref(inputVec))
[size = inputVec.size()](auto input )
{
return input | execution::flow::loop(0,(int)size,
[]( int idx,auto input ) noexcept
{
return input | execution::next( [idx](vector<int>& v) noexcept -> int
{
auto value = v[idx] + rand()%9;
return value;
})
| execution::flow::condition(
[](int value ) noexcept
{
if ( value < 3)
return 0;
else if (value < 6)
return 1;
else
return 2;
},
[idx](auto input) //flow number 0
{
return input | execution::next([idx](int value) noexcept
{
::mel::text::info("T1 of element {}",idx);
})
| execution::next( [idx]() noexcept
{
::mel::text::info("T2 of element {}",idx);
});
},
[idx](auto input) //flow number 1
{
return input | execution::next([idx](int value) noexcept
{
::mel::text::info("T3 of element {}",idx);
});
},
[idx](auto input) //flow number 2
{
return input | execution::next([idx](int value) noexcept
{
::mel::text::info("T4 of element {}",idx);
})
| execution::next( [idx]() noexcept
{
::mel::text::info("T5 of element {}",idx);
});
}
);
});
},
[iteration]() mutable noexcept
{
::mel::text::info("Iteration {} finished", iteration);
if ( ++iteration == 5)
return false;
else
{
return true;
}
}
)
);
if ( res.isValid())
{
//text::info("Finished");
}else
{
try
{
std::rethrow_exception(res.error());
}
catch(...)
{
::text::error("Some error occured!!");
}
}
},0,::tasking::Runnable::killFalse);
}
auto doWhile(ExFuture< ExecutorAgent, TArg > source, Flow flow, Predicate p)
do...while a flow
Definition: While.h:116
ExFuture< ExecutorAgent, typename std::remove_cv< typename std::remove_reference< TRet >::type >::type > inmediate(ExFuture< ExecutorAgent, TArg > fut, TRet &&arg)
Produces an inmediate value in the context of the given ExFuture executor as a response to input fut ...
Definition: Executor.h:117
ExFuture< ExecutorAgent, void > start(Executor< ExecutorAgent > ex)
Start a chain of execution in given executor.
Definition: Executor.h:104

Some points on this code:

  • the initial vector es generated (randomly) outside the flow. it could be done in an initial task as part of the flow, but this would imply a lot of copies because the independent iterations.
  • this vector is created outside task posted to th. It could be tempting to create it inside the same task that is launching the whole flow, but remember from microthread limitations that a local variable inside a microthread can't be got by reference if a context switch is going to be done.

Output is not shown because the parallel nature of each iteration makes difficult to follow it and doesn't contribute to the understanding.