From c30e9145049c52feb2de719307ebbfee0650f01b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Thu, 11 Dec 2014 15:55:18 +0100 Subject: Async: move the actual task exection into Executor implementation As of now, Job is only front interface to a chain of Executor subclasses. Each Executor subclass specializes for given type of execution (then, each, reduce, ...), and the chain is then executed recursively, as we did with the original Job implementation. --- async/autotests/asynctest.cpp | 15 ++-- async/src/async.cpp | 19 +++- async/src/async.h | 200 ++++++++++++++++++++++++------------------ async/src/future.h | 2 + 4 files changed, 142 insertions(+), 94 deletions(-) diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index 19e40f7..403fb83 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp @@ -91,18 +91,21 @@ void AsyncTest::testAsyncPromises() void AsyncTest::testSyncEach() { - /* auto job = Async::start>( - []() -> Async::Future> { - Async::Future> future(QList{ 1, 2, 3, 4 }); + [](Async::Future> &future) { + future.setValue(QList{ 1, 2, 3, 4 }); future.setFinished(); - return future; }) .each, int>( [](const int &v, Async::Future> &future) { - setFinished(); + future.setValue(QList{ v + 1 }); + future.setFinished(); }); - */ + + job.exec(); + Async::Future> future = job.result(); + const QList expected({ 2, 3, 4, 5 }); + QCOMPARE(future.value(), expected); } diff --git a/async/src/async.cpp b/async/src/async.cpp index 7e81f24..c4a88fd 100644 --- a/async/src/async.cpp +++ b/async/src/async.cpp @@ -28,13 +28,20 @@ using namespace Async; -JobBase::JobBase(JobBase::JobType jobType, JobBase* prev) - : mPrev(prev) - , mResult(0) - , mJobType(jobType) +JobBase::JobBase(Executor *executor) + : mExecutor(executor) { } +JobBase::~JobBase() +{ +} + +void JobBase::exec() +{ + mExecutor->exec(); +} + FutureBase::FutureBase() : mFinished(false) @@ -48,6 +55,10 @@ FutureBase::FutureBase(const Async::FutureBase &other) { } +FutureBase::~FutureBase() +{ +} + void FutureBase::setFinished() { mFinished = true; diff --git a/async/src/async.h b/async/src/async.h index 182e57c..0f027f5 100644 --- a/async/src/async.h +++ b/async/src/async.h @@ -27,6 +27,7 @@ #include #include #include +#include #include "future.h" #include "async_impl.h" @@ -49,97 +50,159 @@ using ReduceTask = typename detail::identity Job start(ThenTask func); -namespace Private +class Executor { - template - void doExec(Job *job, Async::Future &out, const In & ... args); -} + +public: + Executor(Executor *parent) + : mPrev(parent) + , mResult(0) + { + } + + virtual ~Executor() + { + delete mResult; + } + + virtual void exec() = 0; + + FutureBase* result() const + { + return mResult; + } + + Executor *mPrev; + FutureBase *mResult; +}; + +template +class ThenExecutor: public Executor +{ + + typedef Out OutType; + typedef typename std::tuple_element<0, std::tuple>::type InType; + + +public: + ThenExecutor(ThenTask then, Executor *parent = nullptr) + : Executor(parent) + , mFunc(then) + { + } + + void exec() + { + Async::Future *in = 0; + if (mPrev) { + mPrev->exec(); + in = static_cast*>(mPrev->result()); + assert(in->isFinished()); + } + + auto out = new Async::Future(); + mFunc(in ? in->value() : In() ..., *out); + out->waitForFinished(); + mResult = out; + } + +private: + std::function&)> mFunc; +}; + +template +class EachExecutor : public Executor +{ +public: + EachExecutor(EachTask each, Executor *parent = nullptr) + : Executor(parent) + , mFunc(each) + { + } + + void exec() + { + assert(mPrev); + mPrev->exec(); + Async::Future *in = static_cast*>(mPrev->result()); + + auto *out = new Async::Future(); + for (auto arg : in->value()) { + Async::Future future; + mFunc(arg, future); + future.waitForFinished(); + out->setValue(out->value() + future.value()); + } + + mResult = out; + } + +private: + std::function&)> mFunc; +}; class JobBase { template friend class Job; -protected: - enum JobType { - Then, - Each, - Reduce - }; - public: - JobBase(JobType jobType, JobBase *prev = nullptr); - virtual void exec() = 0; + JobBase(Executor *executor); + ~JobBase(); + + void exec(); protected: - JobBase *mPrev; - void *mResult; - JobType mJobType; + Executor *mExecutor; }; template class Job : public JobBase { - template + template friend class Job; - template - friend Job start(Async::ThenTask func); - - typedef Out OutType; - typedef typename std::tuple_element<0, std::tuple>::type InType; + template + friend Job start(Async::ThenTask func); public: - ~Job() + template + Job then(ThenTask func) { - delete reinterpret_cast*>(mResult); + Executor *exec = new ThenExecutor(func, mExecutor); + return Job(exec); } - template - Job then(ThenTask func) + template + Job each(EachTask func) { - return Job::create(func, JobBase::Then, this); - } - - template - Job each(EachTask func) - { - static_assert(detail::isIterable::value, + static_assert(detail::isIterable::value, "The 'Each' task can only be connected to a job that returns a list or array."); - static_assert(detail::isIterable::value, + static_assert(detail::isIterable::value, "The result type of 'Each' task must be a list or an array."); - return Job::create(func, JobBase::Each, this); + return Job(new EachExecutor(func, mExecutor)); } - template - Job reduce(ReduceTask func) + template + Job reduce(ReduceTask func) { - static_assert(Async::detail::isIterable::value, + static_assert(Async::detail::isIterable::value, "The result type of 'Reduce' task must be a list or an array."); - return Job::create(func, JobBase::Reduce, this); + //return Job::create(func, new ReduceEx, this); } Async::Future result() const { - return *reinterpret_cast*>(mResult); + return *static_cast*>(mExecutor->result()); } - void exec(); - private: - Job(JobBase::JobType jobType, JobBase *parent = nullptr) - : JobBase(jobType, parent) + Job(Executor *executor) + : JobBase(executor) { } - - template - static Job create(F func, JobBase::JobType jobType, JobBase *parent = nullptr); - -public: - std::function&)> mFunc; }; - } // namespace Async @@ -149,41 +212,10 @@ public: template Async::Job Async::start(ThenTask func) { - return Job::create(func, JobBase::Then); -} - -template -void Async::Private::doExec(Job *job, Async::Future &out, const In & ... args) -{ - job->mFunc(args ..., out); -}; - -template -void Async::Job::exec() -{ - Async::Future *in = nullptr; - if (mPrev) { - mPrev->exec(); - in = reinterpret_cast*>(mPrev->mResult); - assert(in->isFinished()); - } - - auto out = new Async::Future(); - Private::doExec(this, *out, in ? in->value() : In() ...); - out->waitForFinished(); - mResult = reinterpret_cast(out); -} - -template -template -Async::Job Async::Job::create(F func, Async::JobBase::JobType jobType, Async::JobBase* parent) -{ - Job job(jobType, parent); - job.mFunc = func; - return job; + Executor *exec = new ThenExecutor(func); + return Job(exec); } - #endif // ASYNC_H diff --git a/async/src/future.h b/async/src/future.h index c53ef56..eb3de1e 100644 --- a/async/src/future.h +++ b/async/src/future.h @@ -31,6 +31,8 @@ class FutureBase public: FutureBase(); FutureBase(const FutureBase &other); + virtual ~FutureBase(); + void setFinished(); bool isFinished() const; void waitForFinished(); -- cgit v1.2.3