summaryrefslogtreecommitdiffstats
path: root/async/src/async.h
diff options
context:
space:
mode:
Diffstat (limited to 'async/src/async.h')
-rw-r--r--async/src/async.h170
1 files changed, 154 insertions, 16 deletions
diff --git a/async/src/async.h b/async/src/async.h
index d15373b..336bae2 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -33,8 +33,6 @@
33 33
34 34
35/* 35/*
36 * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation.
37 * Useful for typical value consumer continuations.
38 * TODO: error continuation on .then and others. 36 * TODO: error continuation on .then and others.
39 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally 37 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally
40 */ 38 */
@@ -47,13 +45,19 @@ class JobBase;
47 45
48template<typename Out, typename ... In> 46template<typename Out, typename ... In>
49class Job; 47class Job;
50
51template<typename Out, typename ... In> 48template<typename Out, typename ... In>
52using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; 49using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type;
50template<typename Out, typename ... In>
51using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type;
53template<typename Out, typename In> 52template<typename Out, typename In>
54using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 53using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
55template<typename Out, typename In> 54template<typename Out, typename In>
55using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type;
56template<typename Out, typename In>
56using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 57using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
58template<typename Out, typename In>
59using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type;
60
57using ErrorHandler = std::function<void(int, const QString &)>; 61using ErrorHandler = std::function<void(int, const QString &)>;
58 62
59namespace Private 63namespace Private
@@ -134,6 +138,33 @@ public:
134 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); 138 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent);
135}; 139};
136 140
141template<typename Out, typename ... In>
142class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...>
143{
144public:
145 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr());
146 void previousFutureReady();
147protected:
148 SyncThenTask<Out, In ...> mFunc;
149};
150
151template<typename Out, typename In>
152class SyncReduceExecutor : public SyncThenExecutor<Out, In>
153{
154public:
155 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr());
156};
157
158template<typename PrevOut, typename Out, typename In>
159class SyncEachExecutor : public Executor<PrevOut, Out, In>
160{
161public:
162 SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr());
163 void previousFutureReady();
164protected:
165 SyncEachTask<Out, In> mFunc;
166};
167
137} // namespace Private 168} // namespace Private
138 169
139/** 170/**
@@ -238,6 +269,9 @@ class Job : public JobBase
238 template<typename OutOther> 269 template<typename OutOther>
239 friend Job<OutOther> start(Async::ThenTask<OutOther> func); 270 friend Job<OutOther> start(Async::ThenTask<OutOther> func);
240 271
272 template<typename OutOther>
273 friend Job<OutOther> start(Async::SyncThenTask<OutOther> func);
274
241public: 275public:
242 template<typename OutOther, typename ... InOther> 276 template<typename OutOther, typename ... InOther>
243 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) 277 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
@@ -246,28 +280,45 @@ public:
246 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); 280 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
247 } 281 }
248 282
283 template<typename OutOther, typename ... InOther>
284 Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
285 {
286 return Job<OutOther, InOther ...>(Private::ExecutorBasePtr(
287 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
288 }
289
249 template<typename OutOther, typename InOther> 290 template<typename OutOther, typename InOther>
250 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) 291 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func)
251 { 292 {
252 static_assert(detail::isIterable<Out>::value, 293 eachInvariants<OutOther>();
253 "The 'Each' task can only be connected to a job that returns a list or an array.");
254 static_assert(detail::isIterable<OutOther>::value,
255 "The result type of 'Each' task must be a list or an array.");
256 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 294 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
257 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); 295 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor)));
258 } 296 }
259 297
260 template<typename OutOther, typename InOther> 298 template<typename OutOther, typename InOther>
299 Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func)
300 {
301 eachInvariants<OutOther>();
302 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
303 new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor)));
304 }
305
306 template<typename OutOther, typename InOther>
261 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) 307 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func)
262 { 308 {
263 static_assert(Async::detail::isIterable<Out>::value, 309 reduceInvariants<InOther>();
264 "The 'Result' task can only be connected to a job that returns a list or an array");
265 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
266 "The return type of previous task must be compatible with input type of this task");
267 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 310 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
268 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); 311 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)));
269 } 312 }
270 313
314 template<typename OutOther, typename InOther>
315 Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func)
316 {
317 reduceInvariants<InOther>();
318 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
319 new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor)));
320 }
321
271 Async::Future<Out> exec() 322 Async::Future<Out> exec()
272 { 323 {
273 mExecutor->exec(); 324 mExecutor->exec();
@@ -283,6 +334,24 @@ private:
283 Job(Private::ExecutorBasePtr executor) 334 Job(Private::ExecutorBasePtr executor)
284 : JobBase(executor) 335 : JobBase(executor)
285 {} 336 {}
337
338 template<typename OutOther>
339 void eachInvariants()
340 {
341 static_assert(detail::isIterable<Out>::value,
342 "The 'Each' task can only be connected to a job that returns a list or an array.");
343 static_assert(detail::isIterable<OutOther>::value,
344 "The result type of 'Each' task must be a list or an array.");
345 }
346
347 template<typename InOther>
348 void reduceInvariants()
349 {
350 static_assert(Async::detail::isIterable<Out>::value,
351 "The 'Result' task can only be connected to a job that returns a list or an array");
352 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
353 "The return type of previous task must be compatible with input type of this task");
354 }
286}; 355};
287 356
288} // namespace Async 357} // namespace Async
@@ -298,6 +367,12 @@ Job<Out> start(ThenTask<Out> func)
298 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); 367 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func)));
299} 368}
300 369
370template<typename Out>
371Job<Out> start(SyncThenTask<Out> func)
372{
373 return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func)));
374}
375
301namespace Private { 376namespace Private {
302 377
303template<typename PrevOut, typename Out, typename ... In> 378template<typename PrevOut, typename Out, typename ... In>
@@ -378,22 +453,23 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady()
378 } 453 }
379 454
380 for (auto arg : this->mPrevFuture->value()) { 455 for (auto arg : this->mPrevFuture->value()) {
381 Async::Future<Out> future; 456 auto future = new Async::Future<Out>;
382 this->mFunc(arg, future); 457 this->mFunc(arg, *future);
383 auto fw = new Async::FutureWatcher<Out>(); 458 auto fw = new Async::FutureWatcher<Out>();
384 mFutureWatchers.append(fw); 459 mFutureWatchers.append(fw);
385 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, 460 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
386 [out, future, fw, this]() { 461 [out, future, fw, this]() {
387 assert(future.isFinished()); 462 assert(future->isFinished());
388 const int index = mFutureWatchers.indexOf(fw); 463 const int index = mFutureWatchers.indexOf(fw);
389 assert(index > -1); 464 assert(index > -1);
390 mFutureWatchers.removeAt(index); 465 mFutureWatchers.removeAt(index);
391 out->setValue(out->value() + future.value()); 466 out->setValue(out->value() + future->value());
467 delete future;
392 if (mFutureWatchers.isEmpty()) { 468 if (mFutureWatchers.isEmpty()) {
393 out->setFinished(); 469 out->setFinished();
394 } 470 }
395 }); 471 });
396 fw->setFuture(future); 472 fw->setFuture(*future);
397 } 473 }
398} 474}
399 475
@@ -403,6 +479,68 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const Execut
403{ 479{
404} 480}
405 481
482template<typename Out, typename ... In>
483SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent)
484 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent)
485{
486 this->mFunc = then;
487 this->mErrorFunc = errorHandler;
488}
489
490template<typename Out, typename ... In>
491void SyncThenExecutor<Out, In ...>::previousFutureReady()
492{
493 if (this->mPrevFuture) {
494 assert(this->mPrevFuture->isFinished());
495 }
496
497 if (this->mPrevFuture && this->mPrevFuture->errorCode()) {
498 if (this->mErrorFunc) {
499 this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
500 this->mResult->setFinished();
501 } else {
502 static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
503 //propagate error if no error handler is available
504 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
505 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
506 this->mResult->setFinished();
507 }
508 } else {
509 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
510 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
511 this->mResult->setFinished();
512 }
513}
514
515template<typename PrevOut, typename Out, typename In>
516SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent)
517 : Executor<PrevOut, Out, In>(parent)
518{
519 this->mFunc = each;
520}
521
522template<typename PrevOut, typename Out, typename In>
523void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady()
524{
525 assert(this->mPrevFuture->isFinished());
526 auto out = static_cast<Async::Future<Out>*>(this->mResult);
527 if (this->mPrevFuture->value().isEmpty()) {
528 out->setFinished();
529 return;
530 }
531
532 for (auto arg : this->mPrevFuture->value()) {
533 out->setValue(out->value() + this->mFunc(arg));
534 }
535 out->setFinished();
536}
537
538template<typename Out, typename In>
539SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent)
540 : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent)
541{
542}
543
406 544
407} // namespace Private 545} // namespace Private
408 546