summaryrefslogtreecommitdiffstats
path: root/async/src/async.h
diff options
context:
space:
mode:
Diffstat (limited to 'async/src/async.h')
-rw-r--r--async/src/async.h157
1 files changed, 83 insertions, 74 deletions
diff --git a/async/src/async.h b/async/src/async.h
index 89ca0d0..adc0b69 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -96,27 +96,9 @@ class ExecutorBase;
96typedef QSharedPointer<ExecutorBase> ExecutorBasePtr; 96typedef QSharedPointer<ExecutorBase> ExecutorBasePtr;
97 97
98struct Execution { 98struct Execution {
99 Execution(const ExecutorBasePtr &executor) 99 Execution(const ExecutorBasePtr &executor);
100 : executor(executor) 100 ~Execution();
101 , resultBase(nullptr) 101 void setFinished();
102 , isRunning(false)
103 , isFinished(false)
104 {}
105
106 ~Execution()
107 {
108 if (resultBase) {
109 resultBase->releaseExecution();
110 delete resultBase;
111 }
112 prevExecution.reset();
113 }
114
115 void setFinished()
116 {
117 isFinished = true;
118 executor.clear();
119 }
120 102
121 template<typename T> 103 template<typename T>
122 Async::Future<T>* result() 104 Async::Future<T>* result()
@@ -124,10 +106,8 @@ struct Execution {
124 return static_cast<Async::Future<T>*>(resultBase); 106 return static_cast<Async::Future<T>*>(resultBase);
125 } 107 }
126 108
127 void releaseFuture() 109 void releaseFuture();
128 { 110 bool errorWasHandled() const;
129 resultBase = 0;
130 }
131 111
132 ExecutorBasePtr executor; 112 ExecutorBasePtr executor;
133 FutureBase *resultBase; 113 FutureBase *resultBase;
@@ -147,6 +127,8 @@ class ExecutorBase
147 template<typename Out, typename ... In> 127 template<typename Out, typename ... In>
148 friend class Async::Job; 128 friend class Async::Job;
149 129
130 friend class Execution;
131
150public: 132public:
151 virtual ~ExecutorBase(); 133 virtual ~ExecutorBase();
152 virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0; 134 virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0;
@@ -157,6 +139,9 @@ protected:
157 template<typename T> 139 template<typename T>
158 Async::Future<T>* createFuture(const ExecutionPtr &execution) const; 140 Async::Future<T>* createFuture(const ExecutionPtr &execution) const;
159 141
142 virtual bool hasErrorFunc() const = 0;
143 virtual bool handleError(const ExecutionPtr &execution) = 0;
144
160 ExecutorBasePtr mPrev; 145 ExecutorBasePtr mPrev;
161}; 146};
162 147
@@ -164,14 +149,17 @@ template<typename PrevOut, typename Out, typename ... In>
164class Executor : public ExecutorBase 149class Executor : public ExecutorBase
165{ 150{
166protected: 151protected:
167 Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) 152 Executor(ErrorHandler errorFunc, const Private::ExecutorBasePtr &parent)
168 : ExecutorBase(parent) 153 : ExecutorBase(parent)
169 , mErrorFunc(errorHandler) 154 , mErrorFunc(errorFunc)
170 {} 155 {}
156
171 virtual ~Executor() {} 157 virtual ~Executor() {}
172 virtual void run(const ExecutionPtr &execution) = 0; 158 virtual void run(const ExecutionPtr &execution) = 0;
173 159
174 ExecutionPtr exec(const ExecutorBasePtr &self); 160 ExecutionPtr exec(const ExecutorBasePtr &self);
161 bool hasErrorFunc() const { return (bool) mErrorFunc; }
162 bool handleError(const ExecutionPtr &execution);
175 163
176 std::function<void(int, const QString &)> mErrorFunc; 164 std::function<void(int, const QString &)> mErrorFunc;
177}; 165};
@@ -180,7 +168,7 @@ template<typename Out, typename ... In>
180class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, In ...> 168class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, In ...>
181{ 169{
182public: 170public:
183 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 171 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
184 void run(const ExecutionPtr &execution); 172 void run(const ExecutionPtr &execution);
185private: 173private:
186 ThenTask<Out, In ...> mFunc; 174 ThenTask<Out, In ...> mFunc;
@@ -190,7 +178,7 @@ template<typename PrevOut, typename Out, typename In>
190class EachExecutor : public Executor<PrevOut, Out, In> 178class EachExecutor : public Executor<PrevOut, Out, In>
191{ 179{
192public: 180public:
193 EachExecutor(EachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 181 EachExecutor(EachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
194 void run(const ExecutionPtr &execution); 182 void run(const ExecutionPtr &execution);
195private: 183private:
196 EachTask<Out, In> mFunc; 184 EachTask<Out, In> mFunc;
@@ -201,7 +189,7 @@ template<typename Out, typename In>
201class ReduceExecutor : public ThenExecutor<Out, In> 189class ReduceExecutor : public ThenExecutor<Out, In>
202{ 190{
203public: 191public:
204 ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 192 ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
205private: 193private:
206 ReduceTask<Out, In> mFunc; 194 ReduceTask<Out, In> mFunc;
207}; 195};
@@ -210,7 +198,7 @@ template<typename Out, typename ... In>
210class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, Out, In ...> 198class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, Out, In ...>
211{ 199{
212public: 200public:
213 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 201 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
214 void run(const ExecutionPtr &execution); 202 void run(const ExecutionPtr &execution);
215 203
216private: 204private:
@@ -223,7 +211,7 @@ template<typename Out, typename In>
223class SyncReduceExecutor : public SyncThenExecutor<Out, In> 211class SyncReduceExecutor : public SyncThenExecutor<Out, In>
224{ 212{
225public: 213public:
226 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 214 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
227private: 215private:
228 SyncReduceTask<Out, In> mFunc; 216 SyncReduceTask<Out, In> mFunc;
229}; 217};
@@ -232,7 +220,7 @@ template<typename PrevOut, typename Out, typename In>
232class SyncEachExecutor : public Executor<PrevOut, Out, In> 220class SyncEachExecutor : public Executor<PrevOut, Out, In>
233{ 221{
234public: 222public:
235 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 223 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent);
236 void run(const ExecutionPtr &execution); 224 void run(const ExecutionPtr &execution);
237private: 225private:
238 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out> 226 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out>
@@ -253,10 +241,10 @@ private:
253 * where @p In is type of the result. 241 * where @p In is type of the result.
254 */ 242 */
255template<typename Out, typename ... In> 243template<typename Out, typename ... In>
256Job<Out, In ...> start(ThenTask<Out, In ...> func); 244Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler());
257 245
258template<typename Out, typename ... In> 246template<typename Out, typename ... In>
259Job<Out, In ...> start(SyncThenTask<Out, In ...> func); 247Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler());
260 248
261#ifdef WITH_KJOB 249#ifdef WITH_KJOB
262template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> 250template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
@@ -358,10 +346,10 @@ class Job : public JobBase
358 friend class Job; 346 friend class Job;
359 347
360 template<typename OutOther, typename ... InOther> 348 template<typename OutOther, typename ... InOther>
361 friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func); 349 friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc);
362 350
363 template<typename OutOther, typename ... InOther> 351 template<typename OutOther, typename ... InOther>
364 friend Job<OutOther, InOther ...> start(Async::SyncThenTask<OutOther, InOther ...> func); 352 friend Job<OutOther, InOther ...> start(Async::SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc);
365 353
366#ifdef WITH_KJOB 354#ifdef WITH_KJOB
367 template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> 355 template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args>
@@ -524,17 +512,17 @@ private:
524namespace Async { 512namespace Async {
525 513
526template<typename Out, typename ... In> 514template<typename Out, typename ... In>
527Job<Out, In ...> start(ThenTask<Out, In ...> func) 515Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler error)
528{ 516{
529 return Job<Out, In...>(Private::ExecutorBasePtr( 517 return Job<Out, In...>(Private::ExecutorBasePtr(
530 new Private::ThenExecutor<Out, In ...>(func, ErrorHandler(), Private::ExecutorBasePtr()))); 518 new Private::ThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr())));
531} 519}
532 520
533template<typename Out, typename ... In> 521template<typename Out, typename ... In>
534Job<Out, In ...> start(SyncThenTask<Out, In ...> func) 522Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler error)
535{ 523{
536 return Job<Out, In...>(Private::ExecutorBasePtr( 524 return Job<Out, In...>(Private::ExecutorBasePtr(
537 new Private::SyncThenExecutor<Out, In ...>(func, ErrorHandler(), Private::ExecutorBasePtr()))); 525 new Private::SyncThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr())));
538} 526}
539 527
540#ifdef WITH_KJOB 528#ifdef WITH_KJOB
@@ -596,17 +584,12 @@ ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self)
596 584
597 // chainup 585 // chainup
598 execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr(); 586 execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr();
599 /*
600 } else if (mPrev && !mPrevFuture) {
601 // If previous job is running or finished, just get it's future
602 mPrevFuture = static_cast<Async::Future<PrevOut>*>(mPrev->result());
603 }
604 */
605 587
606 execution->resultBase = this->createFuture<Out>(execution); 588 execution->resultBase = this->createFuture<Out>(execution);
607 auto fw = new Async::FutureWatcher<Out>(); 589 auto fw = new Async::FutureWatcher<Out>();
608 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, 590 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
609 [fw, execution, this]() { 591 [fw, execution, this]() {
592 handleError(execution);
610 execution->setFinished(); 593 execution->setFinished();
611 delete fw; 594 delete fw;
612 }); 595 });
@@ -614,44 +597,70 @@ ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self)
614 597
615 Async::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr; 598 Async::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr;
616 if (!prevFuture || prevFuture->isFinished()) { 599 if (!prevFuture || prevFuture->isFinished()) {
617 if (prevFuture && prevFuture->errorCode() != 0) { 600 if (prevFuture) { // prevFuture implies execution->prevExecution
618 if (mErrorFunc) { 601 if (prevFuture->errorCode()) {
619 mErrorFunc(prevFuture->errorCode(), prevFuture->errorMessage()); 602 // Propagate the errorCode and message to the outer Future
620 execution->resultBase->setFinished(); 603 execution->resultBase->setError(prevFuture->errorCode(), prevFuture->errorMessage());
621 execution->setFinished(); 604 if (!execution->errorWasHandled()) {
622 return execution; 605 if (handleError(execution)) {
606 return execution;
607 }
608 } else {
609 return execution;
610 }
623 } else { 611 } else {
624 // Propagate the error to next caller 612 // Propagate error (if any)
625 } 613 }
626 } 614 }
615
627 execution->isRunning = true; 616 execution->isRunning = true;
628 run(execution); 617 run(execution);
629 } else { 618 } else {
630 auto futureWatcher = new Async::FutureWatcher<PrevOut>(); 619 auto prevFutureWatcher = new Async::FutureWatcher<PrevOut>();
631 QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, 620 QObject::connect(prevFutureWatcher, &Async::FutureWatcher<PrevOut>::futureReady,
632 [futureWatcher, execution, this]() { 621 [prevFutureWatcher, execution, this]() {
633 auto prevFuture = futureWatcher->future(); 622 auto prevFuture = prevFutureWatcher->future();
634 assert(prevFuture.isFinished()); 623 assert(prevFuture.isFinished());
635 delete futureWatcher; 624 delete prevFutureWatcher;
636 if (prevFuture.errorCode() != 0) { 625 auto prevExecutor = execution->executor->mPrev;
637 if (mErrorFunc) { 626 if (prevFuture.errorCode()) {
638 mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); 627 execution->resultBase->setError(prevFuture.errorCode(), prevFuture.errorMessage());
639 execution->resultBase->setFinished(); 628 if (!execution->errorWasHandled()) {
640 return; 629 if (handleError(execution)) {
630 return;
631 }
641 } else { 632 } else {
642 // Propagate the error to next caller 633 return;
643 } 634 }
644 } 635 }
636
637
638 // propagate error (if any)
645 execution->isRunning = true; 639 execution->isRunning = true;
646 run(execution); 640 run(execution);
647 }); 641 });
648 642
649 futureWatcher->setFuture(*static_cast<Async::Future<PrevOut>*>(prevFuture)); 643 prevFutureWatcher->setFuture(*static_cast<Async::Future<PrevOut>*>(prevFuture));
650 } 644 }
651 645
652 return execution; 646 return execution;
653} 647}
654 648
649template<typename PrevOut, typename Out, typename ... In>
650bool Executor<PrevOut, Out, In ...>::handleError(const ExecutionPtr &execution)
651{
652 assert(execution->resultBase->isFinished());
653 if (execution->resultBase->errorCode()) {
654 if (mErrorFunc) {
655 mErrorFunc(execution->resultBase->errorCode(),
656 execution->resultBase->errorMessage());
657 return true;
658 }
659 }
660
661 return false;
662}
663
655 664
656template<typename Out, typename ... In> 665template<typename Out, typename ... In>
657ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent) 666ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent)
@@ -715,14 +724,14 @@ void EachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution)
715} 724}
716 725
717template<typename Out, typename In> 726template<typename Out, typename In>
718ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler error, const ExecutorBasePtr &parent) 727ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
719 : ThenExecutor<Out, In>(reduce, error, parent) 728 : ThenExecutor<Out, In>(reduce, errorFunc, parent)
720{ 729{
721} 730}
722 731
723template<typename Out, typename ... In> 732template<typename Out, typename ... In>
724SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) 733SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
725 : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(errorHandler, parent) 734 : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(errorFunc, parent)
726 , mFunc(then) 735 , mFunc(then)
727{ 736{
728} 737}
@@ -762,8 +771,8 @@ void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::true
762} 771}
763 772
764template<typename PrevOut, typename Out, typename In> 773template<typename PrevOut, typename Out, typename In>
765SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent) 774SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
766 : Executor<PrevOut, Out, In>(errorHandler, parent) 775 : Executor<PrevOut, Out, In>(errorFunc, parent)
767 , mFunc(each) 776 , mFunc(each)
768{ 777{
769} 778}
@@ -800,8 +809,8 @@ void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> * /* unused */,
800} 809}
801 810
802template<typename Out, typename In> 811template<typename Out, typename In>
803SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent) 812SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent)
804 : SyncThenExecutor<Out, In>(reduce, errorHandler, parent) 813 : SyncThenExecutor<Out, In>(reduce, errorFunc, parent)
805{ 814{
806} 815}
807 816