Contents of this section:
Introduction
The goal of this module is to provide a very high level API for task execution in a generic way, such that is not neccesary to know the underlying execution system. This problem it's becoming very important because the need to exploit the modern hardware. As the CPU speed is incrementing very slowly, the improvements in software development come from taking advantage of the parallel capacities, use of GPU as computational resource, distributed systems...So, it's necessary to provide new idioms to programmers such that doing this kind of tasks won't be a hell.
mel provides clasical mechanism for concurrent/parallel programming: threads, critical sections,...and no so classicals as the microthread system, which is described in another sections. These tools are enough for simple tasks. But there are two main problems:
- The way the user make the code will depend on the used tool: the code won't be the same if using a thread to launch a task or a microthread. Or, even worse, use the GPU or other systems
- Doing complex tasks can be hard to express and the programmer will soon get lost in the code.
For example, let's assume we want to execute a task t1 in some thread, then, when the task is finished, execute another task t2 and, when this is finished, execute two parallel tasks t3 and t4 in some thread pool. With the basic tools in mel, this is not so much hard, but a kind of mess. The code could be:
auto th1 = ThreadRunnable::create();
auto fut = th1->execute<void>([]()
{
});
try
{
auto fut2 = th1->execute<void>([]()
{
});
ThreadPool::ThreadPoolOpts opts;
ThreadPool myPool(opts);
ThreadPool::ExecutionOpts exopts;
auto barrier = myPool.execute<void>(exopts,
[]()
{
},
[]()
{
}
);
}catch(...)
{
}
::mel::core::WaitResult< T > waitForFutureThread(const mel::core::Future< T > &f, unsigned int msecs=::mel::core::Event::EVENT_WAIT_INFINITE) noexcept(std::is_same< ErrorType, ::mel::core::WaitErrorNoException >::value)
Wait for a Future from a Thread.
Definition: Thread.h:236
MEL_API ::mel::core::Event::EWaitCode waitForBarrierThread(const ::mel::parallelism::Barrier &b, unsigned int msecs=Event::EVENT_WAIT_INFINITE)
Wait for a barrier to activated in the context of a thread.
As can be seen, mel already provides high level tools to facilitate task execution. But, despite of that, it can be hard to manage a complex execution sequence. Think, for example, how to manage error in any point of the chain. And, of course, think how to resolve the same problem only with plain C++ threads...
Instead, with the execution system, built on top of those functionalities, we could write something like:
auto th1 = ThreadRunnable::create(true);
execution::Executor<Runnable> exr(th1);
auto myPool = make_shared<parallelism::ThreadPool>(opts);
parallelism::ThreadPool::ExecutionOpts exopts;
execution::Executor<parallelism::ThreadPool> extp(myPool);
try
{
auto res = mel::tasking::waitForFutureThread(
{
})
{
})
[]()
{
},
[]()
{
}
)
);
}catch(...)
{
}
ExFuture< ExecutorAgent, std::invoke_result_t< F > > launch(Executor< ExecutorAgent > ex, F &&f)
Launch given functor in given executor.
Definition: Executor.h:65
ExFuture< NewExecutorAgent, TRet > transfer(ExFuture< OldExecutorAgent, TRet > source, Executor< NewExecutorAgent > newAgent)
Transfer given ExFuture to a different executor This way, continuations can be chained but executed i...
Definition: Executor.h:265
ExFuture< ExecutorAgent, TArg > parallel(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Execute given functions in a (possibly, depending on concrete executor) parallel way If an exception ...
Definition: Executor.h:499
ExFuture< ExecutorAgent, std::invoke_result_t< F, TArg > > next(ExFuture< ExecutorAgent, TArg > source, F f)
Attach a functor to execute when input fut is complete Given functor will be executed inf the input E...
Definition: Executor.h:166
Definition: ThreadPool.h:48
As can be seen in the example code, the expresiveness and flexibility of this mechanism is far superior. The used executor can be any executor type, the only thing to do (of course, it can be difficult) is to implement the neccesary parts of a new execution system when neccesary (for example, an execution system based on GPU processing). Mel provides three types of executors, the ones used in this example: a RunnableExecutor based on runnables, a ThreadPoolExecutor based on a thread pool and an InlineExecutor which behavess exactly as if code is written inline (so, no executor at all). There is another mel::execution::NaiveInlineExecutor included as a demostration: its implementation is straigthforward but not as much efficient as the InlineExecutor.
The RunnableExecutor and ThreadPoolExecutor have microthread capabilities, meaning that the tasks executed in any of them can take advange of mel's cooperative multitasking (see ::Tasking::Process). Some examples will be provided in the next sections trying to describe al the functionalities with simple cases, and showing all the power of this system. As a note, the examples will be shown using de pipe ('|') operator, but the classic form could be used, althoguh is less expresive and hard to read:
[...same as previous example..]
try
{
auto res = mel::tasking::waitForFutureThread(
{
}),
[]()
{
}
),extp),
[]()
{
},
[]()
{
}
)
);
}catch(...)
{
}
Usage samples
The following samples are for educational purposes only. This means that the tasks acomplished may not make any sense or that they aren't done in the best possible way. The goal is to show the different functionalities and its power. All the samples are done without worrying on the underlying executor, and implemented as template functions.
This functions could be used as:
auto th = ThreadRunnable::create(true);
execution::Executor<Runnable> exr(th);
exr.setOpts({true,false});
_sampleBasic(exr);
parallelism::ThreadPool::ThreadPoolOpts opts;
auto myPool = make_shared<parallelism::ThreadPool>(opts);
parallelism::ThreadPool::ExecutionOpts exopts;
execution::Executor<parallelism::ThreadPool> extp(myPool);
extp.setOpts({true,true});
_sampleBasic(extp);
execution::InlineExecutor exInline;
_sampleBasic(exInline);
Basic sample
A simple execution chain. This code consists of three execution steps:
- first task generating a float, from another float given as argument
- second task converting this float to string, a passing it to the next step
- third step launch three parallel tasks. This doesn't mean neccesarily that that task will be executed in parallel, is just a hint so that the executor will do it in that way if it hast the capacity. To check for the kind of parallelism supported, a previous getExecutor is inserted in order to inform the user. This step, in this concrete example, wouldn't be neccesary because we have the ExecutorType as function template parameter, but in a real example the job couldn't know anything about the underlying executor.
template <class ExecutorType> void _sampleBasic(ExecutorType ex)
{
auto th = ThreadRunnable::create();
th->fireAndForget(
[ex]() mutable {
try {
ex, [](float param) noexcept { return param + 6.f; },
10.5f) |
return std::to_string(param);
}) |
if constexpr(execution::ExecutorTraits<decltype(ex)>::has_parallelism)
mel::text::info("Current executor supports true parallelism. Next job will be executed parallelized");
else
mel::text::info("Current executor doesn't support true parallelism. Next job will be executed sequentially");
return str;
}) |
[](const string &str) noexcept{
mel::text::info("Parallel 1. {}", str + " hello!");
},
[](const string &str) noexcept{
mel::text::info("Parallel 2. {}", str + " hi!");
},
[](const string &str) noexcept{
mel::text::info("Parallel 2. {}", str + " whats up!");
}));
if (res.isValid()) {
::mel::text::info("Result value = {}", res.value());
}
} catch (core::WaitException &e) {
::mel::text::error("Error while waiting: Reason={}", e.what());
} catch (...) {
::mel::text::error("Error while waiting: Unknown reason={}");
}
},
}
static std::function< bool()> killFalse
helper function to reject kill when receiving kill signal
Definition: Runnable.h:276
ExFuture< ExecutorAgent, std::invoke_result_t< F, Executor< ExecutorAgent >, TArg > > getExecutor(ExFuture< ExecutorAgent, TArg > source, F f)
Get the Executor used in the chain of execution.
Definition: Executor.h:790
::mel::core::WaitResult< T > waitForFutureMThread(const mel::core::Future< T > &f, unsigned int msecs=EVENTMT_WAIT_INFINITE) noexcept(std::is_same< ErrorType, ::mel::core::WaitErrorNoException >::value)
Waits for future completion, returning a wapper around the internal vale.
Definition: utilities.h:102
Output will be:
Current executor supports true parallelism. Next job will be executed parallelized
Parallel 1. 16.500000 hello!
Parallel 2. 16.500000 hi!
Parallel 2. 16.500000 whats up!
Result value = 16.500000
Although the samples are shown using lambdas for clarity, any other callable can be used:
class MyClass
{
public:
float f1(float p1,float p2) noexcept
{
return p1+p2;
};
string f2(float p) noexcept
{
return std::to_string(p);
}
void operator()(const string& str) noexcept
{
text::info("Parallel operator() {}",str+" hi!");
}
};
template <class ExecutorType> void _sampleCallables(ExecutorType ex)
{
auto th = ThreadRunnable::create();
MyClass obj;
using namespace std::placeholders;
th->fireAndForget([ex,&obj] () mutable
{
execution::launch(ex,
std::bind(&MyClass::f1,&obj,6.7f,_1),10.5f)
MyClass(),
[](const string& str)
{
text::info("Parallel 2. {}",str+" hi!");
},
[](const string& str)
{
text::info("Parallel 2. {}",str+" whats up!");
}
)
);
if (res.isValid())
{
::text::info("Result value = {}",res.value());
}
},0,::tasking::Runnable::killFalse);
}
And the output will be:
Parallel operator() 17.200001 hi!
Parallel 2. 17.200001 whats up!
Parallel 2. 17.200001 hi!
Result value = 17.200001
In the previous example we see some other callables: member functions, using bind and function object (with operator() ).
- Warning
- std::bind doesn't preserve noexcept specifier ( https://stackoverflow.com/questions/71673262/noexcept-preservation-using-bind)- This is an important aspect to take into account as is explained in this section
Using references
In this example, it's shown how to use references to variables. In order to be able to pass a reference to launch or to inmediate we must use std::ref, because it's not possible to deduce what the user wants only by the type of the parameter. Also, when a function wants to return a reference to the next function in the chain, it's neccesary to indicate it without auto deduction. So, if using a lambda as in the examples, you must specify the return type.
template <class ExecutorType> void _sampleReference(ExecutorType ex)
{
string str = "Hello";
auto th = ThreadRunnable::create();
th->fireAndForget([ex,&str] () mutable
{
execution::launch(ex,[](string& str) noexcept ->string&
{
str += " Dani.";
return str;
},std::ref(str))
{
str+= " How are you?";
return str;
})
{
str += "Bye!";
return str;
})
{
str += "See you!";
return str;
})
);
if (res.isValid())
{
::text::info("Result value = {}",res.value());
::text::info("Original str = {}",str);
}
},0,::tasking::Runnable::killFalse);
}
And the output will be:
Result value = Hello Dani. How are you?Bye!See you!
Original str = Hello Dani. How are you?Bye!
As can be seen, the original string is modified, but only until third job. because this job doesn't return a reference to the next,that fourth job will receive a copy. Also just as a way to better understanding, the previous code is functionally equivalent to this other (redundant parts are removed):
execution::start(ex)
{
str += " Dani.";
return str;
})
{
str+= " How are you?";
return str;
})
{
str += "Bye!";
return str;
})
{
str += "See you!";
return str;
})
);
if (res.isValid())
{
::text::info("Result value = {}",res.value());
::text::info("Original str = {}",str);
});
ExFuture< ExecutorAgent, typename std::remove_cv< typename std::remove_reference< TRet >::type >::type > inmediate(ExFuture< ExecutorAgent, TArg > fut, TRet &&arg)
Produces an inmediate value in the context of the given ExFuture executor as a response to input fut ...
Definition: Executor.h:117
Instead of launching a callable intially, the chain of execution is started with a start, followed by a inmediate. Although in this example this doesn't improves anything, can be usefull in other situations.
Changing executor
In this example we will show how to tranfer execution from one executor to another in the same flow. This is acomplished by the function transfer. It's better seen with an example:
template <class Ex1, class Ex2> void _sampleTransfer(Ex1 ex1,Ex2 ex2)
{
auto th = ThreadRunnable::create();
th->fireAndForget([ex1,ex2] () mutable
{
try
{
execution::start(ex1)
{
text::info("NEXT: {}",str);
return str + ". How are you?";
})
[](const string& str)
{
return std::array{0,10};
},
[](int idx, const string& str) noexcept
{
text::info("Iteration {}", str + std::to_string(idx));
})
{
return "See you!";
})
);
::text::info("Result value = {}",res.value());
}
catch(core::WaitException& e)
{
::text::error("Some error occured!! Code= {}, Reason: {}",(int)e.getCode(),e.what());
}catch(std::exception& e)
{
::text::error("Some error occured!! Reason: {}",e.what());
}
},0,::tasking::Runnable::killFalse);
}
ExFuture< ExecutorAgent, TArg > loop(ExFuture< ExecutorAgent, TArg > source, I getIteratorsFunc, F functor, int increment=1)
parallel (possibly, depending on executor capabilities) loop
Definition: Executor.h:290
In this code the execution starts in the executor ex1 and just before the loop, execution is transferred to the other (ex2).
Converging several job flows
Here an example on how to create several execution flows and merge their work.
template <class ExecutorType1,class ExecutorType2> void _sampleSeveralFlows(ExecutorType1 ex1,ExecutorType2 ex2)
{
auto th = ThreadRunnable::create();
th->fireAndForget([ex1,ex2] () mutable
{
{
mel::text::info("First job");
return "First Job";
});
{
mel::text::info("second job, t1");
},
[]() noexcept
{
mel::text::info("second job, t2");
}
)
{
return 10;
});
[]() noexcept
{
mel::text::info("third job, t1");
},
[]() noexcept
{
mel::text::info("third job, t2");
return 8.7f;
}
);
try
{
auto& val = res.value();
::mel::text::info("Result value = [{},{},(void,{})]",
std::get<0>(val),
std::get<1>(val),
std::get<1>(std::get<2>(val))
);
}
catch(core::WaitException& e)
{
::text::error("Some error occured!! Code= {}, Reason: {}",(int)e.getCode(),e.what());
}
{
try
{
rethrow_exception( e.getCause() );
}catch(std::exception& e)
{
mel::text::error("Error {}",e.what());
}catch(...)
{
mel::text::error("OnAllException. unknown error");
}
}
},0,::tasking::Runnable::killFalse);
}
Excepcion thrown by on_all when some of the futures raise error.
Definition: Executor.h:1129
static ESwitchResult wait(unsigned int msegs) OPTIMIZE_FLAGS
ExFuture< ExecutorAgent, typename ::mel::execution::_private::GetReturn< TArg, FTypes... >::type > parallel_convert(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Same as parallel but returning a tuple with the values for each functor.
Definition: Executor.h:726
ExFuture< ExecutorAgent, void > start(Executor< ExecutorAgent > ex)
Start a chain of execution in given executor.
Definition: Executor.h:104
And the output is:
{.cpp}
second job, t2
third job, t2
second job, t1
third job, t1
First job
Result value = [First Job,10,(void,8.7)]
Again, only for educational purposes, some microthread waits were inserted in some of the tasks. The final result is the merge of the 3 jobs, as a tuple. Also, the third job returns a tuple (this is how parallel_convert works), and it's shown between parenthesis.
Advanced example
We are going to see a more real example with measurements to compare performance in various cases. Follow this link to explore it
Error management
This is a very important part and need to be addressed carefully. When using any of the wait functions (mel::tasking::waitForFutureMThread or mel::core::waitForFutureThread) there are two possibilities for error detection according to template parameter ErrorType:
- throwing an exception if any error occurs while executing that sequence of jobs the program is waiting for. This method is selected by using WaitErrorAsException. It's the default option when not specified
- no throwing an exception. This is selected by using WaitErrorNoException. In this case the returned mel::execution::ExFuture from the execution chain must be checked for error (see Future::getValue and mel::core::FutureValue). Although the first option is more natural and more comfortable, this method will be used in cases when performance is critical or error is not need to be checked (maybe because user knows exactly that the code doesn't throw any exception).
Let's see an example code:
template <class ExecutorType> void _sampleError1(ExecutorType ex)
{
string str = "Hello";
auto th = ThreadRunnable::create();
th->fireAndForget([ex,&str] () mutable
{
try
{
{
str += " Dani.";
return str;
},std::ref(str))
{
str+= " How are you?";
throw std::runtime_error("This is an error");
})
{
str += "Bye!";
return str;
})
{
str += "See you!";
return str;
})
);
::text::info("Result value = {}",res.value());
}
catch(core::WaitException& e)
{
::text::error("Some error occured!! Code= {}, Reason: {}",e.getCode(),e.what());
}
catch(std::exception& e)
{
::text::error("Some error occured!! Reason: {}",e.what());
}
catch(...)
{
::text::error("Some error occured!! Unknwon reason: {}");
}
::text::info("Original str = {}",str);
},0,::tasking::Runnable::killFalse);
}
Output will be:
[error] Some error occured!! Reason: This is an error
[info] Original str = Hello Dani. How are you?
In the previous code, and exception is thrown in the second job, so third and fourth jobs won't be executed. The wait is wrapped with a try/catch and the neccesary exceptions are checked. Things to take into account:
- If a job won't throw an exception (you have to be sure) you shoud mark this function as noexcept. This way, the generated code will be more efficient (mainly in space).
- Of course, yo can throw any type of object and follow the usual rules for exception management.
- the waitForFuture functions will throw a WaitException if the wait was unsuccessful, as process killed, timeout,..
But, what if we want to capture the error at some point and continue the chain of jobs? See next example:
template <class ExecutorType> void _sampleError2(ExecutorType ex)
{
string str = "Hello";
auto th = ThreadRunnable::create();
th->fireAndForget([ex,&str] () mutable
{
try
{
execution::launch(ex,[](string& str) noexcept ->string&
{
str += " Dani.";
return str;
},std::ref(str))
{
str+= " How are you?";
throw std::runtime_error("This is an error");
})
{
str += "Bye!";
return str;
})
{
return "Error caught!! ";
})
{
str += "See you!";
return str;
})
);
::text::info("Result value = {}",res.value());
}
catch(core::WaitException& e)
{
::text::error("Some error occured!! Code= {}, Reason: {}",e.getCode(),e.what());
}catch(std::exception& e)
{
::text::error("Some error occured!! Reason: {}",e.what());
}
::text::info("Original str = {}",str);
},0,::tasking::Runnable::killFalse);
}
ExFuture< ExecutorAgent, TArg > catchError(ExFuture< ExecutorAgent, TArg > source, F &&f)
Capture previous error, if any, and execute the function.
Definition: Executor.h:559
And the output will be:
{.cpp}
[info] Result value = Error caught!! See you!
[info] Original str = Hello Dani. How are you?
Because the error was captured in the job chain, no exception was thrown. The catchError function gets an exception as parameter and must return the same type as the previous job in the chain (because this type is what that job expects . If no exception was launched before the catchError call, this function will be ignored.
And, just for comparison, the same first example disabling exceptions in the wait:
template <class ExecutorType> void _sampleErrorNoException(ExecutorType ex)
{
string str = "Hello";
auto th = ThreadRunnable::create();
th->fireAndForget([ex,&str] () mutable
{
auto res = ::mel::tasking::waitForFutureMThread<::mel::core::WaitErrorNoException>(
execution::launch(ex,[](string& str) noexcept ->string&
{
str += " Dani.";
return str;
},std::ref(str))
{
str+= " How are you?";
throw std::runtime_error("This is an error");
return str;
})
{
str += "Bye!";
return str;
})
{
str += "See you!";
return str;
})
);
if (res.isValid())
::text::info("Result value = {}",res.value());
else
{
try
{
std::rethrow_exception(res.error());
}
catch(core::WaitException& e)
{
::text::error("Some error occured!! Code= {}, Reason: {}",(int)e.getCode(),e.what());
}catch(std::exception& e)
{
::text::error("Some error occured!! Reason: {}",e.what());
}
::text::info("Original str = {}",str);
}
},0,::tasking::Runnable::killFalse);
}
Perfect forwarding
Perefect-forwarding is tried to be achieved in every argument agument passing between jobs, so avoiding copies as much as possible. The best way to see that is by showing an example. First, we will have a class TestClass which will log whenever a constructor/assignment is done. This is:
struct SampleClass
{
float val;
explicit SampleClass(float v = 0.0):val(v)
{
::mel::text::info("SampleClass constructor");
}
SampleClass(const SampleClass& ob)
{
val = ob.val;
::mel::text::info("SampleClass copy constructor");
}
SampleClass(SampleClass&& ob)
{
val = ob.val;
ob.val = -1;
::mel::text::info("SampleClass move constructor");
}
~SampleClass()
{
::mel::text::info("SampleClass destructor");
}
SampleClass& operator=(const SampleClass& ob)
{
val = ob.val;
::mel::text::info("SampleClass copy operator=");
return *this;
}
SampleClass& operator=(SampleClass&& ob)
{
val = ob.val;
ob.val = -1;
::mel::text::info("SampleClass move operator=");
return *this;
}
};
The code will be:
SampleClass cl(5);
auto th = ThreadRunnable::create();
th->fireAndForget(
[ex,&cl]() mutable
{
try
{
execution::launch(ex, [](SampleClass& input) -> SampleClass&
{
input.val++;
return input;
},std::ref(cl))
| execution::next( [](SampleClass& input)
{
input.val++;
return input;
} )
| execution::next( [](SampleClass& input)
{
auto ret = input;
ret.val++;
return ret;
})
);
mel::text::info("Result value = {}",ref.value().val);
mel::text::info("Original value = {}",cl.val);
}catch(...)
{
mel::text::error("_samplePF unknown error!!!");
}
},0,::tasking::Runnable::killFalse
);
And now the out, in which comments are added to explain where every concrete message comes from:
SampleClass constructor -> contruction of object 'cl'
SampleClass copy constructor -> copy construction as result in Job 2
SampleClass move constructor -> moved to the new Job 3
SampleClass destructor -> destroyed object returned in Job 2
SampleClass copy constructor -> creation of `ret`in Job3
SampleClass move constructor -> return from Job3
SampleClass destructor
SampleClass move constructor ->move to final result
SampleClass destructor
SampleClass destructor
Result value = 7
Original value = 7
SampleClass destructor
SampleClass destructor
As can be seen, perfect forwarding is trying to be applied when possible. One thing to keep in maind is that, because every job implies launching a task to target executor, move constructions are impossible to avoid. This is because the asynchronous nature of the system.
Work flows
It's possible to create independent work flows to reuse them or apply them at functions as condition. Follow the link for a more in deep explanation,