diff options
Diffstat (limited to 'async/src')
-rw-r--r-- | async/src/async.h | 170 |
1 files changed, 154 insertions, 16 deletions
diff --git a/async/src/async.h b/async/src/async.h index d15373b..336bae2 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -33,8 +33,6 @@ | |||
33 | 33 | ||
34 | 34 | ||
35 | /* | 35 | /* |
36 | * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation. | ||
37 | * Useful for typical value consumer continuations. | ||
38 | * TODO: error continuation on .then and others. | 36 | * TODO: error continuation on .then and others. |
39 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally | 37 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally |
40 | */ | 38 | */ |
@@ -47,13 +45,19 @@ class JobBase; | |||
47 | 45 | ||
48 | template<typename Out, typename ... In> | 46 | template<typename Out, typename ... In> |
49 | class Job; | 47 | class Job; |
50 | |||
51 | template<typename Out, typename ... In> | 48 | template<typename Out, typename ... In> |
52 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; | 49 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; |
50 | template<typename Out, typename ... In> | ||
51 | using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type; | ||
53 | template<typename Out, typename In> | 52 | template<typename Out, typename In> |
54 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | 53 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; |
55 | template<typename Out, typename In> | 54 | template<typename Out, typename In> |
55 | using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type; | ||
56 | template<typename Out, typename In> | ||
56 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | 57 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; |
58 | template<typename Out, typename In> | ||
59 | using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type; | ||
60 | |||
57 | using ErrorHandler = std::function<void(int, const QString &)>; | 61 | using ErrorHandler = std::function<void(int, const QString &)>; |
58 | 62 | ||
59 | namespace Private | 63 | namespace Private |
@@ -134,6 +138,33 @@ public: | |||
134 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); | 138 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); |
135 | }; | 139 | }; |
136 | 140 | ||
141 | template<typename Out, typename ... In> | ||
142 | class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...> | ||
143 | { | ||
144 | public: | ||
145 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
146 | void previousFutureReady(); | ||
147 | protected: | ||
148 | SyncThenTask<Out, In ...> mFunc; | ||
149 | }; | ||
150 | |||
151 | template<typename Out, typename In> | ||
152 | class SyncReduceExecutor : public SyncThenExecutor<Out, In> | ||
153 | { | ||
154 | public: | ||
155 | SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
156 | }; | ||
157 | |||
158 | template<typename PrevOut, typename Out, typename In> | ||
159 | class SyncEachExecutor : public Executor<PrevOut, Out, In> | ||
160 | { | ||
161 | public: | ||
162 | SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
163 | void previousFutureReady(); | ||
164 | protected: | ||
165 | SyncEachTask<Out, In> mFunc; | ||
166 | }; | ||
167 | |||
137 | } // namespace Private | 168 | } // namespace Private |
138 | 169 | ||
139 | /** | 170 | /** |
@@ -238,6 +269,9 @@ class Job : public JobBase | |||
238 | template<typename OutOther> | 269 | template<typename OutOther> |
239 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); | 270 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); |
240 | 271 | ||
272 | template<typename OutOther> | ||
273 | friend Job<OutOther> start(Async::SyncThenTask<OutOther> func); | ||
274 | |||
241 | public: | 275 | public: |
242 | template<typename OutOther, typename ... InOther> | 276 | template<typename OutOther, typename ... InOther> |
243 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | 277 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) |
@@ -246,28 +280,45 @@ public: | |||
246 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | 280 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); |
247 | } | 281 | } |
248 | 282 | ||
283 | template<typename OutOther, typename ... InOther> | ||
284 | Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | ||
285 | { | ||
286 | return Job<OutOther, InOther ...>(Private::ExecutorBasePtr( | ||
287 | new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | ||
288 | } | ||
289 | |||
249 | template<typename OutOther, typename InOther> | 290 | template<typename OutOther, typename InOther> |
250 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) | 291 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) |
251 | { | 292 | { |
252 | static_assert(detail::isIterable<Out>::value, | 293 | eachInvariants<OutOther>(); |
253 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
254 | static_assert(detail::isIterable<OutOther>::value, | ||
255 | "The result type of 'Each' task must be a list or an array."); | ||
256 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 294 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
257 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); | 295 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); |
258 | } | 296 | } |
259 | 297 | ||
260 | template<typename OutOther, typename InOther> | 298 | template<typename OutOther, typename InOther> |
299 | Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func) | ||
300 | { | ||
301 | eachInvariants<OutOther>(); | ||
302 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
303 | new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor))); | ||
304 | } | ||
305 | |||
306 | template<typename OutOther, typename InOther> | ||
261 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) | 307 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) |
262 | { | 308 | { |
263 | static_assert(Async::detail::isIterable<Out>::value, | 309 | reduceInvariants<InOther>(); |
264 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
265 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
266 | "The return type of previous task must be compatible with input type of this task"); | ||
267 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 310 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
268 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); | 311 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); |
269 | } | 312 | } |
270 | 313 | ||
314 | template<typename OutOther, typename InOther> | ||
315 | Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func) | ||
316 | { | ||
317 | reduceInvariants<InOther>(); | ||
318 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
319 | new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor))); | ||
320 | } | ||
321 | |||
271 | Async::Future<Out> exec() | 322 | Async::Future<Out> exec() |
272 | { | 323 | { |
273 | mExecutor->exec(); | 324 | mExecutor->exec(); |
@@ -283,6 +334,24 @@ private: | |||
283 | Job(Private::ExecutorBasePtr executor) | 334 | Job(Private::ExecutorBasePtr executor) |
284 | : JobBase(executor) | 335 | : JobBase(executor) |
285 | {} | 336 | {} |
337 | |||
338 | template<typename OutOther> | ||
339 | void eachInvariants() | ||
340 | { | ||
341 | static_assert(detail::isIterable<Out>::value, | ||
342 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
343 | static_assert(detail::isIterable<OutOther>::value, | ||
344 | "The result type of 'Each' task must be a list or an array."); | ||
345 | } | ||
346 | |||
347 | template<typename InOther> | ||
348 | void reduceInvariants() | ||
349 | { | ||
350 | static_assert(Async::detail::isIterable<Out>::value, | ||
351 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
352 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
353 | "The return type of previous task must be compatible with input type of this task"); | ||
354 | } | ||
286 | }; | 355 | }; |
287 | 356 | ||
288 | } // namespace Async | 357 | } // namespace Async |
@@ -298,6 +367,12 @@ Job<Out> start(ThenTask<Out> func) | |||
298 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); | 367 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); |
299 | } | 368 | } |
300 | 369 | ||
370 | template<typename Out> | ||
371 | Job<Out> start(SyncThenTask<Out> func) | ||
372 | { | ||
373 | return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func))); | ||
374 | } | ||
375 | |||
301 | namespace Private { | 376 | namespace Private { |
302 | 377 | ||
303 | template<typename PrevOut, typename Out, typename ... In> | 378 | template<typename PrevOut, typename Out, typename ... In> |
@@ -378,22 +453,23 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady() | |||
378 | } | 453 | } |
379 | 454 | ||
380 | for (auto arg : this->mPrevFuture->value()) { | 455 | for (auto arg : this->mPrevFuture->value()) { |
381 | Async::Future<Out> future; | 456 | auto future = new Async::Future<Out>; |
382 | this->mFunc(arg, future); | 457 | this->mFunc(arg, *future); |
383 | auto fw = new Async::FutureWatcher<Out>(); | 458 | auto fw = new Async::FutureWatcher<Out>(); |
384 | mFutureWatchers.append(fw); | 459 | mFutureWatchers.append(fw); |
385 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | 460 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, |
386 | [out, future, fw, this]() { | 461 | [out, future, fw, this]() { |
387 | assert(future.isFinished()); | 462 | assert(future->isFinished()); |
388 | const int index = mFutureWatchers.indexOf(fw); | 463 | const int index = mFutureWatchers.indexOf(fw); |
389 | assert(index > -1); | 464 | assert(index > -1); |
390 | mFutureWatchers.removeAt(index); | 465 | mFutureWatchers.removeAt(index); |
391 | out->setValue(out->value() + future.value()); | 466 | out->setValue(out->value() + future->value()); |
467 | delete future; | ||
392 | if (mFutureWatchers.isEmpty()) { | 468 | if (mFutureWatchers.isEmpty()) { |
393 | out->setFinished(); | 469 | out->setFinished(); |
394 | } | 470 | } |
395 | }); | 471 | }); |
396 | fw->setFuture(future); | 472 | fw->setFuture(*future); |
397 | } | 473 | } |
398 | } | 474 | } |
399 | 475 | ||
@@ -403,6 +479,68 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const Execut | |||
403 | { | 479 | { |
404 | } | 480 | } |
405 | 481 | ||
482 | template<typename Out, typename ... In> | ||
483 | SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) | ||
484 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | ||
485 | { | ||
486 | this->mFunc = then; | ||
487 | this->mErrorFunc = errorHandler; | ||
488 | } | ||
489 | |||
490 | template<typename Out, typename ... In> | ||
491 | void SyncThenExecutor<Out, In ...>::previousFutureReady() | ||
492 | { | ||
493 | if (this->mPrevFuture) { | ||
494 | assert(this->mPrevFuture->isFinished()); | ||
495 | } | ||
496 | |||
497 | if (this->mPrevFuture && this->mPrevFuture->errorCode()) { | ||
498 | if (this->mErrorFunc) { | ||
499 | this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
500 | this->mResult->setFinished(); | ||
501 | } else { | ||
502 | static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
503 | //propagate error if no error handler is available | ||
504 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
505 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
506 | this->mResult->setFinished(); | ||
507 | } | ||
508 | } else { | ||
509 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
510 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
511 | this->mResult->setFinished(); | ||
512 | } | ||
513 | } | ||
514 | |||
515 | template<typename PrevOut, typename Out, typename In> | ||
516 | SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent) | ||
517 | : Executor<PrevOut, Out, In>(parent) | ||
518 | { | ||
519 | this->mFunc = each; | ||
520 | } | ||
521 | |||
522 | template<typename PrevOut, typename Out, typename In> | ||
523 | void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady() | ||
524 | { | ||
525 | assert(this->mPrevFuture->isFinished()); | ||
526 | auto out = static_cast<Async::Future<Out>*>(this->mResult); | ||
527 | if (this->mPrevFuture->value().isEmpty()) { | ||
528 | out->setFinished(); | ||
529 | return; | ||
530 | } | ||
531 | |||
532 | for (auto arg : this->mPrevFuture->value()) { | ||
533 | out->setValue(out->value() + this->mFunc(arg)); | ||
534 | } | ||
535 | out->setFinished(); | ||
536 | } | ||
537 | |||
538 | template<typename Out, typename In> | ||
539 | SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent) | ||
540 | : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent) | ||
541 | { | ||
542 | } | ||
543 | |||
406 | 544 | ||
407 | } // namespace Private | 545 | } // namespace Private |
408 | 546 | ||