diff options
author | Dan Vrátil <dvratil@redhat.com> | 2015-02-08 12:02:04 +0100 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2015-02-09 14:33:45 +0100 |
commit | adc6a443776820b5ae36c982baf92b1d29bbaa4b (patch) | |
tree | ed278ffbcd8fc8c3759fbcc4afd4240fc1a72fc3 /async | |
parent | cbb192ffe865ffb3eed4c940177ffecaecfa570f (diff) | |
download | sink-adc6a443776820b5ae36c982baf92b1d29bbaa4b.tar.gz sink-adc6a443776820b5ae36c982baf92b1d29bbaa4b.zip |
Async: introduce sync executors
Sync executors don't pass Async::Future into the user-provided tasks, but
instead work with return values of the task methods, wrapping them into the
Async::Future internally. Sync tasks are of course possible since forever, but
not the API for those tasks is much cleaner, for users don't have to deal with
"future" in synchronous tasks, for instance when synchronously processing results
of an async task before passing the data to another async task.
Diffstat (limited to 'async')
-rw-r--r-- | async/autotests/asynctest.cpp | 138 | ||||
-rw-r--r-- | async/src/async.h | 170 |
2 files changed, 281 insertions, 27 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index 9240d28..4ebe65e 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp | |||
@@ -42,9 +42,14 @@ private Q_SLOTS: | |||
42 | void testAsyncPromises(); | 42 | void testAsyncPromises(); |
43 | void testAsyncPromises2(); | 43 | void testAsyncPromises2(); |
44 | void testNestedAsync(); | 44 | void testNestedAsync(); |
45 | void testAsyncThen(); | ||
46 | void testSyncThen(); | ||
47 | void testAsyncEach(); | ||
45 | void testSyncEach(); | 48 | void testSyncEach(); |
49 | void testAsyncReduce(); | ||
46 | void testSyncReduce(); | 50 | void testSyncReduce(); |
47 | void testErrorHandler(); | 51 | void testErrorHandler(); |
52 | |||
48 | }; | 53 | }; |
49 | 54 | ||
50 | void AsyncTest::testSyncPromises() | 55 | void AsyncTest::testSyncPromises() |
@@ -155,17 +160,92 @@ void AsyncTest::testNestedAsync() | |||
155 | QTRY_VERIFY(done); | 160 | QTRY_VERIFY(done); |
156 | } | 161 | } |
157 | 162 | ||
158 | void AsyncTest::testSyncEach() | 163 | void AsyncTest::testAsyncThen() |
164 | { | ||
165 | auto job = Async::start<int>( | ||
166 | [](Async::Future<int> &future) { | ||
167 | QTimer *timer = new QTimer; | ||
168 | QObject::connect(timer, &QTimer::timeout, | ||
169 | [&]() { | ||
170 | future.setValue(42); | ||
171 | future.setFinished(); | ||
172 | }); | ||
173 | QObject::connect(timer, &QTimer::timeout, | ||
174 | timer, &QObject::deleteLater); | ||
175 | timer->setSingleShot(true); | ||
176 | timer->start(0); | ||
177 | }); | ||
178 | |||
179 | auto future = job.exec(); | ||
180 | future.waitForFinished(); | ||
181 | |||
182 | QVERIFY(future.isFinished()); | ||
183 | QCOMPARE(future.value(), 42); | ||
184 | } | ||
185 | |||
186 | |||
187 | void AsyncTest::testSyncThen() | ||
188 | { | ||
189 | auto job = Async::start<int>( | ||
190 | []() -> int { | ||
191 | return 42; | ||
192 | }).then<int, int>( | ||
193 | [](int in) -> int { | ||
194 | return in * 2; | ||
195 | }); | ||
196 | |||
197 | auto future = job.exec(); | ||
198 | QVERIFY(future.isFinished()); | ||
199 | QCOMPARE(future.value(), 84); | ||
200 | } | ||
201 | |||
202 | void AsyncTest::testAsyncEach() | ||
159 | { | 203 | { |
160 | auto job = Async::start<QList<int>>( | 204 | auto job = Async::start<QList<int>>( |
161 | [](Async::Future<QList<int>> &future) { | 205 | [](Async::Future<QList<int>> &future) { |
162 | future.setValue(QList<int>{ 1, 2, 3, 4 }); | 206 | QTimer *timer = new QTimer; |
163 | future.setFinished(); | 207 | QObject::connect(timer, &QTimer::timeout, |
208 | [&future]() { | ||
209 | future.setValue({ 1, 2, 3, 4 }); | ||
210 | future.setFinished(); | ||
211 | }); | ||
212 | QObject::connect(timer, &QTimer::timeout, | ||
213 | timer, &QObject::deleteLater); | ||
214 | timer->setSingleShot(true); | ||
215 | timer->start(0); | ||
164 | }) | 216 | }) |
165 | .each<QList<int>, int>( | 217 | .each<QList<int>, int>( |
166 | [](const int &v, Async::Future<QList<int>> &future) { | 218 | [](const int &v, Async::Future<QList<int>> &future) { |
167 | future.setValue(QList<int>{ v + 1 }); | 219 | QTimer *timer = new QTimer; |
168 | future.setFinished(); | 220 | QObject::connect(timer, &QTimer::timeout, |
221 | [v, &future]() { | ||
222 | future.setValue({ v + 1 }); | ||
223 | future.setFinished(); | ||
224 | }); | ||
225 | QObject::connect(timer, &QTimer::timeout, | ||
226 | timer, &QObject::deleteLater); | ||
227 | timer->setSingleShot(true); | ||
228 | timer->start(0); | ||
229 | }); | ||
230 | |||
231 | auto future = job.exec(); | ||
232 | future.waitForFinished(); | ||
233 | |||
234 | const QList<int> expected({ 2, 3, 4, 5 }); | ||
235 | QVERIFY(future.isFinished()); | ||
236 | QCOMPARE(future.value(), expected); | ||
237 | } | ||
238 | |||
239 | |||
240 | void AsyncTest::testSyncEach() | ||
241 | { | ||
242 | auto job = Async::start<QList<int>>( | ||
243 | []() -> QList<int> { | ||
244 | return { 1, 2, 3, 4 }; | ||
245 | }) | ||
246 | .each<QList<int>, int>( | ||
247 | [](const int &v) -> QList<int> { | ||
248 | return { v + 1 }; | ||
169 | }); | 249 | }); |
170 | 250 | ||
171 | Async::Future<QList<int>> future = job.exec(); | 251 | Async::Future<QList<int>> future = job.exec(); |
@@ -175,19 +255,55 @@ void AsyncTest::testSyncEach() | |||
175 | QCOMPARE(future.value(), expected); | 255 | QCOMPARE(future.value(), expected); |
176 | } | 256 | } |
177 | 257 | ||
178 | void AsyncTest::testSyncReduce() | 258 | void AsyncTest::testAsyncReduce() |
179 | { | 259 | { |
180 | auto job = Async::start<QList<int>>( | 260 | auto job = Async::start<QList<int>>( |
181 | [](Async::Future<QList<int>> &future) { | 261 | [](Async::Future<QList<int>> &future) { |
182 | future.setValue(QList<int>{ 1, 2, 3, 4 }); | 262 | QTimer *timer = new QTimer(); |
183 | future.setFinished(); | 263 | QObject::connect(timer, &QTimer::timeout, |
264 | [&future]() { | ||
265 | future.setValue({ 1, 2, 3, 4 }); | ||
266 | future.setFinished(); | ||
267 | }); | ||
268 | QObject::connect(timer, &QTimer::timeout, | ||
269 | timer, &QObject::deleteLater); | ||
270 | timer->setSingleShot(true); | ||
271 | timer->start(0); | ||
184 | }) | 272 | }) |
185 | .reduce<int, QList<int>>( | 273 | .reduce<int, QList<int>>( |
186 | [](const QList<int> &list, Async::Future<int> &future) { | 274 | [](const QList<int> &list, Async::Future<int> &future) { |
275 | QTimer *timer = new QTimer(); | ||
276 | QObject::connect(timer, &QTimer::timeout, | ||
277 | [list, &future]() { | ||
278 | int sum = 0; | ||
279 | for (int i : list) sum += i; | ||
280 | future.setValue(sum); | ||
281 | future.setFinished(); | ||
282 | }); | ||
283 | QObject::connect(timer, &QTimer::timeout, | ||
284 | timer, &QObject::deleteLater); | ||
285 | timer->setSingleShot(true); | ||
286 | timer->start(0); | ||
287 | }); | ||
288 | |||
289 | Async::Future<int> future = job.exec(); | ||
290 | future.waitForFinished(); | ||
291 | |||
292 | QVERIFY(future.isFinished()); | ||
293 | QCOMPARE(future.value(), 10); | ||
294 | } | ||
295 | |||
296 | void AsyncTest::testSyncReduce() | ||
297 | { | ||
298 | auto job = Async::start<QList<int>>( | ||
299 | []() -> QList<int> { | ||
300 | return { 1, 2, 3, 4 }; | ||
301 | }) | ||
302 | .reduce<int, QList<int>>( | ||
303 | [](const QList<int> &list) -> int { | ||
187 | int sum = 0; | 304 | int sum = 0; |
188 | for (int i : list) sum += i; | 305 | for (int i : list) sum += i; |
189 | future.setValue(sum); | 306 | return sum; |
190 | future.setFinished(); | ||
191 | }); | 307 | }); |
192 | 308 | ||
193 | Async::Future<int> future = job.exec(); | 309 | Async::Future<int> future = job.exec(); |
@@ -213,7 +329,7 @@ void AsyncTest::testErrorHandler() | |||
213 | ); | 329 | ); |
214 | auto future = job.exec(); | 330 | auto future = job.exec(); |
215 | future.waitForFinished(); | 331 | future.waitForFinished(); |
216 | QVERIFY(error == 1); | 332 | QCOMPARE(error, 1); |
217 | QVERIFY(future.isFinished()); | 333 | QVERIFY(future.isFinished()); |
218 | } | 334 | } |
219 | 335 | ||
diff --git a/async/src/async.h b/async/src/async.h index d15373b..336bae2 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -33,8 +33,6 @@ | |||
33 | 33 | ||
34 | 34 | ||
35 | /* | 35 | /* |
36 | * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation. | ||
37 | * Useful for typical value consumer continuations. | ||
38 | * TODO: error continuation on .then and others. | 36 | * TODO: error continuation on .then and others. |
39 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally | 37 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally |
40 | */ | 38 | */ |
@@ -47,13 +45,19 @@ class JobBase; | |||
47 | 45 | ||
48 | template<typename Out, typename ... In> | 46 | template<typename Out, typename ... In> |
49 | class Job; | 47 | class Job; |
50 | |||
51 | template<typename Out, typename ... In> | 48 | template<typename Out, typename ... In> |
52 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; | 49 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; |
50 | template<typename Out, typename ... In> | ||
51 | using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type; | ||
53 | template<typename Out, typename In> | 52 | template<typename Out, typename In> |
54 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | 53 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; |
55 | template<typename Out, typename In> | 54 | template<typename Out, typename In> |
55 | using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type; | ||
56 | template<typename Out, typename In> | ||
56 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | 57 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; |
58 | template<typename Out, typename In> | ||
59 | using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type; | ||
60 | |||
57 | using ErrorHandler = std::function<void(int, const QString &)>; | 61 | using ErrorHandler = std::function<void(int, const QString &)>; |
58 | 62 | ||
59 | namespace Private | 63 | namespace Private |
@@ -134,6 +138,33 @@ public: | |||
134 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); | 138 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); |
135 | }; | 139 | }; |
136 | 140 | ||
141 | template<typename Out, typename ... In> | ||
142 | class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...> | ||
143 | { | ||
144 | public: | ||
145 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
146 | void previousFutureReady(); | ||
147 | protected: | ||
148 | SyncThenTask<Out, In ...> mFunc; | ||
149 | }; | ||
150 | |||
151 | template<typename Out, typename In> | ||
152 | class SyncReduceExecutor : public SyncThenExecutor<Out, In> | ||
153 | { | ||
154 | public: | ||
155 | SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
156 | }; | ||
157 | |||
158 | template<typename PrevOut, typename Out, typename In> | ||
159 | class SyncEachExecutor : public Executor<PrevOut, Out, In> | ||
160 | { | ||
161 | public: | ||
162 | SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
163 | void previousFutureReady(); | ||
164 | protected: | ||
165 | SyncEachTask<Out, In> mFunc; | ||
166 | }; | ||
167 | |||
137 | } // namespace Private | 168 | } // namespace Private |
138 | 169 | ||
139 | /** | 170 | /** |
@@ -238,6 +269,9 @@ class Job : public JobBase | |||
238 | template<typename OutOther> | 269 | template<typename OutOther> |
239 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); | 270 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); |
240 | 271 | ||
272 | template<typename OutOther> | ||
273 | friend Job<OutOther> start(Async::SyncThenTask<OutOther> func); | ||
274 | |||
241 | public: | 275 | public: |
242 | template<typename OutOther, typename ... InOther> | 276 | template<typename OutOther, typename ... InOther> |
243 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | 277 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) |
@@ -246,28 +280,45 @@ public: | |||
246 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | 280 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); |
247 | } | 281 | } |
248 | 282 | ||
283 | template<typename OutOther, typename ... InOther> | ||
284 | Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | ||
285 | { | ||
286 | return Job<OutOther, InOther ...>(Private::ExecutorBasePtr( | ||
287 | new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | ||
288 | } | ||
289 | |||
249 | template<typename OutOther, typename InOther> | 290 | template<typename OutOther, typename InOther> |
250 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) | 291 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) |
251 | { | 292 | { |
252 | static_assert(detail::isIterable<Out>::value, | 293 | eachInvariants<OutOther>(); |
253 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
254 | static_assert(detail::isIterable<OutOther>::value, | ||
255 | "The result type of 'Each' task must be a list or an array."); | ||
256 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 294 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
257 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); | 295 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); |
258 | } | 296 | } |
259 | 297 | ||
260 | template<typename OutOther, typename InOther> | 298 | template<typename OutOther, typename InOther> |
299 | Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func) | ||
300 | { | ||
301 | eachInvariants<OutOther>(); | ||
302 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
303 | new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor))); | ||
304 | } | ||
305 | |||
306 | template<typename OutOther, typename InOther> | ||
261 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) | 307 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) |
262 | { | 308 | { |
263 | static_assert(Async::detail::isIterable<Out>::value, | 309 | reduceInvariants<InOther>(); |
264 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
265 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
266 | "The return type of previous task must be compatible with input type of this task"); | ||
267 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 310 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
268 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); | 311 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); |
269 | } | 312 | } |
270 | 313 | ||
314 | template<typename OutOther, typename InOther> | ||
315 | Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func) | ||
316 | { | ||
317 | reduceInvariants<InOther>(); | ||
318 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
319 | new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor))); | ||
320 | } | ||
321 | |||
271 | Async::Future<Out> exec() | 322 | Async::Future<Out> exec() |
272 | { | 323 | { |
273 | mExecutor->exec(); | 324 | mExecutor->exec(); |
@@ -283,6 +334,24 @@ private: | |||
283 | Job(Private::ExecutorBasePtr executor) | 334 | Job(Private::ExecutorBasePtr executor) |
284 | : JobBase(executor) | 335 | : JobBase(executor) |
285 | {} | 336 | {} |
337 | |||
338 | template<typename OutOther> | ||
339 | void eachInvariants() | ||
340 | { | ||
341 | static_assert(detail::isIterable<Out>::value, | ||
342 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
343 | static_assert(detail::isIterable<OutOther>::value, | ||
344 | "The result type of 'Each' task must be a list or an array."); | ||
345 | } | ||
346 | |||
347 | template<typename InOther> | ||
348 | void reduceInvariants() | ||
349 | { | ||
350 | static_assert(Async::detail::isIterable<Out>::value, | ||
351 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
352 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
353 | "The return type of previous task must be compatible with input type of this task"); | ||
354 | } | ||
286 | }; | 355 | }; |
287 | 356 | ||
288 | } // namespace Async | 357 | } // namespace Async |
@@ -298,6 +367,12 @@ Job<Out> start(ThenTask<Out> func) | |||
298 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); | 367 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); |
299 | } | 368 | } |
300 | 369 | ||
370 | template<typename Out> | ||
371 | Job<Out> start(SyncThenTask<Out> func) | ||
372 | { | ||
373 | return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func))); | ||
374 | } | ||
375 | |||
301 | namespace Private { | 376 | namespace Private { |
302 | 377 | ||
303 | template<typename PrevOut, typename Out, typename ... In> | 378 | template<typename PrevOut, typename Out, typename ... In> |
@@ -378,22 +453,23 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady() | |||
378 | } | 453 | } |
379 | 454 | ||
380 | for (auto arg : this->mPrevFuture->value()) { | 455 | for (auto arg : this->mPrevFuture->value()) { |
381 | Async::Future<Out> future; | 456 | auto future = new Async::Future<Out>; |
382 | this->mFunc(arg, future); | 457 | this->mFunc(arg, *future); |
383 | auto fw = new Async::FutureWatcher<Out>(); | 458 | auto fw = new Async::FutureWatcher<Out>(); |
384 | mFutureWatchers.append(fw); | 459 | mFutureWatchers.append(fw); |
385 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | 460 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, |
386 | [out, future, fw, this]() { | 461 | [out, future, fw, this]() { |
387 | assert(future.isFinished()); | 462 | assert(future->isFinished()); |
388 | const int index = mFutureWatchers.indexOf(fw); | 463 | const int index = mFutureWatchers.indexOf(fw); |
389 | assert(index > -1); | 464 | assert(index > -1); |
390 | mFutureWatchers.removeAt(index); | 465 | mFutureWatchers.removeAt(index); |
391 | out->setValue(out->value() + future.value()); | 466 | out->setValue(out->value() + future->value()); |
467 | delete future; | ||
392 | if (mFutureWatchers.isEmpty()) { | 468 | if (mFutureWatchers.isEmpty()) { |
393 | out->setFinished(); | 469 | out->setFinished(); |
394 | } | 470 | } |
395 | }); | 471 | }); |
396 | fw->setFuture(future); | 472 | fw->setFuture(*future); |
397 | } | 473 | } |
398 | } | 474 | } |
399 | 475 | ||
@@ -403,6 +479,68 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const Execut | |||
403 | { | 479 | { |
404 | } | 480 | } |
405 | 481 | ||
482 | template<typename Out, typename ... In> | ||
483 | SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) | ||
484 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | ||
485 | { | ||
486 | this->mFunc = then; | ||
487 | this->mErrorFunc = errorHandler; | ||
488 | } | ||
489 | |||
490 | template<typename Out, typename ... In> | ||
491 | void SyncThenExecutor<Out, In ...>::previousFutureReady() | ||
492 | { | ||
493 | if (this->mPrevFuture) { | ||
494 | assert(this->mPrevFuture->isFinished()); | ||
495 | } | ||
496 | |||
497 | if (this->mPrevFuture && this->mPrevFuture->errorCode()) { | ||
498 | if (this->mErrorFunc) { | ||
499 | this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
500 | this->mResult->setFinished(); | ||
501 | } else { | ||
502 | static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
503 | //propagate error if no error handler is available | ||
504 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
505 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
506 | this->mResult->setFinished(); | ||
507 | } | ||
508 | } else { | ||
509 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
510 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
511 | this->mResult->setFinished(); | ||
512 | } | ||
513 | } | ||
514 | |||
515 | template<typename PrevOut, typename Out, typename In> | ||
516 | SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent) | ||
517 | : Executor<PrevOut, Out, In>(parent) | ||
518 | { | ||
519 | this->mFunc = each; | ||
520 | } | ||
521 | |||
522 | template<typename PrevOut, typename Out, typename In> | ||
523 | void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady() | ||
524 | { | ||
525 | assert(this->mPrevFuture->isFinished()); | ||
526 | auto out = static_cast<Async::Future<Out>*>(this->mResult); | ||
527 | if (this->mPrevFuture->value().isEmpty()) { | ||
528 | out->setFinished(); | ||
529 | return; | ||
530 | } | ||
531 | |||
532 | for (auto arg : this->mPrevFuture->value()) { | ||
533 | out->setValue(out->value() + this->mFunc(arg)); | ||
534 | } | ||
535 | out->setFinished(); | ||
536 | } | ||
537 | |||
538 | template<typename Out, typename In> | ||
539 | SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent) | ||
540 | : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent) | ||
541 | { | ||
542 | } | ||
543 | |||
406 | 544 | ||
407 | } // namespace Private | 545 | } // namespace Private |
408 | 546 | ||