From 664396b0e550910cea50b7852066a04cc7fec3bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Sun, 14 Dec 2014 12:59:22 +0100 Subject: Async: make the processing truly asynchronous Now calling exec() starts the first job and returns a pending Future immediately. Caller can then use Async::FutureWatcher to wait for the future to become finished, i.e. for all jobs to finish execution. --- async/src/CMakeLists.txt | 3 ++ async/src/async.cpp | 48 ----------------- async/src/async.h | 110 +++++++++++++++++++++++++------------ async/src/future.cpp | 57 ++++++++++++++++++++ async/src/future.h | 137 +++++++++++++++++++++++++++++++++++++++-------- 5 files changed, 251 insertions(+), 104 deletions(-) create mode 100644 async/src/future.cpp (limited to 'async/src') diff --git a/async/src/CMakeLists.txt b/async/src/CMakeLists.txt index a98d8ce..a371da0 100644 --- a/async/src/CMakeLists.txt +++ b/async/src/CMakeLists.txt @@ -1,5 +1,8 @@ +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + set(async_SRCS async.cpp + future.cpp ) add_library(akonadi2async SHARED ${async_SRCS}) diff --git a/async/src/async.cpp b/async/src/async.cpp index 0b8d7f3..e1d4806 100644 --- a/async/src/async.cpp +++ b/async/src/async.cpp @@ -48,51 +48,3 @@ JobBase::JobBase(Private::ExecutorBase *executor) JobBase::~JobBase() { } - -void JobBase::exec() -{ - mExecutor->exec(); -} - - -FutureBase::FutureBase() - : mFinished(false) - , mWaitLoop(nullptr) -{ -} - -FutureBase::FutureBase(const Async::FutureBase &other) - : mFinished(other.mFinished) - , mWaitLoop(other.mWaitLoop) -{ -} - -FutureBase::~FutureBase() -{ -} - -void FutureBase::setFinished() -{ - mFinished = true; - if (mWaitLoop && mWaitLoop->isRunning()) { - mWaitLoop->quit(); - } -} - -bool FutureBase::isFinished() const -{ - return mFinished; -} - -void FutureBase::waitForFinished() -{ - if (mFinished) { - return; - } - - mWaitLoop = new QEventLoop; - mWaitLoop->exec(QEventLoop::ExcludeUserInputEvents); - delete mWaitLoop; - mWaitLoop = 0; -} - diff --git a/async/src/async.h b/async/src/async.h index 0e4f246..233ad56 100644 --- a/async/src/async.h +++ b/async/src/async.h @@ -25,14 +25,15 @@ #include #include #include -#include #include #include -#include #include "future.h" #include "async_impl.h" +#include +#include + namespace Async { @@ -86,11 +87,18 @@ class Executor : public ExecutorBase protected: Executor(ExecutorBase *parent) : ExecutorBase(parent) + , mPrevFuture(0) + , mPrevFutureWatcher(0) {} virtual ~Executor() {} inline Async::Future* chainup(); + virtual void previousFutureReady() = 0; + + void exec(); std::function &)> mFunc; + Async::Future *mPrevFuture; + Async::FutureWatcher *mPrevFutureWatcher; }; template @@ -98,7 +106,10 @@ class ThenExecutor: public Executor::type, Out, In { public: ThenExecutor(ThenTask then, ExecutorBase *parent = nullptr); - void exec(); + void previousFutureReady(); + +private: + Async::FutureWatcher::type> *mFutureWatcher; }; template @@ -106,7 +117,10 @@ class EachExecutor : public Executor { public: EachExecutor(EachTask each, ExecutorBase *parent); - void exec(); + void previousFutureReady(); + +private: + QVector*> mFutureWatchers; }; template @@ -114,7 +128,7 @@ class ReduceExecutor : public Executor { public: ReduceExecutor(ReduceTask reduce, ExecutorBase *parent); - void exec(); + void previousFutureReady(); }; } // namespace Private @@ -141,8 +155,6 @@ public: JobBase(Private::ExecutorBase *executor); ~JobBase(); - void exec(); - protected: Private::ExecutorBase *mExecutor; }; @@ -225,6 +237,12 @@ public: return Job(new Private::ReduceExecutor(func, mExecutor)); } + Async::Future exec() + { + mExecutor->exec(); + return result(); + } + Async::Future result() const { return *static_cast*>(mExecutor->result()); @@ -257,14 +275,31 @@ Future* Executor::chainup() { if (mPrev) { mPrev->exec(); - auto future = static_cast*>(mPrev->result()); - assert(future->isFinished()); - return future; + return static_cast*>(mPrev->result()); } else { return 0; } } +template +void Executor::exec() +{ + mPrevFuture = chainup(); + mResult = new Async::Future(); + if (!mPrevFuture || mPrevFuture->isFinished()) { + previousFutureReady(); + } else { + auto futureWatcher = new Async::FutureWatcher(); + QObject::connect(futureWatcher, &Async::FutureWatcher::futureReady, + [futureWatcher, this]() { + assert(futureWatcher->future().isFinished()); + futureWatcher->deleteLater(); + previousFutureReady(); + }); + futureWatcher->setFuture(*mPrevFuture); + } +} + template ThenExecutor::ThenExecutor(ThenTask then, ExecutorBase* parent) : Executor::type, Out, In ...>(parent) @@ -273,15 +308,13 @@ ThenExecutor::ThenExecutor(ThenTask then, ExecutorBase } template -void ThenExecutor::exec() +void ThenExecutor::previousFutureReady() { - auto in = this->chainup(); - (void)in; // supress 'unused variable' warning when In is void - - auto out = new Async::Future(); - this->mFunc(in ? in->value() : In() ..., *out); - out->waitForFinished(); - this->mResult = out; + if (this->mPrevFuture) { + assert(this->mPrevFuture->isFinished()); + } + this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ..., + *static_cast*>(this->mResult)); } template @@ -292,20 +325,33 @@ EachExecutor::EachExecutor(EachTask each, ExecutorBas } template -void EachExecutor::exec() +void EachExecutor::previousFutureReady() { - auto in = this->chainup(); + assert(this->mPrevFuture->isFinished()); + auto out = static_cast*>(this->mResult); + if (this->mPrevFuture->value().isEmpty()) { + out->setFinished(); + return; + } - auto *out = new Async::Future(); - for (auto arg : in->value()) { + for (auto arg : this->mPrevFuture->value()) { Async::Future future; this->mFunc(arg, future); - future.waitForFinished(); - out->setValue(out->value() + future.value()); + auto fw = new Async::FutureWatcher(); + mFutureWatchers.append(fw); + QObject::connect(fw, &Async::FutureWatcher::futureReady, + [out, future, fw, this]() { + assert(future.isFinished()); + const int index = mFutureWatchers.indexOf(fw); + assert(index > -1); + mFutureWatchers.removeAt(index); + out->setValue(out->value() + future.value()); + if (mFutureWatchers.isEmpty()) { + out->setFinished(); + } + }); + fw->setFuture(future); } - out->setFinished(); - - this->mResult = out; } template @@ -316,14 +362,10 @@ ReduceExecutor::ReduceExecutor(ReduceTask reduce, ExecutorBase } template -void ReduceExecutor::exec() +void ReduceExecutor::previousFutureReady() { - auto in = this->chainup(); - - auto out = new Async::Future(); - this->mFunc(in->value(), *out); - out->waitForFinished(); - this->mResult = out; + assert(this->mPrevFuture->isFinished()); + this->mFunc(this->mPrevFuture->value(), *static_cast*>(this->mResult)); } } // namespace Private diff --git a/async/src/future.cpp b/async/src/future.cpp new file mode 100644 index 0000000..48c7417 --- /dev/null +++ b/async/src/future.cpp @@ -0,0 +1,57 @@ +/* + * Copyright 2014 Daniel Vrátil + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation; either version 2 of + * the License or (at your option) version 3 or any later version + * accepted by the membership of KDE e.V. (or its successor approved + * by the membership of KDE e.V.), which shall act as a proxy + * defined in Section 14 of version 3 of the license. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +#include "future.h" + +using namespace Async; + +FutureBase::FutureBase() + : mFinished(false) + , mWaitLoop(nullptr) +{ +} + +FutureBase::FutureBase(const Async::FutureBase &other) + : mFinished(other.mFinished) + , mWaitLoop(other.mWaitLoop) +{ +} + +FutureBase::~FutureBase() +{ +} + +bool FutureBase::isFinished() const +{ + return mFinished; +} + +FutureWatcherBase::FutureWatcherBase(QObject *parent) + : QObject(parent) +{ +} + +FutureWatcherBase::~FutureWatcherBase() +{ +} + + +#include "future.moc" \ No newline at end of file diff --git a/async/src/future.h b/async/src/future.h index eb3de1e..39e3936 100644 --- a/async/src/future.h +++ b/async/src/future.h @@ -24,67 +24,160 @@ class QEventLoop; +#include + +#include +#include +#include + namespace Async { class FutureBase { public: - FutureBase(); - FutureBase(const FutureBase &other); virtual ~FutureBase(); - void setFinished(); + virtual void setFinished() = 0; bool isFinished() const; - void waitForFinished(); protected: + FutureBase(); + FutureBase(const FutureBase &other); + bool mFinished; QEventLoop *mWaitLoop; }; template -class Future : public FutureBase +class FutureWatcher; + +template +class FutureGeneric : public FutureBase { + friend class FutureWatcher; + public: - Future() - : FutureBase() + void setFinished() + { + mFinished = true; + for (auto watcher : d->watchers) { + if (watcher) { + watcher->futureReadyCallback(); + } + } + } + +protected: + FutureGeneric() + : FutureBase() + , d(new Private) {} - Future(const Future &other) - : FutureBase(other) - , mValue(other.mValue) + FutureGeneric(const FutureGeneric &other) + : FutureBase(other) + , d(other.d) + {} + + class Private : public QSharedData + { + public: + typename std::conditional::value, int /* dummy */, T>::type + value; + + QVector>> watchers; + }; + + QExplicitlySharedDataPointer d; + + void addWatcher(FutureWatcher *watcher) + { + d->watchers.append(QPointer>(watcher)); + } +}; + +template +class Future : public FutureGeneric +{ +public: + Future() + : FutureGeneric() {} - Future(const T &val) - : FutureBase() - , mValue(val) + Future(const Future &other) + : FutureGeneric(other) {} - void setValue(const T &val) + void setValue(const T &value) { - mValue = val; + this->d->value = value; } T value() const { - return mValue; + return this->d->value; } - -private: - T mValue; }; template<> -class Future : public FutureBase +class Future : public FutureGeneric { public: Future() - : FutureBase() + : FutureGeneric() {} Future(const Future &other) - : FutureBase(other) + : FutureGeneric(other) + {} +}; + + +class FutureWatcherBase : public QObject +{ + Q_OBJECT + +protected: + FutureWatcherBase(QObject *parent = 0); + virtual ~FutureWatcherBase(); + +Q_SIGNALS: + void futureReady(); +}; + +template +class FutureWatcher : public FutureWatcherBase +{ + friend class Async::FutureGeneric; + +public: + FutureWatcher(QObject *parent = 0) + : FutureWatcherBase(parent) + {} + + ~FutureWatcher() {} + + void setFuture(const Async::Future &future) + { + mFuture = future; + mFuture.addWatcher(this); + if (future.isFinished()) { + futureReadyCallback(); + } + } + + Async::Future future() const + { + return mFuture; + } + +private: + void futureReadyCallback() + { + Q_EMIT futureReady(); + } + + Async::Future mFuture; }; } // namespace Async -- cgit v1.2.3