diff options
Diffstat (limited to 'async/src/async.h')
-rw-r--r-- | async/src/async.h | 872 |
1 files changed, 0 insertions, 872 deletions
diff --git a/async/src/async.h b/async/src/async.h deleted file mode 100644 index b1f1121..0000000 --- a/async/src/async.h +++ /dev/null | |||
@@ -1,872 +0,0 @@ | |||
1 | /* | ||
2 | * Copyright 2014 Daniel Vrátil <dvratil@redhat.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Library General Public License as | ||
6 | * published by the Free Software Foundation; either version 2 of | ||
7 | * the License, or (at your option) any later version. | ||
8 | * | ||
9 | * This library is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU Library General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU Library General Public License | ||
15 | * along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
16 | */ | ||
17 | |||
18 | #ifndef ASYNC_H | ||
19 | #define ASYNC_H | ||
20 | |||
21 | #include <functional> | ||
22 | #include <list> | ||
23 | #include <type_traits> | ||
24 | #include <cassert> | ||
25 | #include <iterator> | ||
26 | |||
27 | #include "future.h" | ||
28 | #include "debug.h" | ||
29 | #include "async_impl.h" | ||
30 | |||
31 | #include <QVector> | ||
32 | #include <QObject> | ||
33 | #include <QSharedPointer> | ||
34 | |||
35 | #include <QDebug> | ||
36 | |||
37 | #ifdef WITH_KJOB | ||
38 | #include <KJob> | ||
39 | #endif | ||
40 | |||
41 | |||
42 | /* | ||
43 | * API to help write async code. | ||
44 | * | ||
45 | * This API is based around jobs that take lambdas to execute asynchronous tasks. Each async operation can take a continuation, | ||
46 | * that can then be used to execute further async operations. That way it is possible to build async chains of operations, | ||
47 | * that can be stored and executed later on. Jobs can be composed, similarly to functions. | ||
48 | * | ||
49 | * Relations between the components: | ||
50 | * * Job: API wrapper around Executors chain. Can be destroyed while still running, | ||
51 | * because the actual execution happens in the background | ||
52 | * * Executor: Describes task to execute. Executors form a linked list matching the | ||
53 | * order in which they will be executed. The Executor chain is destroyed when | ||
54 | * the parent Job is destroyed. However if the Job is still running it is | ||
55 | * guaranteed that the Executor chain will not be destroyed until the execution | ||
56 | * is finished. | ||
57 | * * Execution: The running execution of the task stored in Executor. Each call to Job::exec() | ||
58 | * instantiates new Execution chain, which makes it possible for the Job to be | ||
59 | * executed multiple times (even in parallel). | ||
60 | * * Future: Representation of the result that is being calculated | ||
61 | * | ||
62 | * | ||
63 | * TODO: Composed progress reporting | ||
64 | * TODO: Possibility to abort a job through future (perhaps optional?) | ||
65 | * TODO: Support for timeout, specified during exec call, after which the error handler gets called with a defined errorCode. | ||
66 | */ | ||
67 | namespace Async { | ||
68 | |||
69 | template<typename PrevOut, typename Out, typename ... In> | ||
70 | class Executor; | ||
71 | |||
72 | class JobBase; | ||
73 | |||
74 | template<typename Out, typename ... In> | ||
75 | class Job; | ||
76 | |||
77 | template<typename Out, typename ... In> | ||
78 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; | ||
79 | template<typename Out, typename ... In> | ||
80 | using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type; | ||
81 | template<typename Out, typename In> | ||
82 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | ||
83 | template<typename Out, typename In> | ||
84 | using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type; | ||
85 | template<typename Out, typename In> | ||
86 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | ||
87 | template<typename Out, typename In> | ||
88 | using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type; | ||
89 | |||
90 | using ErrorHandler = std::function<void(int, const QString &)>; | ||
91 | using Condition = std::function<bool()>; | ||
92 | |||
93 | namespace Private | ||
94 | { | ||
95 | |||
96 | class ExecutorBase; | ||
97 | typedef QSharedPointer<ExecutorBase> ExecutorBasePtr; | ||
98 | |||
99 | struct Execution { | ||
100 | Execution(const ExecutorBasePtr &executor); | ||
101 | ~Execution(); | ||
102 | void setFinished(); | ||
103 | |||
104 | template<typename T> | ||
105 | Async::Future<T>* result() const | ||
106 | { | ||
107 | return static_cast<Async::Future<T>*>(resultBase); | ||
108 | } | ||
109 | |||
110 | void releaseFuture(); | ||
111 | bool errorWasHandled() const; | ||
112 | |||
113 | ExecutorBasePtr executor; | ||
114 | FutureBase *resultBase; | ||
115 | bool isRunning; | ||
116 | bool isFinished; | ||
117 | |||
118 | ExecutionPtr prevExecution; | ||
119 | |||
120 | #ifndef QT_NO_DEBUG | ||
121 | Tracer *tracer; | ||
122 | #endif | ||
123 | }; | ||
124 | |||
125 | |||
126 | typedef QSharedPointer<Execution> ExecutionPtr; | ||
127 | |||
128 | class ExecutorBase | ||
129 | { | ||
130 | template<typename PrevOut, typename Out, typename ... In> | ||
131 | friend class Executor; | ||
132 | |||
133 | template<typename Out, typename ... In> | ||
134 | friend class Async::Job; | ||
135 | |||
136 | friend class Execution; | ||
137 | friend class Async::Tracer; | ||
138 | |||
139 | public: | ||
140 | virtual ~ExecutorBase(); | ||
141 | virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0; | ||
142 | |||
143 | protected: | ||
144 | ExecutorBase(const ExecutorBasePtr &parent); | ||
145 | |||
146 | template<typename T> | ||
147 | Async::Future<T>* createFuture(const ExecutionPtr &execution) const; | ||
148 | |||
149 | virtual bool hasErrorFunc() const = 0; | ||
150 | virtual bool handleError(const ExecutionPtr &execution) = 0; | ||
151 | |||
152 | ExecutorBasePtr mPrev; | ||
153 | |||
154 | #ifndef QT_NO_DEBUG | ||
155 | QString mExecutorName; | ||
156 | #endif | ||
157 | }; | ||
158 | |||
159 | template<typename PrevOut, typename Out, typename ... In> | ||
160 | class Executor : public ExecutorBase | ||
161 | { | ||
162 | protected: | ||
163 | Executor(ErrorHandler errorFunc, const Private::ExecutorBasePtr &parent) | ||
164 | : ExecutorBase(parent) | ||
165 | , mErrorFunc(errorFunc) | ||
166 | {} | ||
167 | |||
168 | virtual ~Executor() {} | ||
169 | virtual void run(const ExecutionPtr &execution) = 0; | ||
170 | |||
171 | ExecutionPtr exec(const ExecutorBasePtr &self); | ||
172 | bool hasErrorFunc() const { return (bool) mErrorFunc; } | ||
173 | bool handleError(const ExecutionPtr &execution); | ||
174 | |||
175 | std::function<void(int, const QString &)> mErrorFunc; | ||
176 | }; | ||
177 | |||
178 | template<typename Out, typename ... In> | ||
179 | class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, In ...> | ||
180 | { | ||
181 | public: | ||
182 | ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
183 | void run(const ExecutionPtr &execution); | ||
184 | private: | ||
185 | ThenTask<Out, In ...> mFunc; | ||
186 | }; | ||
187 | |||
188 | template<typename PrevOut, typename Out, typename In> | ||
189 | class EachExecutor : public Executor<PrevOut, Out, In> | ||
190 | { | ||
191 | public: | ||
192 | EachExecutor(EachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
193 | void run(const ExecutionPtr &execution); | ||
194 | private: | ||
195 | EachTask<Out, In> mFunc; | ||
196 | QVector<Async::FutureWatcher<Out>*> mFutureWatchers; | ||
197 | }; | ||
198 | |||
199 | template<typename Out, typename In> | ||
200 | class ReduceExecutor : public ThenExecutor<Out, In> | ||
201 | { | ||
202 | public: | ||
203 | ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
204 | private: | ||
205 | ReduceTask<Out, In> mFunc; | ||
206 | }; | ||
207 | |||
208 | template<typename Out, typename ... In> | ||
209 | class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, Out, In ...> | ||
210 | { | ||
211 | public: | ||
212 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
213 | void run(const ExecutionPtr &execution); | ||
214 | |||
215 | private: | ||
216 | void run(const ExecutionPtr &execution, std::false_type); // !std::is_void<Out> | ||
217 | void run(const ExecutionPtr &execution, std::true_type); // std::is_void<Out> | ||
218 | SyncThenTask<Out, In ...> mFunc; | ||
219 | }; | ||
220 | |||
221 | template<typename Out, typename In> | ||
222 | class SyncReduceExecutor : public SyncThenExecutor<Out, In> | ||
223 | { | ||
224 | public: | ||
225 | SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
226 | private: | ||
227 | SyncReduceTask<Out, In> mFunc; | ||
228 | }; | ||
229 | |||
230 | template<typename PrevOut, typename Out, typename In> | ||
231 | class SyncEachExecutor : public Executor<PrevOut, Out, In> | ||
232 | { | ||
233 | public: | ||
234 | SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
235 | void run(const ExecutionPtr &execution); | ||
236 | private: | ||
237 | void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out> | ||
238 | void run(Async::Future<Out> *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void<Out> | ||
239 | SyncEachTask<Out, In> mFunc; | ||
240 | }; | ||
241 | |||
242 | } // namespace Private | ||
243 | |||
244 | /** | ||
245 | * Start an asynchronous job sequence. | ||
246 | * | ||
247 | * Async::start() is your starting point to build a chain of jobs to be executed | ||
248 | * asynchronously. | ||
249 | * | ||
250 | * @param func An asynchronous function to be executed. The function must have | ||
251 | * void return type, and accept exactly one argument of type @p Async::Future<In>, | ||
252 | * where @p In is type of the result. | ||
253 | */ | ||
254 | template<typename Out, typename ... In> | ||
255 | Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler()); | ||
256 | |||
257 | template<typename Out, typename ... In> | ||
258 | Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler()); | ||
259 | |||
260 | #ifdef WITH_KJOB | ||
261 | template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> | ||
262 | Job<ReturnType, Args ...> start(); | ||
263 | #endif | ||
264 | |||
265 | /** | ||
266 | * Async while loop. | ||
267 | * | ||
268 | * The loop continues while @param condition returns true. | ||
269 | */ | ||
270 | Job<void> dowhile(Condition condition, ThenTask<void> func); | ||
271 | |||
272 | /** | ||
273 | * Async while loop. | ||
274 | * | ||
275 | * Loop continues while body returns true. | ||
276 | */ | ||
277 | Job<void> dowhile(ThenTask<bool> body); | ||
278 | |||
279 | /** | ||
280 | * Iterate over a container. | ||
281 | * | ||
282 | * Use in conjunction with .each | ||
283 | */ | ||
284 | template<typename Out> | ||
285 | Job<Out> iterate(const Out &container); | ||
286 | |||
287 | /** | ||
288 | * Async delay. | ||
289 | */ | ||
290 | Job<void> wait(int delay); | ||
291 | |||
292 | /** | ||
293 | * A null job. | ||
294 | * | ||
295 | * An async noop. | ||
296 | * | ||
297 | */ | ||
298 | template<typename Out> | ||
299 | Job<Out> null(); | ||
300 | |||
301 | /** | ||
302 | * An error job. | ||
303 | * | ||
304 | * An async error. | ||
305 | * | ||
306 | */ | ||
307 | template<typename Out> | ||
308 | Job<Out> error(int errorCode = 1, const QString &errorMessage = QString()); | ||
309 | |||
310 | class JobBase | ||
311 | { | ||
312 | template<typename Out, typename ... In> | ||
313 | friend class Job; | ||
314 | |||
315 | public: | ||
316 | JobBase(const Private::ExecutorBasePtr &executor); | ||
317 | ~JobBase(); | ||
318 | |||
319 | protected: | ||
320 | Private::ExecutorBasePtr mExecutor; | ||
321 | }; | ||
322 | |||
323 | /** | ||
324 | * An Asynchronous job | ||
325 | * | ||
326 | * A single instance of Job represents a single method that will be executed | ||
327 | * asynchrously. The Job is started by @p Job::exec(), which returns @p Async::Future | ||
328 | * immediatelly. The Future will be set to finished state once the asynchronous | ||
329 | * task has finished. You can use @p Async::Future::waitForFinished() to wait for | ||
330 | * for the Future in blocking manner. | ||
331 | * | ||
332 | * It is possible to chain multiple Jobs one after another in different fashion | ||
333 | * (sequential, parallel, etc.). Calling Job::exec() will then return a pending | ||
334 | * @p Async::Future, and will execute the entire chain of jobs. | ||
335 | * | ||
336 | * @code | ||
337 | * auto job = Job::start<QList<int>>( | ||
338 | * [](Async::Future<QList<int>> &future) { | ||
339 | * MyREST::PendingUsers *pu = MyREST::requestListOfUsers(); | ||
340 | * QObject::connect(pu, &PendingOperation::finished, | ||
341 | * [&](PendingOperation *pu) { | ||
342 | * future->setValue(dynamic_cast<MyREST::PendingUsers*>(pu)->userIds()); | ||
343 | * future->setFinished(); | ||
344 | * }); | ||
345 | * }) | ||
346 | * .each<QList<MyREST::User>, int>( | ||
347 | * [](const int &userId, Async::Future<QList<MyREST::User>> &future) { | ||
348 | * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId); | ||
349 | * QObject::connect(pu, &PendingOperation::finished, | ||
350 | * [&](PendingOperation *pu) { | ||
351 | * future->setValue(Qlist<MyREST::User>() << dynamic_cast<MyREST::PendingUser*>(pu)->user()); | ||
352 | * future->setFinished(); | ||
353 | * }); | ||
354 | * }); | ||
355 | * | ||
356 | * Async::Future<QList<MyREST::User>> usersFuture = job.exec(); | ||
357 | * usersFuture.waitForFinished(); | ||
358 | * QList<MyRest::User> users = usersFuture.value(); | ||
359 | * @endcode | ||
360 | * | ||
361 | * In the example above, calling @p job.exec() will first invoke the first job, | ||
362 | * which will retrieve a list of IDs, and then will invoke the second function | ||
363 | * for each single entry in the list returned by the first function. | ||
364 | */ | ||
365 | template<typename Out, typename ... In> | ||
366 | class Job : public JobBase | ||
367 | { | ||
368 | template<typename OutOther, typename ... InOther> | ||
369 | friend class Job; | ||
370 | |||
371 | template<typename OutOther, typename ... InOther> | ||
372 | friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc); | ||
373 | |||
374 | template<typename OutOther, typename ... InOther> | ||
375 | friend Job<OutOther, InOther ...> start(Async::SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc); | ||
376 | |||
377 | #ifdef WITH_KJOB | ||
378 | template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> | ||
379 | friend Job<ReturnType, Args ...> start(); | ||
380 | #endif | ||
381 | |||
382 | public: | ||
383 | template<typename OutOther, typename ... InOther> | ||
384 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | ||
385 | { | ||
386 | return Job<OutOther, InOther ...>(Private::ExecutorBasePtr( | ||
387 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | ||
388 | } | ||
389 | |||
390 | template<typename OutOther, typename ... InOther> | ||
391 | Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | ||
392 | { | ||
393 | return Job<OutOther, InOther ...>(Private::ExecutorBasePtr( | ||
394 | new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | ||
395 | } | ||
396 | |||
397 | template<typename OutOther, typename ... InOther> | ||
398 | Job<OutOther, InOther ...> then(Job<OutOther, InOther ...> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
399 | { | ||
400 | return then<OutOther, InOther ...>(nestedJobWrapper<OutOther, InOther ...>(otherJob), errorFunc); | ||
401 | } | ||
402 | |||
403 | #ifdef WITH_KJOB | ||
404 | template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> | ||
405 | Job<ReturnType, Args ...> then() | ||
406 | { | ||
407 | return start<ReturnType, KJobType, KJobResultMethod, Args ...>(); | ||
408 | } | ||
409 | #endif | ||
410 | |||
411 | template<typename OutOther, typename InOther> | ||
412 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | ||
413 | { | ||
414 | eachInvariants<OutOther>(); | ||
415 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
416 | new Private::EachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor))); | ||
417 | } | ||
418 | |||
419 | template<typename OutOther, typename InOther> | ||
420 | Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | ||
421 | { | ||
422 | eachInvariants<OutOther>(); | ||
423 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
424 | new Private::SyncEachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor))); | ||
425 | } | ||
426 | |||
427 | template<typename OutOther, typename InOther> | ||
428 | Job<OutOther, InOther> each(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
429 | { | ||
430 | eachInvariants<OutOther>(); | ||
431 | return each<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc); | ||
432 | } | ||
433 | |||
434 | template<typename OutOther, typename InOther> | ||
435 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | ||
436 | { | ||
437 | reduceInvariants<InOther>(); | ||
438 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
439 | new Private::ReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); | ||
440 | } | ||
441 | |||
442 | template<typename OutOther, typename InOther> | ||
443 | Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | ||
444 | { | ||
445 | reduceInvariants<InOther>(); | ||
446 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
447 | new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); | ||
448 | } | ||
449 | |||
450 | template<typename OutOther, typename InOther> | ||
451 | Job<OutOther, InOther> reduce(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
452 | { | ||
453 | return reduce<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc); | ||
454 | } | ||
455 | |||
456 | template<typename FirstIn> | ||
457 | Async::Future<Out> exec(FirstIn in) | ||
458 | { | ||
459 | // Inject a fake sync executor that will return the initial value | ||
460 | Private::ExecutorBasePtr first = mExecutor; | ||
461 | while (first->mPrev) { | ||
462 | first = first->mPrev; | ||
463 | } | ||
464 | auto init = new Private::SyncThenExecutor<FirstIn>( | ||
465 | [in]() -> FirstIn { | ||
466 | return in; | ||
467 | }, | ||
468 | ErrorHandler(), Private::ExecutorBasePtr()); | ||
469 | first->mPrev = Private::ExecutorBasePtr(init); | ||
470 | |||
471 | auto result = exec(); | ||
472 | // Remove the injected executor | ||
473 | first->mPrev.reset(); | ||
474 | return result; | ||
475 | } | ||
476 | |||
477 | Async::Future<Out> exec() | ||
478 | { | ||
479 | Private::ExecutionPtr execution = mExecutor->exec(mExecutor); | ||
480 | Async::Future<Out> result = *execution->result<Out>(); | ||
481 | |||
482 | return result; | ||
483 | } | ||
484 | |||
485 | private: | ||
486 | Job(Private::ExecutorBasePtr executor) | ||
487 | : JobBase(executor) | ||
488 | {} | ||
489 | |||
490 | template<typename OutOther> | ||
491 | void eachInvariants() | ||
492 | { | ||
493 | static_assert(detail::isIterable<Out>::value, | ||
494 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
495 | static_assert(std::is_void<OutOther>::value || detail::isIterable<OutOther>::value, | ||
496 | "The result type of 'Each' task must be void, a list or an array."); | ||
497 | } | ||
498 | |||
499 | template<typename InOther> | ||
500 | void reduceInvariants() | ||
501 | { | ||
502 | static_assert(Async::detail::isIterable<Out>::value, | ||
503 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
504 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
505 | "The return type of previous task must be compatible with input type of this task"); | ||
506 | } | ||
507 | |||
508 | template<typename OutOther, typename ... InOther> | ||
509 | inline std::function<void(InOther ..., Async::Future<OutOther>&)> nestedJobWrapper(Job<OutOther, InOther ...> otherJob) { | ||
510 | return [otherJob](InOther ... in, Async::Future<OutOther> &future) { | ||
511 | // copy by value is const | ||
512 | auto job = otherJob; | ||
513 | FutureWatcher<OutOther> *watcher = new FutureWatcher<OutOther>(); | ||
514 | QObject::connect(watcher, &FutureWatcherBase::futureReady, | ||
515 | [watcher, future]() { | ||
516 | // FIXME: We pass future by value, because using reference causes the | ||
517 | // future to get deleted before this lambda is invoked, leading to crash | ||
518 | // in copyFutureValue() | ||
519 | // copy by value is const | ||
520 | auto outFuture = future; | ||
521 | Async::detail::copyFutureValue(watcher->future(), outFuture); | ||
522 | if (watcher->future().errorCode()) { | ||
523 | outFuture.setError(watcher->future().errorCode(), watcher->future().errorMessage()); | ||
524 | } else { | ||
525 | outFuture.setFinished(); | ||
526 | } | ||
527 | delete watcher; | ||
528 | }); | ||
529 | watcher->setFuture(job.exec(in ...)); | ||
530 | }; | ||
531 | } | ||
532 | }; | ||
533 | |||
534 | } // namespace Async | ||
535 | |||
536 | |||
537 | // ********** Out of line definitions **************** | ||
538 | |||
539 | namespace Async { | ||
540 | |||
541 | template<typename Out, typename ... In> | ||
542 | Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler error) | ||
543 | { | ||
544 | return Job<Out, In...>(Private::ExecutorBasePtr( | ||
545 | new Private::ThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr()))); | ||
546 | } | ||
547 | |||
548 | template<typename Out, typename ... In> | ||
549 | Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler error) | ||
550 | { | ||
551 | return Job<Out, In...>(Private::ExecutorBasePtr( | ||
552 | new Private::SyncThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr()))); | ||
553 | } | ||
554 | |||
555 | #ifdef WITH_KJOB | ||
556 | template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> | ||
557 | Job<ReturnType, Args ...> start() | ||
558 | { | ||
559 | return Job<ReturnType, Args ...>(Private::ExecutorBasePtr( | ||
560 | new Private::ThenExecutor<ReturnType, Args ...>([](const Args & ... args, Async::Future<ReturnType> &future) | ||
561 | { | ||
562 | KJobType *job = new KJobType(args ...); | ||
563 | job->connect(job, &KJob::finished, | ||
564 | [&future](KJob *job) { | ||
565 | if (job->error()) { | ||
566 | future.setError(job->error(), job->errorString()); | ||
567 | } else { | ||
568 | future.setValue((static_cast<KJobType*>(job)->*KJobResultMethod)()); | ||
569 | future.setFinished(); | ||
570 | } | ||
571 | }); | ||
572 | job->start(); | ||
573 | }, ErrorHandler(), Private::ExecutorBasePtr()))); | ||
574 | } | ||
575 | #endif | ||
576 | |||
577 | |||
578 | template<typename Out> | ||
579 | Job<Out> null() | ||
580 | { | ||
581 | return Async::start<Out>( | ||
582 | [](Async::Future<Out> &future) { | ||
583 | future.setFinished(); | ||
584 | }); | ||
585 | } | ||
586 | |||
587 | template<typename Out> | ||
588 | Job<Out> error(int errorCode, const QString &errorMessage) | ||
589 | { | ||
590 | return Async::start<Out>( | ||
591 | [errorCode, errorMessage](Async::Future<Out> &future) { | ||
592 | future.setError(errorCode, errorMessage); | ||
593 | }); | ||
594 | } | ||
595 | |||
596 | template<typename Out> | ||
597 | Job<Out> iterate(const Out &container) | ||
598 | { | ||
599 | return Async::start<Out>( | ||
600 | [container]() { | ||
601 | return container; | ||
602 | }); | ||
603 | } | ||
604 | |||
605 | |||
606 | namespace Private { | ||
607 | |||
608 | template<typename T> | ||
609 | Async::Future<T>* ExecutorBase::createFuture(const ExecutionPtr &execution) const | ||
610 | { | ||
611 | return new Async::Future<T>(execution); | ||
612 | } | ||
613 | |||
614 | template<typename PrevOut, typename Out, typename ... In> | ||
615 | ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self) | ||
616 | { | ||
617 | // Passing 'self' to execution ensures that the Executor chain remains | ||
618 | // valid until the entire execution is finished | ||
619 | ExecutionPtr execution = ExecutionPtr::create(self); | ||
620 | #ifndef QT_NO_DEBUG | ||
621 | execution->tracer = new Tracer(execution.data()); // owned by execution | ||
622 | #endif | ||
623 | |||
624 | // chainup | ||
625 | execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr(); | ||
626 | |||
627 | execution->resultBase = ExecutorBase::createFuture<Out>(execution); | ||
628 | auto fw = new Async::FutureWatcher<Out>(); | ||
629 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | ||
630 | [fw, execution, this]() { | ||
631 | handleError(execution); | ||
632 | execution->setFinished(); | ||
633 | delete fw; | ||
634 | }); | ||
635 | fw->setFuture(*execution->result<Out>()); | ||
636 | |||
637 | Async::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr; | ||
638 | if (!prevFuture || prevFuture->isFinished()) { | ||
639 | if (prevFuture) { // prevFuture implies execution->prevExecution | ||
640 | if (prevFuture->errorCode()) { | ||
641 | // Propagate the errorCode and message to the outer Future | ||
642 | execution->resultBase->setError(prevFuture->errorCode(), prevFuture->errorMessage()); | ||
643 | if (!execution->errorWasHandled()) { | ||
644 | if (handleError(execution)) { | ||
645 | return execution; | ||
646 | } | ||
647 | } else { | ||
648 | return execution; | ||
649 | } | ||
650 | } else { | ||
651 | // Propagate error (if any) | ||
652 | } | ||
653 | } | ||
654 | |||
655 | execution->isRunning = true; | ||
656 | run(execution); | ||
657 | } else { | ||
658 | auto prevFutureWatcher = new Async::FutureWatcher<PrevOut>(); | ||
659 | QObject::connect(prevFutureWatcher, &Async::FutureWatcher<PrevOut>::futureReady, | ||
660 | [prevFutureWatcher, execution, this]() { | ||
661 | auto prevFuture = prevFutureWatcher->future(); | ||
662 | assert(prevFuture.isFinished()); | ||
663 | delete prevFutureWatcher; | ||
664 | auto prevExecutor = execution->executor->mPrev; | ||
665 | if (prevFuture.errorCode()) { | ||
666 | execution->resultBase->setError(prevFuture.errorCode(), prevFuture.errorMessage()); | ||
667 | if (!execution->errorWasHandled()) { | ||
668 | if (handleError(execution)) { | ||
669 | return; | ||
670 | } | ||
671 | } else { | ||
672 | return; | ||
673 | } | ||
674 | } | ||
675 | |||
676 | |||
677 | // propagate error (if any) | ||
678 | execution->isRunning = true; | ||
679 | run(execution); | ||
680 | }); | ||
681 | |||
682 | prevFutureWatcher->setFuture(*static_cast<Async::Future<PrevOut>*>(prevFuture)); | ||
683 | } | ||
684 | |||
685 | return execution; | ||
686 | } | ||
687 | |||
688 | template<typename PrevOut, typename Out, typename ... In> | ||
689 | bool Executor<PrevOut, Out, In ...>::handleError(const ExecutionPtr &execution) | ||
690 | { | ||
691 | assert(execution->resultBase->isFinished()); | ||
692 | if (execution->resultBase->errorCode()) { | ||
693 | if (mErrorFunc) { | ||
694 | mErrorFunc(execution->resultBase->errorCode(), | ||
695 | execution->resultBase->errorMessage()); | ||
696 | return true; | ||
697 | } | ||
698 | } | ||
699 | |||
700 | return false; | ||
701 | } | ||
702 | |||
703 | |||
704 | template<typename Out, typename ... In> | ||
705 | ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent) | ||
706 | : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(error, parent) | ||
707 | , mFunc(then) | ||
708 | { | ||
709 | STORE_EXECUTOR_NAME("ThenExecutor", Out, In ...); | ||
710 | } | ||
711 | |||
712 | template<typename Out, typename ... In> | ||
713 | void ThenExecutor<Out, In ...>::run(const ExecutionPtr &execution) | ||
714 | { | ||
715 | Async::Future<typename detail::prevOut<In ...>::type> *prevFuture = nullptr; | ||
716 | if (execution->prevExecution) { | ||
717 | prevFuture = execution->prevExecution->result<typename detail::prevOut<In ...>::type>(); | ||
718 | assert(prevFuture->isFinished()); | ||
719 | } | ||
720 | |||
721 | ThenExecutor<Out, In ...>::mFunc(prevFuture ? prevFuture->value() : In() ..., *execution->result<Out>()); | ||
722 | } | ||
723 | |||
724 | template<typename PrevOut, typename Out, typename In> | ||
725 | EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ErrorHandler error, const ExecutorBasePtr &parent) | ||
726 | : Executor<PrevOut, Out, In>(error, parent) | ||
727 | , mFunc(each) | ||
728 | { | ||
729 | STORE_EXECUTOR_NAME("EachExecutor", PrevOut, Out, In); | ||
730 | } | ||
731 | |||
732 | template<typename PrevOut, typename Out, typename In> | ||
733 | void EachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution) | ||
734 | { | ||
735 | assert(execution->prevExecution); | ||
736 | auto prevFuture = execution->prevExecution->result<PrevOut>(); | ||
737 | assert(prevFuture->isFinished()); | ||
738 | |||
739 | auto out = execution->result<Out>(); | ||
740 | if (prevFuture->value().isEmpty()) { | ||
741 | out->setFinished(); | ||
742 | return; | ||
743 | } | ||
744 | |||
745 | for (auto arg : prevFuture->value()) { | ||
746 | //We have to manually manage the lifetime of these temporary futures | ||
747 | Async::Future<Out> *future = new Async::Future<Out>(); | ||
748 | EachExecutor<PrevOut, Out, In>::mFunc(arg, *future); | ||
749 | auto fw = new Async::FutureWatcher<Out>(); | ||
750 | mFutureWatchers.append(fw); | ||
751 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | ||
752 | [out, fw, this, future]() { | ||
753 | assert(fw->future().isFinished()); | ||
754 | const int index = mFutureWatchers.indexOf(fw); | ||
755 | assert(index > -1); | ||
756 | mFutureWatchers.removeAt(index); | ||
757 | Async::detail::aggregateFutureValue<Out>(fw->future(), *out); | ||
758 | if (mFutureWatchers.isEmpty()) { | ||
759 | out->setFinished(); | ||
760 | } | ||
761 | delete fw; | ||
762 | delete future; | ||
763 | }); | ||
764 | fw->setFuture(*future); | ||
765 | } | ||
766 | } | ||
767 | |||
768 | template<typename Out, typename In> | ||
769 | ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent) | ||
770 | : ThenExecutor<Out, In>(reduce, errorFunc, parent) | ||
771 | { | ||
772 | STORE_EXECUTOR_NAME("ReduceExecutor", Out, In); | ||
773 | } | ||
774 | |||
775 | template<typename Out, typename ... In> | ||
776 | SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent) | ||
777 | : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(errorFunc, parent) | ||
778 | , mFunc(then) | ||
779 | { | ||
780 | STORE_EXECUTOR_NAME("SyncThenExecutor", Out, In ...); | ||
781 | } | ||
782 | |||
783 | template<typename Out, typename ... In> | ||
784 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution) | ||
785 | { | ||
786 | if (execution->prevExecution) { | ||
787 | assert(execution->prevExecution->resultBase->isFinished()); | ||
788 | } | ||
789 | |||
790 | run(execution, std::is_void<Out>()); | ||
791 | execution->resultBase->setFinished(); | ||
792 | } | ||
793 | |||
794 | template<typename Out, typename ... In> | ||
795 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::false_type) | ||
796 | { | ||
797 | Async::Future<typename detail::prevOut<In ...>::type> *prevFuture = | ||
798 | execution->prevExecution | ||
799 | ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>() | ||
800 | : nullptr; | ||
801 | (void) prevFuture; // silence 'set but not used' warning | ||
802 | Async::Future<Out> *future = execution->result<Out>(); | ||
803 | future->setValue(SyncThenExecutor<Out, In...>::mFunc(prevFuture ? prevFuture->value() : In() ...)); | ||
804 | } | ||
805 | |||
806 | template<typename Out, typename ... In> | ||
807 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::true_type) | ||
808 | { | ||
809 | Async::Future<typename detail::prevOut<In ...>::type> *prevFuture = | ||
810 | execution->prevExecution | ||
811 | ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>() | ||
812 | : nullptr; | ||
813 | (void) prevFuture; // silence 'set but not used' warning | ||
814 | SyncThenExecutor<Out, In ...>::mFunc(prevFuture ? prevFuture->value() : In() ...); | ||
815 | } | ||
816 | |||
817 | template<typename PrevOut, typename Out, typename In> | ||
818 | SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent) | ||
819 | : Executor<PrevOut, Out, In>(errorFunc, parent) | ||
820 | , mFunc(each) | ||
821 | { | ||
822 | STORE_EXECUTOR_NAME("SyncEachExecutor", PrevOut, Out, In); | ||
823 | } | ||
824 | |||
825 | template<typename PrevOut, typename Out, typename In> | ||
826 | void SyncEachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution) | ||
827 | { | ||
828 | assert(execution->prevExecution); | ||
829 | auto *prevFuture = execution->prevExecution->result<PrevOut>(); | ||
830 | assert(prevFuture->isFinished()); | ||
831 | |||
832 | auto out = execution->result<Out>(); | ||
833 | if (prevFuture->value().isEmpty()) { | ||
834 | out->setFinished(); | ||
835 | return; | ||
836 | } | ||
837 | |||
838 | for (auto arg : prevFuture->value()) { | ||
839 | run(out, arg, std::is_void<Out>()); | ||
840 | } | ||
841 | out->setFinished(); | ||
842 | } | ||
843 | |||
844 | template<typename PrevOut, typename Out, typename In> | ||
845 | void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> *out, const typename PrevOut::value_type &arg, std::false_type) | ||
846 | { | ||
847 | out->setValue(out->value() + SyncEachExecutor<PrevOut, Out, In>::mFunc(arg)); | ||
848 | } | ||
849 | |||
850 | template<typename PrevOut, typename Out, typename In> | ||
851 | void SyncEachExecutor<PrevOut, Out, In>::run(Async::Future<Out> * /* unused */, const typename PrevOut::value_type &arg, std::true_type) | ||
852 | { | ||
853 | SyncEachExecutor<PrevOut, Out, In>::mFunc(arg); | ||
854 | } | ||
855 | |||
856 | template<typename Out, typename In> | ||
857 | SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent) | ||
858 | : SyncThenExecutor<Out, In>(reduce, errorFunc, parent) | ||
859 | { | ||
860 | STORE_EXECUTOR_NAME("SyncReduceExecutor", Out, In); | ||
861 | } | ||
862 | |||
863 | |||
864 | } // namespace Private | ||
865 | |||
866 | } // namespace Async | ||
867 | |||
868 | |||
869 | |||
870 | #endif // ASYNC_H | ||
871 | |||
872 | |||