From e0ba51543037a026e2a8d483dfdc0e196e9dc59e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Thu, 11 Dec 2014 16:09:28 +0100 Subject: Async: add Reduce task --- async/src/async.h | 44 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) (limited to 'async/src') diff --git a/async/src/async.h b/async/src/async.h index 0f027f5..a976fa2 100644 --- a/async/src/async.h +++ b/async/src/async.h @@ -80,10 +80,6 @@ template class ThenExecutor: public Executor { - typedef Out OutType; - typedef typename std::tuple_element<0, std::tuple>::type InType; - - public: ThenExecutor(ThenTask then, Executor *parent = nullptr) : Executor(parent) @@ -93,10 +89,12 @@ public: void exec() { - Async::Future *in = 0; + typedef typename std::tuple_element<0, std::tuple>::type PrevOut; + + Async::Future *in = 0; if (mPrev) { mPrev->exec(); - in = static_cast*>(mPrev->result()); + in = static_cast*>(mPrev->result()); assert(in->isFinished()); } @@ -133,6 +131,7 @@ public: future.waitForFinished(); out->setValue(out->value() + future.value()); } + out->setFinished(); mResult = out; } @@ -141,6 +140,32 @@ private: std::function&)> mFunc; }; +template +class ReduceExecutor : public Executor +{ +public: + ReduceExecutor(ReduceTask reduce, Executor *parent = nullptr) + : Executor(parent) + , mFunc(reduce) + { + } + + void exec() + { + assert(mPrev); + mPrev->exec(); + Async::Future *in = static_cast*>(mPrev->result()); + + auto out = new Async::Future(); + mFunc(in->value(), *out); + out->waitForFinished(); + mResult = out; + } + +private: + std::function &)> mFunc; +}; + class JobBase { template @@ -187,8 +212,8 @@ public: Job reduce(ReduceTask func) { static_assert(Async::detail::isIterable::value, - "The result type of 'Reduce' task must be a list or an array."); - //return Job::create(func, new ReduceEx, this); + "The 'Result' task can only be connected to a job that returns a list or array"); + return Job(new ReduceExecutor(func, mExecutor)); } Async::Future result() const @@ -212,8 +237,7 @@ private: template Async::Job Async::start(ThenTask func) { - Executor *exec = new ThenExecutor(func); - return Job(exec); + return Job(new ThenExecutor(func)); } #endif // ASYNC_H -- cgit v1.2.3