diff options
Diffstat (limited to 'async/src')
-rw-r--r-- | async/src/async.h | 123 |
1 files changed, 60 insertions, 63 deletions
diff --git a/async/src/async.h b/async/src/async.h index 336bae2..8523df3 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -33,7 +33,6 @@ | |||
33 | 33 | ||
34 | 34 | ||
35 | /* | 35 | /* |
36 | * TODO: error continuation on .then and others. | ||
37 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally | 36 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally |
38 | */ | 37 | */ |
39 | namespace Async { | 38 | namespace Async { |
@@ -97,8 +96,9 @@ template<typename PrevOut, typename Out, typename ... In> | |||
97 | class Executor : public ExecutorBase | 96 | class Executor : public ExecutorBase |
98 | { | 97 | { |
99 | protected: | 98 | protected: |
100 | Executor(const Private::ExecutorBasePtr &parent) | 99 | Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) |
101 | : ExecutorBase(parent) | 100 | : ExecutorBase(parent) |
101 | , mErrorFunc(errorHandler) | ||
102 | , mPrevFuture(0) | 102 | , mPrevFuture(0) |
103 | {} | 103 | {} |
104 | virtual ~Executor() {} | 104 | virtual ~Executor() {} |
@@ -116,7 +116,7 @@ template<typename Out, typename ... In> | |||
116 | class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...> | 116 | class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...> |
117 | { | 117 | { |
118 | public: | 118 | public: |
119 | ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); | 119 | ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
120 | void previousFutureReady(); | 120 | void previousFutureReady(); |
121 | }; | 121 | }; |
122 | 122 | ||
@@ -124,7 +124,7 @@ template<typename PrevOut, typename Out, typename In> | |||
124 | class EachExecutor : public Executor<PrevOut, Out, In> | 124 | class EachExecutor : public Executor<PrevOut, Out, In> |
125 | { | 125 | { |
126 | public: | 126 | public: |
127 | EachExecutor(EachTask<Out, In> each, const ExecutorBasePtr &parent); | 127 | EachExecutor(EachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
128 | void previousFutureReady(); | 128 | void previousFutureReady(); |
129 | 129 | ||
130 | private: | 130 | private: |
@@ -135,14 +135,14 @@ template<typename Out, typename In> | |||
135 | class ReduceExecutor : public ThenExecutor<Out, In> | 135 | class ReduceExecutor : public ThenExecutor<Out, In> |
136 | { | 136 | { |
137 | public: | 137 | public: |
138 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); | 138 | ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
139 | }; | 139 | }; |
140 | 140 | ||
141 | template<typename Out, typename ... In> | 141 | template<typename Out, typename ... In> |
142 | class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...> | 142 | class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...> |
143 | { | 143 | { |
144 | public: | 144 | public: |
145 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); | 145 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
146 | void previousFutureReady(); | 146 | void previousFutureReady(); |
147 | protected: | 147 | protected: |
148 | SyncThenTask<Out, In ...> mFunc; | 148 | SyncThenTask<Out, In ...> mFunc; |
@@ -152,14 +152,14 @@ template<typename Out, typename In> | |||
152 | class SyncReduceExecutor : public SyncThenExecutor<Out, In> | 152 | class SyncReduceExecutor : public SyncThenExecutor<Out, In> |
153 | { | 153 | { |
154 | public: | 154 | public: |
155 | SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr()); | 155 | SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
156 | }; | 156 | }; |
157 | 157 | ||
158 | template<typename PrevOut, typename Out, typename In> | 158 | template<typename PrevOut, typename Out, typename In> |
159 | class SyncEachExecutor : public Executor<PrevOut, Out, In> | 159 | class SyncEachExecutor : public Executor<PrevOut, Out, In> |
160 | { | 160 | { |
161 | public: | 161 | public: |
162 | SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr()); | 162 | SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
163 | void previousFutureReady(); | 163 | void previousFutureReady(); |
164 | protected: | 164 | protected: |
165 | SyncEachTask<Out, In> mFunc; | 165 | SyncEachTask<Out, In> mFunc; |
@@ -190,12 +190,14 @@ Job<Out> start(ThenTask<Out> func); | |||
190 | template<typename Out> | 190 | template<typename Out> |
191 | Job<Out> null() | 191 | Job<Out> null() |
192 | { | 192 | { |
193 | return Async::start<Out>([](Async::Future<Out> &future) {future.setFinished();}); | 193 | return Async::start<Out>([](Async::Future<Out> &future) { |
194 | future.setFinished(); | ||
195 | }); | ||
194 | } | 196 | } |
195 | 197 | ||
196 | /** | 198 | /** |
197 | * An error job. | 199 | * An error job. |
198 | * | 200 | * |
199 | * An async error. | 201 | * An async error. |
200 | * | 202 | * |
201 | */ | 203 | */ |
@@ -288,35 +290,35 @@ public: | |||
288 | } | 290 | } |
289 | 291 | ||
290 | template<typename OutOther, typename InOther> | 292 | template<typename OutOther, typename InOther> |
291 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) | 293 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) |
292 | { | 294 | { |
293 | eachInvariants<OutOther>(); | 295 | eachInvariants<OutOther>(); |
294 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 296 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
295 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); | 297 | new Private::EachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor))); |
296 | } | 298 | } |
297 | 299 | ||
298 | template<typename OutOther, typename InOther> | 300 | template<typename OutOther, typename InOther> |
299 | Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func) | 301 | Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) |
300 | { | 302 | { |
301 | eachInvariants<OutOther>(); | 303 | eachInvariants<OutOther>(); |
302 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 304 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
303 | new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor))); | 305 | new Private::SyncEachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor))); |
304 | } | 306 | } |
305 | 307 | ||
306 | template<typename OutOther, typename InOther> | 308 | template<typename OutOther, typename InOther> |
307 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) | 309 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) |
308 | { | 310 | { |
309 | reduceInvariants<InOther>(); | 311 | reduceInvariants<InOther>(); |
310 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 312 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
311 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); | 313 | new Private::ReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); |
312 | } | 314 | } |
313 | 315 | ||
314 | template<typename OutOther, typename InOther> | 316 | template<typename OutOther, typename InOther> |
315 | Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func) | 317 | Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) |
316 | { | 318 | { |
317 | reduceInvariants<InOther>(); | 319 | reduceInvariants<InOther>(); |
318 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 320 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
319 | new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor))); | 321 | new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); |
320 | } | 322 | } |
321 | 323 | ||
322 | Async::Future<Out> exec() | 324 | Async::Future<Out> exec() |
@@ -364,13 +366,13 @@ namespace Async { | |||
364 | template<typename Out> | 366 | template<typename Out> |
365 | Job<Out> start(ThenTask<Out> func) | 367 | Job<Out> start(ThenTask<Out> func) |
366 | { | 368 | { |
367 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); | 369 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func, ErrorHandler(), Private::ExecutorBasePtr()))); |
368 | } | 370 | } |
369 | 371 | ||
370 | template<typename Out> | 372 | template<typename Out> |
371 | Job<Out> start(SyncThenTask<Out> func) | 373 | Job<Out> start(SyncThenTask<Out> func) |
372 | { | 374 | { |
373 | return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func))); | 375 | return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func, ErrorHandler(), Private::ExecutorBasePtr()))); |
374 | } | 376 | } |
375 | 377 | ||
376 | namespace Private { | 378 | namespace Private { |
@@ -390,27 +392,47 @@ template<typename PrevOut, typename Out, typename ... In> | |||
390 | void Executor<PrevOut, Out, In ...>::exec() | 392 | void Executor<PrevOut, Out, In ...>::exec() |
391 | { | 393 | { |
392 | mPrevFuture = chainup(); | 394 | mPrevFuture = chainup(); |
395 | // Initialize our future | ||
393 | mResult = new Async::Future<Out>(); | 396 | mResult = new Async::Future<Out>(); |
394 | if (!mPrevFuture || mPrevFuture->isFinished()) { | 397 | if (!mPrevFuture || mPrevFuture->isFinished()) { |
398 | if (mPrevFuture && mPrevFuture->errorCode() != 0) { | ||
399 | if (mErrorFunc) { | ||
400 | mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage()); | ||
401 | mResult->setFinished(); | ||
402 | return; | ||
403 | } else { | ||
404 | // Propagate the error to next caller | ||
405 | } | ||
406 | } | ||
395 | previousFutureReady(); | 407 | previousFutureReady(); |
396 | } else { | 408 | } else { |
397 | auto futureWatcher = new Async::FutureWatcher<PrevOut>(); | 409 | auto futureWatcher = new Async::FutureWatcher<PrevOut>(); |
398 | QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, | 410 | QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, |
399 | [futureWatcher, this]() { | 411 | [futureWatcher, this]() { |
400 | assert(futureWatcher->future().isFinished()); | 412 | auto prevFuture = futureWatcher->future(); |
413 | assert(prevFuture.isFinished()); | ||
401 | futureWatcher->deleteLater(); | 414 | futureWatcher->deleteLater(); |
415 | if (prevFuture.errorCode() != 0) { | ||
416 | if (mErrorFunc) { | ||
417 | mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); | ||
418 | mResult->setFinished(); | ||
419 | return; | ||
420 | } else { | ||
421 | // Propagate the error to next caller | ||
422 | } | ||
423 | } | ||
402 | previousFutureReady(); | 424 | previousFutureReady(); |
403 | }); | 425 | }); |
426 | |||
404 | futureWatcher->setFuture(*mPrevFuture); | 427 | futureWatcher->setFuture(*mPrevFuture); |
405 | } | 428 | } |
406 | } | 429 | } |
407 | 430 | ||
408 | template<typename Out, typename ... In> | 431 | template<typename Out, typename ... In> |
409 | ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent) | 432 | ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent) |
410 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | 433 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(error, parent) |
411 | { | 434 | { |
412 | this->mFunc = then; | 435 | this->mFunc = then; |
413 | this->mErrorFunc = error; | ||
414 | } | 436 | } |
415 | 437 | ||
416 | template<typename Out, typename ... In> | 438 | template<typename Out, typename ... In> |
@@ -419,25 +441,14 @@ void ThenExecutor<Out, In ...>::previousFutureReady() | |||
419 | if (this->mPrevFuture) { | 441 | if (this->mPrevFuture) { |
420 | assert(this->mPrevFuture->isFinished()); | 442 | assert(this->mPrevFuture->isFinished()); |
421 | } | 443 | } |
422 | if (this->mPrevFuture && this->mPrevFuture->errorCode()) { | 444 | |
423 | if (this->mErrorFunc) { | 445 | this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., |
424 | this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | 446 | *static_cast<Async::Future<Out>*>(this->mResult)); |
425 | this->mResult->setFinished(); | ||
426 | } else { | ||
427 | static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
428 | //propagate error if no error handler is available | ||
429 | this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., | ||
430 | *static_cast<Async::Future<Out>*>(this->mResult)); | ||
431 | } | ||
432 | } else { | ||
433 | this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., | ||
434 | *static_cast<Async::Future<Out>*>(this->mResult)); | ||
435 | } | ||
436 | } | 447 | } |
437 | 448 | ||
438 | template<typename PrevOut, typename Out, typename In> | 449 | template<typename PrevOut, typename Out, typename In> |
439 | EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, const ExecutorBasePtr &parent) | 450 | EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ErrorHandler error, const ExecutorBasePtr &parent) |
440 | : Executor<PrevOut, Out, In>(parent) | 451 | : Executor<PrevOut, Out, In>(error, parent) |
441 | { | 452 | { |
442 | this->mFunc = each; | 453 | this->mFunc = each; |
443 | } | 454 | } |
@@ -474,17 +485,16 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady() | |||
474 | } | 485 | } |
475 | 486 | ||
476 | template<typename Out, typename In> | 487 | template<typename Out, typename In> |
477 | ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent) | 488 | ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler error, const ExecutorBasePtr &parent) |
478 | : ThenExecutor<Out, In>(reduce, ErrorHandler(), parent) | 489 | : ThenExecutor<Out, In>(reduce, error, parent) |
479 | { | 490 | { |
480 | } | 491 | } |
481 | 492 | ||
482 | template<typename Out, typename ... In> | 493 | template<typename Out, typename ... In> |
483 | SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) | 494 | SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) |
484 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | 495 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(errorHandler, parent) |
485 | { | 496 | { |
486 | this->mFunc = then; | 497 | this->mFunc = then; |
487 | this->mErrorFunc = errorHandler; | ||
488 | } | 498 | } |
489 | 499 | ||
490 | template<typename Out, typename ... In> | 500 | template<typename Out, typename ... In> |
@@ -494,27 +504,14 @@ void SyncThenExecutor<Out, In ...>::previousFutureReady() | |||
494 | assert(this->mPrevFuture->isFinished()); | 504 | assert(this->mPrevFuture->isFinished()); |
495 | } | 505 | } |
496 | 506 | ||
497 | if (this->mPrevFuture && this->mPrevFuture->errorCode()) { | 507 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); |
498 | if (this->mErrorFunc) { | 508 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); |
499 | this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | 509 | this->mResult->setFinished(); |
500 | this->mResult->setFinished(); | ||
501 | } else { | ||
502 | static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
503 | //propagate error if no error handler is available | ||
504 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
505 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
506 | this->mResult->setFinished(); | ||
507 | } | ||
508 | } else { | ||
509 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
510 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
511 | this->mResult->setFinished(); | ||
512 | } | ||
513 | } | 510 | } |
514 | 511 | ||
515 | template<typename PrevOut, typename Out, typename In> | 512 | template<typename PrevOut, typename Out, typename In> |
516 | SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent) | 513 | SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent) |
517 | : Executor<PrevOut, Out, In>(parent) | 514 | : Executor<PrevOut, Out, In>(errorHandler, parent) |
518 | { | 515 | { |
519 | this->mFunc = each; | 516 | this->mFunc = each; |
520 | } | 517 | } |
@@ -536,8 +533,8 @@ void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady() | |||
536 | } | 533 | } |
537 | 534 | ||
538 | template<typename Out, typename In> | 535 | template<typename Out, typename In> |
539 | SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent) | 536 | SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent) |
540 | : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent) | 537 | : SyncThenExecutor<Out, In>(reduce, errorHandler, parent) |
541 | { | 538 | { |
542 | } | 539 | } |
543 | 540 | ||