diff options
author | Dan Vrátil <dvratil@redhat.com> | 2015-02-08 12:02:04 +0100 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2015-02-09 14:33:45 +0100 |
commit | adc6a443776820b5ae36c982baf92b1d29bbaa4b (patch) | |
tree | ed278ffbcd8fc8c3759fbcc4afd4240fc1a72fc3 /async/src | |
parent | cbb192ffe865ffb3eed4c940177ffecaecfa570f (diff) | |
download | sink-adc6a443776820b5ae36c982baf92b1d29bbaa4b.tar.gz sink-adc6a443776820b5ae36c982baf92b1d29bbaa4b.zip |
Async: introduce sync executors
Sync executors don't pass Async::Future into the user-provided tasks, but
instead work with return values of the task methods, wrapping them into the
Async::Future internally. Sync tasks are of course possible since forever, but
not the API for those tasks is much cleaner, for users don't have to deal with
"future" in synchronous tasks, for instance when synchronously processing results
of an async task before passing the data to another async task.
Diffstat (limited to 'async/src')
-rw-r--r-- | async/src/async.h | 170 |
1 files changed, 154 insertions, 16 deletions
diff --git a/async/src/async.h b/async/src/async.h index d15373b..336bae2 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -33,8 +33,6 @@ | |||
33 | 33 | ||
34 | 34 | ||
35 | /* | 35 | /* |
36 | * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation. | ||
37 | * Useful for typical value consumer continuations. | ||
38 | * TODO: error continuation on .then and others. | 36 | * TODO: error continuation on .then and others. |
39 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally | 37 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally |
40 | */ | 38 | */ |
@@ -47,13 +45,19 @@ class JobBase; | |||
47 | 45 | ||
48 | template<typename Out, typename ... In> | 46 | template<typename Out, typename ... In> |
49 | class Job; | 47 | class Job; |
50 | |||
51 | template<typename Out, typename ... In> | 48 | template<typename Out, typename ... In> |
52 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; | 49 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; |
50 | template<typename Out, typename ... In> | ||
51 | using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type; | ||
53 | template<typename Out, typename In> | 52 | template<typename Out, typename In> |
54 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | 53 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; |
55 | template<typename Out, typename In> | 54 | template<typename Out, typename In> |
55 | using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type; | ||
56 | template<typename Out, typename In> | ||
56 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | 57 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; |
58 | template<typename Out, typename In> | ||
59 | using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type; | ||
60 | |||
57 | using ErrorHandler = std::function<void(int, const QString &)>; | 61 | using ErrorHandler = std::function<void(int, const QString &)>; |
58 | 62 | ||
59 | namespace Private | 63 | namespace Private |
@@ -134,6 +138,33 @@ public: | |||
134 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); | 138 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); |
135 | }; | 139 | }; |
136 | 140 | ||
141 | template<typename Out, typename ... In> | ||
142 | class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...> | ||
143 | { | ||
144 | public: | ||
145 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
146 | void previousFutureReady(); | ||
147 | protected: | ||
148 | SyncThenTask<Out, In ...> mFunc; | ||
149 | }; | ||
150 | |||
151 | template<typename Out, typename In> | ||
152 | class SyncReduceExecutor : public SyncThenExecutor<Out, In> | ||
153 | { | ||
154 | public: | ||
155 | SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
156 | }; | ||
157 | |||
158 | template<typename PrevOut, typename Out, typename In> | ||
159 | class SyncEachExecutor : public Executor<PrevOut, Out, In> | ||
160 | { | ||
161 | public: | ||
162 | SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
163 | void previousFutureReady(); | ||
164 | protected: | ||
165 | SyncEachTask<Out, In> mFunc; | ||
166 | }; | ||
167 | |||
137 | } // namespace Private | 168 | } // namespace Private |
138 | 169 | ||
139 | /** | 170 | /** |
@@ -238,6 +269,9 @@ class Job : public JobBase | |||
238 | template<typename OutOther> | 269 | template<typename OutOther> |
239 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); | 270 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); |
240 | 271 | ||
272 | template<typename OutOther> | ||
273 | friend Job<OutOther> start(Async::SyncThenTask<OutOther> func); | ||
274 | |||
241 | public: | 275 | public: |
242 | template<typename OutOther, typename ... InOther> | 276 | template<typename OutOther, typename ... InOther> |
243 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | 277 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) |
@@ -246,28 +280,45 @@ public: | |||
246 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | 280 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); |
247 | } | 281 | } |
248 | 282 | ||
283 | template<typename OutOther, typename ... InOther> | ||
284 | Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | ||
285 | { | ||
286 | return Job<OutOther, InOther ...>(Private::ExecutorBasePtr( | ||
287 | new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | ||
288 | } | ||
289 | |||
249 | template<typename OutOther, typename InOther> | 290 | template<typename OutOther, typename InOther> |
250 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) | 291 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) |
251 | { | 292 | { |
252 | static_assert(detail::isIterable<Out>::value, | 293 | eachInvariants<OutOther>(); |
253 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
254 | static_assert(detail::isIterable<OutOther>::value, | ||
255 | "The result type of 'Each' task must be a list or an array."); | ||
256 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 294 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
257 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); | 295 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); |
258 | } | 296 | } |
259 | 297 | ||
260 | template<typename OutOther, typename InOther> | 298 | template<typename OutOther, typename InOther> |
299 | Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func) | ||
300 | { | ||
301 | eachInvariants<OutOther>(); | ||
302 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
303 | new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor))); | ||
304 | } | ||
305 | |||
306 | template<typename OutOther, typename InOther> | ||
261 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) | 307 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) |
262 | { | 308 | { |
263 | static_assert(Async::detail::isIterable<Out>::value, | 309 | reduceInvariants<InOther>(); |
264 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
265 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
266 | "The return type of previous task must be compatible with input type of this task"); | ||
267 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 310 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
268 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); | 311 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); |
269 | } | 312 | } |
270 | 313 | ||
314 | template<typename OutOther, typename InOther> | ||
315 | Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func) | ||
316 | { | ||
317 | reduceInvariants<InOther>(); | ||
318 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
319 | new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor))); | ||
320 | } | ||
321 | |||
271 | Async::Future<Out> exec() | 322 | Async::Future<Out> exec() |
272 | { | 323 | { |
273 | mExecutor->exec(); | 324 | mExecutor->exec(); |
@@ -283,6 +334,24 @@ private: | |||
283 | Job(Private::ExecutorBasePtr executor) | 334 | Job(Private::ExecutorBasePtr executor) |
284 | : JobBase(executor) | 335 | : JobBase(executor) |
285 | {} | 336 | {} |
337 | |||
338 | template<typename OutOther> | ||
339 | void eachInvariants() | ||
340 | { | ||
341 | static_assert(detail::isIterable<Out>::value, | ||
342 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
343 | static_assert(detail::isIterable<OutOther>::value, | ||
344 | "The result type of 'Each' task must be a list or an array."); | ||
345 | } | ||
346 | |||
347 | template<typename InOther> | ||
348 | void reduceInvariants() | ||
349 | { | ||
350 | static_assert(Async::detail::isIterable<Out>::value, | ||
351 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
352 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
353 | "The return type of previous task must be compatible with input type of this task"); | ||
354 | } | ||
286 | }; | 355 | }; |
287 | 356 | ||
288 | } // namespace Async | 357 | } // namespace Async |
@@ -298,6 +367,12 @@ Job<Out> start(ThenTask<Out> func) | |||
298 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); | 367 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); |
299 | } | 368 | } |
300 | 369 | ||
370 | template<typename Out> | ||
371 | Job<Out> start(SyncThenTask<Out> func) | ||
372 | { | ||
373 | return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func))); | ||
374 | } | ||
375 | |||
301 | namespace Private { | 376 | namespace Private { |
302 | 377 | ||
303 | template<typename PrevOut, typename Out, typename ... In> | 378 | template<typename PrevOut, typename Out, typename ... In> |
@@ -378,22 +453,23 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady() | |||
378 | } | 453 | } |
379 | 454 | ||
380 | for (auto arg : this->mPrevFuture->value()) { | 455 | for (auto arg : this->mPrevFuture->value()) { |
381 | Async::Future<Out> future; | 456 | auto future = new Async::Future<Out>; |
382 | this->mFunc(arg, future); | 457 | this->mFunc(arg, *future); |
383 | auto fw = new Async::FutureWatcher<Out>(); | 458 | auto fw = new Async::FutureWatcher<Out>(); |
384 | mFutureWatchers.append(fw); | 459 | mFutureWatchers.append(fw); |
385 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | 460 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, |
386 | [out, future, fw, this]() { | 461 | [out, future, fw, this]() { |
387 | assert(future.isFinished()); | 462 | assert(future->isFinished()); |
388 | const int index = mFutureWatchers.indexOf(fw); | 463 | const int index = mFutureWatchers.indexOf(fw); |
389 | assert(index > -1); | 464 | assert(index > -1); |
390 | mFutureWatchers.removeAt(index); | 465 | mFutureWatchers.removeAt(index); |
391 | out->setValue(out->value() + future.value()); | 466 | out->setValue(out->value() + future->value()); |
467 | delete future; | ||
392 | if (mFutureWatchers.isEmpty()) { | 468 | if (mFutureWatchers.isEmpty()) { |
393 | out->setFinished(); | 469 | out->setFinished(); |
394 | } | 470 | } |
395 | }); | 471 | }); |
396 | fw->setFuture(future); | 472 | fw->setFuture(*future); |
397 | } | 473 | } |
398 | } | 474 | } |
399 | 475 | ||
@@ -403,6 +479,68 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const Execut | |||
403 | { | 479 | { |
404 | } | 480 | } |
405 | 481 | ||
482 | template<typename Out, typename ... In> | ||
483 | SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) | ||
484 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | ||
485 | { | ||
486 | this->mFunc = then; | ||
487 | this->mErrorFunc = errorHandler; | ||
488 | } | ||
489 | |||
490 | template<typename Out, typename ... In> | ||
491 | void SyncThenExecutor<Out, In ...>::previousFutureReady() | ||
492 | { | ||
493 | if (this->mPrevFuture) { | ||
494 | assert(this->mPrevFuture->isFinished()); | ||
495 | } | ||
496 | |||
497 | if (this->mPrevFuture && this->mPrevFuture->errorCode()) { | ||
498 | if (this->mErrorFunc) { | ||
499 | this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
500 | this->mResult->setFinished(); | ||
501 | } else { | ||
502 | static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
503 | //propagate error if no error handler is available | ||
504 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
505 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
506 | this->mResult->setFinished(); | ||
507 | } | ||
508 | } else { | ||
509 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
510 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
511 | this->mResult->setFinished(); | ||
512 | } | ||
513 | } | ||
514 | |||
515 | template<typename PrevOut, typename Out, typename In> | ||
516 | SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent) | ||
517 | : Executor<PrevOut, Out, In>(parent) | ||
518 | { | ||
519 | this->mFunc = each; | ||
520 | } | ||
521 | |||
522 | template<typename PrevOut, typename Out, typename In> | ||
523 | void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady() | ||
524 | { | ||
525 | assert(this->mPrevFuture->isFinished()); | ||
526 | auto out = static_cast<Async::Future<Out>*>(this->mResult); | ||
527 | if (this->mPrevFuture->value().isEmpty()) { | ||
528 | out->setFinished(); | ||
529 | return; | ||
530 | } | ||
531 | |||
532 | for (auto arg : this->mPrevFuture->value()) { | ||
533 | out->setValue(out->value() + this->mFunc(arg)); | ||
534 | } | ||
535 | out->setFinished(); | ||
536 | } | ||
537 | |||
538 | template<typename Out, typename In> | ||
539 | SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent) | ||
540 | : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent) | ||
541 | { | ||
542 | } | ||
543 | |||
406 | 544 | ||
407 | } // namespace Private | 545 | } // namespace Private |
408 | 546 | ||