summaryrefslogtreecommitdiffstats
path: root/async/src
diff options
context:
space:
mode:
authorDan Vrátil <dvratil@redhat.com>2015-04-01 21:02:34 +0200
committerDan Vrátil <dvratil@redhat.com>2015-04-01 21:19:48 +0200
commitb01fadba903056218f2a00c5e62e1dc8df062124 (patch)
treeadca72675973ab5d8253ab14ff94949824d6523e /async/src
parent94729eef71570e0b792a9afb95eeab7fd1eec56b (diff)
downloadsink-b01fadba903056218f2a00c5e62e1dc8df062124.tar.gz
sink-b01fadba903056218f2a00c5e62e1dc8df062124.zip
Async: support (re-)executing single Job multiple times
Storing Future and current Job progress directly in Executors means that we cannot re-execute finished job, or even execute the same Job multiple times in parallel. To do so, we need to make Executors stateless and track the state elsewhere. This change does that by moving the execution state from Executor to Execution class. Executors now only describe the tasks to execute, while Execution holds the current state of execution. New Execution is created every time Job::exec() is called. Execution holds reference to it's result (Future) and Executor which created the Execution. This ensures that Executor is not deleted when Job (which owns Executors) goes out of scope while the execution is still running. At the same time Future holds reference to relevant Execution, so that the Execution is deleted when all copies of Future referring result from the respective Execution are deleted.
Diffstat (limited to 'async/src')
-rw-r--r--async/src/async.cpp4
-rw-r--r--async/src/async.h242
-rw-r--r--async/src/future.cpp22
-rw-r--r--async/src/future.h71
4 files changed, 228 insertions, 111 deletions
diff --git a/async/src/async.cpp b/async/src/async.cpp
index 20ba4e6..5e26bd8 100644
--- a/async/src/async.cpp
+++ b/async/src/async.cpp
@@ -26,15 +26,11 @@ using namespace Async;
26 26
27Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) 27Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent)
28 : mPrev(parent) 28 : mPrev(parent)
29 , mResult(0)
30 , mIsRunning(false)
31 , mIsFinished(false)
32{ 29{
33} 30}
34 31
35Private::ExecutorBase::~ExecutorBase() 32Private::ExecutorBase::~ExecutorBase()
36{ 33{
37 delete mResult;
38} 34}
39 35
40 36
diff --git a/async/src/async.h b/async/src/async.h
index 2741341..c6ca9e7 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -93,6 +93,49 @@ namespace Private
93class ExecutorBase; 93class ExecutorBase;
94typedef QSharedPointer<ExecutorBase> ExecutorBasePtr; 94typedef QSharedPointer<ExecutorBase> ExecutorBasePtr;
95 95
96struct Execution {
97 Execution(const ExecutorBasePtr &executor)
98 : executor(executor)
99 , resultBase(nullptr)
100 , isRunning(false)
101 , isFinished(false)
102 {}
103
104 ~Execution()
105 {
106 if (resultBase) {
107 resultBase->releaseExecution();
108 delete resultBase;
109 }
110 prevExecution.reset();
111 }
112
113 void setFinished()
114 {
115 isFinished = true;
116 executor.clear();
117 }
118
119 template<typename T>
120 Async::Future<T>* result()
121 {
122 return static_cast<Async::Future<T>*>(resultBase);
123 }
124
125 void releaseFuture()
126 {
127 resultBase = 0;
128 }
129
130 ExecutorBasePtr executor;
131 FutureBase *resultBase;
132 bool isRunning;
133 bool isFinished;
134
135 ExecutionPtr prevExecution;
136};
137
138typedef QSharedPointer<Execution> ExecutionPtr;
96 139
97class ExecutorBase 140class ExecutorBase
98{ 141{
@@ -104,21 +147,15 @@ class ExecutorBase
104 147
105public: 148public:
106 virtual ~ExecutorBase(); 149 virtual ~ExecutorBase();
107 virtual void exec() = 0; 150 virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0;
108
109 inline FutureBase* result() const
110 {
111 return mResult;
112 }
113 151
114protected: 152protected:
115 ExecutorBase(const ExecutorBasePtr &parent); 153 ExecutorBase(const ExecutorBasePtr &parent);
116 154
117 ExecutorBasePtr mSelf; 155 template<typename T>
156 Async::Future<T>* createFuture(const ExecutionPtr &execution) const;
157
118 ExecutorBasePtr mPrev; 158 ExecutorBasePtr mPrev;
119 FutureBase *mResult;
120 bool mIsRunning;
121 bool mIsFinished;
122}; 159};
123 160
124template<typename PrevOut, typename Out, typename ... In> 161template<typename PrevOut, typename Out, typename ... In>
@@ -128,17 +165,13 @@ protected:
128 Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) 165 Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent)
129 : ExecutorBase(parent) 166 : ExecutorBase(parent)
130 , mErrorFunc(errorHandler) 167 , mErrorFunc(errorHandler)
131 , mPrevFuture(0)
132 {} 168 {}
133 virtual ~Executor() {} 169 virtual ~Executor() {}
134 inline Async::Future<PrevOut>* chainup(); 170 virtual void run(const ExecutionPtr &execution) = 0;
135 virtual void previousFutureReady() = 0;
136 171
137 void exec(); 172 ExecutionPtr exec(const ExecutorBasePtr &self);
138 173
139 //std::function<void(const In& ..., Async::Future<Out> &)> mFunc;
140 std::function<void(int, const QString &)> mErrorFunc; 174 std::function<void(int, const QString &)> mErrorFunc;
141 Async::Future<PrevOut> *mPrevFuture;
142}; 175};
143 176
144template<typename Out, typename ... In> 177template<typename Out, typename ... In>
@@ -146,7 +179,7 @@ class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out,
146{ 179{
147public: 180public:
148 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 181 ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
149 void previousFutureReady(); 182 void run(const ExecutionPtr &execution);
150private: 183private:
151 ThenTask<Out, In ...> mFunc; 184 ThenTask<Out, In ...> mFunc;
152}; 185};
@@ -156,7 +189,7 @@ class EachExecutor : public Executor<PrevOut, Out, In>
156{ 189{
157public: 190public:
158 EachExecutor(EachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 191 EachExecutor(EachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
159 void previousFutureReady(); 192 void run(const ExecutionPtr &execution);
160private: 193private:
161 EachTask<Out, In> mFunc; 194 EachTask<Out, In> mFunc;
162 QVector<Async::FutureWatcher<PrevOut>*> mFutureWatchers; 195 QVector<Async::FutureWatcher<PrevOut>*> mFutureWatchers;
@@ -176,11 +209,11 @@ class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type,
176{ 209{
177public: 210public:
178 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 211 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
179 void previousFutureReady(); 212 void run(const ExecutionPtr &execution);
180 213
181private: 214private:
182 void run(std::false_type); // !std::is_void<Out> 215 void run(const ExecutionPtr &execution, std::false_type); // !std::is_void<Out>
183 void run(std::true_type); // std::is_void<Out> 216 void run(const ExecutionPtr &execution, std::true_type); // std::is_void<Out>
184 SyncThenTask<Out, In ...> mFunc; 217 SyncThenTask<Out, In ...> mFunc;
185}; 218};
186 219
@@ -198,7 +231,7 @@ class SyncEachExecutor : public Executor<PrevOut, Out, In>
198{ 231{
199public: 232public:
200 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); 233 SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorHandler, const ExecutorBasePtr &parent);
201 void previousFutureReady(); 234 void run(const ExecutionPtr &execution);
202private: 235private:
203 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out> 236 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out>
204 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void<Out> 237 void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void<Out>
@@ -430,18 +463,10 @@ public:
430 463
431 Async::Future<Out> exec() 464 Async::Future<Out> exec()
432 { 465 {
433 // Have the top executor hold reference to itself during the execution. 466 Private::ExecutionPtr execution = mExecutor->exec(mExecutor);
434 // This ensures that even if the Job goes out of scope, the full Executor 467 Async::Future<Out> result = *execution->result<Out>();
435 // chain will not be destroyed.
436 // The executor will remove the self-reference once it's Future is finished.
437 mExecutor->mSelf = mExecutor;
438 mExecutor->exec();
439 return result();
440 }
441 468
442 Async::Future<Out> result() const 469 return result;
443 {
444 return *static_cast<Async::Future<Out>*>(mExecutor->result());
445 } 470 }
446 471
447private: 472private:
@@ -549,74 +574,75 @@ Job<Out> error(int errorCode, const QString &errorMessage)
549 574
550namespace Private { 575namespace Private {
551 576
552template<typename PrevOut, typename Out, typename ... In> 577template<typename T>
553Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup() 578Async::Future<T>* ExecutorBase::createFuture(const ExecutionPtr &execution) const
554{ 579{
555 if (mPrev) { 580 return new Async::Future<T>(execution);
556 mPrev->exec();
557 return static_cast<Async::Future<PrevOut>*>(mPrev->result());
558 } else {
559 return nullptr;
560 }
561} 581}
562 582
563template<typename PrevOut, typename Out, typename ... In> 583template<typename PrevOut, typename Out, typename ... In>
564void Executor<PrevOut, Out, In ...>::exec() 584ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self)
565{ 585{
566 // Don't chain up to job that already is running (or is finished) 586 // Passing 'self' to execution ensures that the Executor chain remains
567 if (mPrev && !mPrev->mIsRunning & !mPrev->mIsFinished) { 587 // valid until the entire execution is finished
568 mPrevFuture = chainup(); 588 ExecutionPtr execution = ExecutionPtr::create(self);
589
590 // chainup
591 execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr();
592 /*
569 } else if (mPrev && !mPrevFuture) { 593 } else if (mPrev && !mPrevFuture) {
570 // If previous job is running or finished, just get it's future 594 // If previous job is running or finished, just get it's future
571 mPrevFuture = static_cast<Async::Future<PrevOut>*>(mPrev->result()); 595 mPrevFuture = static_cast<Async::Future<PrevOut>*>(mPrev->result());
572 } 596 }
597 */
573 598
574 // Initialize our future 599 execution->resultBase = this->createFuture<Out>(execution);
575 mResult = new Async::Future<Out>();
576 auto fw = new Async::FutureWatcher<Out>(); 600 auto fw = new Async::FutureWatcher<Out>();
577 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, 601 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
578 [fw, this]() { 602 [fw, execution, this]() {
579 mIsFinished = true; 603 execution->setFinished();
580 mSelf.clear();
581 delete fw; 604 delete fw;
582 }); 605 });
583 fw->setFuture(*static_cast<Async::Future<Out>*>(mResult)); 606 fw->setFuture(*execution->result<Out>());
584 607
585 if (!mPrevFuture || mPrevFuture->isFinished()) { 608 Async::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr;
586 if (mPrevFuture && mPrevFuture->errorCode() != 0) { 609 if (!prevFuture || prevFuture->isFinished()) {
610 if (prevFuture && prevFuture->errorCode() != 0) {
587 if (mErrorFunc) { 611 if (mErrorFunc) {
588 mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage()); 612 mErrorFunc(prevFuture->errorCode(), prevFuture->errorMessage());
589 mResult->setFinished(); 613 execution->resultBase->setFinished();
590 mIsFinished = true; 614 execution->setFinished();
591 return; 615 return execution;
592 } else { 616 } else {
593 // Propagate the error to next caller 617 // Propagate the error to next caller
594 } 618 }
595 } 619 }
596 mIsRunning = true; 620 execution->isRunning = true;
597 previousFutureReady(); 621 run(execution);
598 } else { 622 } else {
599 auto futureWatcher = new Async::FutureWatcher<PrevOut>(); 623 auto futureWatcher = new Async::FutureWatcher<PrevOut>();
600 QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, 624 QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady,
601 [futureWatcher, this]() { 625 [futureWatcher, execution, this]() {
602 auto prevFuture = futureWatcher->future(); 626 auto prevFuture = futureWatcher->future();
603 assert(prevFuture.isFinished()); 627 assert(prevFuture.isFinished());
604 delete futureWatcher; 628 delete futureWatcher;
605 if (prevFuture.errorCode() != 0) { 629 if (prevFuture.errorCode() != 0) {
606 if (mErrorFunc) { 630 if (mErrorFunc) {
607 mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); 631 mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage());
608 mResult->setFinished(); 632 execution->resultBase->setFinished();
609 return; 633 return;
610 } else { 634 } else {
611 // Propagate the error to next caller 635 // Propagate the error to next caller
612 } 636 }
613 } 637 }
614 mIsRunning = true; 638 execution->isRunning = true;
615 previousFutureReady(); 639 run(execution);
616 }); 640 });
617 641
618 futureWatcher->setFuture(*mPrevFuture); 642 futureWatcher->setFuture(*static_cast<Async::Future<PrevOut>*>(prevFuture));
619 } 643 }
644
645 return execution;
620} 646}
621 647
622 648
@@ -628,14 +654,15 @@ ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler
628} 654}
629 655
630template<typename Out, typename ... In> 656template<typename Out, typename ... In>
631void ThenExecutor<Out, In ...>::previousFutureReady() 657void ThenExecutor<Out, In ...>::run(const ExecutionPtr &execution)
632{ 658{
633 if (this->mPrevFuture) { 659 Async::Future<typename detail::prevOut<In ...>::type> *prevFuture = nullptr;
634 assert(this->mPrevFuture->isFinished()); 660 if (execution->prevExecution) {
661 prevFuture = execution->prevExecution->result<typename detail::prevOut<In ...>::type>();
662 assert(prevFuture->isFinished());
635 } 663 }
636 664
637 this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., 665 this->mFunc(prevFuture ? prevFuture->value() : In() ..., *execution->result<Out>());
638 *static_cast<Async::Future<Out>*>(this->mResult));
639} 666}
640 667
641template<typename PrevOut, typename Out, typename In> 668template<typename PrevOut, typename Out, typename In>
@@ -646,33 +673,37 @@ EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ErrorHandle
646} 673}
647 674
648template<typename PrevOut, typename Out, typename In> 675template<typename PrevOut, typename Out, typename In>
649void EachExecutor<PrevOut, Out, In>::previousFutureReady() 676void EachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution)
650{ 677{
651 assert(this->mPrevFuture->isFinished()); 678 assert(execution->prevExecution);
652 auto out = static_cast<Async::Future<Out>*>(this->mResult); 679 auto prevFuture = execution->prevExecution->result<PrevOut>();
653 if (this->mPrevFuture->value().isEmpty()) { 680 assert(prevFuture->isFinished());
681
682 auto out = execution->result<Out>();
683 if (prevFuture->value().isEmpty()) {
654 out->setFinished(); 684 out->setFinished();
655 return; 685 return;
656 } 686 }
657 687
658 for (auto arg : this->mPrevFuture->value()) { 688 for (auto arg : prevFuture->value()) {
659 auto future = new Async::Future<Out>; 689 Async::Future<Out> future;
660 this->mFunc(arg, *future); 690 this->mFunc(arg, future);
661 auto fw = new Async::FutureWatcher<Out>(); 691 auto fw = new Async::FutureWatcher<Out>();
662 mFutureWatchers.append(fw); 692 mFutureWatchers.append(fw);
663 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, 693 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
664 [out, future, fw, this]() { 694 [out, fw, this]() {
665 assert(future->isFinished()); 695 auto future = fw->future();
696 assert(future.isFinished());
666 const int index = mFutureWatchers.indexOf(fw); 697 const int index = mFutureWatchers.indexOf(fw);
667 assert(index > -1); 698 assert(index > -1);
668 mFutureWatchers.removeAt(index); 699 mFutureWatchers.removeAt(index);
669 out->setValue(out->value() + future->value()); 700 out->setValue(out->value() + future.value());
670 delete future;
671 if (mFutureWatchers.isEmpty()) { 701 if (mFutureWatchers.isEmpty()) {
672 out->setFinished(); 702 out->setFinished();
673 } 703 }
704 delete fw;
674 }); 705 });
675 fw->setFuture(*future); 706 fw->setFuture(future);
676 } 707 }
677} 708}
678 709
@@ -690,27 +721,37 @@ SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then,
690} 721}
691 722
692template<typename Out, typename ... In> 723template<typename Out, typename ... In>
693void SyncThenExecutor<Out, In ...>::previousFutureReady() 724void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution)
694{ 725{
695 if (this->mPrevFuture) { 726 if (execution->prevExecution) {
696 assert(this->mPrevFuture->isFinished()); 727 assert(execution->prevExecution->resultBase->isFinished());
697 } 728 }
698 729
699 run(std::is_void<Out>()); 730 run(execution, std::is_void<Out>());
700 this->mResult->setFinished(); 731 execution->resultBase->setFinished();
701} 732}
702 733
703template<typename Out, typename ... In> 734template<typename Out, typename ... In>
704void SyncThenExecutor<Out, In ...>::run(std::false_type) 735void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::false_type)
705{ 736{
706 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); 737 Async::Future<typename detail::prevOut<In ...>::type> *prevFuture =
707 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); 738 execution->prevExecution
739 ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>()
740 : nullptr;
741 (void) prevFuture; // silence 'set but not used' warning
742 Async::Future<Out> *future = execution->result<Out>();
743 future->setValue(this->mFunc(prevFuture ? prevFuture->value() : In() ...));
708} 744}
709 745
710template<typename Out, typename ... In> 746template<typename Out, typename ... In>
711void SyncThenExecutor<Out, In ...>::run(std::true_type) 747void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::true_type)
712{ 748{
713 this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); 749 Async::Future<typename detail::prevOut<In ...>::type> *prevFuture =
750 execution->prevExecution
751 ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>()
752 : nullptr;
753 (void) prevFuture; // silence 'set but not used' warning
754 this->mFunc(prevFuture ? prevFuture->value() : In() ...);
714} 755}
715 756
716template<typename PrevOut, typename Out, typename In> 757template<typename PrevOut, typename Out, typename In>
@@ -721,16 +762,19 @@ SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each,
721} 762}
722 763
723template<typename PrevOut, typename Out, typename In> 764template<typename PrevOut, typename Out, typename In>
724void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady() 765void SyncEachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution)
725{ 766{
726 assert(this->mPrevFuture->isFinished()); 767 assert(execution->prevExecution);
727 auto out = static_cast<Async::Future<Out>*>(this->mResult); 768 auto *prevFuture = execution->prevExecution->result<PrevOut>();
728 if (this->mPrevFuture->value().isEmpty()) { 769 assert(prevFuture->isFinished());
770
771 auto out = execution->result<Out>();
772 if (prevFuture->value().isEmpty()) {
729 out->setFinished(); 773 out->setFinished();
730 return; 774 return;
731 } 775 }
732 776
733 for (auto arg : this->mPrevFuture->value()) { 777 for (auto arg : prevFuture->value()) {
734 run(out, arg, std::is_void<Out>()); 778 run(out, arg, std::is_void<Out>());
735 } 779 }
736 out->setFinished(); 780 out->setFinished();
@@ -743,7 +787,7 @@ void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> *out, const type
743} 787}
744 788
745template<typename PrevOut, typename Out, typename In> 789template<typename PrevOut, typename Out, typename In>
746void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> * /* unushed */, const typename PrevOut::value_type &arg, std::true_type) 790void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> * /* unused */, const typename PrevOut::value_type &arg, std::true_type)
747{ 791{
748 this->mFunc(arg); 792 this->mFunc(arg);
749} 793}
diff --git a/async/src/future.cpp b/async/src/future.cpp
index ab02baf..50a326a 100644
--- a/async/src/future.cpp
+++ b/async/src/future.cpp
@@ -16,6 +16,7 @@
16 */ 16 */
17 17
18#include "future.h" 18#include "future.h"
19#include "async.h"
19 20
20using namespace Async; 21using namespace Async;
21 22
@@ -31,6 +32,27 @@ FutureBase::~FutureBase()
31{ 32{
32} 33}
33 34
35FutureBase::PrivateBase::PrivateBase(const Private::ExecutionPtr &execution)
36 : mExecution(execution)
37{
38}
39
40FutureBase::PrivateBase::~PrivateBase()
41{
42 Private::ExecutionPtr executionPtr = mExecution.toStrongRef();
43 if (executionPtr) {
44 executionPtr->releaseFuture();
45 releaseExecution();
46 }
47}
48
49void FutureBase::PrivateBase::releaseExecution()
50{
51 mExecution.clear();
52}
53
54
55
34FutureWatcherBase::FutureWatcherBase(QObject *parent) 56FutureWatcherBase::FutureWatcherBase(QObject *parent)
35 : QObject(parent) 57 : QObject(parent)
36{ 58{
diff --git a/async/src/future.h b/async/src/future.h
index b580b5a..cadd96d 100644
--- a/async/src/future.h
+++ b/async/src/future.h
@@ -29,8 +29,18 @@ class QEventLoop;
29 29
30namespace Async { 30namespace Async {
31 31
32namespace Private {
33class Execution;
34class ExecutorBase;
35
36typedef QSharedPointer<Execution> ExecutionPtr;
37
38}
39
32class FutureBase 40class FutureBase
33{ 41{
42 friend class Async::Private::Execution;
43
34public: 44public:
35 virtual ~FutureBase(); 45 virtual ~FutureBase();
36 46
@@ -39,6 +49,20 @@ public:
39 virtual void setError(int code = 1, const QString &message = QString()) = 0; 49 virtual void setError(int code = 1, const QString &message = QString()) = 0;
40 50
41protected: 51protected:
52 virtual void releaseExecution() = 0;
53
54 class PrivateBase : public QSharedData
55 {
56 public:
57 PrivateBase(const Async::Private::ExecutionPtr &execution);
58 virtual ~PrivateBase();
59
60 void releaseExecution();
61
62 private:
63 QWeakPointer<Async::Private::Execution> mExecution;
64 };
65
42 FutureBase(); 66 FutureBase();
43 FutureBase(const FutureBase &other); 67 FutureBase(const FutureBase &other);
44}; 68};
@@ -104,9 +128,9 @@ public:
104 } 128 }
105 129
106protected: 130protected:
107 FutureGeneric() 131 FutureGeneric(const Async::Private::ExecutionPtr &execution)
108 : FutureBase() 132 : FutureBase()
109 , d(new Private) 133 , d(new Private(execution))
110 {} 134 {}
111 135
112 FutureGeneric(const FutureGeneric<T> &other) 136 FutureGeneric(const FutureGeneric<T> &other)
@@ -114,10 +138,15 @@ protected:
114 , d(other.d) 138 , d(other.d)
115 {} 139 {}
116 140
117 class Private : public QSharedData 141 class Private : public FutureBase::PrivateBase
118 { 142 {
119 public: 143 public:
120 Private() : QSharedData(), finished(false), errorCode(0) {} 144 Private(const Async::Private::ExecutionPtr &execution)
145 : FutureBase::PrivateBase(execution)
146 , finished(false)
147 , errorCode(0)
148 {}
149
121 typename std::conditional<std::is_void<T>::value, int /* dummy */, T>::type 150 typename std::conditional<std::is_void<T>::value, int /* dummy */, T>::type
122 value; 151 value;
123 152
@@ -129,6 +158,11 @@ protected:
129 158
130 QExplicitlySharedDataPointer<Private> d; 159 QExplicitlySharedDataPointer<Private> d;
131 160
161 void releaseExecution()
162 {
163 d->releaseExecution();
164 }
165
132 void addWatcher(FutureWatcher<T> *watcher) 166 void addWatcher(FutureWatcher<T> *watcher)
133 { 167 {
134 d->watchers.append(QPointer<FutureWatcher<T>>(watcher)); 168 d->watchers.append(QPointer<FutureWatcher<T>>(watcher));
@@ -138,9 +172,14 @@ protected:
138template<typename T> 172template<typename T>
139class Future : public FutureGeneric<T> 173class Future : public FutureGeneric<T>
140{ 174{
175 friend class Async::Private::ExecutorBase;
176
177 template<typename T_>
178 friend class Async::FutureWatcher;
179
141public: 180public:
142 Future() 181 Future()
143 : FutureGeneric<T>() 182 : FutureGeneric<T>(Async::Private::ExecutionPtr())
144 {} 183 {}
145 184
146 Future(const Future<T> &other) 185 Future(const Future<T> &other)
@@ -156,19 +195,35 @@ public:
156 { 195 {
157 return this->d->value; 196 return this->d->value;
158 } 197 }
198
199protected:
200 Future(const Async::Private::ExecutionPtr &execution)
201 : FutureGeneric<T>(execution)
202 {}
203
159}; 204};
160 205
161template<> 206template<>
162class Future<void> : public FutureGeneric<void> 207class Future<void> : public FutureGeneric<void>
163{ 208{
209 friend class Async::Private::ExecutorBase;
210
211 template<typename T_>
212 friend class Async::FutureWatcher;
213
164public: 214public:
165 Future() 215 Future()
166 : FutureGeneric<void>() 216 : FutureGeneric<void>(Async::Private::ExecutionPtr())
167 {} 217 {}
168 218
169 Future(const Future<void> &other) 219 Future(const Future<void> &other)
170 : FutureGeneric<void>(other) 220 : FutureGeneric<void>(other)
171 {} 221 {}
222
223protected:
224 Future(const Async::Private::ExecutionPtr &execution)
225 : FutureGeneric<void>(execution)
226 {}
172}; 227};
173 228
174 229
@@ -177,7 +232,7 @@ class FutureWatcherBase : public QObject
177 Q_OBJECT 232 Q_OBJECT
178 233
179protected: 234protected:
180 FutureWatcherBase(QObject *parent = 0); 235 FutureWatcherBase(QObject *parent = nullptr);
181 virtual ~FutureWatcherBase(); 236 virtual ~FutureWatcherBase();
182 237
183Q_SIGNALS: 238Q_SIGNALS:
@@ -190,7 +245,7 @@ class FutureWatcher : public FutureWatcherBase
190 friend class Async::FutureGeneric<T>; 245 friend class Async::FutureGeneric<T>;
191 246
192public: 247public:
193 FutureWatcher(QObject *parent = 0) 248 FutureWatcher(QObject *parent = nullptr)
194 : FutureWatcherBase(parent) 249 : FutureWatcherBase(parent)
195 {} 250 {}
196 251