MEL
Microthread & Execution library
Higher level constructs for the tasking system

Contents of this section:

Introduction

Although the tasking system shown if this section by itself is a powerful tool, we can provide higher constructs to offer more flexibility.

Futures

The very first abstraction is the Future class. Actually, this is not an abstraction related directly with the tasking system, but it's natural to be used by it. Because this type represent a value that maybe is not present yet, it's the perfect element to use as a way to communicate tasks between them.

The first function to stop in is mel::tasking::Runnable::execute. This function allows to execute a calllable with no input arguments, and returns a future.

auto th1 = ThreadRunnable::create();
Future<string> result;
th1->fireAndForget( [result]() mutable
{
auto th = ThreadRunnable::getCurrentThreadRunnable();
auto fut = th->execute<string>(
[]() noexcept
{
Process::wait(100);
return "Hello";
}
);
try
{
auto res = waitForFutureMThread(fut,20); //will wait only for 20 msecs
result.setValue(res.value());
mel::text::info("Result = {}",res.value());
{
result.setValue(std::current_exception());;
mel::text::error("Error waiting. Code {}, Reason = {}",(int)e.getCode(),e.what());
}
catch(...)
{
result.setValue(std::current_exception());;
mel::text::error("Error waiting. Unknown Reason");
}
}
,0,Runnable::killFalse
);
Exception class generated by WaitResult when wait for future gets an error.
Definition: Future.h:882
::mel::core::WaitResult< T > waitForFutureMThread(const mel::core::Future< T > &f, unsigned int msecs=EVENTMT_WAIT_INFINITE) noexcept(std::is_same< ErrorType, ::mel::core::WaitErrorNoException >::value)
Waits for future completion, returning a wapper around the internal vale.
Definition: utilities.h:102

Explanation: a ThreadRunnable is posted a task, which execute another task that returns an int in some Runnable, in this case the same ThreadRunnable. The first task will wait for the result for 20 milliseconds Because the result is generated 100 milliseconds later (due to the Process::wait(100) line), a timeout will occur and the code will enter in that first catch.

A future holds its value as a shared value such that Futures can be assigned between them, so pointing to the same internal data. This alows using value semantics for Futures. So, assigning or copy-construct Futures, makes them to hold the same value. For example:

Future<int> v;
Future<int> v2(v); //points to the same internal variable as v
v.setValue( 6 );//set value. So v2 gets same value.

With this in mind, we can do things as in the next example:

void _sample2()
{
auto th1 = ThreadRunnable::create();
Future<string> result;
th1->fireAndForget( [result]() mutable
{
auto th = ThreadRunnable::getCurrentThreadRunnable();
auto fut = th->execute<string>(
[]() noexcept
{
Process::wait(100);
return "Hello";
}
);
try
{
auto res = waitForFutureMThread(fut,20); //will wait only for 20 msecs
result.setValue(res.value());
mel::text::info("Result = {}",res.value());
{
result.setValue(std::current_exception());;
mel::text::error("Error waiting. Code {}, Reason = {}",(int)e.getCode(),e.what());
}
catch(...)
{
result.setValue(std::current_exception());;
mel::text::error("Error waiting. Unknown Reason");
}
}
,0,Runnable::killFalse
);
//pause current thread waiting for future ready
try
{
auto res = mel::core::waitForFutureThread(result);
mel::text::info("Result after waiting in currrent thread= {}",res.value());
{
mel::text::error("Error waiting in currrent thread. Reason = {}",e.what());
}
catch(...)
{
mel::text::error("Error waiting in currrent thread. Unknown Reason");
}
::mel::core::WaitResult< T > waitForFutureThread(const mel::core::Future< T > &f, unsigned int msecs=::mel::core::Event::EVENT_WAIT_INFINITE) noexcept(std::is_same< ErrorType, ::mel::core::WaitErrorNoException >::value)
Wait for a Future from a Thread.
Definition: Thread.h:236

This example is a modification of the first one. Here, a Future is created as local to the function and passed to the task in order for this to return a result there. In turn, the task execute another function in the same ThreadRunnable (of course, any other agent could be used). It's important to note, in the line doing auto fut = th->execute<string>... we don't assign directly to result. The reason is that,the internal data where result is pointing, will be changed and the wait that is being done in the function (the line auto res = mel::core::waitForFutureThread(result)) is waiting for a different data.

Synchronization mechanisms

In the same way that inter-thread communication has its syncronization mechanism, as mutex or events, the microthread system has equivalent ones: mel::tasking::Event_mthread and mel::tasking::CriticalSection_mthread. Now an example with I think is self-explained

void _sample3()
{
Event_mthread event;
CriticalSection_mthread cs;
auto th1 = ThreadRunnable::create(); //order is important here to respect reverse order of destruction
th1->fireAndForget( [&cs,&event]() mutable
{
::mel::text::info("Task1 Waiting before entering critical section");
Lock_mthread lck(cs);
::mel::text::info("Task1 Entering critical section and set event");
event.set();
}
,0,Runnable::killFalse
);
th1->fireAndForget([&cs]() mutable
{
Lock_mthread lck(cs);
::mel::text::info("Task2 Waiting inside critical section");
::mel::text::info("Task2 is going to release critical section");
}
,0,Runnable::killFalse
);
th1->fireAndForget( [&event]() mutable
{
mel::text::info("Task3 Waiting for event..");
event.wait();
mel::text::info("Task3 Event was signaled!");
}
,0,Runnable::killFalse
);
}
static ESwitchResult wait(unsigned int msegs) OPTIMIZE_FLAGS

Synchronized functions

In many situations, some functions or member functions need to be called in a concrete Runnable. For example, is typical in an UI system or graphics engine to be mandatory to execute their functions in a concrete thread. In those cases, the user should know this limitation and take care of that when calling thatthose functions.

MEL provide the mechanisms to force these functions execution in their mandatory Runnable. For this purpose there are a couple of macros in synchronization_macros.h. As always, let's see an example:

std::shared_ptr<Runnable> sRunnable; // created somewhere...
SYNCHRONIZED_STATIC( f1,int,(int),,sRunnable ) ;
static int f1_sync(int v)
{
throw std::runtime_error("ERROR in f1!!!!");
return v + 5;
}
class MyClass
{
public:
SYNCHRONIZED_METHOD( f2,string,(int,float),noexcept,sRunnable) ;
};
string MyClass::f2_sync(int v1,float v2) noexcept
{
return "f2_sync "+std::to_string(v1)+" "+std::to_string(v2);
}
void _sampleSyncMacros()
{
sRunnable = ThreadRunnable::create(true);
{
mel::text::info("Calling static function f1 syncronized with a Runnable");
auto r = f1(6);
try
{
mel::text::info("Result = {}",res.value());
}
catch(std::exception& e)
{
mel::text::error("Exception!!. {}",e.what());
}
}
{
MyClass obj;
mel::text::info("Calling static function f2 syncronized with a Runnable");
auto r2 = obj.f2(6,9.1f);
try
{
mel::text::info("Result = {}",res.value());
}
catch(std::exception& e)
{
mel::text::error("Exception!!. {}",e.what());
}
}
}
Output:
[info] Calling static function f1 syncronized with a Runnable
[error] Exception!!. ERROR in f1!!!!
[info] Calling static function f2 syncronized with a Runnable
[info] Result = f2_sync 6 9.100000
#define SYNCHRONIZED_STATIC(function_name, TRet, args, qualifiers, runnable)
Declare a static function synchronized with a given runnable.
Definition: synchronization_macros.h:113
#define SYNCHRONIZED_METHOD(function_name, TRet, args, qualifiers, runnable)
Declare a method synchronized with a given runnable.
Definition: synchronization_macros.h:143

In this example we define two functions synchronized with a Runnable called sRunnable (where, when and how is created depends on the concrete application, and it's not shown to not obfuscate the code). This means that, executing those functions will really execute them in the given Runnable. This implies that result should be a Future because it will be resolved when sRunnable is able to do it, so user can wait for this value in any of the wait methods