summaryrefslogtreecommitdiffstats
path: root/async/src
diff options
context:
space:
mode:
authorDan Vrátil <dvratil@redhat.com>2014-12-14 12:59:22 +0100
committerDan Vrátil <dvratil@redhat.com>2014-12-14 13:03:38 +0100
commit664396b0e550910cea50b7852066a04cc7fec3bd (patch)
tree544f9e8206bd498fd16047211a833d01d65ac864 /async/src
parent925d3bd3159820c0eae356fe4d3af54cb16ae1e3 (diff)
downloadsink-664396b0e550910cea50b7852066a04cc7fec3bd.tar.gz
sink-664396b0e550910cea50b7852066a04cc7fec3bd.zip
Async: make the processing truly asynchronous
Now calling exec() starts the first job and returns a pending Future immediately. Caller can then use Async::FutureWatcher to wait for the future to become finished, i.e. for all jobs to finish execution.
Diffstat (limited to 'async/src')
-rw-r--r--async/src/CMakeLists.txt3
-rw-r--r--async/src/async.cpp48
-rw-r--r--async/src/async.h110
-rw-r--r--async/src/future.cpp57
-rw-r--r--async/src/future.h137
5 files changed, 251 insertions, 104 deletions
diff --git a/async/src/CMakeLists.txt b/async/src/CMakeLists.txt
index a98d8ce..a371da0 100644
--- a/async/src/CMakeLists.txt
+++ b/async/src/CMakeLists.txt
@@ -1,5 +1,8 @@
1include_directories(${CMAKE_CURRENT_BINARY_DIR})
2
1set(async_SRCS 3set(async_SRCS
2 async.cpp 4 async.cpp
5 future.cpp
3) 6)
4 7
5add_library(akonadi2async SHARED ${async_SRCS}) 8add_library(akonadi2async SHARED ${async_SRCS})
diff --git a/async/src/async.cpp b/async/src/async.cpp
index 0b8d7f3..e1d4806 100644
--- a/async/src/async.cpp
+++ b/async/src/async.cpp
@@ -48,51 +48,3 @@ JobBase::JobBase(Private::ExecutorBase *executor)
48JobBase::~JobBase() 48JobBase::~JobBase()
49{ 49{
50} 50}
51
52void JobBase::exec()
53{
54 mExecutor->exec();
55}
56
57
58FutureBase::FutureBase()
59 : mFinished(false)
60 , mWaitLoop(nullptr)
61{
62}
63
64FutureBase::FutureBase(const Async::FutureBase &other)
65 : mFinished(other.mFinished)
66 , mWaitLoop(other.mWaitLoop)
67{
68}
69
70FutureBase::~FutureBase()
71{
72}
73
74void FutureBase::setFinished()
75{
76 mFinished = true;
77 if (mWaitLoop && mWaitLoop->isRunning()) {
78 mWaitLoop->quit();
79 }
80}
81
82bool FutureBase::isFinished() const
83{
84 return mFinished;
85}
86
87void FutureBase::waitForFinished()
88{
89 if (mFinished) {
90 return;
91 }
92
93 mWaitLoop = new QEventLoop;
94 mWaitLoop->exec(QEventLoop::ExcludeUserInputEvents);
95 delete mWaitLoop;
96 mWaitLoop = 0;
97}
98
diff --git a/async/src/async.h b/async/src/async.h
index 0e4f246..233ad56 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -25,14 +25,15 @@
25#include <functional> 25#include <functional>
26#include <list> 26#include <list>
27#include <type_traits> 27#include <type_traits>
28#include <iostream>
29#include <cassert> 28#include <cassert>
30#include <iterator> 29#include <iterator>
31#include <boost/graph/graph_concepts.hpp>
32 30
33#include "future.h" 31#include "future.h"
34#include "async_impl.h" 32#include "async_impl.h"
35 33
34#include <QVector>
35#include <QObject>
36
36 37
37namespace Async { 38namespace Async {
38 39
@@ -86,11 +87,18 @@ class Executor : public ExecutorBase
86protected: 87protected:
87 Executor(ExecutorBase *parent) 88 Executor(ExecutorBase *parent)
88 : ExecutorBase(parent) 89 : ExecutorBase(parent)
90 , mPrevFuture(0)
91 , mPrevFutureWatcher(0)
89 {} 92 {}
90 virtual ~Executor() {} 93 virtual ~Executor() {}
91 inline Async::Future<PrevOut>* chainup(); 94 inline Async::Future<PrevOut>* chainup();
95 virtual void previousFutureReady() = 0;
96
97 void exec();
92 98
93 std::function<void(const In& ..., Async::Future<Out> &)> mFunc; 99 std::function<void(const In& ..., Async::Future<Out> &)> mFunc;
100 Async::Future<PrevOut> *mPrevFuture;
101 Async::FutureWatcher<PrevOut> *mPrevFutureWatcher;
94}; 102};
95 103
96template<typename Out, typename ... In> 104template<typename Out, typename ... In>
@@ -98,7 +106,10 @@ class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In
98{ 106{
99public: 107public:
100 ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr); 108 ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr);
101 void exec(); 109 void previousFutureReady();
110
111private:
112 Async::FutureWatcher<typename PreviousOut<In ...>::type> *mFutureWatcher;
102}; 113};
103 114
104template<typename PrevOut, typename Out, typename In> 115template<typename PrevOut, typename Out, typename In>
@@ -106,7 +117,10 @@ class EachExecutor : public Executor<PrevOut, Out, In>
106{ 117{
107public: 118public:
108 EachExecutor(EachTask<Out, In> each, ExecutorBase *parent); 119 EachExecutor(EachTask<Out, In> each, ExecutorBase *parent);
109 void exec(); 120 void previousFutureReady();
121
122private:
123 QVector<Async::FutureWatcher<PrevOut>*> mFutureWatchers;
110}; 124};
111 125
112template<typename Out, typename In> 126template<typename Out, typename In>
@@ -114,7 +128,7 @@ class ReduceExecutor : public Executor<In, Out, In>
114{ 128{
115public: 129public:
116 ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent); 130 ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent);
117 void exec(); 131 void previousFutureReady();
118}; 132};
119 133
120} // namespace Private 134} // namespace Private
@@ -141,8 +155,6 @@ public:
141 JobBase(Private::ExecutorBase *executor); 155 JobBase(Private::ExecutorBase *executor);
142 ~JobBase(); 156 ~JobBase();
143 157
144 void exec();
145
146protected: 158protected:
147 Private::ExecutorBase *mExecutor; 159 Private::ExecutorBase *mExecutor;
148}; 160};
@@ -225,6 +237,12 @@ public:
225 return Job<OutOther, InOther>(new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)); 237 return Job<OutOther, InOther>(new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor));
226 } 238 }
227 239
240 Async::Future<Out> exec()
241 {
242 mExecutor->exec();
243 return result();
244 }
245
228 Async::Future<Out> result() const 246 Async::Future<Out> result() const
229 { 247 {
230 return *static_cast<Async::Future<Out>*>(mExecutor->result()); 248 return *static_cast<Async::Future<Out>*>(mExecutor->result());
@@ -257,14 +275,31 @@ Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup()
257{ 275{
258 if (mPrev) { 276 if (mPrev) {
259 mPrev->exec(); 277 mPrev->exec();
260 auto future = static_cast<Async::Future<PrevOut>*>(mPrev->result()); 278 return static_cast<Async::Future<PrevOut>*>(mPrev->result());
261 assert(future->isFinished());
262 return future;
263 } else { 279 } else {
264 return 0; 280 return 0;
265 } 281 }
266} 282}
267 283
284template<typename PrevOut, typename Out, typename ... In>
285void Executor<PrevOut, Out, In ...>::exec()
286{
287 mPrevFuture = chainup();
288 mResult = new Async::Future<Out>();
289 if (!mPrevFuture || mPrevFuture->isFinished()) {
290 previousFutureReady();
291 } else {
292 auto futureWatcher = new Async::FutureWatcher<PrevOut>();
293 QObject::connect(futureWatcher, &Async::FutureWatcher<PrevOut>::futureReady,
294 [futureWatcher, this]() {
295 assert(futureWatcher->future().isFinished());
296 futureWatcher->deleteLater();
297 previousFutureReady();
298 });
299 futureWatcher->setFuture(*mPrevFuture);
300 }
301}
302
268template<typename Out, typename ... In> 303template<typename Out, typename ... In>
269ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase* parent) 304ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase* parent)
270 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) 305 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent)
@@ -273,15 +308,13 @@ ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase
273} 308}
274 309
275template<typename Out, typename ... In> 310template<typename Out, typename ... In>
276void ThenExecutor<Out, In ...>::exec() 311void ThenExecutor<Out, In ...>::previousFutureReady()
277{ 312{
278 auto in = this->chainup(); 313 if (this->mPrevFuture) {
279 (void)in; // supress 'unused variable' warning when In is void 314 assert(this->mPrevFuture->isFinished());
280 315 }
281 auto out = new Async::Future<Out>(); 316 this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...,
282 this->mFunc(in ? in->value() : In() ..., *out); 317 *static_cast<Async::Future<Out>*>(this->mResult));
283 out->waitForFinished();
284 this->mResult = out;
285} 318}
286 319
287template<typename PrevOut, typename Out, typename In> 320template<typename PrevOut, typename Out, typename In>
@@ -292,20 +325,33 @@ EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ExecutorBas
292} 325}
293 326
294template<typename PrevOut, typename Out, typename In> 327template<typename PrevOut, typename Out, typename In>
295void EachExecutor<PrevOut, Out, In>::exec() 328void EachExecutor<PrevOut, Out, In>::previousFutureReady()
296{ 329{
297 auto in = this->chainup(); 330 assert(this->mPrevFuture->isFinished());
331 auto out = static_cast<Async::Future<Out>*>(this->mResult);
332 if (this->mPrevFuture->value().isEmpty()) {
333 out->setFinished();
334 return;
335 }
298 336
299 auto *out = new Async::Future<Out>(); 337 for (auto arg : this->mPrevFuture->value()) {
300 for (auto arg : in->value()) {
301 Async::Future<Out> future; 338 Async::Future<Out> future;
302 this->mFunc(arg, future); 339 this->mFunc(arg, future);
303 future.waitForFinished(); 340 auto fw = new Async::FutureWatcher<Out>();
304 out->setValue(out->value() + future.value()); 341 mFutureWatchers.append(fw);
342 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
343 [out, future, fw, this]() {
344 assert(future.isFinished());
345 const int index = mFutureWatchers.indexOf(fw);
346 assert(index > -1);
347 mFutureWatchers.removeAt(index);
348 out->setValue(out->value() + future.value());
349 if (mFutureWatchers.isEmpty()) {
350 out->setFinished();
351 }
352 });
353 fw->setFuture(future);
305 } 354 }
306 out->setFinished();
307
308 this->mResult = out;
309} 355}
310 356
311template<typename Out, typename In> 357template<typename Out, typename In>
@@ -316,14 +362,10 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase
316} 362}
317 363
318template<typename Out, typename In> 364template<typename Out, typename In>
319void ReduceExecutor<Out, In>::exec() 365void ReduceExecutor<Out, In>::previousFutureReady()
320{ 366{
321 auto in = this->chainup(); 367 assert(this->mPrevFuture->isFinished());
322 368 this->mFunc(this->mPrevFuture->value(), *static_cast<Async::Future<Out>*>(this->mResult));
323 auto out = new Async::Future<Out>();
324 this->mFunc(in->value(), *out);
325 out->waitForFinished();
326 this->mResult = out;
327} 369}
328 370
329} // namespace Private 371} // namespace Private
diff --git a/async/src/future.cpp b/async/src/future.cpp
new file mode 100644
index 0000000..48c7417
--- /dev/null
+++ b/async/src/future.cpp
@@ -0,0 +1,57 @@
1/*
2 * Copyright 2014 Daniel Vrátil <dvratil@redhat.com>
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License as
6 * published by the Free Software Foundation; either version 2 of
7 * the License or (at your option) version 3 or any later version
8 * accepted by the membership of KDE e.V. (or its successor approved
9 * by the membership of KDE e.V.), which shall act as a proxy
10 * defined in Section 14 of version 3 of the license.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 *
20 */
21
22#include "future.h"
23
24using namespace Async;
25
26FutureBase::FutureBase()
27 : mFinished(false)
28 , mWaitLoop(nullptr)
29{
30}
31
32FutureBase::FutureBase(const Async::FutureBase &other)
33 : mFinished(other.mFinished)
34 , mWaitLoop(other.mWaitLoop)
35{
36}
37
38FutureBase::~FutureBase()
39{
40}
41
42bool FutureBase::isFinished() const
43{
44 return mFinished;
45}
46
47FutureWatcherBase::FutureWatcherBase(QObject *parent)
48 : QObject(parent)
49{
50}
51
52FutureWatcherBase::~FutureWatcherBase()
53{
54}
55
56
57#include "future.moc" \ No newline at end of file
diff --git a/async/src/future.h b/async/src/future.h
index eb3de1e..39e3936 100644
--- a/async/src/future.h
+++ b/async/src/future.h
@@ -24,67 +24,160 @@
24 24
25class QEventLoop; 25class QEventLoop;
26 26
27#include <type_traits>
28
29#include <QSharedDataPointer>
30#include <QPointer>
31#include <QVector>
32
27namespace Async { 33namespace Async {
28 34
29class FutureBase 35class FutureBase
30{ 36{
31public: 37public:
32 FutureBase();
33 FutureBase(const FutureBase &other);
34 virtual ~FutureBase(); 38 virtual ~FutureBase();
35 39
36 void setFinished(); 40 virtual void setFinished() = 0;
37 bool isFinished() const; 41 bool isFinished() const;
38 void waitForFinished();
39 42
40protected: 43protected:
44 FutureBase();
45 FutureBase(const FutureBase &other);
46
41 bool mFinished; 47 bool mFinished;
42 QEventLoop *mWaitLoop; 48 QEventLoop *mWaitLoop;
43}; 49};
44 50
45template<typename T> 51template<typename T>
46class Future : public FutureBase 52class FutureWatcher;
53
54template<typename T>
55class FutureGeneric : public FutureBase
47{ 56{
57 friend class FutureWatcher<T>;
58
48public: 59public:
49 Future() 60 void setFinished()
50 : FutureBase() 61 {
62 mFinished = true;
63 for (auto watcher : d->watchers) {
64 if (watcher) {
65 watcher->futureReadyCallback();
66 }
67 }
68 }
69
70protected:
71 FutureGeneric()
72 : FutureBase()
73 , d(new Private)
51 {} 74 {}
52 75
53 Future(const Future<T> &other) 76 FutureGeneric(const FutureGeneric<T> &other)
54 : FutureBase(other) 77 : FutureBase(other)
55 , mValue(other.mValue) 78 , d(other.d)
79 {}
80
81 class Private : public QSharedData
82 {
83 public:
84 typename std::conditional<std::is_void<T>::value, int /* dummy */, T>::type
85 value;
86
87 QVector<QPointer<FutureWatcher<T>>> watchers;
88 };
89
90 QExplicitlySharedDataPointer<Private> d;
91
92 void addWatcher(FutureWatcher<T> *watcher)
93 {
94 d->watchers.append(QPointer<FutureWatcher<T>>(watcher));
95 }
96};
97
98template<typename T>
99class Future : public FutureGeneric<T>
100{
101public:
102 Future()
103 : FutureGeneric<T>()
56 {} 104 {}
57 105
58 Future(const T &val) 106 Future(const Future<T> &other)
59 : FutureBase() 107 : FutureGeneric<T>(other)
60 , mValue(val)
61 {} 108 {}
62 109
63 void setValue(const T &val) 110 void setValue(const T &value)
64 { 111 {
65 mValue = val; 112 this->d->value = value;
66 } 113 }
67 114
68 T value() const 115 T value() const
69 { 116 {
70 return mValue; 117 return this->d->value;
71 } 118 }
72
73private:
74 T mValue;
75}; 119};
76 120
77template<> 121template<>
78class Future<void> : public FutureBase 122class Future<void> : public FutureGeneric<void>
79{ 123{
80public: 124public:
81 Future() 125 Future()
82 : FutureBase() 126 : FutureGeneric<void>()
83 {} 127 {}
84 128
85 Future(const Future<void> &other) 129 Future(const Future<void> &other)
86 : FutureBase(other) 130 : FutureGeneric<void>(other)
131 {}
132};
133
134
135class FutureWatcherBase : public QObject
136{
137 Q_OBJECT
138
139protected:
140 FutureWatcherBase(QObject *parent = 0);
141 virtual ~FutureWatcherBase();
142
143Q_SIGNALS:
144 void futureReady();
145};
146
147template<typename T>
148class FutureWatcher : public FutureWatcherBase
149{
150 friend class Async::FutureGeneric<T>;
151
152public:
153 FutureWatcher(QObject *parent = 0)
154 : FutureWatcherBase(parent)
155 {}
156
157 ~FutureWatcher()
87 {} 158 {}
159
160 void setFuture(const Async::Future<T> &future)
161 {
162 mFuture = future;
163 mFuture.addWatcher(this);
164 if (future.isFinished()) {
165 futureReadyCallback();
166 }
167 }
168
169 Async::Future<T> future() const
170 {
171 return mFuture;
172 }
173
174private:
175 void futureReadyCallback()
176 {
177 Q_EMIT futureReady();
178 }
179
180 Async::Future<T> mFuture;
88}; 181};
89 182
90} // namespace Async 183} // namespace Async