summaryrefslogtreecommitdiffstats
path: root/async/src/async.h
diff options
context:
space:
mode:
Diffstat (limited to 'async/src/async.h')
-rw-r--r--async/src/async.h110
1 files changed, 76 insertions, 34 deletions
diff --git a/async/src/async.h b/async/src/async.h
index 0e4f246..233ad56 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -25,14 +25,15 @@
25#include <functional> 25#include <functional>
26#include <list> 26#include <list>
27#include <type_traits> 27#include <type_traits>
28#include <iostream>
29#include <cassert> 28#include <cassert>
30#include <iterator> 29#include <iterator>
31#include <boost/graph/graph_concepts.hpp>
32 30
33#include "future.h" 31#include "future.h"
34#include "async_impl.h" 32#include "async_impl.h"
35 33
34#include <QVector>
35#include <QObject>
36
36 37
37namespace Async { 38namespace Async {
38 39
@@ -86,11 +87,18 @@ class Executor : public ExecutorBase
86protected: 87protected:
87 Executor(ExecutorBase *parent) 88 Executor(ExecutorBase *parent)
88 : ExecutorBase(parent) 89 : ExecutorBase(parent)
90 , mPrevFuture(0)
91 , mPrevFutureWatcher(0)
89 {} 92 {}
90 virtual ~Executor() {} 93 virtual ~Executor() {}
91 inline Async::Future<PrevOut>* chainup(); 94 inline Async::Future<PrevOut>* chainup();
95 virtual void previousFutureReady() = 0;
96
97 void exec();
92 98
93 std::function<void(const In& ..., Async::Future<Out> &)> mFunc; 99 std::function<void(const In& ..., Async::Future<Out> &)> mFunc;
100 Async::Future<PrevOut> *mPrevFuture;
101 Async::FutureWatcher<PrevOut> *mPrevFutureWatcher;
94}; 102};
95 103
96template<typename Out, typename ... In> 104template<typename Out, typename ... In>
@@ -98,7 +106,10 @@ class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In
98{ 106{
99public: 107public:
100 ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr); 108 ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr);
101 void exec(); 109 void previousFutureReady();
110
111private:
112 Async::FutureWatcher<typename PreviousOut<In ...>::type> *mFutureWatcher;
102}; 113};
103 114
104template<typename PrevOut, typename Out, typename In> 115template<typename PrevOut, typename Out, typename In>
@@ -106,7 +117,10 @@ class EachExecutor : public Executor<PrevOut, Out, In>
106{ 117{
107public: 118public:
108 EachExecutor(EachTask<Out, In> each, ExecutorBase *parent); 119 EachExecutor(EachTask<Out, In> each, ExecutorBase *parent);
109 void exec(); 120 void previousFutureReady();
121
122private:
123 QVector<Async::FutureWatcher<PrevOut>*> mFutureWatchers;
110}; 124};
111 125
112template<typename Out, typename In> 126template<typename Out, typename In>
@@ -114,7 +128,7 @@ class ReduceExecutor : public Executor<In, Out, In>
114{ 128{
115public: 129public:
116 ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent); 130 ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent);
117 void exec(); 131 void previousFutureReady();
118}; 132};
119 133
120} // namespace Private 134} // namespace Private
@@ -141,8 +155,6 @@ public:
141 JobBase(Private::ExecutorBase *executor); 155 JobBase(Private::ExecutorBase *executor);
142 ~JobBase(); 156 ~JobBase();
143 157
144 void exec();
145
146protected: 158protected:
147 Private::ExecutorBase *mExecutor; 159 Private::ExecutorBase *mExecutor;
148}; 160};
@@ -225,6 +237,12 @@ public:
225 return Job<OutOther, InOther>(new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)); 237 return Job<OutOther, InOther>(new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor));
226 } 238 }
227 239
240 Async::Future<Out> exec()
241 {
242 mExecutor->exec();
243 return result();
244 }
245
228 Async::Future<Out> result() const 246 Async::Future<Out> result() const
229 { 247 {
230 return *static_cast<Async::Future<Out>*>(mExecutor->result()); 248 return *static_cast<Async::Future<Out>*>(mExecutor->result());
@@ -257,14 +275,31 @@ Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup()
257{ 275{
258 if (mPrev) { 276 if (mPrev) {
259 mPrev->exec(); 277 mPrev->exec();
260 auto future = static_cast<Async::Future<PrevOut>*>(mPrev->result()); 278 return static_cast<Async::Future<PrevOut>*>(mPrev->result());
261 assert(future->isFinished());
262 return future;
263 } else { 279 } else {
264 return 0; 280 return 0;
265 } 281 }
266} 282}
267 283
284template<typename PrevOut, typename Out, typename ... In>
285void Executor<PrevOut, Out, In ...>::exec()
286{
287 mPrevFuture = chainup();
288 mResult = new Async::Future<Out>();
289 if (!mPrevFuture || mPrevFuture->isFinished()) {
290 previousFutureReady();
291 } else {
292 auto futureWatcher = new Async::FutureWatcher<PrevOut>();
293 QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady,
294 [futureWatcher, this]() {
295 assert(futureWatcher->future().isFinished());
296 futureWatcher->deleteLater();
297 previousFutureReady();
298 });
299 futureWatcher->setFuture(*mPrevFuture);
300 }
301}
302
268template<typename Out, typename ... In> 303template<typename Out, typename ... In>
269ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase* parent) 304ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase* parent)
270 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) 305 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent)
@@ -273,15 +308,13 @@ ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase
273} 308}
274 309
275template<typename Out, typename ... In> 310template<typename Out, typename ... In>
276void ThenExecutor<Out, In ...>::exec() 311void ThenExecutor<Out, In ...>::previousFutureReady()
277{ 312{
278 auto in = this->chainup(); 313 if (this->mPrevFuture) {
279 (void)in; // supress 'unused variable' warning when In is void 314 assert(this->mPrevFuture->isFinished());
280 315 }
281 auto out = new Async::Future<Out>(); 316 this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...,
282 this->mFunc(in ? in->value() : In() ..., *out); 317 *static_cast<Async::Future<Out>*>(this->mResult));
283 out->waitForFinished();
284 this->mResult = out;
285} 318}
286 319
287template<typename PrevOut, typename Out, typename In> 320template<typename PrevOut, typename Out, typename In>
@@ -292,20 +325,33 @@ EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ExecutorBas
292} 325}
293 326
294template<typename PrevOut, typename Out, typename In> 327template<typename PrevOut, typename Out, typename In>
295void EachExecutor<PrevOut, Out, In>::exec() 328void EachExecutor<PrevOut, Out, In>::previousFutureReady()
296{ 329{
297 auto in = this->chainup(); 330 assert(this->mPrevFuture->isFinished());
331 auto out = static_cast<Async::Future<Out>*>(this->mResult);
332 if (this->mPrevFuture->value().isEmpty()) {
333 out->setFinished();
334 return;
335 }
298 336
299 auto *out = new Async::Future<Out>(); 337 for (auto arg : this->mPrevFuture->value()) {
300 for (auto arg : in->value()) {
301 Async::Future<Out> future; 338 Async::Future<Out> future;
302 this->mFunc(arg, future); 339 this->mFunc(arg, future);
303 future.waitForFinished(); 340 auto fw = new Async::FutureWatcher<Out>();
304 out->setValue(out->value() + future.value()); 341 mFutureWatchers.append(fw);
342 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
343 [out, future, fw, this]() {
344 assert(future.isFinished());
345 const int index = mFutureWatchers.indexOf(fw);
346 assert(index > -1);
347 mFutureWatchers.removeAt(index);
348 out->setValue(out->value() + future.value());
349 if (mFutureWatchers.isEmpty()) {
350 out->setFinished();
351 }
352 });
353 fw->setFuture(future);
305 } 354 }
306 out->setFinished();
307
308 this->mResult = out;
309} 355}
310 356
311template<typename Out, typename In> 357template<typename Out, typename In>
@@ -316,14 +362,10 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase
316} 362}
317 363
318template<typename Out, typename In> 364template<typename Out, typename In>
319void ReduceExecutor<Out, In>::exec() 365void ReduceExecutor<Out, In>::previousFutureReady()
320{ 366{
321 auto in = this->chainup(); 367 assert(this->mPrevFuture->isFinished());
322 368 this->mFunc(this->mPrevFuture->value(), *static_cast<Async::Future<Out>*>(this->mResult));
323 auto out = new Async::Future<Out>();
324 this->mFunc(in->value(), *out);
325 out->waitForFinished();
326 this->mResult = out;
327} 369}
328 370
329} // namespace Private 371} // namespace Private