summaryrefslogtreecommitdiffstats
path: root/async/src/async.h
diff options
context:
space:
mode:
Diffstat (limited to 'async/src/async.h')
-rw-r--r--async/src/async.h872
1 files changed, 0 insertions, 872 deletions
diff --git a/async/src/async.h b/async/src/async.h
deleted file mode 100644
index b1f1121..0000000
--- a/async/src/async.h
+++ /dev/null
@@ -1,872 +0,0 @@
1/*
2 * Copyright 2014 Daniel Vrátil <dvratil@redhat.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public License as
6 * published by the Free Software Foundation; either version 2 of
7 * the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public License
15 * along with this library. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef ASYNC_H
19#define ASYNC_H
20
21#include <functional>
22#include <list>
23#include <type_traits>
24#include <cassert>
25#include <iterator>
26
27#include "future.h"
28#include "debug.h"
29#include "async_impl.h"
30
31#include <QVector>
32#include <QObject>
33#include <QSharedPointer>
34
35#include <QDebug>
36
37#ifdef WITH_KJOB
38#include <KJob>
39#endif
40
41
42/*
43 * API to help write async code.
44 *
45 * This API is based around jobs that take lambdas to execute asynchronous tasks. Each async operation can take a continuation,
46 * that can then be used to execute further async operations. That way it is possible to build async chains of operations,
47 * that can be stored and executed later on. Jobs can be composed, similarly to functions.
48 *
49 * Relations between the components:
50 * * Job: API wrapper around Executors chain. Can be destroyed while still running,
51 * because the actual execution happens in the background
52 * * Executor: Describes task to execute. Executors form a linked list matching the
53 * order in which they will be executed. The Executor chain is destroyed when
54 * the parent Job is destroyed. However if the Job is still running it is
55 * guaranteed that the Executor chain will not be destroyed until the execution
56 * is finished.
57 * * Execution: The running execution of the task stored in Executor. Each call to Job::exec()
58 * instantiates new Execution chain, which makes it possible for the Job to be
59 * executed multiple times (even in parallel).
60 * * Future: Representation of the result that is being calculated
61 *
62 *
63 * TODO: Composed progress reporting
64 * TODO: Possibility to abort a job through future (perhaps optional?)
65 * TODO: Support for timeout, specified during exec call, after which the error handler gets called with a defined errorCode.
66 */
67namespace Async {
68
69template<typename PrevOut, typename Out, typename ... In>
70class Executor;
71
72class JobBase;
73
74template<typename Out, typename ... In>
75class Job;
76
77template<typename Out, typename ... In>
78using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type;
79template<typename Out, typename ... In>
80using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type;
81template<typename Out, typename In>
82using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
83template<typename Out, typename In>
84using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type;
85template<typename Out, typename In>
86using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
87template<typename Out, typename In>
88using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type;
89
90using ErrorHandler = std::function<void(int, const QString &)>;
91using Condition = std::function<bool()>;
92
93namespace Private
94{
95
96class ExecutorBase;
97typedef QSharedPointer<ExecutorBase> ExecutorBasePtr;
98
99struct Execution {
100 Execution(const ExecutorBasePtr &executor);
101 ~Execution();
102 void setFinished();
103
104 template<typename T>
105 Async::Future<T>* result() const
106 {
107 return static_cast<Async::Future<T>*>(resultBase);
108 }
109
110 void releaseFuture();
111 bool errorWasHandled() const;
112
113 ExecutorBasePtr executor;
114 FutureBase *resultBase;
115 bool isRunning;
116 bool isFinished;
117
118 ExecutionPtr prevExecution;
119
120#ifndef QT_NO_DEBUG
121 Tracer *tracer;
122#endif
123};
124
125
126typedef QSharedPointer<Execution> ExecutionPtr;
127
128class ExecutorBase
129{
130 template<typename PrevOut, typename Out, typename ... In>
131 friend class Executor;
132
133 template<typename Out, typename ... In>
134 friend class Async::Job;
135
136 friend class Execution;
137 friend class Async::Tracer;
138
139public:
140 virtual ~ExecutorBase();
141 virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0;
142
143protected:
144 ExecutorBase(const ExecutorBasePtr &parent);
145
146 template<typename T>
147 Async::Future<T>* createFuture(const ExecutionPtr &execution) const;
148
149 virtual bool hasErrorFunc() const = 0;
150 virtual bool handleError(const ExecutionPtr &execution) = 0;
151
152 ExecutorBasePtr mPrev;
153
154#ifndef QT_NO_DEBUG
155 QString mExecutorName;
156#endif
157};
158
159template<typename PrevOut, typename Out, typename ... In>
160class Executor : public ExecutorBase
161{
162protected:
163 Executor(ErrorHandler errorFunc, const Private::ExecutorBasePtr &parent)
164 : ExecutorBase(parent)
165 , mErrorFunc(errorFunc)
166 {}
167
168 virtual ~Executor() {}
169 virtual void run(const ExecutionPtr &execution) = 0;
170
171 ExecutionPtr exec(const ExecutorBasePtr &self);
172 bool hasErrorFunc() const { return (bool) mErrorFunc; }
173 bool handleError(const ExecutionPtr &execution);
174
175 std::function<void(int, const QString &)> mErrorFunc;
176};
177
178template<typename Out, typename ... In>
179class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, In ...>
180{
181public:
182 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
183 void run(const ExecutionPtr &execution);
184private:
185 ThenTask<Out, In ...> mFunc;
186};
187
188template<typename PrevOut, typename Out, typename In>
189class EachExecutor : public Executor<PrevOut, Out, In>
190{
191public:
192 EachExecutor(EachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
193 void run(const ExecutionPtr &execution);
194private:
195 EachTask<Out, In> mFunc;
196 QVector<Async::FutureWatcher<Out>*> mFutureWatchers;
197};
198
199template<typename Out, typename In>
200class ReduceExecutor : public ThenExecutor<Out, In>
201{
202public:
203 ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
204private:
205 ReduceTask<Out, In> mFunc;
206};
207
208template<typename Out, typename ... In>
209class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, Out, In ...>
210{
211public:
212 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
213 void run(const ExecutionPtr &execution);
214
215private:
216 void run(const ExecutionPtr &execution, std::false_type); // !std::is_void<Out>
217 void run(const ExecutionPtr &execution, std::true_type); // std::is_void<Out>
218 SyncThenTask<Out, In ...> mFunc;
219};
220
221template<typename Out, typename In>
222class SyncReduceExecutor : public SyncThenExecutor<Out, In>
223{
224public:
225 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
226private:
227 SyncReduceTask<Out, In> mFunc;
228};
229
230template<typename PrevOut, typename Out, typename In>
231class SyncEachExecutor : public Executor<PrevOut, Out, In>
232{
233public:
234 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
235 void run(const ExecutionPtr &execution);
236private:
237 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out>
238 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void<Out>
239 SyncEachTask<Out, In> mFunc;
240};
241
242} // namespace Private
243
244/**
245 * Start an asynchronous job sequence.
246 *
247 * Async::start() is your starting point to build a chain of jobs to be executed
248 * asynchronously.
249 *
250 * @param func An asynchronous function to be executed. The function must have
251 * void return type, and accept exactly one argument of type @p Async::Future<In>,
252 * where @p In is type of the result.
253 */
254template<typename Out, typename ... In>
255Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler());
256
257template<typename Out, typename ... In>
258Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler());
259
260#ifdef WITH_KJOB
261template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
262Job<ReturnType, Args ...> start();
263#endif
264
265/**
266 * Async while loop.
267 *
268 * The loop continues while @param condition returns true.
269 */
270Job<void> dowhile(Condition condition, ThenTask<void> func);
271
272/**
273 * Async while loop.
274 *
275 * Loop continues while body returns true.
276 */
277Job<void> dowhile(ThenTask<bool> body);
278
279/**
280 * Iterate over a container.
281 *
282 * Use in conjunction with .each
283 */
284template<typename Out>
285Job<Out> iterate(const Out &container);
286
287/**
288 * Async delay.
289 */
290Job<void> wait(int delay);
291
292/**
293 * A null job.
294 *
295 * An async noop.
296 *
297 */
298template<typename Out>
299Job<Out> null();
300
301/**
302 * An error job.
303 *
304 * An async error.
305 *
306 */
307template<typename Out>
308Job<Out> error(int errorCode = 1, const QString &errorMessage = QString());
309
310class JobBase
311{
312 template<typename Out, typename ... In>
313 friend class Job;
314
315public:
316 JobBase(const Private::ExecutorBasePtr &executor);
317 ~JobBase();
318
319protected:
320 Private::ExecutorBasePtr mExecutor;
321};
322
323/**
324 * An Asynchronous job
325 *
326 * A single instance of Job represents a single method that will be executed
327 * asynchrously. The Job is started by @p Job::exec(), which returns @p Async::Future
328 * immediatelly. The Future will be set to finished state once the asynchronous
329 * task has finished. You can use @p Async::Future::waitForFinished() to wait for
330 * for the Future in blocking manner.
331 *
332 * It is possible to chain multiple Jobs one after another in different fashion
333 * (sequential, parallel, etc.). Calling Job::exec() will then return a pending
334 * @p Async::Future, and will execute the entire chain of jobs.
335 *
336 * @code
337 * auto job = Job::start<QList<int>>(
338 * [](Async::Future<QList<int>> &future) {
339 * MyREST::PendingUsers *pu = MyREST::requestListOfUsers();
340 * QObject::connect(pu, &PendingOperation::finished,
341 * [&](PendingOperation *pu) {
342 * future->setValue(dynamic_cast<MyREST::PendingUsers*>(pu)->userIds());
343 * future->setFinished();
344 * });
345 * })
346 * .each<QList<MyREST::User>, int>(
347 * [](const int &userId, Async::Future<QList<MyREST::User>> &future) {
348 * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId);
349 * QObject::connect(pu, &PendingOperation::finished,
350 * [&](PendingOperation *pu) {
351 * future->setValue(Qlist<MyREST::User>() << dynamic_cast<MyREST::PendingUser*>(pu)->user());
352 * future->setFinished();
353 * });
354 * });
355 *
356 * Async::Future<QList<MyREST::User>> usersFuture = job.exec();
357 * usersFuture.waitForFinished();
358 * QList<MyRest::User> users = usersFuture.value();
359 * @endcode
360 *
361 * In the example above, calling @p job.exec() will first invoke the first job,
362 * which will retrieve a list of IDs, and then will invoke the second function
363 * for each single entry in the list returned by the first function.
364 */
365template<typename Out, typename ... In>
366class Job : public JobBase
367{
368 template<typename OutOther, typename ... InOther>
369 friend class Job;
370
371 template<typename OutOther, typename ... InOther>
372 friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc);
373
374 template<typename OutOther, typename ... InOther>
375 friend Job<OutOther, InOther ...> start(Async::SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc);
376
377#ifdef WITH_KJOB
378 template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
379 friend Job<ReturnType, Args ...> start();
380#endif
381
382public:
383 template<typename OutOther, typename ... InOther>
384 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
385 {
386 return Job<OutOther, InOther ...>(Private::ExecutorBasePtr(
387 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
388 }
389
390 template<typename OutOther, typename ... InOther>
391 Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
392 {
393 return Job<OutOther, InOther ...>(Private::ExecutorBasePtr(
394 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
395 }
396
397 template<typename OutOther, typename ... InOther>
398 Job<OutOther, InOther ...> then(Job<OutOther, InOther ...> otherJob, ErrorHandler errorFunc = ErrorHandler())
399 {
400 return then<OutOther, InOther ...>(nestedJobWrapper<OutOther, InOther ...>(otherJob), errorFunc);
401 }
402
403#ifdef WITH_KJOB
404 template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
405 Job<ReturnType, Args ...> then()
406 {
407 return start<ReturnType, KJobType, KJobResultMethod, Args ...>();
408 }
409#endif
410
411 template<typename OutOther, typename InOther>
412 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
413 {
414 eachInvariants<OutOther>();
415 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
416 new Private::EachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor)));
417 }
418
419 template<typename OutOther, typename InOther>
420 Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
421 {
422 eachInvariants<OutOther>();
423 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
424 new Private::SyncEachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor)));
425 }
426
427 template<typename OutOther, typename InOther>
428 Job<OutOther, InOther> each(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler())
429 {
430 eachInvariants<OutOther>();
431 return each<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc);
432 }
433
434 template<typename OutOther, typename InOther>
435 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
436 {
437 reduceInvariants<InOther>();
438 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
439 new Private::ReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor)));
440 }
441
442 template<typename OutOther, typename InOther>
443 Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
444 {
445 reduceInvariants<InOther>();
446 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
447 new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor)));
448 }
449
450 template<typename OutOther, typename InOther>
451 Job<OutOther, InOther> reduce(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler())
452 {
453 return reduce<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc);
454 }
455
456 template<typename FirstIn>
457 Async::Future<Out> exec(FirstIn in)
458 {
459 // Inject a fake sync executor that will return the initial value
460 Private::ExecutorBasePtr first = mExecutor;
461 while (first->mPrev) {
462 first = first->mPrev;
463 }
464 auto init = new Private::SyncThenExecutor<FirstIn>(
465 [in]() -> FirstIn {
466 return in;
467 },
468 ErrorHandler(), Private::ExecutorBasePtr());
469 first->mPrev = Private::ExecutorBasePtr(init);
470
471 auto result = exec();
472 // Remove the injected executor
473 first->mPrev.reset();
474 return result;
475 }
476
477 Async::Future<Out> exec()
478 {
479 Private::ExecutionPtr execution = mExecutor->exec(mExecutor);
480 Async::Future<Out> result = *execution->result<Out>();
481
482 return result;
483 }
484
485private:
486 Job(Private::ExecutorBasePtr executor)
487 : JobBase(executor)
488 {}
489
490 template<typename OutOther>
491 void eachInvariants()
492 {
493 static_assert(detail::isIterable<Out>::value,
494 "The 'Each' task can only be connected to a job that returns a list or an array.");
495 static_assert(std::is_void<OutOther>::value || detail::isIterable<OutOther>::value,
496 "The result type of 'Each' task must be void, a list or an array.");
497 }
498
499 template<typename InOther>
500 void reduceInvariants()
501 {
502 static_assert(Async::detail::isIterable<Out>::value,
503 "The 'Result' task can only be connected to a job that returns a list or an array");
504 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
505 "The return type of previous task must be compatible with input type of this task");
506 }
507
508 template<typename OutOther, typename ... InOther>
509 inline std::function<void(InOther ..., Async::Future<OutOther>&)> nestedJobWrapper(Job<OutOther, InOther ...> otherJob) {
510 return [otherJob](InOther ... in, Async::Future<OutOther> &future) {
511 // copy by value is const
512 auto job = otherJob;
513 FutureWatcher<OutOther> *watcher = new FutureWatcher<OutOther>();
514 QObject::connect(watcher, &FutureWatcherBase::futureReady,
515 [watcher, future]() {
516 // FIXME: We pass future by value, because using reference causes the
517 // future to get deleted before this lambda is invoked, leading to crash
518 // in copyFutureValue()
519 // copy by value is const
520 auto outFuture = future;
521 Async::detail::copyFutureValue(watcher->future(), outFuture);
522 if (watcher->future().errorCode()) {
523 outFuture.setError(watcher->future().errorCode(), watcher->future().errorMessage());
524 } else {
525 outFuture.setFinished();
526 }
527 delete watcher;
528 });
529 watcher->setFuture(job.exec(in ...));
530 };
531 }
532};
533
534} // namespace Async
535
536
537// ********** Out of line definitions ****************
538
539namespace Async {
540
541template<typename Out, typename ... In>
542Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler error)
543{
544 return Job<Out, In...>(Private::ExecutorBasePtr(
545 new Private::ThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr())));
546}
547
548template<typename Out, typename ... In>
549Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler error)
550{
551 return Job<Out, In...>(Private::ExecutorBasePtr(
552 new Private::SyncThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr())));
553}
554
555#ifdef WITH_KJOB
556template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
557Job<ReturnType, Args ...> start()
558{
559 return Job<ReturnType, Args ...>(Private::ExecutorBasePtr(
560 new Private::ThenExecutor<ReturnType, Args ...>([](const Args & ... args, Async::Future<ReturnType> &future)
561 {
562 KJobType *job = new KJobType(args ...);
563 job->connect(job, &KJob::finished,
564 [&future](KJob *job) {
565 if (job->error()) {
566 future.setError(job->error(), job->errorString());
567 } else {
568 future.setValue((static_cast<KJobType*>(job)->*KJobResultMethod)());
569 future.setFinished();
570 }
571 });
572 job->start();
573 }, ErrorHandler(), Private::ExecutorBasePtr())));
574}
575#endif
576
577
578template<typename Out>
579Job<Out> null()
580{
581 return Async::start<Out>(
582 [](Async::Future<Out> &future) {
583 future.setFinished();
584 });
585}
586
587template<typename Out>
588Job<Out> error(int errorCode, const QString &errorMessage)
589{
590 return Async::start<Out>(
591 [errorCode, errorMessage](Async::Future<Out> &future) {
592 future.setError(errorCode, errorMessage);
593 });
594}
595
596template<typename Out>
597Job<Out> iterate(const Out &container)
598{
599 return Async::start<Out>(
600 [container]() {
601 return container;
602 });
603}
604
605
606namespace Private {
607
608template<typename T>
609Async::Future<T>* ExecutorBase::createFuture(const ExecutionPtr &execution) const
610{
611 return new Async::Future<T>(execution);
612}
613
614template<typename PrevOut, typename Out, typename ... In>
615ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self)
616{
617 // Passing 'self' to execution ensures that the Executor chain remains
618 // valid until the entire execution is finished
619 ExecutionPtr execution = ExecutionPtr::create(self);
620#ifndef QT_NO_DEBUG
621 execution->tracer = new Tracer(execution.data()); // owned by execution
622#endif
623
624 // chainup
625 execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr();
626
627 execution->resultBase = ExecutorBase::createFuture<Out>(execution);
628 auto fw = new Async::FutureWatcher<Out>();
629 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
630 [fw, execution, this]() {
631 handleError(execution);
632 execution->setFinished();
633 delete fw;
634 });
635 fw->setFuture(*execution->result<Out>());
636
637 Async::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr;
638 if (!prevFuture || prevFuture->isFinished()) {
639 if (prevFuture) { // prevFuture implies execution->prevExecution
640 if (prevFuture->errorCode()) {
641 // Propagate the errorCode and message to the outer Future
642 execution->resultBase->setError(prevFuture->errorCode(), prevFuture->errorMessage());
643 if (!execution->errorWasHandled()) {
644 if (handleError(execution)) {
645 return execution;
646 }
647 } else {
648 return execution;
649 }
650 } else {
651 // Propagate error (if any)
652 }
653 }
654
655 execution->isRunning = true;
656 run(execution);
657 } else {
658 auto prevFutureWatcher = new Async::FutureWatcher<PrevOut>();
659 QObject::connect(prevFutureWatcher, &Async::FutureWatcher<PrevOut>::futureReady,
660 [prevFutureWatcher, execution, this]() {
661 auto prevFuture = prevFutureWatcher->future();
662 assert(prevFuture.isFinished());
663 delete prevFutureWatcher;
664 auto prevExecutor = execution->executor->mPrev;
665 if (prevFuture.errorCode()) {
666 execution->resultBase->setError(prevFuture.errorCode(), prevFuture.errorMessage());
667 if (!execution->errorWasHandled()) {
668 if (handleError(execution)) {
669 return;
670 }
671 } else {
672 return;
673 }
674 }
675
676
677 // propagate error (if any)
678 execution->isRunning = true;
679 run(execution);
680 });
681
682 prevFutureWatcher->setFuture(*static_cast<Async::Future<PrevOut>*>(prevFuture));
683 }
684
685 return execution;
686}
687
688template<typename PrevOut, typename Out, typename ... In>
689bool Executor<PrevOut, Out, In ...>::handleError(const ExecutionPtr &execution)
690{
691 assert(execution->resultBase->isFinished());
692 if (execution->resultBase->errorCode()) {
693 if (mErrorFunc) {
694 mErrorFunc(execution->resultBase->errorCode(),
695 execution->resultBase->errorMessage());
696 return true;
697 }
698 }
699
700 return false;
701}
702
703
704template<typename Out, typename ... In>
705ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent)
706 : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(error, parent)
707 , mFunc(then)
708{
709 STORE_EXECUTOR_NAME("ThenExecutor", Out, In ...);
710}
711
712template<typename Out, typename ... In>
713void ThenExecutor<Out, In ...>::run(const ExecutionPtr &execution)
714{
715 Async::Future<typename detail::prevOut<In ...>::type> *prevFuture = nullptr;
716 if (execution->prevExecution) {
717 prevFuture = execution->prevExecution->result<typename detail::prevOut<In ...>::type>();
718 assert(prevFuture->isFinished());
719 }
720
721 ThenExecutor<Out, In ...>::mFunc(prevFuture ? prevFuture->value() : In() ..., *execution->result<Out>());
722}
723
724template<typename PrevOut, typename Out, typename In>
725EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ErrorHandler error, const ExecutorBasePtr &parent)
726 : Executor<PrevOut, Out, In>(error, parent)
727 , mFunc(each)
728{
729 STORE_EXECUTOR_NAME("EachExecutor", PrevOut, Out, In);
730}
731
732template<typename PrevOut, typename Out, typename In>
733void EachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution)
734{
735 assert(execution->prevExecution);
736 auto prevFuture = execution->prevExecution->result<PrevOut>();
737 assert(prevFuture->isFinished());
738
739 auto out = execution->result<Out>();
740 if (prevFuture->value().isEmpty()) {
741 out->setFinished();
742 return;
743 }
744
745 for (auto arg : prevFuture->value()) {
746 //We have to manually manage the lifetime of these temporary futures
747 Async::Future<Out> *future = new Async::Future<Out>();
748 EachExecutor<PrevOut, Out, In>::mFunc(arg, *future);
749 auto fw = new Async::FutureWatcher<Out>();
750 mFutureWatchers.append(fw);
751 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
752 [out, fw, this, future]() {
753 assert(fw->future().isFinished());
754 const int index = mFutureWatchers.indexOf(fw);
755 assert(index > -1);
756 mFutureWatchers.removeAt(index);
757 Async::detail::aggregateFutureValue<Out>(fw->future(), *out);
758 if (mFutureWatchers.isEmpty()) {
759 out->setFinished();
760 }
761 delete fw;
762 delete future;
763 });
764 fw->setFuture(*future);
765 }
766}
767
768template<typename Out, typename In>
769ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
770 : ThenExecutor<Out, In>(reduce, errorFunc, parent)
771{
772 STORE_EXECUTOR_NAME("ReduceExecutor", Out, In);
773}
774
775template<typename Out, typename ... In>
776SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
777 : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(errorFunc, parent)
778 , mFunc(then)
779{
780 STORE_EXECUTOR_NAME("SyncThenExecutor", Out, In ...);
781}
782
783template<typename Out, typename ... In>
784void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution)
785{
786 if (execution->prevExecution) {
787 assert(execution->prevExecution->resultBase->isFinished());
788 }
789
790 run(execution, std::is_void<Out>());
791 execution->resultBase->setFinished();
792}
793
794template<typename Out, typename ... In>
795void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::false_type)
796{
797 Async::Future<typename detail::prevOut<In ...>::type> *prevFuture =
798 execution->prevExecution
799 ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>()
800 : nullptr;
801 (void) prevFuture; // silence 'set but not used' warning
802 Async::Future<Out> *future = execution->result<Out>();
803 future->setValue(SyncThenExecutor<Out, In...>::mFunc(prevFuture ? prevFuture->value() : In() ...));
804}
805
806template<typename Out, typename ... In>
807void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::true_type)
808{
809 Async::Future<typename detail::prevOut<In ...>::type> *prevFuture =
810 execution->prevExecution
811 ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>()
812 : nullptr;
813 (void) prevFuture; // silence 'set but not used' warning
814 SyncThenExecutor<Out, In ...>::mFunc(prevFuture ? prevFuture->value() : In() ...);
815}
816
817template<typename PrevOut, typename Out, typename In>
818SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
819 : Executor<PrevOut, Out, In>(errorFunc, parent)
820 , mFunc(each)
821{
822 STORE_EXECUTOR_NAME("SyncEachExecutor", PrevOut, Out, In);
823}
824
825template<typename PrevOut, typename Out, typename In>
826void SyncEachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution)
827{
828 assert(execution->prevExecution);
829 auto *prevFuture = execution->prevExecution->result<PrevOut>();
830 assert(prevFuture->isFinished());
831
832 auto out = execution->result<Out>();
833 if (prevFuture->value().isEmpty()) {
834 out->setFinished();
835 return;
836 }
837
838 for (auto arg : prevFuture->value()) {
839 run(out, arg, std::is_void<Out>());
840 }
841 out->setFinished();
842}
843
844template<typename PrevOut, typename Out, typename In>
845void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> *out, const typename PrevOut::value_type &arg, std::false_type)
846{
847 out->setValue(out->value() + SyncEachExecutor<PrevOut, Out, In>::mFunc(arg));
848}
849
850template<typename PrevOut, typename Out, typename In>
851void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> * /* unused */, const typename PrevOut::value_type &arg, std::true_type)
852{
853 SyncEachExecutor<PrevOut, Out, In>::mFunc(arg);
854}
855
856template<typename Out, typename In>
857SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
858 : SyncThenExecutor<Out, In>(reduce, errorFunc, parent)
859{
860 STORE_EXECUTOR_NAME("SyncReduceExecutor", Out, In);
861}
862
863
864} // namespace Private
865
866} // namespace Async
867
868
869
870#endif // ASYNC_H
871
872