diff options
Diffstat (limited to 'async/src/async.h')
-rw-r--r-- | async/src/async.h | 874 |
1 files changed, 0 insertions, 874 deletions
diff --git a/async/src/async.h b/async/src/async.h deleted file mode 100644 index 152f98e..0000000 --- a/async/src/async.h +++ /dev/null | |||
@@ -1,874 +0,0 @@ | |||
1 | /* | ||
2 | * Copyright 2014 - 2015 Daniel Vrátil <dvratil@redhat.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Library General Public License as | ||
6 | * published by the Free Software Foundation; either version 2 of | ||
7 | * the License, or (at your option) any later version. | ||
8 | * | ||
9 | * This library is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU Library General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU Library General Public License | ||
15 | * along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
16 | */ | ||
17 | |||
18 | #ifndef KASYNC_H | ||
19 | #define KASYNC_H | ||
20 | |||
21 | #include "kasync_export.h" | ||
22 | |||
23 | #include <functional> | ||
24 | #include <list> | ||
25 | #include <type_traits> | ||
26 | #include <cassert> | ||
27 | #include <iterator> | ||
28 | |||
29 | #include "future.h" | ||
30 | #include "debug.h" | ||
31 | #include "async_impl.h" | ||
32 | |||
33 | #include <QVector> | ||
34 | #include <QObject> | ||
35 | #include <QSharedPointer> | ||
36 | |||
37 | #include <QDebug> | ||
38 | |||
39 | #ifdef WITH_KJOB | ||
40 | #include <KJob> | ||
41 | #endif | ||
42 | |||
43 | |||
44 | /* | ||
45 | * API to help write async code. | ||
46 | * | ||
47 | * This API is based around jobs that take lambdas to execute asynchronous tasks. Each async operation can take a continuation, | ||
48 | * that can then be used to execute further async operations. That way it is possible to build async chains of operations, | ||
49 | * that can be stored and executed later on. Jobs can be composed, similarly to functions. | ||
50 | * | ||
51 | * Relations between the components: | ||
52 | * * Job: API wrapper around Executors chain. Can be destroyed while still running, | ||
53 | * because the actual execution happens in the background | ||
54 | * * Executor: Describes task to execute. Executors form a linked list matching the | ||
55 | * order in which they will be executed. The Executor chain is destroyed when | ||
56 | * the parent Job is destroyed. However if the Job is still running it is | ||
57 | * guaranteed that the Executor chain will not be destroyed until the execution | ||
58 | * is finished. | ||
59 | * * Execution: The running execution of the task stored in Executor. Each call to Job::exec() | ||
60 | * instantiates new Execution chain, which makes it possible for the Job to be | ||
61 | * executed multiple times (even in parallel). | ||
62 | * * Future: Representation of the result that is being calculated | ||
63 | * | ||
64 | * | ||
65 | * TODO: Composed progress reporting | ||
66 | * TODO: Possibility to abort a job through future (perhaps optional?) | ||
67 | * TODO: Support for timeout, specified during exec call, after which the error handler gets called with a defined errorCode. | ||
68 | */ | ||
69 | namespace KAsync { | ||
70 | |||
71 | template<typename PrevOut, typename Out, typename ... In> | ||
72 | class Executor; | ||
73 | |||
74 | class JobBase; | ||
75 | |||
76 | template<typename Out, typename ... In> | ||
77 | class Job; | ||
78 | |||
79 | template<typename Out, typename ... In> | ||
80 | using ThenTask = typename detail::identity<std::function<void(In ..., KAsync::Future<Out>&)>>::type; | ||
81 | template<typename Out, typename ... In> | ||
82 | using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type; | ||
83 | template<typename Out, typename In> | ||
84 | using EachTask = typename detail::identity<std::function<void(In, KAsync::Future<Out>&)>>::type; | ||
85 | template<typename Out, typename In> | ||
86 | using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type; | ||
87 | template<typename Out, typename In> | ||
88 | using ReduceTask = typename detail::identity<std::function<void(In, KAsync::Future<Out>&)>>::type; | ||
89 | template<typename Out, typename In> | ||
90 | using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type; | ||
91 | |||
92 | using ErrorHandler = std::function<void(int, const QString &)>; | ||
93 | using Condition = std::function<bool()>; | ||
94 | |||
95 | namespace Private | ||
96 | { | ||
97 | |||
98 | class ExecutorBase; | ||
99 | typedef QSharedPointer<ExecutorBase> ExecutorBasePtr; | ||
100 | |||
101 | struct KASYNC_EXPORT Execution { | ||
102 | Execution(const ExecutorBasePtr &executor); | ||
103 | ~Execution(); | ||
104 | void setFinished(); | ||
105 | |||
106 | template<typename T> | ||
107 | KAsync::Future<T>* result() const | ||
108 | { | ||
109 | return static_cast<KAsync::Future<T>*>(resultBase); | ||
110 | } | ||
111 | |||
112 | void releaseFuture(); | ||
113 | bool errorWasHandled() const; | ||
114 | |||
115 | ExecutorBasePtr executor; | ||
116 | FutureBase *resultBase; | ||
117 | bool isRunning; | ||
118 | bool isFinished; | ||
119 | |||
120 | ExecutionPtr prevExecution; | ||
121 | |||
122 | #ifndef QT_NO_DEBUG | ||
123 | Tracer *tracer; | ||
124 | #endif | ||
125 | }; | ||
126 | |||
127 | |||
128 | typedef QSharedPointer<Execution> ExecutionPtr; | ||
129 | |||
130 | class KASYNC_EXPORT ExecutorBase | ||
131 | { | ||
132 | template<typename PrevOut, typename Out, typename ... In> | ||
133 | friend class Executor; | ||
134 | |||
135 | template<typename Out, typename ... In> | ||
136 | friend class KAsync::Job; | ||
137 | |||
138 | friend class Execution; | ||
139 | friend class KAsync::Tracer; | ||
140 | |||
141 | public: | ||
142 | virtual ~ExecutorBase(); | ||
143 | virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0; | ||
144 | |||
145 | protected: | ||
146 | ExecutorBase(const ExecutorBasePtr &parent); | ||
147 | |||
148 | template<typename T> | ||
149 | KAsync::Future<T>* createFuture(const ExecutionPtr &execution) const; | ||
150 | |||
151 | virtual bool hasErrorFunc() const = 0; | ||
152 | virtual bool handleError(const ExecutionPtr &execution) = 0; | ||
153 | |||
154 | ExecutorBasePtr mPrev; | ||
155 | |||
156 | #ifndef QT_NO_DEBUG | ||
157 | QString mExecutorName; | ||
158 | #endif | ||
159 | }; | ||
160 | |||
161 | template<typename PrevOut, typename Out, typename ... In> | ||
162 | class Executor : public ExecutorBase | ||
163 | { | ||
164 | protected: | ||
165 | Executor(ErrorHandler errorFunc, const Private::ExecutorBasePtr &parent) | ||
166 | : ExecutorBase(parent) | ||
167 | , mErrorFunc(errorFunc) | ||
168 | {} | ||
169 | |||
170 | virtual ~Executor() {} | ||
171 | virtual void run(const ExecutionPtr &execution) = 0; | ||
172 | |||
173 | ExecutionPtr exec(const ExecutorBasePtr &self); | ||
174 | bool hasErrorFunc() const { return (bool) mErrorFunc; } | ||
175 | bool handleError(const ExecutionPtr &execution); | ||
176 | |||
177 | std::function<void(int, const QString &)> mErrorFunc; | ||
178 | }; | ||
179 | |||
180 | template<typename Out, typename ... In> | ||
181 | class ThenExecutor: public Executor<typename detail::prevOut<In ...>::type, Out, In ...> | ||
182 | { | ||
183 | public: | ||
184 | ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
185 | void run(const ExecutionPtr &execution); | ||
186 | private: | ||
187 | ThenTask<Out, In ...> mFunc; | ||
188 | }; | ||
189 | |||
190 | template<typename PrevOut, typename Out, typename In> | ||
191 | class EachExecutor : public Executor<PrevOut, Out, In> | ||
192 | { | ||
193 | public: | ||
194 | EachExecutor(EachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
195 | void run(const ExecutionPtr &execution); | ||
196 | private: | ||
197 | EachTask<Out, In> mFunc; | ||
198 | QVector<KAsync::FutureWatcher<Out>*> mFutureWatchers; | ||
199 | }; | ||
200 | |||
201 | template<typename Out, typename In> | ||
202 | class ReduceExecutor : public ThenExecutor<Out, In> | ||
203 | { | ||
204 | public: | ||
205 | ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
206 | private: | ||
207 | ReduceTask<Out, In> mFunc; | ||
208 | }; | ||
209 | |||
210 | template<typename Out, typename ... In> | ||
211 | class SyncThenExecutor : public Executor<typename detail::prevOut<In ...>::type, Out, In ...> | ||
212 | { | ||
213 | public: | ||
214 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
215 | void run(const ExecutionPtr &execution); | ||
216 | |||
217 | private: | ||
218 | void run(const ExecutionPtr &execution, std::false_type); // !std::is_void<Out> | ||
219 | void run(const ExecutionPtr &execution, std::true_type); // std::is_void<Out> | ||
220 | SyncThenTask<Out, In ...> mFunc; | ||
221 | }; | ||
222 | |||
223 | template<typename Out, typename In> | ||
224 | class SyncReduceExecutor : public SyncThenExecutor<Out, In> | ||
225 | { | ||
226 | public: | ||
227 | SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
228 | private: | ||
229 | SyncReduceTask<Out, In> mFunc; | ||
230 | }; | ||
231 | |||
232 | template<typename PrevOut, typename Out, typename In> | ||
233 | class SyncEachExecutor : public Executor<PrevOut, Out, In> | ||
234 | { | ||
235 | public: | ||
236 | SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent); | ||
237 | void run(const ExecutionPtr &execution); | ||
238 | private: | ||
239 | void run(KAsync::Future<Out> *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void<Out> | ||
240 | void run(KAsync::Future<Out> *future, const typename PrevOut::value_type &arg, std::true_type); // std::is_void<Out> | ||
241 | SyncEachTask<Out, In> mFunc; | ||
242 | }; | ||
243 | |||
244 | } // namespace Private | ||
245 | |||
246 | /** | ||
247 | * Start an asynchronous job sequence. | ||
248 | * | ||
249 | * KAsync::start() is your starting point to build a chain of jobs to be executed | ||
250 | * asynchronously. | ||
251 | * | ||
252 | * @param func An asynchronous function to be executed. The function must have | ||
253 | * void return type, and accept exactly one argument of type @p KAsync::Future<In>, | ||
254 | * where @p In is type of the result. | ||
255 | */ | ||
256 | template<typename Out, typename ... In> | ||
257 | Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler()); | ||
258 | |||
259 | template<typename Out, typename ... In> | ||
260 | Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler errorFunc = ErrorHandler()); | ||
261 | |||
262 | #ifdef WITH_KJOB | ||
263 | template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> | ||
264 | Job<ReturnType, Args ...> start(); | ||
265 | #endif | ||
266 | |||
267 | /** | ||
268 | * Async while loop. | ||
269 | * | ||
270 | * The loop continues while @param condition returns true. | ||
271 | */ | ||
272 | KASYNC_EXPORT Job<void> dowhile(Condition condition, ThenTask<void> func); | ||
273 | |||
274 | /** | ||
275 | * Async while loop. | ||
276 | * | ||
277 | * Loop continues while body returns true. | ||
278 | */ | ||
279 | KASYNC_EXPORT Job<void> dowhile(ThenTask<bool> body); | ||
280 | |||
281 | /** | ||
282 | * Iterate over a container. | ||
283 | * | ||
284 | * Use in conjunction with .each | ||
285 | */ | ||
286 | template<typename Out> | ||
287 | Job<Out> iterate(const Out &container); | ||
288 | |||
289 | /** | ||
290 | * Async delay. | ||
291 | */ | ||
292 | KASYNC_EXPORT Job<void> wait(int delay); | ||
293 | |||
294 | /** | ||
295 | * A null job. | ||
296 | * | ||
297 | * An async noop. | ||
298 | * | ||
299 | */ | ||
300 | template<typename Out> | ||
301 | Job<Out> null(); | ||
302 | |||
303 | /** | ||
304 | * An error job. | ||
305 | * | ||
306 | * An async error. | ||
307 | * | ||
308 | */ | ||
309 | template<typename Out> | ||
310 | Job<Out> error(int errorCode = 1, const QString &errorMessage = QString()); | ||
311 | |||
312 | class KASYNC_EXPORT JobBase | ||
313 | { | ||
314 | template<typename Out, typename ... In> | ||
315 | friend class Job; | ||
316 | |||
317 | public: | ||
318 | JobBase(const Private::ExecutorBasePtr &executor); | ||
319 | ~JobBase(); | ||
320 | |||
321 | protected: | ||
322 | Private::ExecutorBasePtr mExecutor; | ||
323 | }; | ||
324 | |||
325 | /** | ||
326 | * An Asynchronous job | ||
327 | * | ||
328 | * A single instance of Job represents a single method that will be executed | ||
329 | * asynchrously. The Job is started by @p Job::exec(), which returns @p KAsync::Future | ||
330 | * immediatelly. The Future will be set to finished state once the asynchronous | ||
331 | * task has finished. You can use @p KAsync::Future::waitForFinished() to wait for | ||
332 | * for the Future in blocking manner. | ||
333 | * | ||
334 | * It is possible to chain multiple Jobs one after another in different fashion | ||
335 | * (sequential, parallel, etc.). Calling Job::exec() will then return a pending | ||
336 | * @p KAsync::Future, and will execute the entire chain of jobs. | ||
337 | * | ||
338 | * @code | ||
339 | * auto job = Job::start<QList<int>>( | ||
340 | * [](KAsync::Future<QList<int>> &future) { | ||
341 | * MyREST::PendingUsers *pu = MyREST::requestListOfUsers(); | ||
342 | * QObject::connect(pu, &PendingOperation::finished, | ||
343 | * [&](PendingOperation *pu) { | ||
344 | * future->setValue(dynamic_cast<MyREST::PendingUsers*>(pu)->userIds()); | ||
345 | * future->setFinished(); | ||
346 | * }); | ||
347 | * }) | ||
348 | * .each<QList<MyREST::User>, int>( | ||
349 | * [](const int &userId, KAsync::Future<QList<MyREST::User>> &future) { | ||
350 | * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId); | ||
351 | * QObject::connect(pu, &PendingOperation::finished, | ||
352 | * [&](PendingOperation *pu) { | ||
353 | * future->setValue(Qlist<MyREST::User>() << dynamic_cast<MyREST::PendingUser*>(pu)->user()); | ||
354 | * future->setFinished(); | ||
355 | * }); | ||
356 | * }); | ||
357 | * | ||
358 | * KAsync::Future<QList<MyREST::User>> usersFuture = job.exec(); | ||
359 | * usersFuture.waitForFinished(); | ||
360 | * QList<MyRest::User> users = usersFuture.value(); | ||
361 | * @endcode | ||
362 | * | ||
363 | * In the example above, calling @p job.exec() will first invoke the first job, | ||
364 | * which will retrieve a list of IDs, and then will invoke the second function | ||
365 | * for each single entry in the list returned by the first function. | ||
366 | */ | ||
367 | template<typename Out, typename ... In> | ||
368 | class Job : public JobBase | ||
369 | { | ||
370 | template<typename OutOther, typename ... InOther> | ||
371 | friend class Job; | ||
372 | |||
373 | template<typename OutOther, typename ... InOther> | ||
374 | friend Job<OutOther, InOther ...> start(KAsync::ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc); | ||
375 | |||
376 | template<typename OutOther, typename ... InOther> | ||
377 | friend Job<OutOther, InOther ...> start(KAsync::SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc); | ||
378 | |||
379 | #ifdef WITH_KJOB | ||
380 | template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> | ||
381 | friend Job<ReturnType, Args ...> start(); | ||
382 | #endif | ||
383 | |||
384 | public: | ||
385 | template<typename OutOther, typename ... InOther> | ||
386 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | ||
387 | { | ||
388 | return Job<OutOther, InOther ...>(Private::ExecutorBasePtr( | ||
389 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | ||
390 | } | ||
391 | |||
392 | template<typename OutOther, typename ... InOther> | ||
393 | Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | ||
394 | { | ||
395 | return Job<OutOther, InOther ...>(Private::ExecutorBasePtr( | ||
396 | new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | ||
397 | } | ||
398 | |||
399 | template<typename OutOther, typename ... InOther> | ||
400 | Job<OutOther, InOther ...> then(Job<OutOther, InOther ...> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
401 | { | ||
402 | return then<OutOther, InOther ...>(nestedJobWrapper<OutOther, InOther ...>(otherJob), errorFunc); | ||
403 | } | ||
404 | |||
405 | #ifdef WITH_KJOB | ||
406 | template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> | ||
407 | Job<ReturnType, Args ...> then() | ||
408 | { | ||
409 | return start<ReturnType, KJobType, KJobResultMethod, Args ...>(); | ||
410 | } | ||
411 | #endif | ||
412 | |||
413 | template<typename OutOther, typename InOther> | ||
414 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | ||
415 | { | ||
416 | eachInvariants<OutOther>(); | ||
417 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
418 | new Private::EachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor))); | ||
419 | } | ||
420 | |||
421 | template<typename OutOther, typename InOther> | ||
422 | Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | ||
423 | { | ||
424 | eachInvariants<OutOther>(); | ||
425 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
426 | new Private::SyncEachExecutor<Out, OutOther, InOther>(func, errorFunc, mExecutor))); | ||
427 | } | ||
428 | |||
429 | template<typename OutOther, typename InOther> | ||
430 | Job<OutOther, InOther> each(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
431 | { | ||
432 | eachInvariants<OutOther>(); | ||
433 | return each<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc); | ||
434 | } | ||
435 | |||
436 | template<typename OutOther, typename InOther> | ||
437 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | ||
438 | { | ||
439 | reduceInvariants<InOther>(); | ||
440 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
441 | new Private::ReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); | ||
442 | } | ||
443 | |||
444 | template<typename OutOther, typename InOther> | ||
445 | Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | ||
446 | { | ||
447 | reduceInvariants<InOther>(); | ||
448 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
449 | new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); | ||
450 | } | ||
451 | |||
452 | template<typename OutOther, typename InOther> | ||
453 | Job<OutOther, InOther> reduce(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
454 | { | ||
455 | return reduce<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc); | ||
456 | } | ||
457 | |||
458 | template<typename FirstIn> | ||
459 | KAsync::Future<Out> exec(FirstIn in) | ||
460 | { | ||
461 | // Inject a fake sync executor that will return the initial value | ||
462 | Private::ExecutorBasePtr first = mExecutor; | ||
463 | while (first->mPrev) { | ||
464 | first = first->mPrev; | ||
465 | } | ||
466 | auto init = new Private::SyncThenExecutor<FirstIn>( | ||
467 | [in]() -> FirstIn { | ||
468 | return in; | ||
469 | }, | ||
470 | ErrorHandler(), Private::ExecutorBasePtr()); | ||
471 | first->mPrev = Private::ExecutorBasePtr(init); | ||
472 | |||
473 | auto result = exec(); | ||
474 | // Remove the injected executor | ||
475 | first->mPrev.reset(); | ||
476 | return result; | ||
477 | } | ||
478 | |||
479 | KAsync::Future<Out> exec() | ||
480 | { | ||
481 | Private::ExecutionPtr execution = mExecutor->exec(mExecutor); | ||
482 | KAsync::Future<Out> result = *execution->result<Out>(); | ||
483 | |||
484 | return result; | ||
485 | } | ||
486 | |||
487 | private: | ||
488 | Job(Private::ExecutorBasePtr executor) | ||
489 | : JobBase(executor) | ||
490 | {} | ||
491 | |||
492 | template<typename OutOther> | ||
493 | void eachInvariants() | ||
494 | { | ||
495 | static_assert(detail::isIterable<Out>::value, | ||
496 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
497 | static_assert(std::is_void<OutOther>::value || detail::isIterable<OutOther>::value, | ||
498 | "The result type of 'Each' task must be void, a list or an array."); | ||
499 | } | ||
500 | |||
501 | template<typename InOther> | ||
502 | void reduceInvariants() | ||
503 | { | ||
504 | static_assert(KAsync::detail::isIterable<Out>::value, | ||
505 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
506 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
507 | "The return type of previous task must be compatible with input type of this task"); | ||
508 | } | ||
509 | |||
510 | template<typename OutOther, typename ... InOther> | ||
511 | inline std::function<void(InOther ..., KAsync::Future<OutOther>&)> nestedJobWrapper(Job<OutOther, InOther ...> otherJob) { | ||
512 | return [otherJob](InOther ... in, KAsync::Future<OutOther> &future) { | ||
513 | // copy by value is const | ||
514 | auto job = otherJob; | ||
515 | FutureWatcher<OutOther> *watcher = new FutureWatcher<OutOther>(); | ||
516 | QObject::connect(watcher, &FutureWatcherBase::futureReady, | ||
517 | [watcher, future]() { | ||
518 | // FIXME: We pass future by value, because using reference causes the | ||
519 | // future to get deleted before this lambda is invoked, leading to crash | ||
520 | // in copyFutureValue() | ||
521 | // copy by value is const | ||
522 | auto outFuture = future; | ||
523 | KAsync::detail::copyFutureValue(watcher->future(), outFuture); | ||
524 | if (watcher->future().errorCode()) { | ||
525 | outFuture.setError(watcher->future().errorCode(), watcher->future().errorMessage()); | ||
526 | } else { | ||
527 | outFuture.setFinished(); | ||
528 | } | ||
529 | delete watcher; | ||
530 | }); | ||
531 | watcher->setFuture(job.exec(in ...)); | ||
532 | }; | ||
533 | } | ||
534 | }; | ||
535 | |||
536 | } // namespace KAsync | ||
537 | |||
538 | |||
539 | // ********** Out of line definitions **************** | ||
540 | |||
541 | namespace KAsync { | ||
542 | |||
543 | template<typename Out, typename ... In> | ||
544 | Job<Out, In ...> start(ThenTask<Out, In ...> func, ErrorHandler error) | ||
545 | { | ||
546 | return Job<Out, In...>(Private::ExecutorBasePtr( | ||
547 | new Private::ThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr()))); | ||
548 | } | ||
549 | |||
550 | template<typename Out, typename ... In> | ||
551 | Job<Out, In ...> start(SyncThenTask<Out, In ...> func, ErrorHandler error) | ||
552 | { | ||
553 | return Job<Out, In...>(Private::ExecutorBasePtr( | ||
554 | new Private::SyncThenExecutor<Out, In ...>(func, error, Private::ExecutorBasePtr()))); | ||
555 | } | ||
556 | |||
557 | #ifdef WITH_KJOB | ||
558 | template<typename ReturnType, typename KJobType, ReturnType (KJobType::*KJobResultMethod)(), typename ... Args> | ||
559 | Job<ReturnType, Args ...> start() | ||
560 | { | ||
561 | return Job<ReturnType, Args ...>(Private::ExecutorBasePtr( | ||
562 | new Private::ThenExecutor<ReturnType, Args ...>([](const Args & ... args, KAsync::Future<ReturnType> &future) | ||
563 | { | ||
564 | KJobType *job = new KJobType(args ...); | ||
565 | job->connect(job, &KJob::finished, | ||
566 | [&future](KJob *job) { | ||
567 | if (job->error()) { | ||
568 | future.setError(job->error(), job->errorString()); | ||
569 | } else { | ||
570 | future.setValue((static_cast<KJobType*>(job)->*KJobResultMethod)()); | ||
571 | future.setFinished(); | ||
572 | } | ||
573 | }); | ||
574 | job->start(); | ||
575 | }, ErrorHandler(), Private::ExecutorBasePtr()))); | ||
576 | } | ||
577 | #endif | ||
578 | |||
579 | |||
580 | template<typename Out> | ||
581 | Job<Out> null() | ||
582 | { | ||
583 | return KAsync::start<Out>( | ||
584 | [](KAsync::Future<Out> &future) { | ||
585 | future.setFinished(); | ||
586 | }); | ||
587 | } | ||
588 | |||
589 | template<typename Out> | ||
590 | Job<Out> error(int errorCode, const QString &errorMessage) | ||
591 | { | ||
592 | return KAsync::start<Out>( | ||
593 | [errorCode, errorMessage](KAsync::Future<Out> &future) { | ||
594 | future.setError(errorCode, errorMessage); | ||
595 | }); | ||
596 | } | ||
597 | |||
598 | template<typename Out> | ||
599 | Job<Out> iterate(const Out &container) | ||
600 | { | ||
601 | return KAsync::start<Out>( | ||
602 | [container]() { | ||
603 | return container; | ||
604 | }); | ||
605 | } | ||
606 | |||
607 | |||
608 | namespace Private { | ||
609 | |||
610 | template<typename T> | ||
611 | KAsync::Future<T>* ExecutorBase::createFuture(const ExecutionPtr &execution) const | ||
612 | { | ||
613 | return new KAsync::Future<T>(execution); | ||
614 | } | ||
615 | |||
616 | template<typename PrevOut, typename Out, typename ... In> | ||
617 | ExecutionPtr Executor<PrevOut, Out, In ...>::exec(const ExecutorBasePtr &self) | ||
618 | { | ||
619 | // Passing 'self' to execution ensures that the Executor chain remains | ||
620 | // valid until the entire execution is finished | ||
621 | ExecutionPtr execution = ExecutionPtr::create(self); | ||
622 | #ifndef QT_NO_DEBUG | ||
623 | execution->tracer = new Tracer(execution.data()); // owned by execution | ||
624 | #endif | ||
625 | |||
626 | // chainup | ||
627 | execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr(); | ||
628 | |||
629 | execution->resultBase = ExecutorBase::createFuture<Out>(execution); | ||
630 | auto fw = new KAsync::FutureWatcher<Out>(); | ||
631 | QObject::connect(fw, &KAsync::FutureWatcher<Out>::futureReady, | ||
632 | [fw, execution, this]() { | ||
633 | handleError(execution); | ||
634 | execution->setFinished(); | ||
635 | delete fw; | ||
636 | }); | ||
637 | fw->setFuture(*execution->result<Out>()); | ||
638 | |||
639 | KAsync::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() : nullptr; | ||
640 | if (!prevFuture || prevFuture->isFinished()) { | ||
641 | if (prevFuture) { // prevFuture implies execution->prevExecution | ||
642 | if (prevFuture->errorCode()) { | ||
643 | // Propagate the errorCode and message to the outer Future | ||
644 | execution->resultBase->setError(prevFuture->errorCode(), prevFuture->errorMessage()); | ||
645 | if (!execution->errorWasHandled()) { | ||
646 | if (handleError(execution)) { | ||
647 | return execution; | ||
648 | } | ||
649 | } else { | ||
650 | return execution; | ||
651 | } | ||
652 | } else { | ||
653 | // Propagate error (if any) | ||
654 | } | ||
655 | } | ||
656 | |||
657 | execution->isRunning = true; | ||
658 | run(execution); | ||
659 | } else { | ||
660 | auto prevFutureWatcher = new KAsync::FutureWatcher<PrevOut>(); | ||
661 | QObject::connect(prevFutureWatcher, &KAsync::FutureWatcher<PrevOut>::futureReady, | ||
662 | [prevFutureWatcher, execution, this]() { | ||
663 | auto prevFuture = prevFutureWatcher->future(); | ||
664 | assert(prevFuture.isFinished()); | ||
665 | delete prevFutureWatcher; | ||
666 | auto prevExecutor = execution->executor->mPrev; | ||
667 | if (prevFuture.errorCode()) { | ||
668 | execution->resultBase->setError(prevFuture.errorCode(), prevFuture.errorMessage()); | ||
669 | if (!execution->errorWasHandled()) { | ||
670 | if (handleError(execution)) { | ||
671 | return; | ||
672 | } | ||
673 | } else { | ||
674 | return; | ||
675 | } | ||
676 | } | ||
677 | |||
678 | |||
679 | // propagate error (if any) | ||
680 | execution->isRunning = true; | ||
681 | run(execution); | ||
682 | }); | ||
683 | |||
684 | prevFutureWatcher->setFuture(*static_cast<KAsync::Future<PrevOut>*>(prevFuture)); | ||
685 | } | ||
686 | |||
687 | return execution; | ||
688 | } | ||
689 | |||
690 | template<typename PrevOut, typename Out, typename ... In> | ||
691 | bool Executor<PrevOut, Out, In ...>::handleError(const ExecutionPtr &execution) | ||
692 | { | ||
693 | assert(execution->resultBase->isFinished()); | ||
694 | if (execution->resultBase->errorCode()) { | ||
695 | if (mErrorFunc) { | ||
696 | mErrorFunc(execution->resultBase->errorCode(), | ||
697 | execution->resultBase->errorMessage()); | ||
698 | return true; | ||
699 | } | ||
700 | } | ||
701 | |||
702 | return false; | ||
703 | } | ||
704 | |||
705 | |||
706 | template<typename Out, typename ... In> | ||
707 | ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ErrorHandler error, const ExecutorBasePtr &parent) | ||
708 | : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(error, parent) | ||
709 | , mFunc(then) | ||
710 | { | ||
711 | STORE_EXECUTOR_NAME("ThenExecutor", Out, In ...); | ||
712 | } | ||
713 | |||
714 | template<typename Out, typename ... In> | ||
715 | void ThenExecutor<Out, In ...>::run(const ExecutionPtr &execution) | ||
716 | { | ||
717 | KAsync::Future<typename detail::prevOut<In ...>::type> *prevFuture = nullptr; | ||
718 | if (execution->prevExecution) { | ||
719 | prevFuture = execution->prevExecution->result<typename detail::prevOut<In ...>::type>(); | ||
720 | assert(prevFuture->isFinished()); | ||
721 | } | ||
722 | |||
723 | ThenExecutor<Out, In ...>::mFunc(prevFuture ? prevFuture->value() : In() ..., *execution->result<Out>()); | ||
724 | } | ||
725 | |||
726 | template<typename PrevOut, typename Out, typename In> | ||
727 | EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ErrorHandler error, const ExecutorBasePtr &parent) | ||
728 | : Executor<PrevOut, Out, In>(error, parent) | ||
729 | , mFunc(each) | ||
730 | { | ||
731 | STORE_EXECUTOR_NAME("EachExecutor", PrevOut, Out, In); | ||
732 | } | ||
733 | |||
734 | template<typename PrevOut, typename Out, typename In> | ||
735 | void EachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution) | ||
736 | { | ||
737 | assert(execution->prevExecution); | ||
738 | auto prevFuture = execution->prevExecution->result<PrevOut>(); | ||
739 | assert(prevFuture->isFinished()); | ||
740 | |||
741 | auto out = execution->result<Out>(); | ||
742 | if (prevFuture->value().isEmpty()) { | ||
743 | out->setFinished(); | ||
744 | return; | ||
745 | } | ||
746 | |||
747 | for (auto arg : prevFuture->value()) { | ||
748 | //We have to manually manage the lifetime of these temporary futures | ||
749 | KAsync::Future<Out> *future = new KAsync::Future<Out>(); | ||
750 | EachExecutor<PrevOut, Out, In>::mFunc(arg, *future); | ||
751 | auto fw = new KAsync::FutureWatcher<Out>(); | ||
752 | mFutureWatchers.append(fw); | ||
753 | QObject::connect(fw, &KAsync::FutureWatcher<Out>::futureReady, | ||
754 | [out, fw, this, future]() { | ||
755 | assert(fw->future().isFinished()); | ||
756 | const int index = mFutureWatchers.indexOf(fw); | ||
757 | assert(index > -1); | ||
758 | mFutureWatchers.removeAt(index); | ||
759 | KAsync::detail::aggregateFutureValue<Out>(fw->future(), *out); | ||
760 | if (mFutureWatchers.isEmpty()) { | ||
761 | out->setFinished(); | ||
762 | } | ||
763 | delete fw; | ||
764 | delete future; | ||
765 | }); | ||
766 | fw->setFuture(*future); | ||
767 | } | ||
768 | } | ||
769 | |||
770 | template<typename Out, typename In> | ||
771 | ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent) | ||
772 | : ThenExecutor<Out, In>(reduce, errorFunc, parent) | ||
773 | { | ||
774 | STORE_EXECUTOR_NAME("ReduceExecutor", Out, In); | ||
775 | } | ||
776 | |||
777 | template<typename Out, typename ... In> | ||
778 | SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorFunc, const ExecutorBasePtr &parent) | ||
779 | : Executor<typename detail::prevOut<In ...>::type, Out, In ...>(errorFunc, parent) | ||
780 | , mFunc(then) | ||
781 | { | ||
782 | STORE_EXECUTOR_NAME("SyncThenExecutor", Out, In ...); | ||
783 | } | ||
784 | |||
785 | template<typename Out, typename ... In> | ||
786 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution) | ||
787 | { | ||
788 | if (execution->prevExecution) { | ||
789 | assert(execution->prevExecution->resultBase->isFinished()); | ||
790 | } | ||
791 | |||
792 | run(execution, std::is_void<Out>()); | ||
793 | execution->resultBase->setFinished(); | ||
794 | } | ||
795 | |||
796 | template<typename Out, typename ... In> | ||
797 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::false_type) | ||
798 | { | ||
799 | KAsync::Future<typename detail::prevOut<In ...>::type> *prevFuture = | ||
800 | execution->prevExecution | ||
801 | ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>() | ||
802 | : nullptr; | ||
803 | (void) prevFuture; // silence 'set but not used' warning | ||
804 | KAsync::Future<Out> *future = execution->result<Out>(); | ||
805 | future->setValue(SyncThenExecutor<Out, In...>::mFunc(prevFuture ? prevFuture->value() : In() ...)); | ||
806 | } | ||
807 | |||
808 | template<typename Out, typename ... In> | ||
809 | void SyncThenExecutor<Out, In ...>::run(const ExecutionPtr &execution, std::true_type) | ||
810 | { | ||
811 | KAsync::Future<typename detail::prevOut<In ...>::type> *prevFuture = | ||
812 | execution->prevExecution | ||
813 | ? execution->prevExecution->result<typename detail::prevOut<In ...>::type>() | ||
814 | : nullptr; | ||
815 | (void) prevFuture; // silence 'set but not used' warning | ||
816 | SyncThenExecutor<Out, In ...>::mFunc(prevFuture ? prevFuture->value() : In() ...); | ||
817 | } | ||
818 | |||
819 | template<typename PrevOut, typename Out, typename In> | ||
820 | SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, ErrorHandler errorFunc, const ExecutorBasePtr &parent) | ||
821 | : Executor<PrevOut, Out, In>(errorFunc, parent) | ||
822 | , mFunc(each) | ||
823 | { | ||
824 | STORE_EXECUTOR_NAME("SyncEachExecutor", PrevOut, Out, In); | ||
825 | } | ||
826 | |||
827 | template<typename PrevOut, typename Out, typename In> | ||
828 | void SyncEachExecutor<PrevOut, Out, In>::run(const ExecutionPtr &execution) | ||
829 | { | ||
830 | assert(execution->prevExecution); | ||
831 | auto *prevFuture = execution->prevExecution->result<PrevOut>(); | ||
832 | assert(prevFuture->isFinished()); | ||
833 | |||
834 | auto out = execution->result<Out>(); | ||
835 | if (prevFuture->value().isEmpty()) { | ||
836 | out->setFinished(); | ||
837 | return; | ||
838 | } | ||
839 | |||
840 | for (auto arg : prevFuture->value()) { | ||
841 | run(out, arg, std::is_void<Out>()); | ||
842 | } | ||
843 | out->setFinished(); | ||
844 | } | ||
845 | |||
846 | template<typename PrevOut, typename Out, typename In> | ||
847 | void SyncEachExecutor<PrevOut, Out, In>::run(KAsync::Future<Out> *out, const typename PrevOut::value_type &arg, std::false_type) | ||
848 | { | ||
849 | out->setValue(out->value() + SyncEachExecutor<PrevOut, Out, In>::mFunc(arg)); | ||
850 | } | ||
851 | |||
852 | template<typename PrevOut, typename Out, typename In> | ||
853 | void SyncEachExecutor<PrevOut, Out, In>::run(KAsync::Future<Out> * /* unused */, const typename PrevOut::value_type &arg, std::true_type) | ||
854 | { | ||
855 | SyncEachExecutor<PrevOut, Out, In>::mFunc(arg); | ||
856 | } | ||
857 | |||
858 | template<typename Out, typename In> | ||
859 | SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent) | ||
860 | : SyncThenExecutor<Out, In>(reduce, errorFunc, parent) | ||
861 | { | ||
862 | STORE_EXECUTOR_NAME("SyncReduceExecutor", Out, In); | ||
863 | } | ||
864 | |||
865 | |||
866 | } // namespace Private | ||
867 | |||
868 | } // namespace KAsync | ||
869 | |||
870 | |||
871 | |||
872 | #endif // KASYNC_H | ||
873 | |||
874 | |||