Although the code shown in this section is not really complicated, it's a more real one and allow us to check performance aspects. The goal of this example is simply to calculate the mean of a very big vector. We will have a template function to abstract from execution agent. This is the code:
template <
class ExecutorType>
void _testMeanVector(::
mel::execution::ExFuture<ExecutorType,vector<double>> fut,tests::BaseTest* test)
{
typedef vector<double> VectorType;
try
{
Timer timer;
uint64_t t0 = timer.getMilliseconds();
fut
[](const VectorType& v) noexcept
{
double mean = 0.f;
size_t tam = v.size()/4;
size_t endIdx = tam;
for(size_t i = 0; i < endIdx;++i)
mean += v[i];
mean /= v.size();
return mean;
},
[](const VectorType& v) noexcept
{
double mean = 0.f;
size_t tam = v.size()/4;
size_t startIdx = tam;
size_t endIdx = tam*2;
for(size_t i = startIdx; i < endIdx;++i)
mean += v[i];
mean /= v.size();
return mean;
},
[](const VectorType& v) noexcept
{
double mean = 0.f;
size_t tam = v.size()/4;
size_t startIdx = tam*2;
size_t endIdx = tam*3;
for(size_t i = startIdx; i < endIdx;++i)
mean += v[i];
mean /= v.size();
return mean;
},
[](const VectorType& v) noexcept
{
double mean = 0.f;
size_t tam = v.size()/4;
size_t startIdx = tam*3;
size_t endIdx = v.size();
for(size_t i = startIdx; i < endIdx;++i)
mean += v[i];
mean /= v.size();
return mean;
}
{
return (std::get<0>(means)+std::get<1>(means)+std::get<2>(means)+std::get<3>(means));
}));
uint64_t t1 = timer.getMilliseconds();
text::info("Mean = {}. Time spent = {} seconds",res5.value(),(float)((t1-t0)/1000.f));
}catch(std::exception& e)
{
text::info("Error = {}",e.what());
}
}
Extension of mel::core::Future to apply to executors.
Definition: ExFuture.h:21
::mel::core::WaitResult< T > waitForFutureThread(const mel::core::Future< T > &f, unsigned int msecs=::mel::core::Event::EVENT_WAIT_INFINITE) noexcept(std::is_same< ErrorType, ::mel::core::WaitErrorNoException >::value)
Wait for a Future from a Thread.
Definition: Thread.h:236
ExFuture< ExecutorAgent, typename ::mel::execution::_private::GetReturn< TArg, FTypes... >::type > parallel_convert(ExFuture< ExecutorAgent, TArg > source, FTypes... functions)
Same as parallel but returning a tuple with the values for each functor.
Definition: Executor.h:726
ExFuture< ExecutorAgent, std::invoke_result_t< F, TArg > > next(ExFuture< ExecutorAgent, TArg > source, F f)
Attach a functor to execute when input fut is complete Given functor will be executed inf the input E...
Definition: Executor.h:166
And from the main function:
Runnable::RunnableCreationOptions opts;
opts.schedulerOpts = ProcessScheduler::LockFreeOptions{};
auto th1 = ThreadRunnable::create(true,opts);
execution::Executor<Runnable> exr(th1);
typedef vector<double> VectorType;
auto initFut = execution::launch(exr,[]()
{
constexpr int vecSize = 100'000'000;
VectorType v(vecSize);
for( size_t i = 0; i < vecSize;++i)
v[i] = (std::rand()%20)/3.0;
return v;
});
core::waitForFutureThread(initFut);
auto initRes = core::waitForFutureThread<mel::core::WaitErrorNoException>( initFut );
Timer timer;
uint64_t t0 = timer.getMilliseconds();
text::info("vector mean: plain way");
double mean = 0.0;
{
auto& v = initRes.value();
size_t endIdx = v.size();
for(size_t i = 0; i < endIdx;++i)
mean+=v[i];
mean/=v.size();
}
uint64_t t1 = timer.getMilliseconds();
mel::text::info("Mean = {}. Time spent = {}",mean,(float)((t1-t0)/1000.f));
{
parallelism::ThreadPool::ThreadPoolOpts opts;
opts.threadOpts.schedulerOpts = ProcessScheduler::LockFreeOptions{};
auto myPool = make_shared<parallelism::ThreadPool>(opts);
parallelism::ThreadPool::ExecutionOpts exopts;
execution::Executor<parallelism::ThreadPool> extp(myPool);
extp.setOpts({true,true});
text::info("vector mean: ThreadPoolExecutor");
_testMeanVector( execution::transfer(initFut,extp),"vector mean: ThreadPoolExecutor",test);
}
{
exr.setOpts({true,false});
text::info("vector mean: RunnableExecutor");
_testMeanVector(execution::transfer(initFut,exr),"vector mean: RunnableExecutor",test);
}
{
execution::InlineExecutor ex;
text::info("vector mean: InlineExecutor");
_testMeanVector( execution::transfer(initFut,ex),"vector mean: InlineExecutor",test);
}
In this code, a very big vector is created, and the mean of its values is calculated by:
- spliting the vector in 4 parts and calculating the mean on each. this is done in using parallel_convert with 4 parallel tasks, each of which returns its own mean, so the full result of this job is a tuple.
- passing the previous tuple with next to a callable which sums the four measurements.
Also, at first, a plain calculation is done to compare de results. In the concrete machine uses at this moment, the output is:
vector mean: plain way
Mean = 3.1669570700186305. Time spent = 0.092
vector mean: ThreadPoolExecutor
Mean = 3.166957070004096. Time spent = 0.044 seconds
vector mean: RunnableExecutor
Mean = 3.166957070004096. Time spent = 0.093 seconds
vector mean: InlineExecutor
Mean = 3.166957070004096. Time spent = 0.091 seconds
The results show two very good news:
- using a ThreadPoolExecutor is more than double faster than a simple RunnableExecutor. By default a ThreadPool uses all available cores. In this used machine, the number of cores is 8, so each task in parallel_convert is fully independent. Although apparently this should mean that the code should be 4 times faster, other things can affect performance, mainly cache issues.
- the direct way and the InlineExecutor takes almost the same time that the RunnableExecutor way, meaning that the internals of the execution and microthread system has very,very low penalty.
In order to do improve the previous code, it had been better to take advantage of concrete parallelism possibilities of the system. Instead of dividing the input vector in 4 different parts, It would be better to divide in as much parts as the system can. The code could be:
try
{
Timer timer;
uint64_t t0 = timer.getMilliseconds();
vector<double> means(numParts);
fut
0,(int)numParts,
[&means,numParts](int idx,VectorType& v) noexcept
{
double mean = 0.f;
size_t tam = v.size()/numParts;
size_t startIdx = idx*tam;
size_t endIdx;
if ( idx == numParts-1)
endIdx = v.size();
else
endIdx = startIdx+tam;
for(size_t i = startIdx; i < endIdx;++i)
mean += v[i];
mean /= v.size();
means[idx] = mean;
},1
)
{
double mean = 0.0;
for(size_t i = 0; i < means.size();++i)
mean+=means[i];
return mean;
})
);
uint64_t t1 = timer.getMilliseconds();
text::info("Using loop. Mean = {}. Time spent = {} seconds",res5.value(),(float)((t1-t0)/1000.f));
}catch(std::exception& e)
{
text::info("Error = {}",e.what());
}
MEL_API unsigned int getNumProcessors()
Get the number of logical processors.
ExFuture< ExecutorAgent, TArg > loop(ExFuture< ExecutorAgent, TArg > source, I getIteratorsFunc, F functor, int increment=1)
parallel (possibly, depending on executor capabilities) loop
Definition: Executor.h:290
In this new version, loop is used, doing as much iterations as available cores. Also, the parallelism capabilities of the used executor is checked with ExecutorTraits, such that, with no real parallelism , dividing processing in chunks doesn't0 make any advantage (also the loss is minimal as was seen in the previous example).