diff options
Diffstat (limited to 'async/src/async.h')
-rw-r--r-- | async/src/async.h | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/async/src/async.h b/async/src/async.h new file mode 100644 index 0000000..0e4f246 --- /dev/null +++ b/async/src/async.h | |||
@@ -0,0 +1,337 @@ | |||
1 | /* | ||
2 | * Copyright 2014 Daniel Vrátil <dvratil@redhat.com> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU General Public License as | ||
6 | * published by the Free Software Foundation; either version 2 of | ||
7 | * the License or (at your option) version 3 or any later version | ||
8 | * accepted by the membership of KDE e.V. (or its successor approved | ||
9 | * by the membership of KDE e.V.), which shall act as a proxy | ||
10 | * defined in Section 14 of version 3 of the license. | ||
11 | * | ||
12 | * This program is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
15 | * GNU General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU General Public License | ||
18 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
19 | * | ||
20 | */ | ||
21 | |||
22 | #ifndef ASYNC_H | ||
23 | #define ASYNC_H | ||
24 | |||
25 | #include <functional> | ||
26 | #include <list> | ||
27 | #include <type_traits> | ||
28 | #include <iostream> | ||
29 | #include <cassert> | ||
30 | #include <iterator> | ||
31 | #include <boost/graph/graph_concepts.hpp> | ||
32 | |||
33 | #include "future.h" | ||
34 | #include "async_impl.h" | ||
35 | |||
36 | |||
37 | namespace Async { | ||
38 | |||
39 | template<typename PrevOut, typename Out, typename ... In> | ||
40 | class Executor; | ||
41 | |||
42 | class JobBase; | ||
43 | |||
44 | template<typename Out, typename ... In> | ||
45 | class Job; | ||
46 | |||
47 | template<typename Out, typename ... In> | ||
48 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; | ||
49 | template<typename Out, typename In> | ||
50 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | ||
51 | template<typename Out, typename In> | ||
52 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | ||
53 | |||
54 | namespace Private | ||
55 | { | ||
56 | |||
57 | template<typename ... T> | ||
58 | struct PreviousOut { | ||
59 | using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type; | ||
60 | }; | ||
61 | |||
62 | class ExecutorBase | ||
63 | { | ||
64 | template<typename PrevOut, typename Out, typename ... In> | ||
65 | friend class Executor; | ||
66 | |||
67 | public: | ||
68 | virtual ~ExecutorBase(); | ||
69 | virtual void exec() = 0; | ||
70 | |||
71 | inline FutureBase* result() const | ||
72 | { | ||
73 | return mResult; | ||
74 | } | ||
75 | |||
76 | protected: | ||
77 | ExecutorBase(ExecutorBase *parent); | ||
78 | |||
79 | ExecutorBase *mPrev; | ||
80 | FutureBase *mResult; | ||
81 | }; | ||
82 | |||
83 | template<typename PrevOut, typename Out, typename ... In> | ||
84 | class Executor : public ExecutorBase | ||
85 | { | ||
86 | protected: | ||
87 | Executor(ExecutorBase *parent) | ||
88 | : ExecutorBase(parent) | ||
89 | {} | ||
90 | virtual ~Executor() {} | ||
91 | inline Async::Future<PrevOut>* chainup(); | ||
92 | |||
93 | std::function<void(const In& ..., Async::Future<Out> &)> mFunc; | ||
94 | }; | ||
95 | |||
96 | template<typename Out, typename ... In> | ||
97 | class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...> | ||
98 | { | ||
99 | public: | ||
100 | ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr); | ||
101 | void exec(); | ||
102 | }; | ||
103 | |||
104 | template<typename PrevOut, typename Out, typename In> | ||
105 | class EachExecutor : public Executor<PrevOut, Out, In> | ||
106 | { | ||
107 | public: | ||
108 | EachExecutor(EachTask<Out, In> each, ExecutorBase *parent); | ||
109 | void exec(); | ||
110 | }; | ||
111 | |||
112 | template<typename Out, typename In> | ||
113 | class ReduceExecutor : public Executor<In, Out, In> | ||
114 | { | ||
115 | public: | ||
116 | ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent); | ||
117 | void exec(); | ||
118 | }; | ||
119 | |||
120 | } // namespace Private | ||
121 | |||
122 | /** | ||
123 | * Start an asynchronous job sequence. | ||
124 | * | ||
125 | * Async::start() is your starting point to build a chain of jobs to be executed | ||
126 | * asynchronously. | ||
127 | * | ||
128 | * @param func An asynchronous function to be executed. The function must have | ||
129 | * void return type, and accept exactly one argument of type @p Async::Future<In>, | ||
130 | * where @p In is type of the result. | ||
131 | */ | ||
132 | template<typename Out> | ||
133 | Job<Out> start(ThenTask<Out> func); | ||
134 | |||
135 | class JobBase | ||
136 | { | ||
137 | template<typename Out, typename ... In> | ||
138 | friend class Job; | ||
139 | |||
140 | public: | ||
141 | JobBase(Private::ExecutorBase *executor); | ||
142 | ~JobBase(); | ||
143 | |||
144 | void exec(); | ||
145 | |||
146 | protected: | ||
147 | Private::ExecutorBase *mExecutor; | ||
148 | }; | ||
149 | |||
150 | /** | ||
151 | * An Asynchronous job | ||
152 | * | ||
153 | * A single instance of Job represents a single method that will be executed | ||
154 | * asynchrously. The Job is started by @p Job::exec(), which returns @p Async::Future | ||
155 | * immediatelly. The Future will be set to finished state once the asynchronous | ||
156 | * task has finished. You can use @p Async::Future::waitForFinished() to wait for | ||
157 | * for the Future in blocking manner. | ||
158 | * | ||
159 | * It is possible to chain multiple Jobs one after another in different fashion | ||
160 | * (sequential, parallel, etc.). Calling Job::exec() will then return a pending | ||
161 | * @p Async::Future, and will execute the entire chain of jobs. | ||
162 | * | ||
163 | * @code | ||
164 | * auto job = Job::start<QList<int>>( | ||
165 | * [](Async::Future<QList<int>> &future) { | ||
166 | * MyREST::PendingUsers *pu = MyREST::requestListOfUsers(); | ||
167 | * QObject::connect(pu, &PendingOperation::finished, | ||
168 | * [&](PendingOperation *pu) { | ||
169 | * future->setValue(dynamic_cast<MyREST::PendingUsers*>(pu)->userIds()); | ||
170 | * future->setFinished(); | ||
171 | * }); | ||
172 | * }) | ||
173 | * .each<QList<MyREST::User>, int>( | ||
174 | * [](const int &userId, Async::Future<QList<MyREST::User>> &future) { | ||
175 | * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId); | ||
176 | * QObject::connect(pu, &PendingOperation::finished, | ||
177 | * [&](PendingOperation *pu) { | ||
178 | * future->setValue(Qlist<MyREST::User>() << dynamic_cast<MyREST::PendingUser*>(pu)->user()); | ||
179 | * future->setFinished(); | ||
180 | * }); | ||
181 | * }); | ||
182 | * | ||
183 | * Async::Future<QList<MyREST::User>> usersFuture = job.exec(); | ||
184 | * usersFuture.waitForFinished(); | ||
185 | * QList<MyRest::User> users = usersFuture.value(); | ||
186 | * @endcode | ||
187 | * | ||
188 | * In the example above, calling @p job.exec() will first invoke the first job, | ||
189 | * which will retrieve a list of IDs, and then will invoke the second function | ||
190 | * for each single entry in the list returned by the first function. | ||
191 | */ | ||
192 | template<typename Out, typename ... In> | ||
193 | class Job : public JobBase | ||
194 | { | ||
195 | template<typename OutOther, typename ... InOther> | ||
196 | friend class Job; | ||
197 | |||
198 | template<typename OutOther> | ||
199 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); | ||
200 | |||
201 | public: | ||
202 | template<typename OutOther, typename ... InOther> | ||
203 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func) | ||
204 | { | ||
205 | return Job<OutOther, InOther ...>(new Private::ThenExecutor<OutOther, InOther ...>(func, mExecutor)); | ||
206 | } | ||
207 | |||
208 | template<typename OutOther, typename InOther> | ||
209 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) | ||
210 | { | ||
211 | static_assert(detail::isIterable<Out>::value, | ||
212 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
213 | static_assert(detail::isIterable<OutOther>::value, | ||
214 | "The result type of 'Each' task must be a list or an array."); | ||
215 | return Job<OutOther, InOther>(new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor)); | ||
216 | } | ||
217 | |||
218 | template<typename OutOther, typename InOther> | ||
219 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) | ||
220 | { | ||
221 | static_assert(Async::detail::isIterable<Out>::value, | ||
222 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
223 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
224 | "The return type of previous task must be compatible with input type of this task"); | ||
225 | return Job<OutOther, InOther>(new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)); | ||
226 | } | ||
227 | |||
228 | Async::Future<Out> result() const | ||
229 | { | ||
230 | return *static_cast<Async::Future<Out>*>(mExecutor->result()); | ||
231 | } | ||
232 | |||
233 | private: | ||
234 | Job(Private::ExecutorBase *executor) | ||
235 | : JobBase(executor) | ||
236 | { | ||
237 | } | ||
238 | }; | ||
239 | |||
240 | } // namespace Async | ||
241 | |||
242 | |||
243 | // ********** Out of line definitions **************** | ||
244 | |||
245 | namespace Async { | ||
246 | |||
247 | template<typename Out> | ||
248 | Job<Out> start(ThenTask<Out> func) | ||
249 | { | ||
250 | return Job<Out>(new Private::ThenExecutor<Out>(func)); | ||
251 | } | ||
252 | |||
253 | namespace Private { | ||
254 | |||
255 | template<typename PrevOut, typename Out, typename ... In> | ||
256 | Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup() | ||
257 | { | ||
258 | if (mPrev) { | ||
259 | mPrev->exec(); | ||
260 | auto future = static_cast<Async::Future<PrevOut>*>(mPrev->result()); | ||
261 | assert(future->isFinished()); | ||
262 | return future; | ||
263 | } else { | ||
264 | return 0; | ||
265 | } | ||
266 | } | ||
267 | |||
268 | template<typename Out, typename ... In> | ||
269 | ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase* parent) | ||
270 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | ||
271 | { | ||
272 | this->mFunc = then; | ||
273 | } | ||
274 | |||
275 | template<typename Out, typename ... In> | ||
276 | void ThenExecutor<Out, In ...>::exec() | ||
277 | { | ||
278 | auto in = this->chainup(); | ||
279 | (void)in; // supress 'unused variable' warning when In is void | ||
280 | |||
281 | auto out = new Async::Future<Out>(); | ||
282 | this->mFunc(in ? in->value() : In() ..., *out); | ||
283 | out->waitForFinished(); | ||
284 | this->mResult = out; | ||
285 | } | ||
286 | |||
287 | template<typename PrevOut, typename Out, typename In> | ||
288 | EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ExecutorBase* parent) | ||
289 | : Executor<PrevOut, Out, In>(parent) | ||
290 | { | ||
291 | this->mFunc = each; | ||
292 | } | ||
293 | |||
294 | template<typename PrevOut, typename Out, typename In> | ||
295 | void EachExecutor<PrevOut, Out, In>::exec() | ||
296 | { | ||
297 | auto in = this->chainup(); | ||
298 | |||
299 | auto *out = new Async::Future<Out>(); | ||
300 | for (auto arg : in->value()) { | ||
301 | Async::Future<Out> future; | ||
302 | this->mFunc(arg, future); | ||
303 | future.waitForFinished(); | ||
304 | out->setValue(out->value() + future.value()); | ||
305 | } | ||
306 | out->setFinished(); | ||
307 | |||
308 | this->mResult = out; | ||
309 | } | ||
310 | |||
311 | template<typename Out, typename In> | ||
312 | ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase* parent) | ||
313 | : Executor<In, Out, In>(parent) | ||
314 | { | ||
315 | this->mFunc = reduce; | ||
316 | } | ||
317 | |||
318 | template<typename Out, typename In> | ||
319 | void ReduceExecutor<Out, In>::exec() | ||
320 | { | ||
321 | auto in = this->chainup(); | ||
322 | |||
323 | auto out = new Async::Future<Out>(); | ||
324 | this->mFunc(in->value(), *out); | ||
325 | out->waitForFinished(); | ||
326 | this->mResult = out; | ||
327 | } | ||
328 | |||
329 | } // namespace Private | ||
330 | |||
331 | } // namespace Async | ||
332 | |||
333 | |||
334 | |||
335 | #endif // ASYNC_H | ||
336 | |||
337 | |||