summaryrefslogtreecommitdiffstats
path: root/async/src
diff options
context:
space:
mode:
authorDan Vrátil <dvratil@redhat.com>2015-02-08 12:02:04 +0100
committerDan Vrátil <dvratil@redhat.com>2015-02-09 14:33:45 +0100
commitadc6a443776820b5ae36c982baf92b1d29bbaa4b (patch)
treeed278ffbcd8fc8c3759fbcc4afd4240fc1a72fc3 /async/src
parentcbb192ffe865ffb3eed4c940177ffecaecfa570f (diff)
downloadsink-adc6a443776820b5ae36c982baf92b1d29bbaa4b.tar.gz
sink-adc6a443776820b5ae36c982baf92b1d29bbaa4b.zip
Async: introduce sync executors
Sync executors don't pass Async::Future into the user-provided tasks, but instead work with return values of the task methods, wrapping them into the Async::Future internally. Sync tasks are of course possible since forever, but not the API for those tasks is much cleaner, for users don't have to deal with "future" in synchronous tasks, for instance when synchronously processing results of an async task before passing the data to another async task.
Diffstat (limited to 'async/src')
-rw-r--r--async/src/async.h170
1 files changed, 154 insertions, 16 deletions
diff --git a/async/src/async.h b/async/src/async.h
index d15373b..336bae2 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -33,8 +33,6 @@
33 33
34 34
35/* 35/*
36 * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation.
37 * Useful for typical value consumer continuations.
38 * TODO: error continuation on .then and others. 36 * TODO: error continuation on .then and others.
39 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally 37 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally
40 */ 38 */
@@ -47,13 +45,19 @@ class JobBase;
47 45
48template<typename Out, typename ... In> 46template<typename Out, typename ... In>
49class Job; 47class Job;
50
51template<typename Out, typename ... In> 48template<typename Out, typename ... In>
52using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; 49using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type;
50template<typename Out, typename ... In>
51using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type;
53template<typename Out, typename In> 52template<typename Out, typename In>
54using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 53using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
55template<typename Out, typename In> 54template<typename Out, typename In>
55using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type;
56template<typename Out, typename In>
56using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 57using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
58template<typename Out, typename In>
59using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type;
60
57using ErrorHandler = std::function<void(int, const QString &)>; 61using ErrorHandler = std::function<void(int, const QString &)>;
58 62
59namespace Private 63namespace Private
@@ -134,6 +138,33 @@ public:
134 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); 138 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent);
135}; 139};
136 140
141template<typename Out, typename ... In>
142class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...>
143{
144public:
145 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr());
146 void previousFutureReady();
147protected:
148 SyncThenTask<Out, In ...> mFunc;
149};
150
151template<typename Out, typename In>
152class SyncReduceExecutor : public SyncThenExecutor<Out, In>
153{
154public:
155 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr());
156};
157
158template<typename PrevOut, typename Out, typename In>
159class SyncEachExecutor : public Executor<PrevOut, Out, In>
160{
161public:
162 SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr());
163 void previousFutureReady();
164protected:
165 SyncEachTask<Out, In> mFunc;
166};
167
137} // namespace Private 168} // namespace Private
138 169
139/** 170/**
@@ -238,6 +269,9 @@ class Job : public JobBase
238 template<typename OutOther> 269 template<typename OutOther>
239 friend Job<OutOther> start(Async::ThenTask<OutOther> func); 270 friend Job<OutOther> start(Async::ThenTask<OutOther> func);
240 271
272 template<typename OutOther>
273 friend Job<OutOther> start(Async::SyncThenTask<OutOther> func);
274
241public: 275public:
242 template<typename OutOther, typename ... InOther> 276 template<typename OutOther, typename ... InOther>
243 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) 277 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
@@ -246,28 +280,45 @@ public:
246 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); 280 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
247 } 281 }
248 282
283 template<typename OutOther, typename ... InOther>
284 Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
285 {
286 return Job<OutOther, InOther ...>(Private::ExecutorBasePtr(
287 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
288 }
289
249 template<typename OutOther, typename InOther> 290 template<typename OutOther, typename InOther>
250 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) 291 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func)
251 { 292 {
252 static_assert(detail::isIterable<Out>::value, 293 eachInvariants<OutOther>();
253 "The 'Each' task can only be connected to a job that returns a list or an array.");
254 static_assert(detail::isIterable<OutOther>::value,
255 "The result type of 'Each' task must be a list or an array.");
256 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 294 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
257 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); 295 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor)));
258 } 296 }
259 297
260 template<typename OutOther, typename InOther> 298 template<typename OutOther, typename InOther>
299 Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func)
300 {
301 eachInvariants<OutOther>();
302 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
303 new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor)));
304 }
305
306 template<typename OutOther, typename InOther>
261 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) 307 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func)
262 { 308 {
263 static_assert(Async::detail::isIterable<Out>::value, 309 reduceInvariants<InOther>();
264 "The 'Result' task can only be connected to a job that returns a list or an array");
265 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
266 "The return type of previous task must be compatible with input type of this task");
267 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 310 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
268 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); 311 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)));
269 } 312 }
270 313
314 template<typename OutOther, typename InOther>
315 Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func)
316 {
317 reduceInvariants<InOther>();
318 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
319 new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor)));
320 }
321
271 Async::Future<Out> exec() 322 Async::Future<Out> exec()
272 { 323 {
273 mExecutor->exec(); 324 mExecutor->exec();
@@ -283,6 +334,24 @@ private:
283 Job(Private::ExecutorBasePtr executor) 334 Job(Private::ExecutorBasePtr executor)
284 : JobBase(executor) 335 : JobBase(executor)
285 {} 336 {}
337
338 template<typename OutOther>
339 void eachInvariants()
340 {
341 static_assert(detail::isIterable<Out>::value,
342 "The 'Each' task can only be connected to a job that returns a list or an array.");
343 static_assert(detail::isIterable<OutOther>::value,
344 "The result type of 'Each' task must be a list or an array.");
345 }
346
347 template<typename InOther>
348 void reduceInvariants()
349 {
350 static_assert(Async::detail::isIterable<Out>::value,
351 "The 'Result' task can only be connected to a job that returns a list or an array");
352 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
353 "The return type of previous task must be compatible with input type of this task");
354 }
286}; 355};
287 356
288} // namespace Async 357} // namespace Async
@@ -298,6 +367,12 @@ Job<Out> start(ThenTask<Out> func)
298 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); 367 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func)));
299} 368}
300 369
370template<typename Out>
371Job<Out> start(SyncThenTask<Out> func)
372{
373 return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func)));
374}
375
301namespace Private { 376namespace Private {
302 377
303template<typename PrevOut, typename Out, typename ... In> 378template<typename PrevOut, typename Out, typename ... In>
@@ -378,22 +453,23 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady()
378 } 453 }
379 454
380 for (auto arg : this->mPrevFuture->value()) { 455 for (auto arg : this->mPrevFuture->value()) {
381 Async::Future<Out> future; 456 auto future = new Async::Future<Out>;
382 this->mFunc(arg, future); 457 this->mFunc(arg, *future);
383 auto fw = new Async::FutureWatcher<Out>(); 458 auto fw = new Async::FutureWatcher<Out>();
384 mFutureWatchers.append(fw); 459 mFutureWatchers.append(fw);
385 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, 460 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
386 [out, future, fw, this]() { 461 [out, future, fw, this]() {
387 assert(future.isFinished()); 462 assert(future->isFinished());
388 const int index = mFutureWatchers.indexOf(fw); 463 const int index = mFutureWatchers.indexOf(fw);
389 assert(index > -1); 464 assert(index > -1);
390 mFutureWatchers.removeAt(index); 465 mFutureWatchers.removeAt(index);
391 out->setValue(out->value() + future.value()); 466 out->setValue(out->value() + future->value());
467 delete future;
392 if (mFutureWatchers.isEmpty()) { 468 if (mFutureWatchers.isEmpty()) {
393 out->setFinished(); 469 out->setFinished();
394 } 470 }
395 }); 471 });
396 fw->setFuture(future); 472 fw->setFuture(*future);
397 } 473 }
398} 474}
399 475
@@ -403,6 +479,68 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const Execut
403{ 479{
404} 480}
405 481
482template<typename Out, typename ... In>
483SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent)
484 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent)
485{
486 this->mFunc = then;
487 this->mErrorFunc = errorHandler;
488}
489
490template<typename Out, typename ... In>
491void SyncThenExecutor<Out, In ...>::previousFutureReady()
492{
493 if (this->mPrevFuture) {
494 assert(this->mPrevFuture->isFinished());
495 }
496
497 if (this->mPrevFuture && this->mPrevFuture->errorCode()) {
498 if (this->mErrorFunc) {
499 this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
500 this->mResult->setFinished();
501 } else {
502 static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
503 //propagate error if no error handler is available
504 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
505 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
506 this->mResult->setFinished();
507 }
508 } else {
509 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
510 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
511 this->mResult->setFinished();
512 }
513}
514
515template<typename PrevOut, typename Out, typename In>
516SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent)
517 : Executor<PrevOut, Out, In>(parent)
518{
519 this->mFunc = each;
520}
521
522template<typename PrevOut, typename Out, typename In>
523void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady()
524{
525 assert(this->mPrevFuture->isFinished());
526 auto out = static_cast<Async::Future<Out>*>(this->mResult);
527 if (this->mPrevFuture->value().isEmpty()) {
528 out->setFinished();
529 return;
530 }
531
532 for (auto arg : this->mPrevFuture->value()) {
533 out->setValue(out->value() + this->mFunc(arg));
534 }
535 out->setFinished();
536}
537
538template<typename Out, typename In>
539SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent)
540 : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent)
541{
542}
543
406 544
407} // namespace Private 545} // namespace Private
408 546