summaryrefslogtreecommitdiffstats
path: root/async/src
diff options
context:
space:
mode:
Diffstat (limited to 'async/src')
-rw-r--r--async/src/async.h123
1 files changed, 60 insertions, 63 deletions
diff --git a/async/src/async.h b/async/src/async.h
index 336bae2..8523df3 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -33,7 +33,6 @@
33 33
34 34
35/* 35/*
36 * TODO: error continuation on .then and others.
37 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally 36 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally
38 */ 37 */
39namespace Async { 38namespace Async {
@@ -97,8 +96,9 @@ template<typename PrevOut, typename Out, typename ... In>
97class Executor : public ExecutorBase 96class Executor : public ExecutorBase
98{ 97{
99protected: 98protected:
100 Executor(const Private::ExecutorBasePtr &parent) 99 Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent)
101 : ExecutorBase(parent) 100 : ExecutorBase(parent)
101 , mErrorFunc(errorHandler)
102 , mPrevFuture(0) 102 , mPrevFuture(0)
103 {} 103 {}
104 virtual ~Executor() {} 104 virtual ~Executor() {}
@@ -116,7 +116,7 @@ template<typename Out, typename ... In>
116class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...> 116class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...>
117{ 117{
118public: 118public:
119 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); 119 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
120 void previousFutureReady(); 120 void previousFutureReady();
121}; 121};
122 122
@@ -124,7 +124,7 @@ template<typename PrevOut, typename Out, typename In>
124class EachExecutor : public Executor<PrevOut, Out, In> 124class EachExecutor : public Executor<PrevOut, Out, In>
125{ 125{
126public: 126public:
127 EachExecutor(EachTask<Out, In> each, const ExecutorBasePtr &parent); 127 EachExecutor(EachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
128 void previousFutureReady(); 128 void previousFutureReady();
129 129
130private: 130private:
@@ -135,14 +135,14 @@ template<typename Out, typename In>
135class ReduceExecutor : public ThenExecutor<Out, In> 135class ReduceExecutor : public ThenExecutor<Out, In>
136{ 136{
137public: 137public:
138 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); 138 ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
139}; 139};
140 140
141template<typename Out, typename ... In> 141template<typename Out, typename ... In>
142class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...> 142class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...>
143{ 143{
144public: 144public:
145 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); 145 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
146 void previousFutureReady(); 146 void previousFutureReady();
147protected: 147protected:
148 SyncThenTask<Out, In ...> mFunc; 148 SyncThenTask<Out, In ...> mFunc;
@@ -152,14 +152,14 @@ template<typename Out, typename In>
152class SyncReduceExecutor : public SyncThenExecutor<Out, In> 152class SyncReduceExecutor : public SyncThenExecutor<Out, In>
153{ 153{
154public: 154public:
155 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr()); 155 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
156}; 156};
157 157
158template<typename PrevOut, typename Out, typename In> 158template<typename PrevOut, typename Out, typename In>
159class SyncEachExecutor : public Executor<PrevOut, Out, In> 159class SyncEachExecutor : public Executor<PrevOut, Out, In>
160{ 160{
161public: 161public:
162 SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr()); 162 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
163 void previousFutureReady(); 163 void previousFutureReady();
164protected: 164protected:
165 SyncEachTask<Out, In> mFunc; 165 SyncEachTask<Out, In> mFunc;
@@ -190,12 +190,14 @@ Job<Out> start(ThenTask<Out> func);
190template<typename Out> 190template<typename Out>
191Job<Out> null() 191Job<Out> null()
192{ 192{
193 return Async::start<Out>([](Async::Future<Out> &future) {future.setFinished();}); 193 return Async::start<Out>([](Async::Future<Out> &future) {
194 future.setFinished();
195 });
194} 196}
195 197
196/** 198/**
197 * An error job. 199 * An error job.
198 * 200 *
199 * An async error. 201 * An async error.
200 * 202 *
201 */ 203 */
@@ -288,35 +290,35 @@ public:
288 } 290 }
289 291
290 template<typename OutOther, typename InOther> 292 template<typename OutOther, typename InOther>
291 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) 293 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
292 { 294 {
293 eachInvariants<OutOther>(); 295 eachInvariants<OutOther>();
294 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 296 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
295 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); 297 new Private::EachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor)));
296 } 298 }
297 299
298 template<typename OutOther, typename InOther> 300 template<typename OutOther, typename InOther>
299 Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func) 301 Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
300 { 302 {
301 eachInvariants<OutOther>(); 303 eachInvariants<OutOther>();
302 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 304 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
303 new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor))); 305 new Private::SyncEachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor)));
304 } 306 }
305 307
306 template<typename OutOther, typename InOther> 308 template<typename OutOther, typename InOther>
307 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) 309 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
308 { 310 {
309 reduceInvariants<InOther>(); 311 reduceInvariants<InOther>();
310 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 312 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
311 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); 313 new Private::ReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor)));
312 } 314 }
313 315
314 template<typename OutOther, typename InOther> 316 template<typename OutOther, typename InOther>
315 Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func) 317 Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
316 { 318 {
317 reduceInvariants<InOther>(); 319 reduceInvariants<InOther>();
318 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 320 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
319 new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor))); 321 new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor)));
320 } 322 }
321 323
322 Async::Future<Out> exec() 324 Async::Future<Out> exec()
@@ -364,13 +366,13 @@ namespace Async {
364template<typename Out> 366template<typename Out>
365Job<Out> start(ThenTask<Out> func) 367Job<Out> start(ThenTask<Out> func)
366{ 368{
367 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); 369 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func, ErrorHandler(), Private::ExecutorBasePtr())));
368} 370}
369 371
370template<typename Out> 372template<typename Out>
371Job<Out> start(SyncThenTask<Out> func) 373Job<Out> start(SyncThenTask<Out> func)
372{ 374{
373 return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func))); 375 return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func, ErrorHandler(), Private::ExecutorBasePtr())));
374} 376}
375 377
376namespace Private { 378namespace Private {
@@ -390,27 +392,47 @@ template<typename PrevOut, typename Out, typename ... In>
390void Executor<PrevOut, Out, In ...>::exec() 392void Executor<PrevOut, Out, In ...>::exec()
391{ 393{
392 mPrevFuture = chainup(); 394 mPrevFuture = chainup();
395 // Initialize our future
393 mResult = new Async::Future<Out>(); 396 mResult = new Async::Future<Out>();
394 if (!mPrevFuture || mPrevFuture->isFinished()) { 397 if (!mPrevFuture || mPrevFuture->isFinished()) {
398 if (mPrevFuture && mPrevFuture->errorCode() != 0) {
399 if (mErrorFunc) {
400 mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage());
401 mResult->setFinished();
402 return;
403 } else {
404 // Propagate the error to next caller
405 }
406 }
395 previousFutureReady(); 407 previousFutureReady();
396 } else { 408 } else {
397 auto futureWatcher = new Async::FutureWatcher<PrevOut>(); 409 auto futureWatcher = new Async::FutureWatcher<PrevOut>();
398 QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, 410 QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady,
399 [futureWatcher, this]() { 411 [futureWatcher, this]() {
400 assert(futureWatcher->future().isFinished()); 412 auto prevFuture = futureWatcher->future();
413 assert(prevFuture.isFinished());
401 futureWatcher->deleteLater(); 414 futureWatcher->deleteLater();
415 if (prevFuture.errorCode() != 0) {
416 if (mErrorFunc) {
417 mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage());
418 mResult->setFinished();
419 return;
420 } else {
421 // Propagate the error to next caller
422 }
423 }
402 previousFutureReady(); 424 previousFutureReady();
403 }); 425 });
426
404 futureWatcher->setFuture(*mPrevFuture); 427 futureWatcher->setFuture(*mPrevFuture);
405 } 428 }
406} 429}
407 430
408template<typename Out, typename ... In> 431template<typename Out, typename ... In>
409ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent) 432ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent)
410 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) 433 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(error, parent)
411{ 434{
412 this->mFunc = then; 435 this->mFunc = then;
413 this->mErrorFunc = error;
414} 436}
415 437
416template<typename Out, typename ... In> 438template<typename Out, typename ... In>
@@ -419,25 +441,14 @@ void ThenExecutor<Out, In ...>::previousFutureReady()
419 if (this->mPrevFuture) { 441 if (this->mPrevFuture) {
420 assert(this->mPrevFuture->isFinished()); 442 assert(this->mPrevFuture->isFinished());
421 } 443 }
422 if (this->mPrevFuture && this->mPrevFuture->errorCode()) { 444
423 if (this->mErrorFunc) { 445 this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...,
424 this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); 446 *static_cast<Async::Future<Out>*>(this->mResult));
425 this->mResult->setFinished();
426 } else {
427 static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
428 //propagate error if no error handler is available
429 this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...,
430 *static_cast<Async::Future<Out>*>(this->mResult));
431 }
432 } else {
433 this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...,
434 *static_cast<Async::Future<Out>*>(this->mResult));
435 }
436} 447}
437 448
438template<typename PrevOut, typename Out, typename In> 449template<typename PrevOut, typename Out, typename In>
439EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, const ExecutorBasePtr &parent) 450EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ErrorHandler error, const ExecutorBasePtr &parent)
440 : Executor<PrevOut, Out, In>(parent) 451 : Executor<PrevOut, Out, In>(error, parent)
441{ 452{
442 this->mFunc = each; 453 this->mFunc = each;
443} 454}
@@ -474,17 +485,16 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady()
474} 485}
475 486
476template<typename Out, typename In> 487template<typename Out, typename In>
477ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent) 488ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler error, const ExecutorBasePtr &parent)
478 : ThenExecutor<Out, In>(reduce, ErrorHandler(), parent) 489 : ThenExecutor<Out, In>(reduce, error, parent)
479{ 490{
480} 491}
481 492
482template<typename Out, typename ... In> 493template<typename Out, typename ... In>
483SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) 494SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent)
484 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) 495 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(errorHandler, parent)
485{ 496{
486 this->mFunc = then; 497 this->mFunc = then;
487 this->mErrorFunc = errorHandler;
488} 498}
489 499
490template<typename Out, typename ... In> 500template<typename Out, typename ... In>
@@ -494,27 +504,14 @@ void SyncThenExecutor<Out, In ...>::previousFutureReady()
494 assert(this->mPrevFuture->isFinished()); 504 assert(this->mPrevFuture->isFinished());
495 } 505 }
496 506
497 if (this->mPrevFuture && this->mPrevFuture->errorCode()) { 507 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
498 if (this->mErrorFunc) { 508 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
499 this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); 509 this->mResult->setFinished();
500 this->mResult->setFinished();
501 } else {
502 static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
503 //propagate error if no error handler is available
504 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
505 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
506 this->mResult->setFinished();
507 }
508 } else {
509 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
510 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
511 this->mResult->setFinished();
512 }
513} 510}
514 511
515template<typename PrevOut, typename Out, typename In> 512template<typename PrevOut, typename Out, typename In>
516SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent) 513SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent)
517 : Executor<PrevOut, Out, In>(parent) 514 : Executor<PrevOut, Out, In>(errorHandler, parent)
518{ 515{
519 this->mFunc = each; 516 this->mFunc = each;
520} 517}
@@ -536,8 +533,8 @@ void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady()
536} 533}
537 534
538template<typename Out, typename In> 535template<typename Out, typename In>
539SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent) 536SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent)
540 : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent) 537 : SyncThenExecutor<Out, In>(reduce, errorHandler, parent)
541{ 538{
542} 539}
543 540