/* * Copyright 2014 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef ASYNC_H #define ASYNC_H #include #include #include #include #include #include "future.h" #include "async_impl.h" #include #include namespace Async { template class Executor; class JobBase; template class Job; template using ThenTask = typename detail::identity&)>>::type; template using EachTask = typename detail::identity&)>>::type; template using ReduceTask = typename detail::identity&)>>::type; namespace Private { template struct PreviousOut { using type = typename std::tuple_element<0, std::tuple>::type; }; class ExecutorBase { template friend class Executor; public: virtual ~ExecutorBase(); virtual void exec() = 0; inline FutureBase* result() const { return mResult; } protected: ExecutorBase(ExecutorBase *parent); ExecutorBase *mPrev; FutureBase *mResult; }; template class Executor : public ExecutorBase { protected: Executor(ExecutorBase *parent) : ExecutorBase(parent) , mPrevFuture(0) , mPrevFutureWatcher(0) {} virtual ~Executor() {} inline Async::Future* chainup(); virtual void previousFutureReady() = 0; void exec(); std::function &)> mFunc; Async::Future *mPrevFuture; Async::FutureWatcher *mPrevFutureWatcher; }; template class ThenExecutor: public Executor::type, Out, In ...> { public: ThenExecutor(ThenTask then, ExecutorBase *parent = nullptr); void previousFutureReady(); private: Async::FutureWatcher::type> *mFutureWatcher; }; template class EachExecutor : public Executor { public: EachExecutor(EachTask each, ExecutorBase *parent); void previousFutureReady(); private: QVector*> mFutureWatchers; }; template class ReduceExecutor : public Executor { public: ReduceExecutor(ReduceTask reduce, ExecutorBase *parent); void previousFutureReady(); }; } // namespace Private /** * Start an asynchronous job sequence. * * Async::start() is your starting point to build a chain of jobs to be executed * asynchronously. * * @param func An asynchronous function to be executed. The function must have * void return type, and accept exactly one argument of type @p Async::Future, * where @p In is type of the result. */ template Job start(ThenTask func); /** * A null job. * * An async noop. * */ template Job null() { return Async::start([](Async::Future &future) {future.setFinished();}); } class JobBase { template friend class Job; public: JobBase(Private::ExecutorBase *executor); ~JobBase(); protected: Private::ExecutorBase *mExecutor; }; /** * An Asynchronous job * * A single instance of Job represents a single method that will be executed * asynchrously. The Job is started by @p Job::exec(), which returns @p Async::Future * immediatelly. The Future will be set to finished state once the asynchronous * task has finished. You can use @p Async::Future::waitForFinished() to wait for * for the Future in blocking manner. * * It is possible to chain multiple Jobs one after another in different fashion * (sequential, parallel, etc.). Calling Job::exec() will then return a pending * @p Async::Future, and will execute the entire chain of jobs. * * @code * auto job = Job::start>( * [](Async::Future> &future) { * MyREST::PendingUsers *pu = MyREST::requestListOfUsers(); * QObject::connect(pu, &PendingOperation::finished, * [&](PendingOperation *pu) { * future->setValue(dynamic_cast(pu)->userIds()); * future->setFinished(); * }); * }) * .each, int>( * [](const int &userId, Async::Future> &future) { * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId); * QObject::connect(pu, &PendingOperation::finished, * [&](PendingOperation *pu) { * future->setValue(Qlist() << dynamic_cast(pu)->user()); * future->setFinished(); * }); * }); * * Async::Future> usersFuture = job.exec(); * usersFuture.waitForFinished(); * QList users = usersFuture.value(); * @endcode * * In the example above, calling @p job.exec() will first invoke the first job, * which will retrieve a list of IDs, and then will invoke the second function * for each single entry in the list returned by the first function. */ template class Job : public JobBase { template friend class Job; template friend Job start(Async::ThenTask func); public: template Job then(ThenTask func) { return Job(new Private::ThenExecutor(func, mExecutor)); } template Job each(EachTask func) { static_assert(detail::isIterable::value, "The 'Each' task can only be connected to a job that returns a list or an array."); static_assert(detail::isIterable::value, "The result type of 'Each' task must be a list or an array."); return Job(new Private::EachExecutor(func, mExecutor)); } template Job reduce(ReduceTask func) { static_assert(Async::detail::isIterable::value, "The 'Result' task can only be connected to a job that returns a list or an array"); static_assert(std::is_same::value, "The return type of previous task must be compatible with input type of this task"); return Job(new Private::ReduceExecutor(func, mExecutor)); } Async::Future exec() { mExecutor->exec(); return result(); } Async::Future result() const { return *static_cast*>(mExecutor->result()); } private: Job(Private::ExecutorBase *executor) : JobBase(executor) {} }; } // namespace Async // ********** Out of line definitions **************** namespace Async { template Job start(ThenTask func) { return Job(new Private::ThenExecutor(func)); } namespace Private { template Future* Executor::chainup() { if (mPrev) { mPrev->exec(); return static_cast*>(mPrev->result()); } else { return 0; } } template void Executor::exec() { mPrevFuture = chainup(); mResult = new Async::Future(); if (!mPrevFuture || mPrevFuture->isFinished()) { previousFutureReady(); } else { auto futureWatcher = new Async::FutureWatcher(); QObject::connect(futureWatcher, &Async::FutureWatcher::futureReady, [futureWatcher, this]() { assert(futureWatcher->future().isFinished()); futureWatcher->deleteLater(); previousFutureReady(); }); futureWatcher->setFuture(*mPrevFuture); } } template ThenExecutor::ThenExecutor(ThenTask then, ExecutorBase* parent) : Executor::type, Out, In ...>(parent) { this->mFunc = then; } template void ThenExecutor::previousFutureReady() { if (this->mPrevFuture) { assert(this->mPrevFuture->isFinished()); } this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., *static_cast*>(this->mResult)); } template EachExecutor::EachExecutor(EachTask each, ExecutorBase* parent) : Executor(parent) { this->mFunc = each; } template void EachExecutor::previousFutureReady() { assert(this->mPrevFuture->isFinished()); auto out = static_cast*>(this->mResult); if (this->mPrevFuture->value().isEmpty()) { out->setFinished(); return; } for (auto arg : this->mPrevFuture->value()) { Async::Future future; this->mFunc(arg, future); auto fw = new Async::FutureWatcher(); mFutureWatchers.append(fw); QObject::connect(fw, &Async::FutureWatcher::futureReady, [out, future, fw, this]() { assert(future.isFinished()); const int index = mFutureWatchers.indexOf(fw); assert(index > -1); mFutureWatchers.removeAt(index); out->setValue(out->value() + future.value()); if (mFutureWatchers.isEmpty()) { out->setFinished(); } }); fw->setFuture(future); } } template ReduceExecutor::ReduceExecutor(ReduceTask reduce, ExecutorBase* parent) : Executor(parent) { this->mFunc = reduce; } template void ReduceExecutor::previousFutureReady() { assert(this->mPrevFuture->isFinished()); this->mFunc(this->mPrevFuture->value(), *static_cast*>(this->mResult)); } } // namespace Private } // namespace Async #endif // ASYNC_H