summaryrefslogtreecommitdiffstats
path: root/async
diff options
context:
space:
mode:
Diffstat (limited to 'async')
-rw-r--r--async/autotests/asynctest.cpp166
-rw-r--r--async/src/async.cpp46
-rw-r--r--async/src/async.h157
3 files changed, 280 insertions, 89 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp
index 65e604f..d709567 100644
--- a/async/autotests/asynctest.cpp
+++ b/async/autotests/asynctest.cpp
@@ -27,6 +27,8 @@
27#include <QtTest/QTest> 27#include <QtTest/QTest>
28#include <QDebug> 28#include <QDebug>
29 29
30#include <functional>
31
30class AsyncTest : public QObject 32class AsyncTest : public QObject
31{ 33{
32 Q_OBJECT 34 Q_OBJECT
@@ -62,6 +64,9 @@ private Q_SLOTS:
62 64
63 void testProgressReporting(); 65 void testProgressReporting();
64 void testErrorHandler(); 66 void testErrorHandler();
67 void testErrorPropagation();
68 void testErrorHandlerAsync();
69 void testErrorPropagationAsync();
65 70
66 void testChainingRunningJob(); 71 void testChainingRunningJob();
67 void testChainingFinishedJob(); 72 void testChainingFinishedJob();
@@ -92,8 +97,25 @@ private:
92 mTimer.start(200); 97 mTimer.start(200);
93 } 98 }
94 99
100 AsyncSimulator(Async::Future<T> &future, std::function<void(Async::Future<T>&)> callback)
101 : mFuture(future)
102 , mCallback(callback)
103 {
104 QObject::connect(&mTimer, &QTimer::timeout,
105 [this]() {
106 mCallback(mFuture);
107 });
108 QObject::connect(&mTimer, &QTimer::timeout,
109 [this]() {
110 delete this;
111 });
112 mTimer.setSingleShot(true);
113 mTimer.start(200);
114 }
115
95 private: 116 private:
96 Async::Future<T> mFuture; 117 Async::Future<T> mFuture;
118 std::function<void(Async::Future<T>&)> mCallback;
97 T mResult; 119 T mResult;
98 QTimer mTimer; 120 QTimer mTimer;
99 }; 121 };
@@ -390,18 +412,14 @@ void AsyncTest::testAsyncReduce()
390 }) 412 })
391 .reduce<int, QList<int>>( 413 .reduce<int, QList<int>>(
392 [](const QList<int> &list, Async::Future<int> &future) { 414 [](const QList<int> &list, Async::Future<int> &future) {
393 QTimer *timer = new QTimer(); 415 new AsyncSimulator<int>(future,
394 QObject::connect(timer, &QTimer::timeout, 416 [list](Async::Future<int> &future) {
395 [list, &future]() { 417 int sum = 0;
396 int sum = 0; 418 for (int i : list) sum += i;
397 for (int i : list) sum += i; 419 future.setValue(sum);
398 future.setValue(sum); 420 future.setFinished();
399 future.setFinished(); 421 }
400 }); 422 );
401 QObject::connect(timer, &QTimer::timeout,
402 timer, &QObject::deleteLater);
403 timer->setSingleShot(true);
404 timer->start(0);
405 }); 423 });
406 424
407 Async::Future<int> future = job.exec(); 425 Async::Future<int> future = job.exec();
@@ -509,27 +527,145 @@ void AsyncTest::testProgressReporting()
509 527
510void AsyncTest::testErrorHandler() 528void AsyncTest::testErrorHandler()
511{ 529{
530
531 {
532 auto job = Async::start<int>(
533 [](Async::Future<int> &f) {
534 f.setError(1, "error");
535 });
536
537 auto future = job.exec();
538 QVERIFY(future.isFinished());
539 QCOMPARE(future.errorCode(), 1);
540 QCOMPARE(future.errorMessage(), QString::fromLatin1("error"));
541 }
542
543 {
544 int error = 0;
545 auto job = Async::start<int>(
546 [](Async::Future<int> &f) {
547 f.setError(1, "error");
548 },
549 [&error](int errorCode, const QString &errorMessage) {
550 error += errorCode;
551 }
552 );
553
554 auto future = job.exec();
555 QVERIFY(future.isFinished());
556 QCOMPARE(error, 1);
557 QCOMPARE(future.errorCode(), 1);
558 QCOMPARE(future.errorMessage(), QString::fromLatin1("error"));
559 }
560}
561
562void AsyncTest::testErrorPropagation()
563{
512 int error = 0; 564 int error = 0;
565 bool called = false;
513 auto job = Async::start<int>( 566 auto job = Async::start<int>(
514 [](Async::Future<int> &f) { 567 [](Async::Future<int> &f) {
515 f.setError(1, "error"); 568 f.setError(1, "error");
516 }) 569 })
517 .then<int, int>( 570 .then<int, int>(
518 [](int v, Async::Future<int> &f) { 571 [&called](int v, Async::Future<int> &f) {
572 called = true;
519 f.setFinished(); 573 f.setFinished();
520 }, 574 },
521 [&error](int errorCode, const QString &errorMessage) { 575 [&error](int errorCode, const QString &errorMessage) {
522 error = errorCode; 576 error += errorCode;
523 } 577 }
524 ); 578 );
525 auto future = job.exec(); 579 auto future = job.exec();
526 future.waitForFinished(); 580 QVERIFY(future.isFinished());
581 QCOMPARE(future.errorCode(), 1);
582 QCOMPARE(future.errorMessage(), QString::fromLatin1("error"));
583 QCOMPARE(called, false);
527 QCOMPARE(error, 1); 584 QCOMPARE(error, 1);
585}
586
587void AsyncTest::testErrorHandlerAsync()
588{
589 {
590 auto job = Async::start<int>(
591 [](Async::Future<int> &f) {
592 new AsyncSimulator<int>(f,
593 [](Async::Future<int> &f) {
594 f.setError(1, "error");
595 }
596 );
597 }
598 );
599
600 auto future = job.exec();
601 future.waitForFinished();
602
603 QVERIFY(future.isFinished());
604 QCOMPARE(future.errorCode(), 1);
605 QCOMPARE(future.errorMessage(), QString::fromLatin1("error"));
606 }
607
608 {
609 int error = 0;
610 auto job = Async::start<int>(
611 [](Async::Future<int> &f) {
612 new AsyncSimulator<int>(f,
613 [](Async::Future<int> &f) {
614 f.setError(1, "error");
615 }
616 );
617 },
618 [&error](int errorCode, const QString &errorMessage) {
619 error += errorCode;
620 }
621 );
622
623 auto future = job.exec();
624 future.waitForFinished();
625
626 QVERIFY(future.isFinished());
627 QCOMPARE(error, 1);
628 QCOMPARE(future.errorCode(), 1);
629 QCOMPARE(future.errorMessage(), QString::fromLatin1("error"));
630 }
631}
632
633void AsyncTest::testErrorPropagationAsync()
634{
635 int error = 0;
636 bool called = false;
637 auto job = Async::start<int>(
638 [](Async::Future<int> &f) {
639 new AsyncSimulator<int>(f,
640 [](Async::Future<int> &f) {
641 f.setError(1, "error");
642 }
643 );
644 })
645 .then<int, int>(
646 [&called](int v, Async::Future<int> &f) {
647 called = true;
648 f.setFinished();
649 },
650 [&error](int errorCode, const QString &errorMessage) {
651 error += errorCode;
652 }
653 );
654
655 auto future = job.exec();
656 future.waitForFinished();
657
528 QVERIFY(future.isFinished()); 658 QVERIFY(future.isFinished());
659 QCOMPARE(future.errorCode(), 1);
660 QCOMPARE(future.errorMessage(), QString::fromLatin1("error"));
661 QCOMPARE(called, false);
662 QCOMPARE(error, 1);
529} 663}
530 664
531 665
532 666
667
668
533void AsyncTest::testChainingRunningJob() 669void AsyncTest::testChainingRunningJob()
534{ 670{
535 int check = 0; 671 int check = 0;
diff --git a/async/src/async.cpp b/async/src/async.cpp
index 5e26bd8..c780878 100644
--- a/async/src/async.cpp
+++ b/async/src/async.cpp
@@ -24,6 +24,50 @@
24 24
25using namespace Async; 25using namespace Async;
26 26
27Private::Execution::Execution(const Private::ExecutorBasePtr &executor)
28 : executor(executor)
29 , resultBase(nullptr)
30 , isRunning(false)
31 , isFinished(false)
32{
33}
34
35Private::Execution::~Execution()
36{
37 if (resultBase) {
38 resultBase->releaseExecution();
39 delete resultBase;
40 }
41 prevExecution.reset();
42}
43
44void Private::Execution::setFinished()
45{
46 isFinished = true;
47 //executor.clear();
48}
49
50void Private::Execution::releaseFuture()
51{
52 resultBase = 0;
53}
54
55bool Private::Execution::errorWasHandled() const
56{
57 Execution * const exec = this;
58 while (exec) {
59 if (exec->executor->hasErrorFunc()) {
60 return true;
61 }
62 exec = exec->prevExecution.data();
63 }
64 return false;
65}
66
67
68
69
70
27Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) 71Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent)
28 : mPrev(parent) 72 : mPrev(parent)
29{ 73{
@@ -34,6 +78,8 @@ Private::ExecutorBase::~ExecutorBase()
34} 78}
35 79
36 80
81
82
37JobBase::JobBase(const Private::ExecutorBasePtr &executor) 83JobBase::JobBase(const Private::ExecutorBasePtr &executor)
38 : mExecutor(executor) 84 : mExecutor(executor)
39{ 85{
diff --git a/async/src/async.h b/async/src/async.h
index 89ca0d0..adc0b69 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -96,27 +96,9 @@ class ExecutorBase;
96typedef QSharedPointer<ExecutorBase> ExecutorBasePtr; 96typedef QSharedPointer<ExecutorBase> ExecutorBasePtr;
97 97
98struct Execution { 98struct Execution {
99 Execution(const ExecutorBasePtr &executor) 99 Execution(const ExecutorBasePtr &executor);
100 : executor(executor) 100 ~Execution();
101 , resultBase(nullptr) 101 void setFinished();
102 , isRunning(false)
103 , isFinished(false)
104 {}
105
106 ~Execution()
107 {
108 if (resultBase) {
109 resultBase->releaseExecution();
110 delete resultBase;
111 }
112 prevExecution.reset();
113 }
114
115 void setFinished()
116 {
117 isFinished = true;
118 executor.clear();
119 }
120 102
121 template<typename T> 103 template<typename T>
122 Async::Future<T>* result() 104 Async::Future<T>* result()
@@ -124,10 +106,8 @@ struct Execution {
124 return static_cast<Async::Future<T>*>(resultBase); 106 return static_cast<Async::Future<T>*>(resultBase);
125 } 107 }
126 108
127 void releaseFuture() 109 void releaseFuture();
128 { 110 bool errorWasHandled() const;
129 resultBase = 0;
130 }
131 111
132 ExecutorBasePtr executor; 112 ExecutorBasePtr executor;
133 FutureBase *resultBase; 113 FutureBase *resultBase;
@@ -147,6 +127,8 @@ class ExecutorBase
147 template<typename Out, typename ... In> 127 template<typename Out, typename ... In>
148 friend class Async::Job; 128 friend class Async::Job;
149 129
130 friend class Execution;
131
150public: 132public:
151 virtual ~ExecutorBase(); 133 virtual ~ExecutorBase();
152 virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0; 134 virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0;
@@ -157,6 +139,9 @@ protected:
157 template<typename T> 139 template<typename T>
158 Async::Future<T>* createFuture(const ExecutionPtr &execution) const; 140 Async::Future<T>* createFuture(const ExecutionPtr &execution) const;
159 141
142 virtual bool hasErrorFunc() const = 0;
143 virtual bool handleError(const ExecutionPtr &execution) = 0;
144
160 ExecutorBasePtr mPrev; 145 ExecutorBasePtr mPrev;
161}; 146};
162 147
@@ -164,14 +149,17 @@ template<typename PrevOut, typename Out, typename ... In>
164class Executor : public ExecutorBase 149class Executor : public ExecutorBase
165{ 150{
166protected: 151protected:
167 Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) 152 Executor(ErrorHandler errorFunc, const Private::ExecutorBasePtr &parent)
168 : ExecutorBase(parent) 153 : ExecutorBase(parent)
169 , mErrorFunc(errorHandler) 154 , mErrorFunc(errorFunc)
170 {} 155 {}
156
171 virtual ~Executor() {} 157 virtual ~Executor() {}
172 virtual void run(const ExecutionPtr &execution) = 0; 158 virtual void run(const ExecutionPtr &execution) = 0;
173 159
174 ExecutionPtr exec(const ExecutorBasePtr &self); 160 ExecutionPtr exec(const ExecutorBasePtr &self);
161 bool hasErrorFunc() const { return (bool) mErrorFunc; }
162 bool handleError(const ExecutionPtr &execution);
175 163
176 std::function<void(int, const QString &)> mErrorFunc; 164 std::function<void(int, const QString &)> mErrorFunc;
177}; 165};
@@ -180,7 +168,7 @@ template<typename Out, typename ... In>
180class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, In ...> 168class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, In ...>
181{ 169{
182public: 170public:
183 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 171 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
184 void run(const ExecutionPtr &execution); 172 void run(const ExecutionPtr &execution);
185private: 173private:
186 ThenTask<Out, In ...> mFunc; 174 ThenTask<Out, In ...> mFunc;
@@ -190,7 +178,7 @@ template<typename PrevOut, typename Out, typename In>
190class EachExecutor : public Executor<PrevOut, Out, In> 178class EachExecutor : public Executor<PrevOut, Out, In>
191{ 179{
192public: 180public:
193 EachExecutor(EachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 181 EachExecutor(EachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
194 void run(const ExecutionPtr &execution); 182 void run(const ExecutionPtr &execution);
195private: 183private:
196 EachTask<Out, In> mFunc; 184 EachTask<Out, In> mFunc;
@@ -201,7 +189,7 @@ template<typename Out, typename In>
201class ReduceExecutor : public ThenExecutor<Out, In> 189class ReduceExecutor : public ThenExecutor<Out, In>
202{ 190{
203public: 191public:
204 ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 192 ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
205private: 193private:
206 ReduceTask<Out, In> mFunc; 194 ReduceTask<Out, In> mFunc;
207}; 195};
@@ -210,7 +198,7 @@ template<typename Out, typename ... In>
210class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, Out, In ...> 198class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, Out, In ...>
211{ 199{
212public: 200public:
213 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 201 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
214 void run(const ExecutionPtr &execution); 202 void run(const ExecutionPtr &execution);
215 203
216private: 204private:
@@ -223,7 +211,7 @@ template<typename Out, typename In>
223class SyncReduceExecutor : public SyncThenExecutor<Out, In> 211class SyncReduceExecutor : public SyncThenExecutor<Out, In>
224{ 212{
225public: 213public:
226 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 214 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
227private: 215private:
228 SyncReduceTask<Out, In> mFunc; 216 SyncReduceTask<Out, In> mFunc;
229}; 217};
@@ -232,7 +220,7 @@ template<typename PrevOut, typename Out, typename In>
232class SyncEachExecutor : public Executor<PrevOut, Out, In> 220class SyncEachExecutor : public Executor<PrevOut, Out, In>
233{ 221{
234public: 222public:
235 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 223 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
236 void run(const ExecutionPtr &execution); 224 void run(const ExecutionPtr &execution);
237private: 225private:
238 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out> 226 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out>
@@ -253,10 +241,10 @@ private:
253 * where @p In is type of the result. 241 * where @p In is type of the result.
254 */ 242 */
255template<typename Out, typename ... In> 243template<typename Out, typename ... In>
256Job<Out, In ...> start(ThenTask<Out, In ...> func); 244Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler());
257 245
258template<typename Out, typename ... In> 246template<typename Out, typename ... In>
259Job<Out, In ...> start(SyncThenTask<Out, In ...> func); 247Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler());
260 248
261#ifdef WITH_KJOB 249#ifdef WITH_KJOB
262template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> 250template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
@@ -358,10 +346,10 @@ class Job : public JobBase
358 friend class Job; 346 friend class Job;
359 347
360 template<typename OutOther, typename ... InOther> 348 template<typename OutOther, typename ... InOther>
361 friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func); 349 friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc);
362 350
363 template<typename OutOther, typename ... InOther> 351 template<typename OutOther, typename ... InOther>
364 friend Job<OutOther, InOther ...> start(Async::SyncThenTask<OutOther, InOther ...> func); 352 friend Job<OutOther, InOther ...> start(Async::SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc);
365 353
366#ifdef WITH_KJOB 354#ifdef WITH_KJOB
367 template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> 355 template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
@@ -524,17 +512,17 @@ private:
524namespace Async { 512namespace Async {
525 513
526template<typename Out, typename ... In> 514template<typename Out, typename ... In>
527Job<Out, In ...> start(ThenTask<Out, In ...> func) 515Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler error)
528{ 516{
529 return Job<Out, In...>(Private::ExecutorBasePtr( 517 return Job<Out, In...>(Private::ExecutorBasePtr(
530 new Private::ThenExecutor<Out, In ...>(func, ErrorHandler(), Private::ExecutorBasePtr()))); 518 new Private::ThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr())));
531} 519}
532 520
533template<typename Out, typename ... In> 521template<typename Out, typename ... In>
534Job<Out, In ...> start(SyncThenTask<Out, In ...> func) 522Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler error)
535{ 523{
536 return Job<Out, In...>(Private::ExecutorBasePtr( 524 return Job<Out, In...>(Private::ExecutorBasePtr(
537 new Private::SyncThenExecutor<Out, In ...>(func, ErrorHandler(), Private::ExecutorBasePtr()))); 525 new Private::SyncThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr())));
538} 526}
539 527
540#ifdef WITH_KJOB 528#ifdef WITH_KJOB
@@ -596,17 +584,12 @@ ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self)
596 584
597 // chainup 585 // chainup
598 execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr(); 586 execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr();
599 /*
600 } else if (mPrev && !mPrevFuture) {
601 // If previous job is running or finished, just get it's future
602 mPrevFuture = static_cast<Async::Future<PrevOut>*>(mPrev->result());
603 }
604 */
605 587
606 execution->resultBase = this->createFuture<Out>(execution); 588 execution->resultBase = this->createFuture<Out>(execution);
607 auto fw = new Async::FutureWatcher<Out>(); 589 auto fw = new Async::FutureWatcher<Out>();
608 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, 590 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
609 [fw, execution, this]() { 591 [fw, execution, this]() {
592 handleError(execution);
610 execution->setFinished(); 593 execution->setFinished();
611 delete fw; 594 delete fw;
612 }); 595 });
@@ -614,44 +597,70 @@ ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self)
614 597
615 Async::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr; 598 Async::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr;
616 if (!prevFuture || prevFuture->isFinished()) { 599 if (!prevFuture || prevFuture->isFinished()) {
617 if (prevFuture && prevFuture->errorCode() != 0) { 600 if (prevFuture) { // prevFuture implies execution->prevExecution
618 if (mErrorFunc) { 601 if (prevFuture->errorCode()) {
619 mErrorFunc(prevFuture->errorCode(), prevFuture->errorMessage()); 602 // Propagate the errorCode and message to the outer Future
620 execution->resultBase->setFinished(); 603 execution->resultBase->setError(prevFuture->errorCode(), prevFuture->errorMessage());
621 execution->setFinished(); 604 if (!execution->errorWasHandled()) {
622 return execution; 605 if (handleError(execution)) {
606 return execution;
607 }
608 } else {
609 return execution;
610 }
623 } else { 611 } else {
624 // Propagate the error to next caller 612 // Propagate error (if any)
625 } 613 }
626 } 614 }
615
627 execution->isRunning = true; 616 execution->isRunning = true;
628 run(execution); 617 run(execution);
629 } else { 618 } else {
630 auto futureWatcher = new Async::FutureWatcher<PrevOut>(); 619 auto prevFutureWatcher = new Async::FutureWatcher<PrevOut>();
631 QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, 620 QObject::connect(prevFutureWatcher, &Async::FutureWatcher<PrevOut>::futureReady,
632 [futureWatcher, execution, this]() { 621 [prevFutureWatcher, execution, this]() {
633 auto prevFuture = futureWatcher->future(); 622 auto prevFuture = prevFutureWatcher->future();
634 assert(prevFuture.isFinished()); 623 assert(prevFuture.isFinished());
635 delete futureWatcher; 624 delete prevFutureWatcher;
636 if (prevFuture.errorCode() != 0) { 625 auto prevExecutor = execution->executor->mPrev;
637 if (mErrorFunc) { 626 if (prevFuture.errorCode()) {
638 mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); 627 execution->resultBase->setError(prevFuture.errorCode(), prevFuture.errorMessage());
639 execution->resultBase->setFinished(); 628 if (!execution->errorWasHandled()) {
640 return; 629 if (handleError(execution)) {
630 return;
631 }
641 } else { 632 } else {
642 // Propagate the error to next caller 633 return;
643 } 634 }
644 } 635 }
636
637
638 // propagate error (if any)
645 execution->isRunning = true; 639 execution->isRunning = true;
646 run(execution); 640 run(execution);
647 }); 641 });
648 642
649 futureWatcher->setFuture(*static_cast<Async::Future<PrevOut>*>(prevFuture)); 643 prevFutureWatcher->setFuture(*static_cast<Async::Future<PrevOut>*>(prevFuture));
650 } 644 }
651 645
652 return execution; 646 return execution;
653} 647}
654 648
649template<typename PrevOut, typename Out, typename ... In>
650bool Executor<PrevOut, Out, In ...>::handleError(const ExecutionPtr &execution)
651{
652 assert(execution->resultBase->isFinished());
653 if (execution->resultBase->errorCode()) {
654 if (mErrorFunc) {
655 mErrorFunc(execution->resultBase->errorCode(),
656 execution->resultBase->errorMessage());
657 return true;
658 }
659 }
660
661 return false;
662}
663
655 664
656template<typename Out, typename ... In> 665template<typename Out, typename ... In>
657ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent) 666ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent)
@@ -715,14 +724,14 @@ void EachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution)
715} 724}
716 725
717template<typename Out, typename In> 726template<typename Out, typename In>
718ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler error, const ExecutorBasePtr &parent) 727ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
719 : ThenExecutor<Out, In>(reduce, error, parent) 728 : ThenExecutor<Out, In>(reduce, errorFunc, parent)
720{ 729{
721} 730}
722 731
723template<typename Out, typename ... In> 732template<typename Out, typename ... In>
724SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) 733SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
725 : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(errorHandler, parent) 734 : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(errorFunc, parent)
726 , mFunc(then) 735 , mFunc(then)
727{ 736{
728} 737}
@@ -762,8 +771,8 @@ void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::true
762} 771}
763 772
764template<typename PrevOut, typename Out, typename In> 773template<typename PrevOut, typename Out, typename In>
765SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent) 774SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
766 : Executor<PrevOut, Out, In>(errorHandler, parent) 775 : Executor<PrevOut, Out, In>(errorFunc, parent)
767 , mFunc(each) 776 , mFunc(each)
768{ 777{
769} 778}
@@ -800,8 +809,8 @@ void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> * /* unused */,
800} 809}
801 810
802template<typename Out, typename In> 811template<typename Out, typename In>
803SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent) 812SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
804 : SyncThenExecutor<Out, In>(reduce, errorHandler, parent) 813 : SyncThenExecutor<Out, In>(reduce, errorFunc, parent)
805{ 814{
806} 815}
807 816