diff options
author | Dan Vrátil <dvratil@redhat.com> | 2015-04-01 21:02:34 +0200 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2015-04-01 21:19:48 +0200 |
commit | b01fadba903056218f2a00c5e62e1dc8df062124 (patch) | |
tree | adca72675973ab5d8253ab14ff94949824d6523e /async/src | |
parent | 94729eef71570e0b792a9afb95eeab7fd1eec56b (diff) | |
download | sink-b01fadba903056218f2a00c5e62e1dc8df062124.tar.gz sink-b01fadba903056218f2a00c5e62e1dc8df062124.zip |
Async: support (re-)executing single Job multiple times
Storing Future and current Job progress directly in Executors means
that we cannot re-execute finished job, or even execute the same Job
multiple times in parallel. To do so, we need to make Executors stateless
and track the state elsewhere.
This change does that by moving the execution state from Executor to Execution
class. Executors now only describe the tasks to execute, while Execution holds
the current state of execution. New Execution is created every time Job::exec()
is called.
Execution holds reference to it's result (Future) and Executor which created
the Execution. This ensures that Executor is not deleted when Job (which owns
Executors) goes out of scope while the execution is still running. At the same
time Future holds reference to relevant Execution, so that the Execution is
deleted when all copies of Future referring result from the respective Execution
are deleted.
Diffstat (limited to 'async/src')
-rw-r--r-- | async/src/async.cpp | 4 | ||||
-rw-r--r-- | async/src/async.h | 242 | ||||
-rw-r--r-- | async/src/future.cpp | 22 | ||||
-rw-r--r-- | async/src/future.h | 71 |
4 files changed, 228 insertions, 111 deletions
diff --git a/async/src/async.cpp b/async/src/async.cpp index 20ba4e6..5e26bd8 100644 --- a/async/src/async.cpp +++ b/async/src/async.cpp | |||
@@ -26,15 +26,11 @@ using namespace Async; | |||
26 | 26 | ||
27 | Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) | 27 | Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) |
28 | : mPrev(parent) | 28 | : mPrev(parent) |
29 | , mResult(0) | ||
30 | , mIsRunning(false) | ||
31 | , mIsFinished(false) | ||
32 | { | 29 | { |
33 | } | 30 | } |
34 | 31 | ||
35 | Private::ExecutorBase::~ExecutorBase() | 32 | Private::ExecutorBase::~ExecutorBase() |
36 | { | 33 | { |
37 | delete mResult; | ||
38 | } | 34 | } |
39 | 35 | ||
40 | 36 | ||
diff --git a/async/src/async.h b/async/src/async.h index 2741341..c6ca9e7 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -93,6 +93,49 @@ namespace Private | |||
93 | class ExecutorBase; | 93 | class ExecutorBase; |
94 | typedef QSharedPointer<ExecutorBase> ExecutorBasePtr; | 94 | typedef QSharedPointer<ExecutorBase> ExecutorBasePtr; |
95 | 95 | ||
96 | struct Execution { | ||
97 | Execution(const ExecutorBasePtr &executor) | ||
98 | : executor(executor) | ||
99 | , resultBase(nullptr) | ||
100 | , isRunning(false) | ||
101 | , isFinished(false) | ||
102 | {} | ||
103 | |||
104 | ~Execution() | ||
105 | { | ||
106 | if (resultBase) { | ||
107 | resultBase->releaseExecution(); | ||
108 | delete resultBase; | ||
109 | } | ||
110 | prevExecution.reset(); | ||
111 | } | ||
112 | |||
113 | void setFinished() | ||
114 | { | ||
115 | isFinished = true; | ||
116 | executor.clear(); | ||
117 | } | ||
118 | |||
119 | template<typename T> | ||
120 | Async::Future<T>* result() | ||
121 | { | ||
122 | return static_cast<Async::Future<T>*>(resultBase); | ||
123 | } | ||
124 | |||
125 | void releaseFuture() | ||
126 | { | ||
127 | resultBase = 0; | ||
128 | } | ||
129 | |||
130 | ExecutorBasePtr executor; | ||
131 | FutureBase *resultBase; | ||
132 | bool isRunning; | ||
133 | bool isFinished; | ||
134 | |||
135 | ExecutionPtr prevExecution; | ||
136 | }; | ||
137 | |||
138 | typedef QSharedPointer<Execution> ExecutionPtr; | ||
96 | 139 | ||
97 | class ExecutorBase | 140 | class ExecutorBase |
98 | { | 141 | { |
@@ -104,21 +147,15 @@ class ExecutorBase | |||
104 | 147 | ||
105 | public: | 148 | public: |
106 | virtual ~ExecutorBase(); | 149 | virtual ~ExecutorBase(); |
107 | virtual void exec() = 0; | 150 | virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0; |
108 | |||
109 | inline FutureBase* result() const | ||
110 | { | ||
111 | return mResult; | ||
112 | } | ||
113 | 151 | ||
114 | protected: | 152 | protected: |
115 | ExecutorBase(const ExecutorBasePtr &parent); | 153 | ExecutorBase(const ExecutorBasePtr &parent); |
116 | 154 | ||
117 | ExecutorBasePtr mSelf; | 155 | template<typename T> |
156 | Async::Future<T>* createFuture(const ExecutionPtr &execution) const; | ||
157 | |||
118 | ExecutorBasePtr mPrev; | 158 | ExecutorBasePtr mPrev; |
119 | FutureBase *mResult; | ||
120 | bool mIsRunning; | ||
121 | bool mIsFinished; | ||
122 | }; | 159 | }; |
123 | 160 | ||
124 | template<typename PrevOut, typename Out, typename ... In> | 161 | template<typename PrevOut, typename Out, typename ... In> |
@@ -128,17 +165,13 @@ protected: | |||
128 | Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) | 165 | Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) |
129 | : ExecutorBase(parent) | 166 | : ExecutorBase(parent) |
130 | , mErrorFunc(errorHandler) | 167 | , mErrorFunc(errorHandler) |
131 | , mPrevFuture(0) | ||
132 | {} | 168 | {} |
133 | virtual ~Executor() {} | 169 | virtual ~Executor() {} |
134 | inline Async::Future<PrevOut>* chainup(); | 170 | virtual void run(const ExecutionPtr &execution) = 0; |
135 | virtual void previousFutureReady() = 0; | ||
136 | 171 | ||
137 | void exec(); | 172 | ExecutionPtr exec(const ExecutorBasePtr &self); |
138 | 173 | ||
139 | //std::function<void(const In& ..., Async::Future<Out> &)> mFunc; | ||
140 | std::function<void(int, const QString &)> mErrorFunc; | 174 | std::function<void(int, const QString &)> mErrorFunc; |
141 | Async::Future<PrevOut> *mPrevFuture; | ||
142 | }; | 175 | }; |
143 | 176 | ||
144 | template<typename Out, typename ... In> | 177 | template<typename Out, typename ... In> |
@@ -146,7 +179,7 @@ class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, | |||
146 | { | 179 | { |
147 | public: | 180 | public: |
148 | ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); | 181 | ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
149 | void previousFutureReady(); | 182 | void run(const ExecutionPtr &execution); |
150 | private: | 183 | private: |
151 | ThenTask<Out, In ...> mFunc; | 184 | ThenTask<Out, In ...> mFunc; |
152 | }; | 185 | }; |
@@ -156,7 +189,7 @@ class EachExecutor : public Executor<PrevOut, Out, In> | |||
156 | { | 189 | { |
157 | public: | 190 | public: |
158 | EachExecutor(EachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); | 191 | EachExecutor(EachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
159 | void previousFutureReady(); | 192 | void run(const ExecutionPtr &execution); |
160 | private: | 193 | private: |
161 | EachTask<Out, In> mFunc; | 194 | EachTask<Out, In> mFunc; |
162 | QVector<Async::FutureWatcher<PrevOut>*> mFutureWatchers; | 195 | QVector<Async::FutureWatcher<PrevOut>*> mFutureWatchers; |
@@ -176,11 +209,11 @@ class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, | |||
176 | { | 209 | { |
177 | public: | 210 | public: |
178 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); | 211 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
179 | void previousFutureReady(); | 212 | void run(const ExecutionPtr &execution); |
180 | 213 | ||
181 | private: | 214 | private: |
182 | void run(std::false_type); // !std::is_void<Out> | 215 | void run(const ExecutionPtr &execution, std::false_type); // !std::is_void<Out> |
183 | void run(std::true_type); // std::is_void<Out> | 216 | void run(const ExecutionPtr &execution, std::true_type); // std::is_void<Out> |
184 | SyncThenTask<Out, In ...> mFunc; | 217 | SyncThenTask<Out, In ...> mFunc; |
185 | }; | 218 | }; |
186 | 219 | ||
@@ -198,7 +231,7 @@ class SyncEachExecutor : public Executor<PrevOut, Out, In> | |||
198 | { | 231 | { |
199 | public: | 232 | public: |
200 | SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); | 233 | SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); |
201 | void previousFutureReady(); | 234 | void run(const ExecutionPtr &execution); |
202 | private: | 235 | private: |
203 | void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out> | 236 | void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out> |
204 | void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void<Out> | 237 | void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void<Out> |
@@ -430,18 +463,10 @@ public: | |||
430 | 463 | ||
431 | Async::Future<Out> exec() | 464 | Async::Future<Out> exec() |
432 | { | 465 | { |
433 | // Have the top executor hold reference to itself during the execution. | 466 | Private::ExecutionPtr execution = mExecutor->exec(mExecutor); |
434 | // This ensures that even if the Job goes out of scope, the full Executor | 467 | Async::Future<Out> result = *execution->result<Out>(); |
435 | // chain will not be destroyed. | ||
436 | // The executor will remove the self-reference once it's Future is finished. | ||
437 | mExecutor->mSelf = mExecutor; | ||
438 | mExecutor->exec(); | ||
439 | return result(); | ||
440 | } | ||
441 | 468 | ||
442 | Async::Future<Out> result() const | 469 | return result; |
443 | { | ||
444 | return *static_cast<Async::Future<Out>*>(mExecutor->result()); | ||
445 | } | 470 | } |
446 | 471 | ||
447 | private: | 472 | private: |
@@ -549,74 +574,75 @@ Job<Out> error(int errorCode, const QString &errorMessage) | |||
549 | 574 | ||
550 | namespace Private { | 575 | namespace Private { |
551 | 576 | ||
552 | template<typename PrevOut, typename Out, typename ... In> | 577 | template<typename T> |
553 | Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup() | 578 | Async::Future<T>* ExecutorBase::createFuture(const ExecutionPtr &execution) const |
554 | { | 579 | { |
555 | if (mPrev) { | 580 | return new Async::Future<T>(execution); |
556 | mPrev->exec(); | ||
557 | return static_cast<Async::Future<PrevOut>*>(mPrev->result()); | ||
558 | } else { | ||
559 | return nullptr; | ||
560 | } | ||
561 | } | 581 | } |
562 | 582 | ||
563 | template<typename PrevOut, typename Out, typename ... In> | 583 | template<typename PrevOut, typename Out, typename ... In> |
564 | void Executor<PrevOut, Out, In ...>::exec() | 584 | ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self) |
565 | { | 585 | { |
566 | // Don't chain up to job that already is running (or is finished) | 586 | // Passing 'self' to execution ensures that the Executor chain remains |
567 | if (mPrev && !mPrev->mIsRunning & !mPrev->mIsFinished) { | 587 | // valid until the entire execution is finished |
568 | mPrevFuture = chainup(); | 588 | ExecutionPtr execution = ExecutionPtr::create(self); |
589 | |||
590 | // chainup | ||
591 | execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr(); | ||
592 | /* | ||
569 | } else if (mPrev && !mPrevFuture) { | 593 | } else if (mPrev && !mPrevFuture) { |
570 | // If previous job is running or finished, just get it's future | 594 | // If previous job is running or finished, just get it's future |
571 | mPrevFuture = static_cast<Async::Future<PrevOut>*>(mPrev->result()); | 595 | mPrevFuture = static_cast<Async::Future<PrevOut>*>(mPrev->result()); |
572 | } | 596 | } |
597 | */ | ||
573 | 598 | ||
574 | // Initialize our future | 599 | execution->resultBase = this->createFuture<Out>(execution); |
575 | mResult = new Async::Future<Out>(); | ||
576 | auto fw = new Async::FutureWatcher<Out>(); | 600 | auto fw = new Async::FutureWatcher<Out>(); |
577 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | 601 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, |
578 | [fw, this]() { | 602 | [fw, execution, this]() { |
579 | mIsFinished = true; | 603 | execution->setFinished(); |
580 | mSelf.clear(); | ||
581 | delete fw; | 604 | delete fw; |
582 | }); | 605 | }); |
583 | fw->setFuture(*static_cast<Async::Future<Out>*>(mResult)); | 606 | fw->setFuture(*execution->result<Out>()); |
584 | 607 | ||
585 | if (!mPrevFuture || mPrevFuture->isFinished()) { | 608 | Async::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr; |
586 | if (mPrevFuture && mPrevFuture->errorCode() != 0) { | 609 | if (!prevFuture || prevFuture->isFinished()) { |
610 | if (prevFuture && prevFuture->errorCode() != 0) { | ||
587 | if (mErrorFunc) { | 611 | if (mErrorFunc) { |
588 | mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage()); | 612 | mErrorFunc(prevFuture->errorCode(), prevFuture->errorMessage()); |
589 | mResult->setFinished(); | 613 | execution->resultBase->setFinished(); |
590 | mIsFinished = true; | 614 | execution->setFinished(); |
591 | return; | 615 | return execution; |
592 | } else { | 616 | } else { |
593 | // Propagate the error to next caller | 617 | // Propagate the error to next caller |
594 | } | 618 | } |
595 | } | 619 | } |
596 | mIsRunning = true; | 620 | execution->isRunning = true; |
597 | previousFutureReady(); | 621 | run(execution); |
598 | } else { | 622 | } else { |
599 | auto futureWatcher = new Async::FutureWatcher<PrevOut>(); | 623 | auto futureWatcher = new Async::FutureWatcher<PrevOut>(); |
600 | QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, | 624 | QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, |
601 | [futureWatcher, this]() { | 625 | [futureWatcher, execution, this]() { |
602 | auto prevFuture = futureWatcher->future(); | 626 | auto prevFuture = futureWatcher->future(); |
603 | assert(prevFuture.isFinished()); | 627 | assert(prevFuture.isFinished()); |
604 | delete futureWatcher; | 628 | delete futureWatcher; |
605 | if (prevFuture.errorCode() != 0) { | 629 | if (prevFuture.errorCode() != 0) { |
606 | if (mErrorFunc) { | 630 | if (mErrorFunc) { |
607 | mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); | 631 | mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); |
608 | mResult->setFinished(); | 632 | execution->resultBase->setFinished(); |
609 | return; | 633 | return; |
610 | } else { | 634 | } else { |
611 | // Propagate the error to next caller | 635 | // Propagate the error to next caller |
612 | } | 636 | } |
613 | } | 637 | } |
614 | mIsRunning = true; | 638 | execution->isRunning = true; |
615 | previousFutureReady(); | 639 | run(execution); |
616 | }); | 640 | }); |
617 | 641 | ||
618 | futureWatcher->setFuture(*mPrevFuture); | 642 | futureWatcher->setFuture(*static_cast<Async::Future<PrevOut>*>(prevFuture)); |
619 | } | 643 | } |
644 | |||
645 | return execution; | ||
620 | } | 646 | } |
621 | 647 | ||
622 | 648 | ||
@@ -628,14 +654,15 @@ ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler | |||
628 | } | 654 | } |
629 | 655 | ||
630 | template<typename Out, typename ... In> | 656 | template<typename Out, typename ... In> |
631 | void ThenExecutor<Out, In ...>::previousFutureReady() | 657 | void ThenExecutor<Out, In ...>::run(const ExecutionPtr &execution) |
632 | { | 658 | { |
633 | if (this->mPrevFuture) { | 659 | Async::Future<typename detail::prevOut<In ...>::type> *prevFuture = nullptr; |
634 | assert(this->mPrevFuture->isFinished()); | 660 | if (execution->prevExecution) { |
661 | prevFuture = execution->prevExecution->result<typename detail::prevOut<In ...>::type>(); | ||
662 | assert(prevFuture->isFinished()); | ||
635 | } | 663 | } |
636 | 664 | ||
637 | this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., | 665 | this->mFunc(prevFuture ? prevFuture->value() : In() ..., *execution->result<Out>()); |
638 | *static_cast<Async::Future<Out>*>(this->mResult)); | ||
639 | } | 666 | } |
640 | 667 | ||
641 | template<typename PrevOut, typename Out, typename In> | 668 | template<typename PrevOut, typename Out, typename In> |
@@ -646,33 +673,37 @@ EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ErrorHandle | |||
646 | } | 673 | } |
647 | 674 | ||
648 | template<typename PrevOut, typename Out, typename In> | 675 | template<typename PrevOut, typename Out, typename In> |
649 | void EachExecutor<PrevOut, Out, In>::previousFutureReady() | 676 | void EachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution) |
650 | { | 677 | { |
651 | assert(this->mPrevFuture->isFinished()); | 678 | assert(execution->prevExecution); |
652 | auto out = static_cast<Async::Future<Out>*>(this->mResult); | 679 | auto prevFuture = execution->prevExecution->result<PrevOut>(); |
653 | if (this->mPrevFuture->value().isEmpty()) { | 680 | assert(prevFuture->isFinished()); |
681 | |||
682 | auto out = execution->result<Out>(); | ||
683 | if (prevFuture->value().isEmpty()) { | ||
654 | out->setFinished(); | 684 | out->setFinished(); |
655 | return; | 685 | return; |
656 | } | 686 | } |
657 | 687 | ||
658 | for (auto arg : this->mPrevFuture->value()) { | 688 | for (auto arg : prevFuture->value()) { |
659 | auto future = new Async::Future<Out>; | 689 | Async::Future<Out> future; |
660 | this->mFunc(arg, *future); | 690 | this->mFunc(arg, future); |
661 | auto fw = new Async::FutureWatcher<Out>(); | 691 | auto fw = new Async::FutureWatcher<Out>(); |
662 | mFutureWatchers.append(fw); | 692 | mFutureWatchers.append(fw); |
663 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | 693 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, |
664 | [out, future, fw, this]() { | 694 | [out, fw, this]() { |
665 | assert(future->isFinished()); | 695 | auto future = fw->future(); |
696 | assert(future.isFinished()); | ||
666 | const int index = mFutureWatchers.indexOf(fw); | 697 | const int index = mFutureWatchers.indexOf(fw); |
667 | assert(index > -1); | 698 | assert(index > -1); |
668 | mFutureWatchers.removeAt(index); | 699 | mFutureWatchers.removeAt(index); |
669 | out->setValue(out->value() + future->value()); | 700 | out->setValue(out->value() + future.value()); |
670 | delete future; | ||
671 | if (mFutureWatchers.isEmpty()) { | 701 | if (mFutureWatchers.isEmpty()) { |
672 | out->setFinished(); | 702 | out->setFinished(); |
673 | } | 703 | } |
704 | delete fw; | ||
674 | }); | 705 | }); |
675 | fw->setFuture(*future); | 706 | fw->setFuture(future); |
676 | } | 707 | } |
677 | } | 708 | } |
678 | 709 | ||
@@ -690,27 +721,37 @@ SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, | |||
690 | } | 721 | } |
691 | 722 | ||
692 | template<typename Out, typename ... In> | 723 | template<typename Out, typename ... In> |
693 | void SyncThenExecutor<Out, In ...>::previousFutureReady() | 724 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution) |
694 | { | 725 | { |
695 | if (this->mPrevFuture) { | 726 | if (execution->prevExecution) { |
696 | assert(this->mPrevFuture->isFinished()); | 727 | assert(execution->prevExecution->resultBase->isFinished()); |
697 | } | 728 | } |
698 | 729 | ||
699 | run(std::is_void<Out>()); | 730 | run(execution, std::is_void<Out>()); |
700 | this->mResult->setFinished(); | 731 | execution->resultBase->setFinished(); |
701 | } | 732 | } |
702 | 733 | ||
703 | template<typename Out, typename ... In> | 734 | template<typename Out, typename ... In> |
704 | void SyncThenExecutor<Out, In ...>::run(std::false_type) | 735 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::false_type) |
705 | { | 736 | { |
706 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | 737 | Async::Future<typename detail::prevOut<In ...>::type> *prevFuture = |
707 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | 738 | execution->prevExecution |
739 | ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>() | ||
740 | : nullptr; | ||
741 | (void) prevFuture; // silence 'set but not used' warning | ||
742 | Async::Future<Out> *future = execution->result<Out>(); | ||
743 | future->setValue(this->mFunc(prevFuture ? prevFuture->value() : In() ...)); | ||
708 | } | 744 | } |
709 | 745 | ||
710 | template<typename Out, typename ... In> | 746 | template<typename Out, typename ... In> |
711 | void SyncThenExecutor<Out, In ...>::run(std::true_type) | 747 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::true_type) |
712 | { | 748 | { |
713 | this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | 749 | Async::Future<typename detail::prevOut<In ...>::type> *prevFuture = |
750 | execution->prevExecution | ||
751 | ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>() | ||
752 | : nullptr; | ||
753 | (void) prevFuture; // silence 'set but not used' warning | ||
754 | this->mFunc(prevFuture ? prevFuture->value() : In() ...); | ||
714 | } | 755 | } |
715 | 756 | ||
716 | template<typename PrevOut, typename Out, typename In> | 757 | template<typename PrevOut, typename Out, typename In> |
@@ -721,16 +762,19 @@ SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, | |||
721 | } | 762 | } |
722 | 763 | ||
723 | template<typename PrevOut, typename Out, typename In> | 764 | template<typename PrevOut, typename Out, typename In> |
724 | void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady() | 765 | void SyncEachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution) |
725 | { | 766 | { |
726 | assert(this->mPrevFuture->isFinished()); | 767 | assert(execution->prevExecution); |
727 | auto out = static_cast<Async::Future<Out>*>(this->mResult); | 768 | auto *prevFuture = execution->prevExecution->result<PrevOut>(); |
728 | if (this->mPrevFuture->value().isEmpty()) { | 769 | assert(prevFuture->isFinished()); |
770 | |||
771 | auto out = execution->result<Out>(); | ||
772 | if (prevFuture->value().isEmpty()) { | ||
729 | out->setFinished(); | 773 | out->setFinished(); |
730 | return; | 774 | return; |
731 | } | 775 | } |
732 | 776 | ||
733 | for (auto arg : this->mPrevFuture->value()) { | 777 | for (auto arg : prevFuture->value()) { |
734 | run(out, arg, std::is_void<Out>()); | 778 | run(out, arg, std::is_void<Out>()); |
735 | } | 779 | } |
736 | out->setFinished(); | 780 | out->setFinished(); |
@@ -743,7 +787,7 @@ void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> *out, const type | |||
743 | } | 787 | } |
744 | 788 | ||
745 | template<typename PrevOut, typename Out, typename In> | 789 | template<typename PrevOut, typename Out, typename In> |
746 | void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> * /* unushed */, const typename PrevOut::value_type &arg, std::true_type) | 790 | void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> * /* unused */, const typename PrevOut::value_type &arg, std::true_type) |
747 | { | 791 | { |
748 | this->mFunc(arg); | 792 | this->mFunc(arg); |
749 | } | 793 | } |
diff --git a/async/src/future.cpp b/async/src/future.cpp index ab02baf..50a326a 100644 --- a/async/src/future.cpp +++ b/async/src/future.cpp | |||
@@ -16,6 +16,7 @@ | |||
16 | */ | 16 | */ |
17 | 17 | ||
18 | #include "future.h" | 18 | #include "future.h" |
19 | #include "async.h" | ||
19 | 20 | ||
20 | using namespace Async; | 21 | using namespace Async; |
21 | 22 | ||
@@ -31,6 +32,27 @@ FutureBase::~FutureBase() | |||
31 | { | 32 | { |
32 | } | 33 | } |
33 | 34 | ||
35 | FutureBase::PrivateBase::PrivateBase(const Private::ExecutionPtr &execution) | ||
36 | : mExecution(execution) | ||
37 | { | ||
38 | } | ||
39 | |||
40 | FutureBase::PrivateBase::~PrivateBase() | ||
41 | { | ||
42 | Private::ExecutionPtr executionPtr = mExecution.toStrongRef(); | ||
43 | if (executionPtr) { | ||
44 | executionPtr->releaseFuture(); | ||
45 | releaseExecution(); | ||
46 | } | ||
47 | } | ||
48 | |||
49 | void FutureBase::PrivateBase::releaseExecution() | ||
50 | { | ||
51 | mExecution.clear(); | ||
52 | } | ||
53 | |||
54 | |||
55 | |||
34 | FutureWatcherBase::FutureWatcherBase(QObject *parent) | 56 | FutureWatcherBase::FutureWatcherBase(QObject *parent) |
35 | : QObject(parent) | 57 | : QObject(parent) |
36 | { | 58 | { |
diff --git a/async/src/future.h b/async/src/future.h index b580b5a..cadd96d 100644 --- a/async/src/future.h +++ b/async/src/future.h | |||
@@ -29,8 +29,18 @@ class QEventLoop; | |||
29 | 29 | ||
30 | namespace Async { | 30 | namespace Async { |
31 | 31 | ||
32 | namespace Private { | ||
33 | class Execution; | ||
34 | class ExecutorBase; | ||
35 | |||
36 | typedef QSharedPointer<Execution> ExecutionPtr; | ||
37 | |||
38 | } | ||
39 | |||
32 | class FutureBase | 40 | class FutureBase |
33 | { | 41 | { |
42 | friend class Async::Private::Execution; | ||
43 | |||
34 | public: | 44 | public: |
35 | virtual ~FutureBase(); | 45 | virtual ~FutureBase(); |
36 | 46 | ||
@@ -39,6 +49,20 @@ public: | |||
39 | virtual void setError(int code = 1, const QString &message = QString()) = 0; | 49 | virtual void setError(int code = 1, const QString &message = QString()) = 0; |
40 | 50 | ||
41 | protected: | 51 | protected: |
52 | virtual void releaseExecution() = 0; | ||
53 | |||
54 | class PrivateBase : public QSharedData | ||
55 | { | ||
56 | public: | ||
57 | PrivateBase(const Async::Private::ExecutionPtr &execution); | ||
58 | virtual ~PrivateBase(); | ||
59 | |||
60 | void releaseExecution(); | ||
61 | |||
62 | private: | ||
63 | QWeakPointer<Async::Private::Execution> mExecution; | ||
64 | }; | ||
65 | |||
42 | FutureBase(); | 66 | FutureBase(); |
43 | FutureBase(const FutureBase &other); | 67 | FutureBase(const FutureBase &other); |
44 | }; | 68 | }; |
@@ -104,9 +128,9 @@ public: | |||
104 | } | 128 | } |
105 | 129 | ||
106 | protected: | 130 | protected: |
107 | FutureGeneric() | 131 | FutureGeneric(const Async::Private::ExecutionPtr &execution) |
108 | : FutureBase() | 132 | : FutureBase() |
109 | , d(new Private) | 133 | , d(new Private(execution)) |
110 | {} | 134 | {} |
111 | 135 | ||
112 | FutureGeneric(const FutureGeneric<T> &other) | 136 | FutureGeneric(const FutureGeneric<T> &other) |
@@ -114,10 +138,15 @@ protected: | |||
114 | , d(other.d) | 138 | , d(other.d) |
115 | {} | 139 | {} |
116 | 140 | ||
117 | class Private : public QSharedData | 141 | class Private : public FutureBase::PrivateBase |
118 | { | 142 | { |
119 | public: | 143 | public: |
120 | Private() : QSharedData(), finished(false), errorCode(0) {} | 144 | Private(const Async::Private::ExecutionPtr &execution) |
145 | : FutureBase::PrivateBase(execution) | ||
146 | , finished(false) | ||
147 | , errorCode(0) | ||
148 | {} | ||
149 | |||
121 | typename std::conditional<std::is_void<T>::value, int /* dummy */, T>::type | 150 | typename std::conditional<std::is_void<T>::value, int /* dummy */, T>::type |
122 | value; | 151 | value; |
123 | 152 | ||
@@ -129,6 +158,11 @@ protected: | |||
129 | 158 | ||
130 | QExplicitlySharedDataPointer<Private> d; | 159 | QExplicitlySharedDataPointer<Private> d; |
131 | 160 | ||
161 | void releaseExecution() | ||
162 | { | ||
163 | d->releaseExecution(); | ||
164 | } | ||
165 | |||
132 | void addWatcher(FutureWatcher<T> *watcher) | 166 | void addWatcher(FutureWatcher<T> *watcher) |
133 | { | 167 | { |
134 | d->watchers.append(QPointer<FutureWatcher<T>>(watcher)); | 168 | d->watchers.append(QPointer<FutureWatcher<T>>(watcher)); |
@@ -138,9 +172,14 @@ protected: | |||
138 | template<typename T> | 172 | template<typename T> |
139 | class Future : public FutureGeneric<T> | 173 | class Future : public FutureGeneric<T> |
140 | { | 174 | { |
175 | friend class Async::Private::ExecutorBase; | ||
176 | |||
177 | template<typename T_> | ||
178 | friend class Async::FutureWatcher; | ||
179 | |||
141 | public: | 180 | public: |
142 | Future() | 181 | Future() |
143 | : FutureGeneric<T>() | 182 | : FutureGeneric<T>(Async::Private::ExecutionPtr()) |
144 | {} | 183 | {} |
145 | 184 | ||
146 | Future(const Future<T> &other) | 185 | Future(const Future<T> &other) |
@@ -156,19 +195,35 @@ public: | |||
156 | { | 195 | { |
157 | return this->d->value; | 196 | return this->d->value; |
158 | } | 197 | } |
198 | |||
199 | protected: | ||
200 | Future(const Async::Private::ExecutionPtr &execution) | ||
201 | : FutureGeneric<T>(execution) | ||
202 | {} | ||
203 | |||
159 | }; | 204 | }; |
160 | 205 | ||
161 | template<> | 206 | template<> |
162 | class Future<void> : public FutureGeneric<void> | 207 | class Future<void> : public FutureGeneric<void> |
163 | { | 208 | { |
209 | friend class Async::Private::ExecutorBase; | ||
210 | |||
211 | template<typename T_> | ||
212 | friend class Async::FutureWatcher; | ||
213 | |||
164 | public: | 214 | public: |
165 | Future() | 215 | Future() |
166 | : FutureGeneric<void>() | 216 | : FutureGeneric<void>(Async::Private::ExecutionPtr()) |
167 | {} | 217 | {} |
168 | 218 | ||
169 | Future(const Future<void> &other) | 219 | Future(const Future<void> &other) |
170 | : FutureGeneric<void>(other) | 220 | : FutureGeneric<void>(other) |
171 | {} | 221 | {} |
222 | |||
223 | protected: | ||
224 | Future(const Async::Private::ExecutionPtr &execution) | ||
225 | : FutureGeneric<void>(execution) | ||
226 | {} | ||
172 | }; | 227 | }; |
173 | 228 | ||
174 | 229 | ||
@@ -177,7 +232,7 @@ class FutureWatcherBase : public QObject | |||
177 | Q_OBJECT | 232 | Q_OBJECT |
178 | 233 | ||
179 | protected: | 234 | protected: |
180 | FutureWatcherBase(QObject *parent = 0); | 235 | FutureWatcherBase(QObject *parent = nullptr); |
181 | virtual ~FutureWatcherBase(); | 236 | virtual ~FutureWatcherBase(); |
182 | 237 | ||
183 | Q_SIGNALS: | 238 | Q_SIGNALS: |
@@ -190,7 +245,7 @@ class FutureWatcher : public FutureWatcherBase | |||
190 | friend class Async::FutureGeneric<T>; | 245 | friend class Async::FutureGeneric<T>; |
191 | 246 | ||
192 | public: | 247 | public: |
193 | FutureWatcher(QObject *parent = 0) | 248 | FutureWatcher(QObject *parent = nullptr) |
194 | : FutureWatcherBase(parent) | 249 | : FutureWatcherBase(parent) |
195 | {} | 250 | {} |
196 | 251 | ||