/* * Copyright 2014 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef ASYNC_H #define ASYNC_H #include #include #include #include #include #include "future.h" #include "async_impl.h" #include #include #include #include #ifdef WITH_KJOB #include #endif /* * API to help write async code. * * This API is based around jobs that take lambdas to execute asynchronous tasks. Each async operation can take a continuation, * that can then be used to execute further async operations. That way it is possible to build async chains of operations, * that can be stored and executed later on. Jobs can be composed, similarly to functions. * * Relations between the components: * * Job: API wrapper around Executors chain. Can be destroyed while still running, * because the actual execution happens in the background * * Executor: Describes task to execute. Executors form a linked list matching the * order in which they will be executed. The Executor chain is destroyed when * the parent Job is destroyed. However if the Job is still running it is * guaranteed that the Executor chain will not be destroyed until the execution * is finished. * * Execution: The running execution of the task stored in Executor. Each call to Job::exec() * instantiates new Execution chain, which makes it possible for the Job to be * executed multiple times (even in parallel). * * Future: Representation of the result that is being calculated * * * TODO: Composed progress reporting * TODO: Possibility to abort a job through future (perhaps optional?) * TODO: Support for timeout, specified during exec call, after which the error handler gets called with a defined errorCode. */ namespace Async { template class Executor; class JobBase; template class Job; template using ThenTask = typename detail::identity&)>>::type; template using SyncThenTask = typename detail::identity>::type; template using EachTask = typename detail::identity&)>>::type; template using SyncEachTask = typename detail::identity>::type; template using ReduceTask = typename detail::identity&)>>::type; template using SyncReduceTask = typename detail::identity>::type; using ErrorHandler = std::function; using Condition = std::function; namespace Private { class ExecutorBase; typedef QSharedPointer ExecutorBasePtr; struct Execution { Execution(const ExecutorBasePtr &executor) : executor(executor) , resultBase(nullptr) , isRunning(false) , isFinished(false) {} ~Execution() { if (resultBase) { resultBase->releaseExecution(); delete resultBase; } prevExecution.reset(); } void setFinished() { isFinished = true; executor.clear(); } template Async::Future* result() { return static_cast*>(resultBase); } void releaseFuture() { resultBase = 0; } ExecutorBasePtr executor; FutureBase *resultBase; bool isRunning; bool isFinished; ExecutionPtr prevExecution; }; typedef QSharedPointer ExecutionPtr; class ExecutorBase { template friend class Executor; template friend class Async::Job; public: virtual ~ExecutorBase(); virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0; protected: ExecutorBase(const ExecutorBasePtr &parent); template Async::Future* createFuture(const ExecutionPtr &execution) const; ExecutorBasePtr mPrev; }; template class Executor : public ExecutorBase { protected: Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) : ExecutorBase(parent) , mErrorFunc(errorHandler) {} virtual ~Executor() {} virtual void run(const ExecutionPtr &execution) = 0; ExecutionPtr exec(const ExecutorBasePtr &self); std::function mErrorFunc; }; template class ThenExecutor: public Executor::type, Out, In ...> { public: ThenExecutor(ThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); void run(const ExecutionPtr &execution); private: ThenTask mFunc; }; template class EachExecutor : public Executor { public: EachExecutor(EachTask each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); void run(const ExecutionPtr &execution); private: EachTask mFunc; QVector*> mFutureWatchers; }; template class ReduceExecutor : public ThenExecutor { public: ReduceExecutor(ReduceTask reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); private: ReduceTask mFunc; }; template class SyncThenExecutor : public Executor::type, Out, In ...> { public: SyncThenExecutor(SyncThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); void run(const ExecutionPtr &execution); private: void run(const ExecutionPtr &execution, std::false_type); // !std::is_void void run(const ExecutionPtr &execution, std::true_type); // std::is_void SyncThenTask mFunc; }; template class SyncReduceExecutor : public SyncThenExecutor { public: SyncReduceExecutor(SyncReduceTask reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); private: SyncReduceTask mFunc; }; template class SyncEachExecutor : public Executor { public: SyncEachExecutor(SyncEachTask each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); void run(const ExecutionPtr &execution); private: void run(Async::Future *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void void run(Async::Future *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void SyncEachTask mFunc; }; } // namespace Private /** * Start an asynchronous job sequence. * * Async::start() is your starting point to build a chain of jobs to be executed * asynchronously. * * @param func An asynchronous function to be executed. The function must have * void return type, and accept exactly one argument of type @p Async::Future, * where @p In is type of the result. */ template Job start(ThenTask func); template Job start(SyncThenTask func); #ifdef WITH_KJOB template Job start(); #endif /** * Async while loop. * * The loop continues while @param condition returns true. */ Job dowhile(Condition condition, ThenTask func); /** * Async while loop. * * Loop continues while body returns true. */ Job dowhile(ThenTask body); /** * A null job. * * An async noop. * */ template Job null(); /** * An error job. * * An async error. * */ template Job error(int errorCode = 1, const QString &errorMessage = QString()); class JobBase { template friend class Job; public: JobBase(const Private::ExecutorBasePtr &executor); ~JobBase(); protected: Private::ExecutorBasePtr mExecutor; }; /** * An Asynchronous job * * A single instance of Job represents a single method that will be executed * asynchrously. The Job is started by @p Job::exec(), which returns @p Async::Future * immediatelly. The Future will be set to finished state once the asynchronous * task has finished. You can use @p Async::Future::waitForFinished() to wait for * for the Future in blocking manner. * * It is possible to chain multiple Jobs one after another in different fashion * (sequential, parallel, etc.). Calling Job::exec() will then return a pending * @p Async::Future, and will execute the entire chain of jobs. * * @code * auto job = Job::start>( * [](Async::Future> &future) { * MyREST::PendingUsers *pu = MyREST::requestListOfUsers(); * QObject::connect(pu, &PendingOperation::finished, * [&](PendingOperation *pu) { * future->setValue(dynamic_cast(pu)->userIds()); * future->setFinished(); * }); * }) * .each, int>( * [](const int &userId, Async::Future> &future) { * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId); * QObject::connect(pu, &PendingOperation::finished, * [&](PendingOperation *pu) { * future->setValue(Qlist() << dynamic_cast(pu)->user()); * future->setFinished(); * }); * }); * * Async::Future> usersFuture = job.exec(); * usersFuture.waitForFinished(); * QList users = usersFuture.value(); * @endcode * * In the example above, calling @p job.exec() will first invoke the first job, * which will retrieve a list of IDs, and then will invoke the second function * for each single entry in the list returned by the first function. */ template class Job : public JobBase { template friend class Job; template friend Job start(Async::ThenTask func); template friend Job start(Async::SyncThenTask func); #ifdef WITH_KJOB template friend Job start(); #endif public: template Job then(ThenTask func, ErrorHandler errorFunc = ErrorHandler()) { return Job(Private::ExecutorBasePtr( new Private::ThenExecutor(func, errorFunc, mExecutor))); } template Job then(SyncThenTask func, ErrorHandler errorFunc = ErrorHandler()) { return Job(Private::ExecutorBasePtr( new Private::SyncThenExecutor(func, errorFunc, mExecutor))); } template Job then(Job otherJob, ErrorHandler errorFunc = ErrorHandler()) { return then(nestedJobWrapper(otherJob), errorFunc); } #ifdef WITH_KJOB template Job then() { return start(); } #endif template Job each(EachTask func, ErrorHandler errorFunc = ErrorHandler()) { eachInvariants(); return Job(Private::ExecutorBasePtr( new Private::EachExecutor(func, errorFunc, mExecutor))); } template Job each(SyncEachTask func, ErrorHandler errorFunc = ErrorHandler()) { eachInvariants(); return Job(Private::ExecutorBasePtr( new Private::SyncEachExecutor(func, errorFunc, mExecutor))); } template Job each(Job otherJob, ErrorHandler errorFunc = ErrorHandler()) { eachInvariants(); return each(nestedJobWrapper(otherJob), errorFunc); } template Job reduce(ReduceTask func, ErrorHandler errorFunc = ErrorHandler()) { reduceInvariants(); return Job(Private::ExecutorBasePtr( new Private::ReduceExecutor(func, errorFunc, mExecutor))); } template Job reduce(SyncReduceTask func, ErrorHandler errorFunc = ErrorHandler()) { reduceInvariants(); return Job(Private::ExecutorBasePtr( new Private::SyncReduceExecutor(func, errorFunc, mExecutor))); } template Job reduce(Job otherJob, ErrorHandler errorFunc = ErrorHandler()) { return reduce(nestedJobWrapper(otherJob), errorFunc); } template Async::Future exec(FirstIn in) { // Inject a fake sync executor that will return the initial value Private::ExecutorBasePtr first = mExecutor; while (first->mPrev) { first = first->mPrev; } auto init = new Private::SyncThenExecutor( [in]() -> FirstIn { return in; }, ErrorHandler(), Private::ExecutorBasePtr()); first->mPrev = Private::ExecutorBasePtr(init); auto result = exec(); // Remove the injected executor first->mPrev.reset(); return result; } Async::Future exec() { Private::ExecutionPtr execution = mExecutor->exec(mExecutor); Async::Future result = *execution->result(); return result; } private: Job(Private::ExecutorBasePtr executor) : JobBase(executor) {} template void eachInvariants() { static_assert(detail::isIterable::value, "The 'Each' task can only be connected to a job that returns a list or an array."); static_assert(std::is_void::value || detail::isIterable::value, "The result type of 'Each' task must be void, a list or an array."); } template void reduceInvariants() { static_assert(Async::detail::isIterable::value, "The 'Result' task can only be connected to a job that returns a list or an array"); static_assert(std::is_same::value, "The return type of previous task must be compatible with input type of this task"); } template inline std::function&)> nestedJobWrapper(Job otherJob) { return [otherJob](InOther ... in, Async::Future &future) { // copy by value is const auto job = otherJob; FutureWatcher *watcher = new FutureWatcher(); QObject::connect(watcher, &FutureWatcherBase::futureReady, [watcher, future]() { // FIXME: We pass future by value, because using reference causes the // future to get deleted before this lambda is invoked, leading to crash // in copyFutureValue() // copy by value is const auto outFuture = future; Async::detail::copyFutureValue(watcher->future(), outFuture); outFuture.setFinished(); delete watcher; }); watcher->setFuture(job.exec(in ...)); }; } }; } // namespace Async // ********** Out of line definitions **************** namespace Async { template Job start(ThenTask func) { return Job(Private::ExecutorBasePtr( new Private::ThenExecutor(func, ErrorHandler(), Private::ExecutorBasePtr()))); } template Job start(SyncThenTask func) { return Job(Private::ExecutorBasePtr( new Private::SyncThenExecutor(func, ErrorHandler(), Private::ExecutorBasePtr()))); } #ifdef WITH_KJOB template Job start() { return Job(Private::ExecutorBasePtr( new Private::ThenExecutor([](const Args & ... args, Async::Future &future) { KJobType *job = new KJobType(args ...); job->connect(job, &KJob::finished, [&future](KJob *job) { if (job->error()) { future.setError(job->error(), job->errorString()); } else { future.setValue((static_cast(job)->*KJobResultMethod)()); future.setFinished(); } }); job->start(); }, ErrorHandler(), Private::ExecutorBasePtr()))); } #endif template Job null() { return Async::start( [](Async::Future &future) { future.setFinished(); }); } template Job error(int errorCode, const QString &errorMessage) { return Async::start( [errorCode, errorMessage](Async::Future &future) { future.setError(errorCode, errorMessage); }); } namespace Private { template Async::Future* ExecutorBase::createFuture(const ExecutionPtr &execution) const { return new Async::Future(execution); } template ExecutionPtr Executor::exec(const ExecutorBasePtr &self) { // Passing 'self' to execution ensures that the Executor chain remains // valid until the entire execution is finished ExecutionPtr execution = ExecutionPtr::create(self); // chainup execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr(); /* } else if (mPrev && !mPrevFuture) { // If previous job is running or finished, just get it's future mPrevFuture = static_cast*>(mPrev->result()); } */ execution->resultBase = this->createFuture(execution); auto fw = new Async::FutureWatcher(); QObject::connect(fw, &Async::FutureWatcher::futureReady, [fw, execution, this]() { execution->setFinished(); delete fw; }); fw->setFuture(*execution->result()); Async::Future *prevFuture = execution->prevExecution ? execution->prevExecution->result() : nullptr; if (!prevFuture || prevFuture->isFinished()) { if (prevFuture && prevFuture->errorCode() != 0) { if (mErrorFunc) { mErrorFunc(prevFuture->errorCode(), prevFuture->errorMessage()); execution->resultBase->setFinished(); execution->setFinished(); return execution; } else { // Propagate the error to next caller } } execution->isRunning = true; run(execution); } else { auto futureWatcher = new Async::FutureWatcher(); QObject::connect(futureWatcher, &Async::FutureWatcher::futureReady, [futureWatcher, execution, this]() { auto prevFuture = futureWatcher->future(); assert(prevFuture.isFinished()); delete futureWatcher; if (prevFuture.errorCode() != 0) { if (mErrorFunc) { mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); execution->resultBase->setFinished(); return; } else { // Propagate the error to next caller } } execution->isRunning = true; run(execution); }); futureWatcher->setFuture(*static_cast*>(prevFuture)); } return execution; } template ThenExecutor::ThenExecutor(ThenTask then, ErrorHandler error, const ExecutorBasePtr &parent) : Executor::type, Out, In ...>(error, parent) , mFunc(then) { } template void ThenExecutor::run(const ExecutionPtr &execution) { Async::Future::type> *prevFuture = nullptr; if (execution->prevExecution) { prevFuture = execution->prevExecution->result::type>(); assert(prevFuture->isFinished()); } this->mFunc(prevFuture ? prevFuture->value() : In() ..., *execution->result()); } template EachExecutor::EachExecutor(EachTask each, ErrorHandler error, const ExecutorBasePtr &parent) : Executor(error, parent) , mFunc(each) { } template void EachExecutor::run(const ExecutionPtr &execution) { assert(execution->prevExecution); auto prevFuture = execution->prevExecution->result(); assert(prevFuture->isFinished()); auto out = execution->result(); if (prevFuture->value().isEmpty()) { out->setFinished(); return; } for (auto arg : prevFuture->value()) { Async::Future future; this->mFunc(arg, future); auto fw = new Async::FutureWatcher(); mFutureWatchers.append(fw); QObject::connect(fw, &Async::FutureWatcher::futureReady, [out, fw, this]() { auto future = fw->future(); assert(future.isFinished()); const int index = mFutureWatchers.indexOf(fw); assert(index > -1); mFutureWatchers.removeAt(index); out->setValue(out->value() + future.value()); if (mFutureWatchers.isEmpty()) { out->setFinished(); } delete fw; }); fw->setFuture(future); } } template ReduceExecutor::ReduceExecutor(ReduceTask reduce, ErrorHandler error, const ExecutorBasePtr &parent) : ThenExecutor(reduce, error, parent) { } template SyncThenExecutor::SyncThenExecutor(SyncThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) : Executor::type, Out, In ...>(errorHandler, parent) , mFunc(then) { } template void SyncThenExecutor::run(const ExecutionPtr &execution) { if (execution->prevExecution) { assert(execution->prevExecution->resultBase->isFinished()); } run(execution, std::is_void()); execution->resultBase->setFinished(); } template void SyncThenExecutor::run(const ExecutionPtr &execution, std::false_type) { Async::Future::type> *prevFuture = execution->prevExecution ? execution->prevExecution->result::type>() : nullptr; (void) prevFuture; // silence 'set but not used' warning Async::Future *future = execution->result(); future->setValue(this->mFunc(prevFuture ? prevFuture->value() : In() ...)); } template void SyncThenExecutor::run(const ExecutionPtr &execution, std::true_type) { Async::Future::type> *prevFuture = execution->prevExecution ? execution->prevExecution->result::type>() : nullptr; (void) prevFuture; // silence 'set but not used' warning this->mFunc(prevFuture ? prevFuture->value() : In() ...); } template SyncEachExecutor::SyncEachExecutor(SyncEachTask each, ErrorHandler errorHandler, const ExecutorBasePtr &parent) : Executor(errorHandler, parent) , mFunc(each) { } template void SyncEachExecutor::run(const ExecutionPtr &execution) { assert(execution->prevExecution); auto *prevFuture = execution->prevExecution->result(); assert(prevFuture->isFinished()); auto out = execution->result(); if (prevFuture->value().isEmpty()) { out->setFinished(); return; } for (auto arg : prevFuture->value()) { run(out, arg, std::is_void()); } out->setFinished(); } template void SyncEachExecutor::run(Async::Future *out, const typename PrevOut::value_type &arg, std::false_type) { out->setValue(out->value() + this->mFunc(arg)); } template void SyncEachExecutor::run(Async::Future * /* unused */, const typename PrevOut::value_type &arg, std::true_type) { this->mFunc(arg); } template SyncReduceExecutor::SyncReduceExecutor(SyncReduceTask reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent) : SyncThenExecutor(reduce, errorHandler, parent) { } } // namespace Private } // namespace Async #endif // ASYNC_H