diff options
author | Dan Vrátil <dvratil@redhat.com> | 2014-12-14 12:59:22 +0100 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2014-12-14 13:03:38 +0100 |
commit | 664396b0e550910cea50b7852066a04cc7fec3bd (patch) | |
tree | 544f9e8206bd498fd16047211a833d01d65ac864 /async/src/async.h | |
parent | 925d3bd3159820c0eae356fe4d3af54cb16ae1e3 (diff) | |
download | sink-664396b0e550910cea50b7852066a04cc7fec3bd.tar.gz sink-664396b0e550910cea50b7852066a04cc7fec3bd.zip |
Async: make the processing truly asynchronous
Now calling exec() starts the first job and returns a pending Future immediately. Caller
can then use Async::FutureWatcher to wait for the future to become finished, i.e. for all
jobs to finish execution.
Diffstat (limited to 'async/src/async.h')
-rw-r--r-- | async/src/async.h | 110 |
1 files changed, 76 insertions, 34 deletions
diff --git a/async/src/async.h b/async/src/async.h index 0e4f246..233ad56 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -25,14 +25,15 @@ | |||
25 | #include <functional> | 25 | #include <functional> |
26 | #include <list> | 26 | #include <list> |
27 | #include <type_traits> | 27 | #include <type_traits> |
28 | #include <iostream> | ||
29 | #include <cassert> | 28 | #include <cassert> |
30 | #include <iterator> | 29 | #include <iterator> |
31 | #include <boost/graph/graph_concepts.hpp> | ||
32 | 30 | ||
33 | #include "future.h" | 31 | #include "future.h" |
34 | #include "async_impl.h" | 32 | #include "async_impl.h" |
35 | 33 | ||
34 | #include <QVector> | ||
35 | #include <QObject> | ||
36 | |||
36 | 37 | ||
37 | namespace Async { | 38 | namespace Async { |
38 | 39 | ||
@@ -86,11 +87,18 @@ class Executor : public ExecutorBase | |||
86 | protected: | 87 | protected: |
87 | Executor(ExecutorBase *parent) | 88 | Executor(ExecutorBase *parent) |
88 | : ExecutorBase(parent) | 89 | : ExecutorBase(parent) |
90 | , mPrevFuture(0) | ||
91 | , mPrevFutureWatcher(0) | ||
89 | {} | 92 | {} |
90 | virtual ~Executor() {} | 93 | virtual ~Executor() {} |
91 | inline Async::Future<PrevOut>* chainup(); | 94 | inline Async::Future<PrevOut>* chainup(); |
95 | virtual void previousFutureReady() = 0; | ||
96 | |||
97 | void exec(); | ||
92 | 98 | ||
93 | std::function<void(const In& ..., Async::Future<Out> &)> mFunc; | 99 | std::function<void(const In& ..., Async::Future<Out> &)> mFunc; |
100 | Async::Future<PrevOut> *mPrevFuture; | ||
101 | Async::FutureWatcher<PrevOut> *mPrevFutureWatcher; | ||
94 | }; | 102 | }; |
95 | 103 | ||
96 | template<typename Out, typename ... In> | 104 | template<typename Out, typename ... In> |
@@ -98,7 +106,10 @@ class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In | |||
98 | { | 106 | { |
99 | public: | 107 | public: |
100 | ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr); | 108 | ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr); |
101 | void exec(); | 109 | void previousFutureReady(); |
110 | |||
111 | private: | ||
112 | Async::FutureWatcher<typename PreviousOut<In ...>::type> *mFutureWatcher; | ||
102 | }; | 113 | }; |
103 | 114 | ||
104 | template<typename PrevOut, typename Out, typename In> | 115 | template<typename PrevOut, typename Out, typename In> |
@@ -106,7 +117,10 @@ class EachExecutor : public Executor<PrevOut, Out, In> | |||
106 | { | 117 | { |
107 | public: | 118 | public: |
108 | EachExecutor(EachTask<Out, In> each, ExecutorBase *parent); | 119 | EachExecutor(EachTask<Out, In> each, ExecutorBase *parent); |
109 | void exec(); | 120 | void previousFutureReady(); |
121 | |||
122 | private: | ||
123 | QVector<Async::FutureWatcher<PrevOut>*> mFutureWatchers; | ||
110 | }; | 124 | }; |
111 | 125 | ||
112 | template<typename Out, typename In> | 126 | template<typename Out, typename In> |
@@ -114,7 +128,7 @@ class ReduceExecutor : public Executor<In, Out, In> | |||
114 | { | 128 | { |
115 | public: | 129 | public: |
116 | ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent); | 130 | ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent); |
117 | void exec(); | 131 | void previousFutureReady(); |
118 | }; | 132 | }; |
119 | 133 | ||
120 | } // namespace Private | 134 | } // namespace Private |
@@ -141,8 +155,6 @@ public: | |||
141 | JobBase(Private::ExecutorBase *executor); | 155 | JobBase(Private::ExecutorBase *executor); |
142 | ~JobBase(); | 156 | ~JobBase(); |
143 | 157 | ||
144 | void exec(); | ||
145 | |||
146 | protected: | 158 | protected: |
147 | Private::ExecutorBase *mExecutor; | 159 | Private::ExecutorBase *mExecutor; |
148 | }; | 160 | }; |
@@ -225,6 +237,12 @@ public: | |||
225 | return Job<OutOther, InOther>(new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)); | 237 | return Job<OutOther, InOther>(new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)); |
226 | } | 238 | } |
227 | 239 | ||
240 | Async::Future<Out> exec() | ||
241 | { | ||
242 | mExecutor->exec(); | ||
243 | return result(); | ||
244 | } | ||
245 | |||
228 | Async::Future<Out> result() const | 246 | Async::Future<Out> result() const |
229 | { | 247 | { |
230 | return *static_cast<Async::Future<Out>*>(mExecutor->result()); | 248 | return *static_cast<Async::Future<Out>*>(mExecutor->result()); |
@@ -257,14 +275,31 @@ Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup() | |||
257 | { | 275 | { |
258 | if (mPrev) { | 276 | if (mPrev) { |
259 | mPrev->exec(); | 277 | mPrev->exec(); |
260 | auto future = static_cast<Async::Future<PrevOut>*>(mPrev->result()); | 278 | return static_cast<Async::Future<PrevOut>*>(mPrev->result()); |
261 | assert(future->isFinished()); | ||
262 | return future; | ||
263 | } else { | 279 | } else { |
264 | return 0; | 280 | return 0; |
265 | } | 281 | } |
266 | } | 282 | } |
267 | 283 | ||
284 | template<typename PrevOut, typename Out, typename ... In> | ||
285 | void Executor<PrevOut, Out, In ...>::exec() | ||
286 | { | ||
287 | mPrevFuture = chainup(); | ||
288 | mResult = new Async::Future<Out>(); | ||
289 | if (!mPrevFuture || mPrevFuture->isFinished()) { | ||
290 | previousFutureReady(); | ||
291 | } else { | ||
292 | auto futureWatcher = new Async::FutureWatcher<PrevOut>(); | ||
293 | QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, | ||
294 | [futureWatcher, this]() { | ||
295 | assert(futureWatcher->future().isFinished()); | ||
296 | futureWatcher->deleteLater(); | ||
297 | previousFutureReady(); | ||
298 | }); | ||
299 | futureWatcher->setFuture(*mPrevFuture); | ||
300 | } | ||
301 | } | ||
302 | |||
268 | template<typename Out, typename ... In> | 303 | template<typename Out, typename ... In> |
269 | ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase* parent) | 304 | ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase* parent) |
270 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | 305 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) |
@@ -273,15 +308,13 @@ ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase | |||
273 | } | 308 | } |
274 | 309 | ||
275 | template<typename Out, typename ... In> | 310 | template<typename Out, typename ... In> |
276 | void ThenExecutor<Out, In ...>::exec() | 311 | void ThenExecutor<Out, In ...>::previousFutureReady() |
277 | { | 312 | { |
278 | auto in = this->chainup(); | 313 | if (this->mPrevFuture) { |
279 | (void)in; // supress 'unused variable' warning when In is void | 314 | assert(this->mPrevFuture->isFinished()); |
280 | 315 | } | |
281 | auto out = new Async::Future<Out>(); | 316 | this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., |
282 | this->mFunc(in ? in->value() : In() ..., *out); | 317 | *static_cast<Async::Future<Out>*>(this->mResult)); |
283 | out->waitForFinished(); | ||
284 | this->mResult = out; | ||
285 | } | 318 | } |
286 | 319 | ||
287 | template<typename PrevOut, typename Out, typename In> | 320 | template<typename PrevOut, typename Out, typename In> |
@@ -292,20 +325,33 @@ EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ExecutorBas | |||
292 | } | 325 | } |
293 | 326 | ||
294 | template<typename PrevOut, typename Out, typename In> | 327 | template<typename PrevOut, typename Out, typename In> |
295 | void EachExecutor<PrevOut, Out, In>::exec() | 328 | void EachExecutor<PrevOut, Out, In>::previousFutureReady() |
296 | { | 329 | { |
297 | auto in = this->chainup(); | 330 | assert(this->mPrevFuture->isFinished()); |
331 | auto out = static_cast<Async::Future<Out>*>(this->mResult); | ||
332 | if (this->mPrevFuture->value().isEmpty()) { | ||
333 | out->setFinished(); | ||
334 | return; | ||
335 | } | ||
298 | 336 | ||
299 | auto *out = new Async::Future<Out>(); | 337 | for (auto arg : this->mPrevFuture->value()) { |
300 | for (auto arg : in->value()) { | ||
301 | Async::Future<Out> future; | 338 | Async::Future<Out> future; |
302 | this->mFunc(arg, future); | 339 | this->mFunc(arg, future); |
303 | future.waitForFinished(); | 340 | auto fw = new Async::FutureWatcher<Out>(); |
304 | out->setValue(out->value() + future.value()); | 341 | mFutureWatchers.append(fw); |
342 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | ||
343 | [out, future, fw, this]() { | ||
344 | assert(future.isFinished()); | ||
345 | const int index = mFutureWatchers.indexOf(fw); | ||
346 | assert(index > -1); | ||
347 | mFutureWatchers.removeAt(index); | ||
348 | out->setValue(out->value() + future.value()); | ||
349 | if (mFutureWatchers.isEmpty()) { | ||
350 | out->setFinished(); | ||
351 | } | ||
352 | }); | ||
353 | fw->setFuture(future); | ||
305 | } | 354 | } |
306 | out->setFinished(); | ||
307 | |||
308 | this->mResult = out; | ||
309 | } | 355 | } |
310 | 356 | ||
311 | template<typename Out, typename In> | 357 | template<typename Out, typename In> |
@@ -316,14 +362,10 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase | |||
316 | } | 362 | } |
317 | 363 | ||
318 | template<typename Out, typename In> | 364 | template<typename Out, typename In> |
319 | void ReduceExecutor<Out, In>::exec() | 365 | void ReduceExecutor<Out, In>::previousFutureReady() |
320 | { | 366 | { |
321 | auto in = this->chainup(); | 367 | assert(this->mPrevFuture->isFinished()); |
322 | 368 | this->mFunc(this->mPrevFuture->value(), *static_cast<Async::Future<Out>*>(this->mResult)); | |
323 | auto out = new Async::Future<Out>(); | ||
324 | this->mFunc(in->value(), *out); | ||
325 | out->waitForFinished(); | ||
326 | this->mResult = out; | ||
327 | } | 369 | } |
328 | 370 | ||
329 | } // namespace Private | 371 | } // namespace Private |