summaryrefslogtreecommitdiffstats
path: root/async/src/async.h
diff options
context:
space:
mode:
authorDan Vrátil <dvratil@redhat.com>2015-05-18 15:20:35 +0200
committerDan Vrátil <dvratil@redhat.com>2015-05-18 15:20:35 +0200
commit5e580299e342bd77fc7479bbfd235f4446d7f05b (patch)
tree648aacd4de1f239d72be89ab9f2d4a97867d7920 /async/src/async.h
parentb43c0cf97615957e097daef29ff8febc1ec884c8 (diff)
downloadsink-5e580299e342bd77fc7479bbfd235f4446d7f05b.tar.gz
sink-5e580299e342bd77fc7479bbfd235f4446d7f05b.zip
KAsync has moved to it's own kasync.git repository
Diffstat (limited to 'async/src/async.h')
-rw-r--r--async/src/async.h874
1 files changed, 0 insertions, 874 deletions
diff --git a/async/src/async.h b/async/src/async.h
deleted file mode 100644
index 152f98e..0000000
--- a/async/src/async.h
+++ /dev/null
@@ -1,874 +0,0 @@
1/*
2 * Copyright 2014 - 2015 Daniel Vrátil <dvratil@redhat.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public License as
6 * published by the Free Software Foundation; either version 2 of
7 * the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public License
15 * along with this library. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef KASYNC_H
19#define KASYNC_H
20
21#include "kasync_export.h"
22
23#include <functional>
24#include <list>
25#include <type_traits>
26#include <cassert>
27#include <iterator>
28
29#include "future.h"
30#include "debug.h"
31#include "async_impl.h"
32
33#include <QVector>
34#include <QObject>
35#include <QSharedPointer>
36
37#include <QDebug>
38
39#ifdef WITH_KJOB
40#include <KJob>
41#endif
42
43
44/*
45 * API to help write async code.
46 *
47 * This API is based around jobs that take lambdas to execute asynchronous tasks. Each async operation can take a continuation,
48 * that can then be used to execute further async operations. That way it is possible to build async chains of operations,
49 * that can be stored and executed later on. Jobs can be composed, similarly to functions.
50 *
51 * Relations between the components:
52 * * Job: API wrapper around Executors chain. Can be destroyed while still running,
53 * because the actual execution happens in the background
54 * * Executor: Describes task to execute. Executors form a linked list matching the
55 * order in which they will be executed. The Executor chain is destroyed when
56 * the parent Job is destroyed. However if the Job is still running it is
57 * guaranteed that the Executor chain will not be destroyed until the execution
58 * is finished.
59 * * Execution: The running execution of the task stored in Executor. Each call to Job::exec()
60 * instantiates new Execution chain, which makes it possible for the Job to be
61 * executed multiple times (even in parallel).
62 * * Future: Representation of the result that is being calculated
63 *
64 *
65 * TODO: Composed progress reporting
66 * TODO: Possibility to abort a job through future (perhaps optional?)
67 * TODO: Support for timeout, specified during exec call, after which the error handler gets called with a defined errorCode.
68 */
69namespace KAsync {
70
71template<typename PrevOut, typename Out, typename ... In>
72class Executor;
73
74class JobBase;
75
76template<typename Out, typename ... In>
77class Job;
78
79template<typename Out, typename ... In>
80using ThenTask = typename detail::identity<std::function<void(In ..., KAsync::Future<Out>&)>>::type;
81template<typename Out, typename ... In>
82using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type;
83template<typename Out, typename In>
84using EachTask = typename detail::identity<std::function<void(In, KAsync::Future<Out>&)>>::type;
85template<typename Out, typename In>
86using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type;
87template<typename Out, typename In>
88using ReduceTask = typename detail::identity<std::function<void(In, KAsync::Future<Out>&)>>::type;
89template<typename Out, typename In>
90using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type;
91
92using ErrorHandler = std::function<void(int, const QString &)>;
93using Condition = std::function<bool()>;
94
95namespace Private
96{
97
98class ExecutorBase;
99typedef QSharedPointer<ExecutorBase> ExecutorBasePtr;
100
101struct KASYNC_EXPORT Execution {
102 Execution(const ExecutorBasePtr &executor);
103 ~Execution();
104 void setFinished();
105
106 template<typename T>
107 KAsync::Future<T>* result() const
108 {
109 return static_cast<KAsync::Future<T>*>(resultBase);
110 }
111
112 void releaseFuture();
113 bool errorWasHandled() const;
114
115 ExecutorBasePtr executor;
116 FutureBase *resultBase;
117 bool isRunning;
118 bool isFinished;
119
120 ExecutionPtr prevExecution;
121
122#ifndef QT_NO_DEBUG
123 Tracer *tracer;
124#endif
125};
126
127
128typedef QSharedPointer<Execution> ExecutionPtr;
129
130class KASYNC_EXPORT ExecutorBase
131{
132 template<typename PrevOut, typename Out, typename ... In>
133 friend class Executor;
134
135 template<typename Out, typename ... In>
136 friend class KAsync::Job;
137
138 friend class Execution;
139 friend class KAsync::Tracer;
140
141public:
142 virtual ~ExecutorBase();
143 virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0;
144
145protected:
146 ExecutorBase(const ExecutorBasePtr &parent);
147
148 template<typename T>
149 KAsync::Future<T>* createFuture(const ExecutionPtr &execution) const;
150
151 virtual bool hasErrorFunc() const = 0;
152 virtual bool handleError(const ExecutionPtr &execution) = 0;
153
154 ExecutorBasePtr mPrev;
155
156#ifndef QT_NO_DEBUG
157 QString mExecutorName;
158#endif
159};
160
161template<typename PrevOut, typename Out, typename ... In>
162class Executor : public ExecutorBase
163{
164protected:
165 Executor(ErrorHandler errorFunc, const Private::ExecutorBasePtr &parent)
166 : ExecutorBase(parent)
167 , mErrorFunc(errorFunc)
168 {}
169
170 virtual ~Executor() {}
171 virtual void run(const ExecutionPtr &execution) = 0;
172
173 ExecutionPtr exec(const ExecutorBasePtr &self);
174 bool hasErrorFunc() const { return (bool) mErrorFunc; }
175 bool handleError(const ExecutionPtr &execution);
176
177 std::function<void(int, const QString &)> mErrorFunc;
178};
179
180template<typename Out, typename ... In>
181class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, In ...>
182{
183public:
184 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
185 void run(const ExecutionPtr &execution);
186private:
187 ThenTask<Out, In ...> mFunc;
188};
189
190template<typename PrevOut, typename Out, typename In>
191class EachExecutor : public Executor<PrevOut, Out, In>
192{
193public:
194 EachExecutor(EachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
195 void run(const ExecutionPtr &execution);
196private:
197 EachTask<Out, In> mFunc;
198 QVector<KAsync::FutureWatcher<Out>*> mFutureWatchers;
199};
200
201template<typename Out, typename In>
202class ReduceExecutor : public ThenExecutor<Out, In>
203{
204public:
205 ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
206private:
207 ReduceTask<Out, In> mFunc;
208};
209
210template<typename Out, typename ... In>
211class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, Out, In ...>
212{
213public:
214 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
215 void run(const ExecutionPtr &execution);
216
217private:
218 void run(const ExecutionPtr &execution, std::false_type); // !std::is_void<Out>
219 void run(const ExecutionPtr &execution, std::true_type); // std::is_void<Out>
220 SyncThenTask<Out, In ...> mFunc;
221};
222
223template<typename Out, typename In>
224class SyncReduceExecutor : public SyncThenExecutor<Out, In>
225{
226public:
227 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
228private:
229 SyncReduceTask<Out, In> mFunc;
230};
231
232template<typename PrevOut, typename Out, typename In>
233class SyncEachExecutor : public Executor<PrevOut, Out, In>
234{
235public:
236 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
237 void run(const ExecutionPtr &execution);
238private:
239 void run(KAsync::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out>
240 void run(KAsync::Future<Out> *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void<Out>
241 SyncEachTask<Out, In> mFunc;
242};
243
244} // namespace Private
245
246/**
247 * Start an asynchronous job sequence.
248 *
249 * KAsync::start() is your starting point to build a chain of jobs to be executed
250 * asynchronously.
251 *
252 * @param func An asynchronous function to be executed. The function must have
253 * void return type, and accept exactly one argument of type @p KAsync::Future<In>,
254 * where @p In is type of the result.
255 */
256template<typename Out, typename ... In>
257Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler());
258
259template<typename Out, typename ... In>
260Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler());
261
262#ifdef WITH_KJOB
263template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
264Job<ReturnType, Args ...> start();
265#endif
266
267/**
268 * Async while loop.
269 *
270 * The loop continues while @param condition returns true.
271 */
272KASYNC_EXPORT Job<void> dowhile(Condition condition, ThenTask<void> func);
273
274/**
275 * Async while loop.
276 *
277 * Loop continues while body returns true.
278 */
279KASYNC_EXPORT Job<void> dowhile(ThenTask<bool> body);
280
281/**
282 * Iterate over a container.
283 *
284 * Use in conjunction with .each
285 */
286template<typename Out>
287Job<Out> iterate(const Out &container);
288
289/**
290 * Async delay.
291 */
292KASYNC_EXPORT Job<void> wait(int delay);
293
294/**
295 * A null job.
296 *
297 * An async noop.
298 *
299 */
300template<typename Out>
301Job<Out> null();
302
303/**
304 * An error job.
305 *
306 * An async error.
307 *
308 */
309template<typename Out>
310Job<Out> error(int errorCode = 1, const QString &errorMessage = QString());
311
312class KASYNC_EXPORT JobBase
313{
314 template<typename Out, typename ... In>
315 friend class Job;
316
317public:
318 JobBase(const Private::ExecutorBasePtr &executor);
319 ~JobBase();
320
321protected:
322 Private::ExecutorBasePtr mExecutor;
323};
324
325/**
326 * An Asynchronous job
327 *
328 * A single instance of Job represents a single method that will be executed
329 * asynchrously. The Job is started by @p Job::exec(), which returns @p KAsync::Future
330 * immediatelly. The Future will be set to finished state once the asynchronous
331 * task has finished. You can use @p KAsync::Future::waitForFinished() to wait for
332 * for the Future in blocking manner.
333 *
334 * It is possible to chain multiple Jobs one after another in different fashion
335 * (sequential, parallel, etc.). Calling Job::exec() will then return a pending
336 * @p KAsync::Future, and will execute the entire chain of jobs.
337 *
338 * @code
339 * auto job = Job::start<QList<int>>(
340 * [](KAsync::Future<QList<int>> &future) {
341 * MyREST::PendingUsers *pu = MyREST::requestListOfUsers();
342 * QObject::connect(pu, &PendingOperation::finished,
343 * [&](PendingOperation *pu) {
344 * future->setValue(dynamic_cast<MyREST::PendingUsers*>(pu)->userIds());
345 * future->setFinished();
346 * });
347 * })
348 * .each<QList<MyREST::User>, int>(
349 * [](const int &userId, KAsync::Future<QList<MyREST::User>> &future) {
350 * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId);
351 * QObject::connect(pu, &PendingOperation::finished,
352 * [&](PendingOperation *pu) {
353 * future->setValue(Qlist<MyREST::User>() << dynamic_cast<MyREST::PendingUser*>(pu)->user());
354 * future->setFinished();
355 * });
356 * });
357 *
358 * KAsync::Future<QList<MyREST::User>> usersFuture = job.exec();
359 * usersFuture.waitForFinished();
360 * QList<MyRest::User> users = usersFuture.value();
361 * @endcode
362 *
363 * In the example above, calling @p job.exec() will first invoke the first job,
364 * which will retrieve a list of IDs, and then will invoke the second function
365 * for each single entry in the list returned by the first function.
366 */
367template<typename Out, typename ... In>
368class Job : public JobBase
369{
370 template<typename OutOther, typename ... InOther>
371 friend class Job;
372
373 template<typename OutOther, typename ... InOther>
374 friend Job<OutOther, InOther ...> start(KAsync::ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc);
375
376 template<typename OutOther, typename ... InOther>
377 friend Job<OutOther, InOther ...> start(KAsync::SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc);
378
379#ifdef WITH_KJOB
380 template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
381 friend Job<ReturnType, Args ...> start();
382#endif
383
384public:
385 template<typename OutOther, typename ... InOther>
386 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
387 {
388 return Job<OutOther, InOther ...>(Private::ExecutorBasePtr(
389 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
390 }
391
392 template<typename OutOther, typename ... InOther>
393 Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
394 {
395 return Job<OutOther, InOther ...>(Private::ExecutorBasePtr(
396 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
397 }
398
399 template<typename OutOther, typename ... InOther>
400 Job<OutOther, InOther ...> then(Job<OutOther, InOther ...> otherJob, ErrorHandler errorFunc = ErrorHandler())
401 {
402 return then<OutOther, InOther ...>(nestedJobWrapper<OutOther, InOther ...>(otherJob), errorFunc);
403 }
404
405#ifdef WITH_KJOB
406 template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
407 Job<ReturnType, Args ...> then()
408 {
409 return start<ReturnType, KJobType, KJobResultMethod, Args ...>();
410 }
411#endif
412
413 template<typename OutOther, typename InOther>
414 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
415 {
416 eachInvariants<OutOther>();
417 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
418 new Private::EachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor)));
419 }
420
421 template<typename OutOther, typename InOther>
422 Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
423 {
424 eachInvariants<OutOther>();
425 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
426 new Private::SyncEachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor)));
427 }
428
429 template<typename OutOther, typename InOther>
430 Job<OutOther, InOther> each(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler())
431 {
432 eachInvariants<OutOther>();
433 return each<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc);
434 }
435
436 template<typename OutOther, typename InOther>
437 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
438 {
439 reduceInvariants<InOther>();
440 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
441 new Private::ReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor)));
442 }
443
444 template<typename OutOther, typename InOther>
445 Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
446 {
447 reduceInvariants<InOther>();
448 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
449 new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor)));
450 }
451
452 template<typename OutOther, typename InOther>
453 Job<OutOther, InOther> reduce(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler())
454 {
455 return reduce<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc);
456 }
457
458 template<typename FirstIn>
459 KAsync::Future<Out> exec(FirstIn in)
460 {
461 // Inject a fake sync executor that will return the initial value
462 Private::ExecutorBasePtr first = mExecutor;
463 while (first->mPrev) {
464 first = first->mPrev;
465 }
466 auto init = new Private::SyncThenExecutor<FirstIn>(
467 [in]() -> FirstIn {
468 return in;
469 },
470 ErrorHandler(), Private::ExecutorBasePtr());
471 first->mPrev = Private::ExecutorBasePtr(init);
472
473 auto result = exec();
474 // Remove the injected executor
475 first->mPrev.reset();
476 return result;
477 }
478
479 KAsync::Future<Out> exec()
480 {
481 Private::ExecutionPtr execution = mExecutor->exec(mExecutor);
482 KAsync::Future<Out> result = *execution->result<Out>();
483
484 return result;
485 }
486
487private:
488 Job(Private::ExecutorBasePtr executor)
489 : JobBase(executor)
490 {}
491
492 template<typename OutOther>
493 void eachInvariants()
494 {
495 static_assert(detail::isIterable<Out>::value,
496 "The 'Each' task can only be connected to a job that returns a list or an array.");
497 static_assert(std::is_void<OutOther>::value || detail::isIterable<OutOther>::value,
498 "The result type of 'Each' task must be void, a list or an array.");
499 }
500
501 template<typename InOther>
502 void reduceInvariants()
503 {
504 static_assert(KAsync::detail::isIterable<Out>::value,
505 "The 'Result' task can only be connected to a job that returns a list or an array");
506 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
507 "The return type of previous task must be compatible with input type of this task");
508 }
509
510 template<typename OutOther, typename ... InOther>
511 inline std::function<void(InOther ..., KAsync::Future<OutOther>&)> nestedJobWrapper(Job<OutOther, InOther ...> otherJob) {
512 return [otherJob](InOther ... in, KAsync::Future<OutOther> &future) {
513 // copy by value is const
514 auto job = otherJob;
515 FutureWatcher<OutOther> *watcher = new FutureWatcher<OutOther>();
516 QObject::connect(watcher, &FutureWatcherBase::futureReady,
517 [watcher, future]() {
518 // FIXME: We pass future by value, because using reference causes the
519 // future to get deleted before this lambda is invoked, leading to crash
520 // in copyFutureValue()
521 // copy by value is const
522 auto outFuture = future;
523 KAsync::detail::copyFutureValue(watcher->future(), outFuture);
524 if (watcher->future().errorCode()) {
525 outFuture.setError(watcher->future().errorCode(), watcher->future().errorMessage());
526 } else {
527 outFuture.setFinished();
528 }
529 delete watcher;
530 });
531 watcher->setFuture(job.exec(in ...));
532 };
533 }
534};
535
536} // namespace KAsync
537
538
539// ********** Out of line definitions ****************
540
541namespace KAsync {
542
543template<typename Out, typename ... In>
544Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler error)
545{
546 return Job<Out, In...>(Private::ExecutorBasePtr(
547 new Private::ThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr())));
548}
549
550template<typename Out, typename ... In>
551Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler error)
552{
553 return Job<Out, In...>(Private::ExecutorBasePtr(
554 new Private::SyncThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr())));
555}
556
557#ifdef WITH_KJOB
558template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
559Job<ReturnType, Args ...> start()
560{
561 return Job<ReturnType, Args ...>(Private::ExecutorBasePtr(
562 new Private::ThenExecutor<ReturnType, Args ...>([](const Args & ... args, KAsync::Future<ReturnType> &future)
563 {
564 KJobType *job = new KJobType(args ...);
565 job->connect(job, &KJob::finished,
566 [&future](KJob *job) {
567 if (job->error()) {
568 future.setError(job->error(), job->errorString());
569 } else {
570 future.setValue((static_cast<KJobType*>(job)->*KJobResultMethod)());
571 future.setFinished();
572 }
573 });
574 job->start();
575 }, ErrorHandler(), Private::ExecutorBasePtr())));
576}
577#endif
578
579
580template<typename Out>
581Job<Out> null()
582{
583 return KAsync::start<Out>(
584 [](KAsync::Future<Out> &future) {
585 future.setFinished();
586 });
587}
588
589template<typename Out>
590Job<Out> error(int errorCode, const QString &errorMessage)
591{
592 return KAsync::start<Out>(
593 [errorCode, errorMessage](KAsync::Future<Out> &future) {
594 future.setError(errorCode, errorMessage);
595 });
596}
597
598template<typename Out>
599Job<Out> iterate(const Out &container)
600{
601 return KAsync::start<Out>(
602 [container]() {
603 return container;
604 });
605}
606
607
608namespace Private {
609
610template<typename T>
611KAsync::Future<T>* ExecutorBase::createFuture(const ExecutionPtr &execution) const
612{
613 return new KAsync::Future<T>(execution);
614}
615
616template<typename PrevOut, typename Out, typename ... In>
617ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self)
618{
619 // Passing 'self' to execution ensures that the Executor chain remains
620 // valid until the entire execution is finished
621 ExecutionPtr execution = ExecutionPtr::create(self);
622#ifndef QT_NO_DEBUG
623 execution->tracer = new Tracer(execution.data()); // owned by execution
624#endif
625
626 // chainup
627 execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr();
628
629 execution->resultBase = ExecutorBase::createFuture<Out>(execution);
630 auto fw = new KAsync::FutureWatcher<Out>();
631 QObject::connect(fw, &KAsync::FutureWatcher<Out>::futureReady,
632 [fw, execution, this]() {
633 handleError(execution);
634 execution->setFinished();
635 delete fw;
636 });
637 fw->setFuture(*execution->result<Out>());
638
639 KAsync::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr;
640 if (!prevFuture || prevFuture->isFinished()) {
641 if (prevFuture) { // prevFuture implies execution->prevExecution
642 if (prevFuture->errorCode()) {
643 // Propagate the errorCode and message to the outer Future
644 execution->resultBase->setError(prevFuture->errorCode(), prevFuture->errorMessage());
645 if (!execution->errorWasHandled()) {
646 if (handleError(execution)) {
647 return execution;
648 }
649 } else {
650 return execution;
651 }
652 } else {
653 // Propagate error (if any)
654 }
655 }
656
657 execution->isRunning = true;
658 run(execution);
659 } else {
660 auto prevFutureWatcher = new KAsync::FutureWatcher<PrevOut>();
661 QObject::connect(prevFutureWatcher, &KAsync::FutureWatcher<PrevOut>::futureReady,
662 [prevFutureWatcher, execution, this]() {
663 auto prevFuture = prevFutureWatcher->future();
664 assert(prevFuture.isFinished());
665 delete prevFutureWatcher;
666 auto prevExecutor = execution->executor->mPrev;
667 if (prevFuture.errorCode()) {
668 execution->resultBase->setError(prevFuture.errorCode(), prevFuture.errorMessage());
669 if (!execution->errorWasHandled()) {
670 if (handleError(execution)) {
671 return;
672 }
673 } else {
674 return;
675 }
676 }
677
678
679 // propagate error (if any)
680 execution->isRunning = true;
681 run(execution);
682 });
683
684 prevFutureWatcher->setFuture(*static_cast<KAsync::Future<PrevOut>*>(prevFuture));
685 }
686
687 return execution;
688}
689
690template<typename PrevOut, typename Out, typename ... In>
691bool Executor<PrevOut, Out, In ...>::handleError(const ExecutionPtr &execution)
692{
693 assert(execution->resultBase->isFinished());
694 if (execution->resultBase->errorCode()) {
695 if (mErrorFunc) {
696 mErrorFunc(execution->resultBase->errorCode(),
697 execution->resultBase->errorMessage());
698 return true;
699 }
700 }
701
702 return false;
703}
704
705
706template<typename Out, typename ... In>
707ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent)
708 : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(error, parent)
709 , mFunc(then)
710{
711 STORE_EXECUTOR_NAME("ThenExecutor", Out, In ...);
712}
713
714template<typename Out, typename ... In>
715void ThenExecutor<Out, In ...>::run(const ExecutionPtr &execution)
716{
717 KAsync::Future<typename detail::prevOut<In ...>::type> *prevFuture = nullptr;
718 if (execution->prevExecution) {
719 prevFuture = execution->prevExecution->result<typename detail::prevOut<In ...>::type>();
720 assert(prevFuture->isFinished());
721 }
722
723 ThenExecutor<Out, In ...>::mFunc(prevFuture ? prevFuture->value() : In() ..., *execution->result<Out>());
724}
725
726template<typename PrevOut, typename Out, typename In>
727EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ErrorHandler error, const ExecutorBasePtr &parent)
728 : Executor<PrevOut, Out, In>(error, parent)
729 , mFunc(each)
730{
731 STORE_EXECUTOR_NAME("EachExecutor", PrevOut, Out, In);
732}
733
734template<typename PrevOut, typename Out, typename In>
735void EachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution)
736{
737 assert(execution->prevExecution);
738 auto prevFuture = execution->prevExecution->result<PrevOut>();
739 assert(prevFuture->isFinished());
740
741 auto out = execution->result<Out>();
742 if (prevFuture->value().isEmpty()) {
743 out->setFinished();
744 return;
745 }
746
747 for (auto arg : prevFuture->value()) {
748 //We have to manually manage the lifetime of these temporary futures
749 KAsync::Future<Out> *future = new KAsync::Future<Out>();
750 EachExecutor<PrevOut, Out, In>::mFunc(arg, *future);
751 auto fw = new KAsync::FutureWatcher<Out>();
752 mFutureWatchers.append(fw);
753 QObject::connect(fw, &KAsync::FutureWatcher<Out>::futureReady,
754 [out, fw, this, future]() {
755 assert(fw->future().isFinished());
756 const int index = mFutureWatchers.indexOf(fw);
757 assert(index > -1);
758 mFutureWatchers.removeAt(index);
759 KAsync::detail::aggregateFutureValue<Out>(fw->future(), *out);
760 if (mFutureWatchers.isEmpty()) {
761 out->setFinished();
762 }
763 delete fw;
764 delete future;
765 });
766 fw->setFuture(*future);
767 }
768}
769
770template<typename Out, typename In>
771ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
772 : ThenExecutor<Out, In>(reduce, errorFunc, parent)
773{
774 STORE_EXECUTOR_NAME("ReduceExecutor", Out, In);
775}
776
777template<typename Out, typename ... In>
778SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
779 : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(errorFunc, parent)
780 , mFunc(then)
781{
782 STORE_EXECUTOR_NAME("SyncThenExecutor", Out, In ...);
783}
784
785template<typename Out, typename ... In>
786void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution)
787{
788 if (execution->prevExecution) {
789 assert(execution->prevExecution->resultBase->isFinished());
790 }
791
792 run(execution, std::is_void<Out>());
793 execution->resultBase->setFinished();
794}
795
796template<typename Out, typename ... In>
797void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::false_type)
798{
799 KAsync::Future<typename detail::prevOut<In ...>::type> *prevFuture =
800 execution->prevExecution
801 ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>()
802 : nullptr;
803 (void) prevFuture; // silence 'set but not used' warning
804 KAsync::Future<Out> *future = execution->result<Out>();
805 future->setValue(SyncThenExecutor<Out, In...>::mFunc(prevFuture ? prevFuture->value() : In() ...));
806}
807
808template<typename Out, typename ... In>
809void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::true_type)
810{
811 KAsync::Future<typename detail::prevOut<In ...>::type> *prevFuture =
812 execution->prevExecution
813 ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>()
814 : nullptr;
815 (void) prevFuture; // silence 'set but not used' warning
816 SyncThenExecutor<Out, In ...>::mFunc(prevFuture ? prevFuture->value() : In() ...);
817}
818
819template<typename PrevOut, typename Out, typename In>
820SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
821 : Executor<PrevOut, Out, In>(errorFunc, parent)
822 , mFunc(each)
823{
824 STORE_EXECUTOR_NAME("SyncEachExecutor", PrevOut, Out, In);
825}
826
827template<typename PrevOut, typename Out, typename In>
828void SyncEachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution)
829{
830 assert(execution->prevExecution);
831 auto *prevFuture = execution->prevExecution->result<PrevOut>();
832 assert(prevFuture->isFinished());
833
834 auto out = execution->result<Out>();
835 if (prevFuture->value().isEmpty()) {
836 out->setFinished();
837 return;
838 }
839
840 for (auto arg : prevFuture->value()) {
841 run(out, arg, std::is_void<Out>());
842 }
843 out->setFinished();
844}
845
846template<typename PrevOut, typename Out, typename In>
847void SyncEachExecutor<PrevOut, Out, In>::run(KAsync::Future<Out> *out, const typename PrevOut::value_type &arg, std::false_type)
848{
849 out->setValue(out->value() + SyncEachExecutor<PrevOut, Out, In>::mFunc(arg));
850}
851
852template<typename PrevOut, typename Out, typename In>
853void SyncEachExecutor<PrevOut, Out, In>::run(KAsync::Future<Out> * /* unused */, const typename PrevOut::value_type &arg, std::true_type)
854{
855 SyncEachExecutor<PrevOut, Out, In>::mFunc(arg);
856}
857
858template<typename Out, typename In>
859SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
860 : SyncThenExecutor<Out, In>(reduce, errorFunc, parent)
861{
862 STORE_EXECUTOR_NAME("SyncReduceExecutor", Out, In);
863}
864
865
866} // namespace Private
867
868} // namespace KAsync
869
870
871
872#endif // KASYNC_H
873
874