From b01fadba903056218f2a00c5e62e1dc8df062124 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Wed, 1 Apr 2015 21:02:34 +0200 Subject: Async: support (re-)executing single Job multiple times Storing Future and current Job progress directly in Executors means that we cannot re-execute finished job, or even execute the same Job multiple times in parallel. To do so, we need to make Executors stateless and track the state elsewhere. This change does that by moving the execution state from Executor to Execution class. Executors now only describe the tasks to execute, while Execution holds the current state of execution. New Execution is created every time Job::exec() is called. Execution holds reference to it's result (Future) and Executor which created the Execution. This ensures that Executor is not deleted when Job (which owns Executors) goes out of scope while the execution is still running. At the same time Future holds reference to relevant Execution, so that the Execution is deleted when all copies of Future referring result from the respective Execution are deleted. --- async/src/async.cpp | 4 - async/src/async.h | 242 ++++++++++++++++++++++++++++++--------------------- async/src/future.cpp | 22 +++++ async/src/future.h | 71 +++++++++++++-- 4 files changed, 228 insertions(+), 111 deletions(-) (limited to 'async/src') diff --git a/async/src/async.cpp b/async/src/async.cpp index 20ba4e6..5e26bd8 100644 --- a/async/src/async.cpp +++ b/async/src/async.cpp @@ -26,15 +26,11 @@ using namespace Async; Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) : mPrev(parent) - , mResult(0) - , mIsRunning(false) - , mIsFinished(false) { } Private::ExecutorBase::~ExecutorBase() { - delete mResult; } diff --git a/async/src/async.h b/async/src/async.h index 2741341..c6ca9e7 100644 --- a/async/src/async.h +++ b/async/src/async.h @@ -93,6 +93,49 @@ namespace Private class ExecutorBase; typedef QSharedPointer ExecutorBasePtr; +struct Execution { + Execution(const ExecutorBasePtr &executor) + : executor(executor) + , resultBase(nullptr) + , isRunning(false) + , isFinished(false) + {} + + ~Execution() + { + if (resultBase) { + resultBase->releaseExecution(); + delete resultBase; + } + prevExecution.reset(); + } + + void setFinished() + { + isFinished = true; + executor.clear(); + } + + template + Async::Future* result() + { + return static_cast*>(resultBase); + } + + void releaseFuture() + { + resultBase = 0; + } + + ExecutorBasePtr executor; + FutureBase *resultBase; + bool isRunning; + bool isFinished; + + ExecutionPtr prevExecution; +}; + +typedef QSharedPointer ExecutionPtr; class ExecutorBase { @@ -104,21 +147,15 @@ class ExecutorBase public: virtual ~ExecutorBase(); - virtual void exec() = 0; - - inline FutureBase* result() const - { - return mResult; - } + virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0; protected: ExecutorBase(const ExecutorBasePtr &parent); - ExecutorBasePtr mSelf; + template + Async::Future* createFuture(const ExecutionPtr &execution) const; + ExecutorBasePtr mPrev; - FutureBase *mResult; - bool mIsRunning; - bool mIsFinished; }; template @@ -128,17 +165,13 @@ protected: Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) : ExecutorBase(parent) , mErrorFunc(errorHandler) - , mPrevFuture(0) {} virtual ~Executor() {} - inline Async::Future* chainup(); - virtual void previousFutureReady() = 0; + virtual void run(const ExecutionPtr &execution) = 0; - void exec(); + ExecutionPtr exec(const ExecutorBasePtr &self); - //std::function &)> mFunc; std::function mErrorFunc; - Async::Future *mPrevFuture; }; template @@ -146,7 +179,7 @@ class ThenExecutor: public Executor::type, Out, { public: ThenExecutor(ThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); - void previousFutureReady(); + void run(const ExecutionPtr &execution); private: ThenTask mFunc; }; @@ -156,7 +189,7 @@ class EachExecutor : public Executor { public: EachExecutor(EachTask each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); - void previousFutureReady(); + void run(const ExecutionPtr &execution); private: EachTask mFunc; QVector*> mFutureWatchers; @@ -176,11 +209,11 @@ class SyncThenExecutor : public Executor::type, { public: SyncThenExecutor(SyncThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); - void previousFutureReady(); + void run(const ExecutionPtr &execution); private: - void run(std::false_type); // !std::is_void - void run(std::true_type); // std::is_void + void run(const ExecutionPtr &execution, std::false_type); // !std::is_void + void run(const ExecutionPtr &execution, std::true_type); // std::is_void SyncThenTask mFunc; }; @@ -198,7 +231,7 @@ class SyncEachExecutor : public Executor { public: SyncEachExecutor(SyncEachTask each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); - void previousFutureReady(); + void run(const ExecutionPtr &execution); private: void run(Async::Future *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void void run(Async::Future *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void @@ -430,18 +463,10 @@ public: Async::Future exec() { - // Have the top executor hold reference to itself during the execution. - // This ensures that even if the Job goes out of scope, the full Executor - // chain will not be destroyed. - // The executor will remove the self-reference once it's Future is finished. - mExecutor->mSelf = mExecutor; - mExecutor->exec(); - return result(); - } + Private::ExecutionPtr execution = mExecutor->exec(mExecutor); + Async::Future result = *execution->result(); - Async::Future result() const - { - return *static_cast*>(mExecutor->result()); + return result; } private: @@ -549,74 +574,75 @@ Job error(int errorCode, const QString &errorMessage) namespace Private { -template -Future* Executor::chainup() +template +Async::Future* ExecutorBase::createFuture(const ExecutionPtr &execution) const { - if (mPrev) { - mPrev->exec(); - return static_cast*>(mPrev->result()); - } else { - return nullptr; - } + return new Async::Future(execution); } template -void Executor::exec() +ExecutionPtr Executor::exec(const ExecutorBasePtr &self) { - // Don't chain up to job that already is running (or is finished) - if (mPrev && !mPrev->mIsRunning & !mPrev->mIsFinished) { - mPrevFuture = chainup(); + // Passing 'self' to execution ensures that the Executor chain remains + // valid until the entire execution is finished + ExecutionPtr execution = ExecutionPtr::create(self); + + // chainup + execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr(); + /* } else if (mPrev && !mPrevFuture) { // If previous job is running or finished, just get it's future mPrevFuture = static_cast*>(mPrev->result()); } + */ - // Initialize our future - mResult = new Async::Future(); + execution->resultBase = this->createFuture(execution); auto fw = new Async::FutureWatcher(); QObject::connect(fw, &Async::FutureWatcher::futureReady, - [fw, this]() { - mIsFinished = true; - mSelf.clear(); + [fw, execution, this]() { + execution->setFinished(); delete fw; }); - fw->setFuture(*static_cast*>(mResult)); + fw->setFuture(*execution->result()); - if (!mPrevFuture || mPrevFuture->isFinished()) { - if (mPrevFuture && mPrevFuture->errorCode() != 0) { + Async::Future *prevFuture = execution->prevExecution ? execution->prevExecution->result() : nullptr; + if (!prevFuture || prevFuture->isFinished()) { + if (prevFuture && prevFuture->errorCode() != 0) { if (mErrorFunc) { - mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage()); - mResult->setFinished(); - mIsFinished = true; - return; + mErrorFunc(prevFuture->errorCode(), prevFuture->errorMessage()); + execution->resultBase->setFinished(); + execution->setFinished(); + return execution; } else { // Propagate the error to next caller } } - mIsRunning = true; - previousFutureReady(); + execution->isRunning = true; + run(execution); } else { auto futureWatcher = new Async::FutureWatcher(); QObject::connect(futureWatcher, &Async::FutureWatcher::futureReady, - [futureWatcher, this]() { + [futureWatcher, execution, this]() { auto prevFuture = futureWatcher->future(); assert(prevFuture.isFinished()); delete futureWatcher; if (prevFuture.errorCode() != 0) { if (mErrorFunc) { mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); - mResult->setFinished(); + execution->resultBase->setFinished(); return; } else { // Propagate the error to next caller } } - mIsRunning = true; - previousFutureReady(); + execution->isRunning = true; + run(execution); }); - futureWatcher->setFuture(*mPrevFuture); + futureWatcher->setFuture(*static_cast*>(prevFuture)); } + + return execution; } @@ -628,14 +654,15 @@ ThenExecutor::ThenExecutor(ThenTask then, ErrorHandler } template -void ThenExecutor::previousFutureReady() +void ThenExecutor::run(const ExecutionPtr &execution) { - if (this->mPrevFuture) { - assert(this->mPrevFuture->isFinished()); + Async::Future::type> *prevFuture = nullptr; + if (execution->prevExecution) { + prevFuture = execution->prevExecution->result::type>(); + assert(prevFuture->isFinished()); } - this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., - *static_cast*>(this->mResult)); + this->mFunc(prevFuture ? prevFuture->value() : In() ..., *execution->result()); } template @@ -646,33 +673,37 @@ EachExecutor::EachExecutor(EachTask each, ErrorHandle } template -void EachExecutor::previousFutureReady() +void EachExecutor::run(const ExecutionPtr &execution) { - assert(this->mPrevFuture->isFinished()); - auto out = static_cast*>(this->mResult); - if (this->mPrevFuture->value().isEmpty()) { + assert(execution->prevExecution); + auto prevFuture = execution->prevExecution->result(); + assert(prevFuture->isFinished()); + + auto out = execution->result(); + if (prevFuture->value().isEmpty()) { out->setFinished(); return; } - for (auto arg : this->mPrevFuture->value()) { - auto future = new Async::Future; - this->mFunc(arg, *future); + for (auto arg : prevFuture->value()) { + Async::Future future; + this->mFunc(arg, future); auto fw = new Async::FutureWatcher(); mFutureWatchers.append(fw); QObject::connect(fw, &Async::FutureWatcher::futureReady, - [out, future, fw, this]() { - assert(future->isFinished()); + [out, fw, this]() { + auto future = fw->future(); + assert(future.isFinished()); const int index = mFutureWatchers.indexOf(fw); assert(index > -1); mFutureWatchers.removeAt(index); - out->setValue(out->value() + future->value()); - delete future; + out->setValue(out->value() + future.value()); if (mFutureWatchers.isEmpty()) { out->setFinished(); } + delete fw; }); - fw->setFuture(*future); + fw->setFuture(future); } } @@ -690,27 +721,37 @@ SyncThenExecutor::SyncThenExecutor(SyncThenTask then, } template -void SyncThenExecutor::previousFutureReady() +void SyncThenExecutor::run(const ExecutionPtr &execution) { - if (this->mPrevFuture) { - assert(this->mPrevFuture->isFinished()); + if (execution->prevExecution) { + assert(execution->prevExecution->resultBase->isFinished()); } - run(std::is_void()); - this->mResult->setFinished(); + run(execution, std::is_void()); + execution->resultBase->setFinished(); } template -void SyncThenExecutor::run(std::false_type) -{ - Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); - static_cast*>(this->mResult)->setValue(result); +void SyncThenExecutor::run(const ExecutionPtr &execution, std::false_type) +{ + Async::Future::type> *prevFuture = + execution->prevExecution + ? execution->prevExecution->result::type>() + : nullptr; + (void) prevFuture; // silence 'set but not used' warning + Async::Future *future = execution->result(); + future->setValue(this->mFunc(prevFuture ? prevFuture->value() : In() ...)); } template -void SyncThenExecutor::run(std::true_type) -{ - this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); +void SyncThenExecutor::run(const ExecutionPtr &execution, std::true_type) +{ + Async::Future::type> *prevFuture = + execution->prevExecution + ? execution->prevExecution->result::type>() + : nullptr; + (void) prevFuture; // silence 'set but not used' warning + this->mFunc(prevFuture ? prevFuture->value() : In() ...); } template @@ -721,16 +762,19 @@ SyncEachExecutor::SyncEachExecutor(SyncEachTask each, } template -void SyncEachExecutor::previousFutureReady() +void SyncEachExecutor::run(const ExecutionPtr &execution) { - assert(this->mPrevFuture->isFinished()); - auto out = static_cast*>(this->mResult); - if (this->mPrevFuture->value().isEmpty()) { + assert(execution->prevExecution); + auto *prevFuture = execution->prevExecution->result(); + assert(prevFuture->isFinished()); + + auto out = execution->result(); + if (prevFuture->value().isEmpty()) { out->setFinished(); return; } - for (auto arg : this->mPrevFuture->value()) { + for (auto arg : prevFuture->value()) { run(out, arg, std::is_void()); } out->setFinished(); @@ -743,7 +787,7 @@ void SyncEachExecutor::run(Async::Future *out, const type } template -void SyncEachExecutor::run(Async::Future * /* unushed */, const typename PrevOut::value_type &arg, std::true_type) +void SyncEachExecutor::run(Async::Future * /* unused */, const typename PrevOut::value_type &arg, std::true_type) { this->mFunc(arg); } diff --git a/async/src/future.cpp b/async/src/future.cpp index ab02baf..50a326a 100644 --- a/async/src/future.cpp +++ b/async/src/future.cpp @@ -16,6 +16,7 @@ */ #include "future.h" +#include "async.h" using namespace Async; @@ -31,6 +32,27 @@ FutureBase::~FutureBase() { } +FutureBase::PrivateBase::PrivateBase(const Private::ExecutionPtr &execution) + : mExecution(execution) +{ +} + +FutureBase::PrivateBase::~PrivateBase() +{ + Private::ExecutionPtr executionPtr = mExecution.toStrongRef(); + if (executionPtr) { + executionPtr->releaseFuture(); + releaseExecution(); + } +} + +void FutureBase::PrivateBase::releaseExecution() +{ + mExecution.clear(); +} + + + FutureWatcherBase::FutureWatcherBase(QObject *parent) : QObject(parent) { diff --git a/async/src/future.h b/async/src/future.h index b580b5a..cadd96d 100644 --- a/async/src/future.h +++ b/async/src/future.h @@ -29,8 +29,18 @@ class QEventLoop; namespace Async { +namespace Private { +class Execution; +class ExecutorBase; + +typedef QSharedPointer ExecutionPtr; + +} + class FutureBase { + friend class Async::Private::Execution; + public: virtual ~FutureBase(); @@ -39,6 +49,20 @@ public: virtual void setError(int code = 1, const QString &message = QString()) = 0; protected: + virtual void releaseExecution() = 0; + + class PrivateBase : public QSharedData + { + public: + PrivateBase(const Async::Private::ExecutionPtr &execution); + virtual ~PrivateBase(); + + void releaseExecution(); + + private: + QWeakPointer mExecution; + }; + FutureBase(); FutureBase(const FutureBase &other); }; @@ -104,9 +128,9 @@ public: } protected: - FutureGeneric() + FutureGeneric(const Async::Private::ExecutionPtr &execution) : FutureBase() - , d(new Private) + , d(new Private(execution)) {} FutureGeneric(const FutureGeneric &other) @@ -114,10 +138,15 @@ protected: , d(other.d) {} - class Private : public QSharedData + class Private : public FutureBase::PrivateBase { public: - Private() : QSharedData(), finished(false), errorCode(0) {} + Private(const Async::Private::ExecutionPtr &execution) + : FutureBase::PrivateBase(execution) + , finished(false) + , errorCode(0) + {} + typename std::conditional::value, int /* dummy */, T>::type value; @@ -129,6 +158,11 @@ protected: QExplicitlySharedDataPointer d; + void releaseExecution() + { + d->releaseExecution(); + } + void addWatcher(FutureWatcher *watcher) { d->watchers.append(QPointer>(watcher)); @@ -138,9 +172,14 @@ protected: template class Future : public FutureGeneric { + friend class Async::Private::ExecutorBase; + + template + friend class Async::FutureWatcher; + public: Future() - : FutureGeneric() + : FutureGeneric(Async::Private::ExecutionPtr()) {} Future(const Future &other) @@ -156,19 +195,35 @@ public: { return this->d->value; } + +protected: + Future(const Async::Private::ExecutionPtr &execution) + : FutureGeneric(execution) + {} + }; template<> class Future : public FutureGeneric { + friend class Async::Private::ExecutorBase; + + template + friend class Async::FutureWatcher; + public: Future() - : FutureGeneric() + : FutureGeneric(Async::Private::ExecutionPtr()) {} Future(const Future &other) : FutureGeneric(other) {} + +protected: + Future(const Async::Private::ExecutionPtr &execution) + : FutureGeneric(execution) + {} }; @@ -177,7 +232,7 @@ class FutureWatcherBase : public QObject Q_OBJECT protected: - FutureWatcherBase(QObject *parent = 0); + FutureWatcherBase(QObject *parent = nullptr); virtual ~FutureWatcherBase(); Q_SIGNALS: @@ -190,7 +245,7 @@ class FutureWatcher : public FutureWatcherBase friend class Async::FutureGeneric; public: - FutureWatcher(QObject *parent = 0) + FutureWatcher(QObject *parent = nullptr) : FutureWatcherBase(parent) {} -- cgit v1.2.3