summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Vrátil <dvratil@redhat.com>2015-02-08 12:02:04 +0100
committerDan Vrátil <dvratil@redhat.com>2015-02-09 14:33:45 +0100
commitadc6a443776820b5ae36c982baf92b1d29bbaa4b (patch)
treeed278ffbcd8fc8c3759fbcc4afd4240fc1a72fc3
parentcbb192ffe865ffb3eed4c940177ffecaecfa570f (diff)
downloadsink-adc6a443776820b5ae36c982baf92b1d29bbaa4b.tar.gz
sink-adc6a443776820b5ae36c982baf92b1d29bbaa4b.zip
Async: introduce sync executors
Sync executors don't pass Async::Future into the user-provided tasks, but instead work with return values of the task methods, wrapping them into the Async::Future internally. Sync tasks are of course possible since forever, but not the API for those tasks is much cleaner, for users don't have to deal with "future" in synchronous tasks, for instance when synchronously processing results of an async task before passing the data to another async task.
-rw-r--r--async/autotests/asynctest.cpp138
-rw-r--r--async/src/async.h170
2 files changed, 281 insertions, 27 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp
index 9240d28..4ebe65e 100644
--- a/async/autotests/asynctest.cpp
+++ b/async/autotests/asynctest.cpp
@@ -42,9 +42,14 @@ private Q_SLOTS:
42 void testAsyncPromises(); 42 void testAsyncPromises();
43 void testAsyncPromises2(); 43 void testAsyncPromises2();
44 void testNestedAsync(); 44 void testNestedAsync();
45 void testAsyncThen();
46 void testSyncThen();
47 void testAsyncEach();
45 void testSyncEach(); 48 void testSyncEach();
49 void testAsyncReduce();
46 void testSyncReduce(); 50 void testSyncReduce();
47 void testErrorHandler(); 51 void testErrorHandler();
52
48}; 53};
49 54
50void AsyncTest::testSyncPromises() 55void AsyncTest::testSyncPromises()
@@ -155,17 +160,92 @@ void AsyncTest::testNestedAsync()
155 QTRY_VERIFY(done); 160 QTRY_VERIFY(done);
156} 161}
157 162
158void AsyncTest::testSyncEach() 163void AsyncTest::testAsyncThen()
164{
165 auto job = Async::start<int>(
166 [](Async::Future<int> &future) {
167 QTimer *timer = new QTimer;
168 QObject::connect(timer, &QTimer::timeout,
169 [&]() {
170 future.setValue(42);
171 future.setFinished();
172 });
173 QObject::connect(timer, &QTimer::timeout,
174 timer, &QObject::deleteLater);
175 timer->setSingleShot(true);
176 timer->start(0);
177 });
178
179 auto future = job.exec();
180 future.waitForFinished();
181
182 QVERIFY(future.isFinished());
183 QCOMPARE(future.value(), 42);
184}
185
186
187void AsyncTest::testSyncThen()
188{
189 auto job = Async::start<int>(
190 []() -> int {
191 return 42;
192 }).then<int, int>(
193 [](int in) -> int {
194 return in * 2;
195 });
196
197 auto future = job.exec();
198 QVERIFY(future.isFinished());
199 QCOMPARE(future.value(), 84);
200}
201
202void AsyncTest::testAsyncEach()
159{ 203{
160 auto job = Async::start<QList<int>>( 204 auto job = Async::start<QList<int>>(
161 [](Async::Future<QList<int>> &future) { 205 [](Async::Future<QList<int>> &future) {
162 future.setValue(QList<int>{ 1, 2, 3, 4 }); 206 QTimer *timer = new QTimer;
163 future.setFinished(); 207 QObject::connect(timer, &QTimer::timeout,
208 [&future]() {
209 future.setValue({ 1, 2, 3, 4 });
210 future.setFinished();
211 });
212 QObject::connect(timer, &QTimer::timeout,
213 timer, &QObject::deleteLater);
214 timer->setSingleShot(true);
215 timer->start(0);
164 }) 216 })
165 .each<QList<int>, int>( 217 .each<QList<int>, int>(
166 [](const int &v, Async::Future<QList<int>> &future) { 218 [](const int &v, Async::Future<QList<int>> &future) {
167 future.setValue(QList<int>{ v + 1 }); 219 QTimer *timer = new QTimer;
168 future.setFinished(); 220 QObject::connect(timer, &QTimer::timeout,
221 [v, &future]() {
222 future.setValue({ v + 1 });
223 future.setFinished();
224 });
225 QObject::connect(timer, &QTimer::timeout,
226 timer, &QObject::deleteLater);
227 timer->setSingleShot(true);
228 timer->start(0);
229 });
230
231 auto future = job.exec();
232 future.waitForFinished();
233
234 const QList<int> expected({ 2, 3, 4, 5 });
235 QVERIFY(future.isFinished());
236 QCOMPARE(future.value(), expected);
237}
238
239
240void AsyncTest::testSyncEach()
241{
242 auto job = Async::start<QList<int>>(
243 []() -> QList<int> {
244 return { 1, 2, 3, 4 };
245 })
246 .each<QList<int>, int>(
247 [](const int &v) -> QList<int> {
248 return { v + 1 };
169 }); 249 });
170 250
171 Async::Future<QList<int>> future = job.exec(); 251 Async::Future<QList<int>> future = job.exec();
@@ -175,19 +255,55 @@ void AsyncTest::testSyncEach()
175 QCOMPARE(future.value(), expected); 255 QCOMPARE(future.value(), expected);
176} 256}
177 257
178void AsyncTest::testSyncReduce() 258void AsyncTest::testAsyncReduce()
179{ 259{
180 auto job = Async::start<QList<int>>( 260 auto job = Async::start<QList<int>>(
181 [](Async::Future<QList<int>> &future) { 261 [](Async::Future<QList<int>> &future) {
182 future.setValue(QList<int>{ 1, 2, 3, 4 }); 262 QTimer *timer = new QTimer();
183 future.setFinished(); 263 QObject::connect(timer, &QTimer::timeout,
264 [&future]() {
265 future.setValue({ 1, 2, 3, 4 });
266 future.setFinished();
267 });
268 QObject::connect(timer, &QTimer::timeout,
269 timer, &QObject::deleteLater);
270 timer->setSingleShot(true);
271 timer->start(0);
184 }) 272 })
185 .reduce<int, QList<int>>( 273 .reduce<int, QList<int>>(
186 [](const QList<int> &list, Async::Future<int> &future) { 274 [](const QList<int> &list, Async::Future<int> &future) {
275 QTimer *timer = new QTimer();
276 QObject::connect(timer, &QTimer::timeout,
277 [list, &future]() {
278 int sum = 0;
279 for (int i : list) sum += i;
280 future.setValue(sum);
281 future.setFinished();
282 });
283 QObject::connect(timer, &QTimer::timeout,
284 timer, &QObject::deleteLater);
285 timer->setSingleShot(true);
286 timer->start(0);
287 });
288
289 Async::Future<int> future = job.exec();
290 future.waitForFinished();
291
292 QVERIFY(future.isFinished());
293 QCOMPARE(future.value(), 10);
294}
295
296void AsyncTest::testSyncReduce()
297{
298 auto job = Async::start<QList<int>>(
299 []() -> QList<int> {
300 return { 1, 2, 3, 4 };
301 })
302 .reduce<int, QList<int>>(
303 [](const QList<int> &list) -> int {
187 int sum = 0; 304 int sum = 0;
188 for (int i : list) sum += i; 305 for (int i : list) sum += i;
189 future.setValue(sum); 306 return sum;
190 future.setFinished();
191 }); 307 });
192 308
193 Async::Future<int> future = job.exec(); 309 Async::Future<int> future = job.exec();
@@ -213,7 +329,7 @@ void AsyncTest::testErrorHandler()
213 ); 329 );
214 auto future = job.exec(); 330 auto future = job.exec();
215 future.waitForFinished(); 331 future.waitForFinished();
216 QVERIFY(error == 1); 332 QCOMPARE(error, 1);
217 QVERIFY(future.isFinished()); 333 QVERIFY(future.isFinished());
218} 334}
219 335
diff --git a/async/src/async.h b/async/src/async.h
index d15373b..336bae2 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -33,8 +33,6 @@
33 33
34 34
35/* 35/*
36 * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation.
37 * Useful for typical value consumer continuations.
38 * TODO: error continuation on .then and others. 36 * TODO: error continuation on .then and others.
39 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally 37 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally
40 */ 38 */
@@ -47,13 +45,19 @@ class JobBase;
47 45
48template<typename Out, typename ... In> 46template<typename Out, typename ... In>
49class Job; 47class Job;
50
51template<typename Out, typename ... In> 48template<typename Out, typename ... In>
52using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; 49using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type;
50template<typename Out, typename ... In>
51using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type;
53template<typename Out, typename In> 52template<typename Out, typename In>
54using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 53using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
55template<typename Out, typename In> 54template<typename Out, typename In>
55using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type;
56template<typename Out, typename In>
56using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 57using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
58template<typename Out, typename In>
59using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type;
60
57using ErrorHandler = std::function<void(int, const QString &)>; 61using ErrorHandler = std::function<void(int, const QString &)>;
58 62
59namespace Private 63namespace Private
@@ -134,6 +138,33 @@ public:
134 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); 138 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent);
135}; 139};
136 140
141template<typename Out, typename ... In>
142class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...>
143{
144public:
145 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr());
146 void previousFutureReady();
147protected:
148 SyncThenTask<Out, In ...> mFunc;
149};
150
151template<typename Out, typename In>
152class SyncReduceExecutor : public SyncThenExecutor<Out, In>
153{
154public:
155 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr());
156};
157
158template<typename PrevOut, typename Out, typename In>
159class SyncEachExecutor : public Executor<PrevOut, Out, In>
160{
161public:
162 SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr());
163 void previousFutureReady();
164protected:
165 SyncEachTask<Out, In> mFunc;
166};
167
137} // namespace Private 168} // namespace Private
138 169
139/** 170/**
@@ -238,6 +269,9 @@ class Job : public JobBase
238 template<typename OutOther> 269 template<typename OutOther>
239 friend Job<OutOther> start(Async::ThenTask<OutOther> func); 270 friend Job<OutOther> start(Async::ThenTask<OutOther> func);
240 271
272 template<typename OutOther>
273 friend Job<OutOther> start(Async::SyncThenTask<OutOther> func);
274
241public: 275public:
242 template<typename OutOther, typename ... InOther> 276 template<typename OutOther, typename ... InOther>
243 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) 277 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
@@ -246,28 +280,45 @@ public:
246 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); 280 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
247 } 281 }
248 282
283 template<typename OutOther, typename ... InOther>
284 Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
285 {
286 return Job<OutOther, InOther ...>(Private::ExecutorBasePtr(
287 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
288 }
289
249 template<typename OutOther, typename InOther> 290 template<typename OutOther, typename InOther>
250 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) 291 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func)
251 { 292 {
252 static_assert(detail::isIterable<Out>::value, 293 eachInvariants<OutOther>();
253 "The 'Each' task can only be connected to a job that returns a list or an array.");
254 static_assert(detail::isIterable<OutOther>::value,
255 "The result type of 'Each' task must be a list or an array.");
256 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 294 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
257 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); 295 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor)));
258 } 296 }
259 297
260 template<typename OutOther, typename InOther> 298 template<typename OutOther, typename InOther>
299 Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func)
300 {
301 eachInvariants<OutOther>();
302 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
303 new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor)));
304 }
305
306 template<typename OutOther, typename InOther>
261 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) 307 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func)
262 { 308 {
263 static_assert(Async::detail::isIterable<Out>::value, 309 reduceInvariants<InOther>();
264 "The 'Result' task can only be connected to a job that returns a list or an array");
265 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
266 "The return type of previous task must be compatible with input type of this task");
267 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 310 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
268 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); 311 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)));
269 } 312 }
270 313
314 template<typename OutOther, typename InOther>
315 Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func)
316 {
317 reduceInvariants<InOther>();
318 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
319 new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor)));
320 }
321
271 Async::Future<Out> exec() 322 Async::Future<Out> exec()
272 { 323 {
273 mExecutor->exec(); 324 mExecutor->exec();
@@ -283,6 +334,24 @@ private:
283 Job(Private::ExecutorBasePtr executor) 334 Job(Private::ExecutorBasePtr executor)
284 : JobBase(executor) 335 : JobBase(executor)
285 {} 336 {}
337
338 template<typename OutOther>
339 void eachInvariants()
340 {
341 static_assert(detail::isIterable<Out>::value,
342 "The 'Each' task can only be connected to a job that returns a list or an array.");
343 static_assert(detail::isIterable<OutOther>::value,
344 "The result type of 'Each' task must be a list or an array.");
345 }
346
347 template<typename InOther>
348 void reduceInvariants()
349 {
350 static_assert(Async::detail::isIterable<Out>::value,
351 "The 'Result' task can only be connected to a job that returns a list or an array");
352 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
353 "The return type of previous task must be compatible with input type of this task");
354 }
286}; 355};
287 356
288} // namespace Async 357} // namespace Async
@@ -298,6 +367,12 @@ Job<Out> start(ThenTask<Out> func)
298 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); 367 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func)));
299} 368}
300 369
370template<typename Out>
371Job<Out> start(SyncThenTask<Out> func)
372{
373 return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func)));
374}
375
301namespace Private { 376namespace Private {
302 377
303template<typename PrevOut, typename Out, typename ... In> 378template<typename PrevOut, typename Out, typename ... In>
@@ -378,22 +453,23 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady()
378 } 453 }
379 454
380 for (auto arg : this->mPrevFuture->value()) { 455 for (auto arg : this->mPrevFuture->value()) {
381 Async::Future<Out> future; 456 auto future = new Async::Future<Out>;
382 this->mFunc(arg, future); 457 this->mFunc(arg, *future);
383 auto fw = new Async::FutureWatcher<Out>(); 458 auto fw = new Async::FutureWatcher<Out>();
384 mFutureWatchers.append(fw); 459 mFutureWatchers.append(fw);
385 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, 460 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
386 [out, future, fw, this]() { 461 [out, future, fw, this]() {
387 assert(future.isFinished()); 462 assert(future->isFinished());
388 const int index = mFutureWatchers.indexOf(fw); 463 const int index = mFutureWatchers.indexOf(fw);
389 assert(index > -1); 464 assert(index > -1);
390 mFutureWatchers.removeAt(index); 465 mFutureWatchers.removeAt(index);
391 out->setValue(out->value() + future.value()); 466 out->setValue(out->value() + future->value());
467 delete future;
392 if (mFutureWatchers.isEmpty()) { 468 if (mFutureWatchers.isEmpty()) {
393 out->setFinished(); 469 out->setFinished();
394 } 470 }
395 }); 471 });
396 fw->setFuture(future); 472 fw->setFuture(*future);
397 } 473 }
398} 474}
399 475
@@ -403,6 +479,68 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const Execut
403{ 479{
404} 480}
405 481
482template<typename Out, typename ... In>
483SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent)
484 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent)
485{
486 this->mFunc = then;
487 this->mErrorFunc = errorHandler;
488}
489
490template<typename Out, typename ... In>
491void SyncThenExecutor<Out, In ...>::previousFutureReady()
492{
493 if (this->mPrevFuture) {
494 assert(this->mPrevFuture->isFinished());
495 }
496
497 if (this->mPrevFuture && this->mPrevFuture->errorCode()) {
498 if (this->mErrorFunc) {
499 this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
500 this->mResult->setFinished();
501 } else {
502 static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
503 //propagate error if no error handler is available
504 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
505 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
506 this->mResult->setFinished();
507 }
508 } else {
509 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
510 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
511 this->mResult->setFinished();
512 }
513}
514
515template<typename PrevOut, typename Out, typename In>
516SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent)
517 : Executor<PrevOut, Out, In>(parent)
518{
519 this->mFunc = each;
520}
521
522template<typename PrevOut, typename Out, typename In>
523void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady()
524{
525 assert(this->mPrevFuture->isFinished());
526 auto out = static_cast<Async::Future<Out>*>(this->mResult);
527 if (this->mPrevFuture->value().isEmpty()) {
528 out->setFinished();
529 return;
530 }
531
532 for (auto arg : this->mPrevFuture->value()) {
533 out->setValue(out->value() + this->mFunc(arg));
534 }
535 out->setFinished();
536}
537
538template<typename Out, typename In>
539SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent)
540 : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent)
541{
542}
543
406 544
407} // namespace Private 545} // namespace Private
408 546