diff options
-rw-r--r-- | async/autotests/asynctest.cpp | 138 | ||||
-rw-r--r-- | async/src/async.h | 170 |
2 files changed, 281 insertions, 27 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index 9240d28..4ebe65e 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp | |||
@@ -42,9 +42,14 @@ private Q_SLOTS: | |||
42 | void testAsyncPromises(); | 42 | void testAsyncPromises(); |
43 | void testAsyncPromises2(); | 43 | void testAsyncPromises2(); |
44 | void testNestedAsync(); | 44 | void testNestedAsync(); |
45 | void testAsyncThen(); | ||
46 | void testSyncThen(); | ||
47 | void testAsyncEach(); | ||
45 | void testSyncEach(); | 48 | void testSyncEach(); |
49 | void testAsyncReduce(); | ||
46 | void testSyncReduce(); | 50 | void testSyncReduce(); |
47 | void testErrorHandler(); | 51 | void testErrorHandler(); |
52 | |||
48 | }; | 53 | }; |
49 | 54 | ||
50 | void AsyncTest::testSyncPromises() | 55 | void AsyncTest::testSyncPromises() |
@@ -155,17 +160,92 @@ void AsyncTest::testNestedAsync() | |||
155 | QTRY_VERIFY(done); | 160 | QTRY_VERIFY(done); |
156 | } | 161 | } |
157 | 162 | ||
158 | void AsyncTest::testSyncEach() | 163 | void AsyncTest::testAsyncThen() |
164 | { | ||
165 | auto job = Async::start<int>( | ||
166 | [](Async::Future<int> &future) { | ||
167 | QTimer *timer = new QTimer; | ||
168 | QObject::connect(timer, &QTimer::timeout, | ||
169 | [&]() { | ||
170 | future.setValue(42); | ||
171 | future.setFinished(); | ||
172 | }); | ||
173 | QObject::connect(timer, &QTimer::timeout, | ||
174 | timer, &QObject::deleteLater); | ||
175 | timer->setSingleShot(true); | ||
176 | timer->start(0); | ||
177 | }); | ||
178 | |||
179 | auto future = job.exec(); | ||
180 | future.waitForFinished(); | ||
181 | |||
182 | QVERIFY(future.isFinished()); | ||
183 | QCOMPARE(future.value(), 42); | ||
184 | } | ||
185 | |||
186 | |||
187 | void AsyncTest::testSyncThen() | ||
188 | { | ||
189 | auto job = Async::start<int>( | ||
190 | []() -> int { | ||
191 | return 42; | ||
192 | }).then<int, int>( | ||
193 | [](int in) -> int { | ||
194 | return in * 2; | ||
195 | }); | ||
196 | |||
197 | auto future = job.exec(); | ||
198 | QVERIFY(future.isFinished()); | ||
199 | QCOMPARE(future.value(), 84); | ||
200 | } | ||
201 | |||
202 | void AsyncTest::testAsyncEach() | ||
159 | { | 203 | { |
160 | auto job = Async::start<QList<int>>( | 204 | auto job = Async::start<QList<int>>( |
161 | [](Async::Future<QList<int>> &future) { | 205 | [](Async::Future<QList<int>> &future) { |
162 | future.setValue(QList<int>{ 1, 2, 3, 4 }); | 206 | QTimer *timer = new QTimer; |
163 | future.setFinished(); | 207 | QObject::connect(timer, &QTimer::timeout, |
208 | [&future]() { | ||
209 | future.setValue({ 1, 2, 3, 4 }); | ||
210 | future.setFinished(); | ||
211 | }); | ||
212 | QObject::connect(timer, &QTimer::timeout, | ||
213 | timer, &QObject::deleteLater); | ||
214 | timer->setSingleShot(true); | ||
215 | timer->start(0); | ||
164 | }) | 216 | }) |
165 | .each<QList<int>, int>( | 217 | .each<QList<int>, int>( |
166 | [](const int &v, Async::Future<QList<int>> &future) { | 218 | [](const int &v, Async::Future<QList<int>> &future) { |
167 | future.setValue(QList<int>{ v + 1 }); | 219 | QTimer *timer = new QTimer; |
168 | future.setFinished(); | 220 | QObject::connect(timer, &QTimer::timeout, |
221 | [v, &future]() { | ||
222 | future.setValue({ v + 1 }); | ||
223 | future.setFinished(); | ||
224 | }); | ||
225 | QObject::connect(timer, &QTimer::timeout, | ||
226 | timer, &QObject::deleteLater); | ||
227 | timer->setSingleShot(true); | ||
228 | timer->start(0); | ||
229 | }); | ||
230 | |||
231 | auto future = job.exec(); | ||
232 | future.waitForFinished(); | ||
233 | |||
234 | const QList<int> expected({ 2, 3, 4, 5 }); | ||
235 | QVERIFY(future.isFinished()); | ||
236 | QCOMPARE(future.value(), expected); | ||
237 | } | ||
238 | |||
239 | |||
240 | void AsyncTest::testSyncEach() | ||
241 | { | ||
242 | auto job = Async::start<QList<int>>( | ||
243 | []() -> QList<int> { | ||
244 | return { 1, 2, 3, 4 }; | ||
245 | }) | ||
246 | .each<QList<int>, int>( | ||
247 | [](const int &v) -> QList<int> { | ||
248 | return { v + 1 }; | ||
169 | }); | 249 | }); |
170 | 250 | ||
171 | Async::Future<QList<int>> future = job.exec(); | 251 | Async::Future<QList<int>> future = job.exec(); |
@@ -175,19 +255,55 @@ void AsyncTest::testSyncEach() | |||
175 | QCOMPARE(future.value(), expected); | 255 | QCOMPARE(future.value(), expected); |
176 | } | 256 | } |
177 | 257 | ||
178 | void AsyncTest::testSyncReduce() | 258 | void AsyncTest::testAsyncReduce() |
179 | { | 259 | { |
180 | auto job = Async::start<QList<int>>( | 260 | auto job = Async::start<QList<int>>( |
181 | [](Async::Future<QList<int>> &future) { | 261 | [](Async::Future<QList<int>> &future) { |
182 | future.setValue(QList<int>{ 1, 2, 3, 4 }); | 262 | QTimer *timer = new QTimer(); |
183 | future.setFinished(); | 263 | QObject::connect(timer, &QTimer::timeout, |
264 | [&future]() { | ||
265 | future.setValue({ 1, 2, 3, 4 }); | ||
266 | future.setFinished(); | ||
267 | }); | ||
268 | QObject::connect(timer, &QTimer::timeout, | ||
269 | timer, &QObject::deleteLater); | ||
270 | timer->setSingleShot(true); | ||
271 | timer->start(0); | ||
184 | }) | 272 | }) |
185 | .reduce<int, QList<int>>( | 273 | .reduce<int, QList<int>>( |
186 | [](const QList<int> &list, Async::Future<int> &future) { | 274 | [](const QList<int> &list, Async::Future<int> &future) { |
275 | QTimer *timer = new QTimer(); | ||
276 | QObject::connect(timer, &QTimer::timeout, | ||
277 | [list, &future]() { | ||
278 | int sum = 0; | ||
279 | for (int i : list) sum += i; | ||
280 | future.setValue(sum); | ||
281 | future.setFinished(); | ||
282 | }); | ||
283 | QObject::connect(timer, &QTimer::timeout, | ||
284 | timer, &QObject::deleteLater); | ||
285 | timer->setSingleShot(true); | ||
286 | timer->start(0); | ||
287 | }); | ||
288 | |||
289 | Async::Future<int> future = job.exec(); | ||
290 | future.waitForFinished(); | ||
291 | |||
292 | QVERIFY(future.isFinished()); | ||
293 | QCOMPARE(future.value(), 10); | ||
294 | } | ||
295 | |||
296 | void AsyncTest::testSyncReduce() | ||
297 | { | ||
298 | auto job = Async::start<QList<int>>( | ||
299 | []() -> QList<int> { | ||
300 | return { 1, 2, 3, 4 }; | ||
301 | }) | ||
302 | .reduce<int, QList<int>>( | ||
303 | [](const QList<int> &list) -> int { | ||
187 | int sum = 0; | 304 | int sum = 0; |
188 | for (int i : list) sum += i; | 305 | for (int i : list) sum += i; |
189 | future.setValue(sum); | 306 | return sum; |
190 | future.setFinished(); | ||
191 | }); | 307 | }); |
192 | 308 | ||
193 | Async::Future<int> future = job.exec(); | 309 | Async::Future<int> future = job.exec(); |
@@ -213,7 +329,7 @@ void AsyncTest::testErrorHandler() | |||
213 | ); | 329 | ); |
214 | auto future = job.exec(); | 330 | auto future = job.exec(); |
215 | future.waitForFinished(); | 331 | future.waitForFinished(); |
216 | QVERIFY(error == 1); | 332 | QCOMPARE(error, 1); |
217 | QVERIFY(future.isFinished()); | 333 | QVERIFY(future.isFinished()); |
218 | } | 334 | } |
219 | 335 | ||
diff --git a/async/src/async.h b/async/src/async.h index d15373b..336bae2 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -33,8 +33,6 @@ | |||
33 | 33 | ||
34 | 34 | ||
35 | /* | 35 | /* |
36 | * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation. | ||
37 | * Useful for typical value consumer continuations. | ||
38 | * TODO: error continuation on .then and others. | 36 | * TODO: error continuation on .then and others. |
39 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally | 37 | * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally |
40 | */ | 38 | */ |
@@ -47,13 +45,19 @@ class JobBase; | |||
47 | 45 | ||
48 | template<typename Out, typename ... In> | 46 | template<typename Out, typename ... In> |
49 | class Job; | 47 | class Job; |
50 | |||
51 | template<typename Out, typename ... In> | 48 | template<typename Out, typename ... In> |
52 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; | 49 | using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; |
50 | template<typename Out, typename ... In> | ||
51 | using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type; | ||
53 | template<typename Out, typename In> | 52 | template<typename Out, typename In> |
54 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | 53 | using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; |
55 | template<typename Out, typename In> | 54 | template<typename Out, typename In> |
55 | using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type; | ||
56 | template<typename Out, typename In> | ||
56 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; | 57 | using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; |
58 | template<typename Out, typename In> | ||
59 | using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type; | ||
60 | |||
57 | using ErrorHandler = std::function<void(int, const QString &)>; | 61 | using ErrorHandler = std::function<void(int, const QString &)>; |
58 | 62 | ||
59 | namespace Private | 63 | namespace Private |
@@ -134,6 +138,33 @@ public: | |||
134 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); | 138 | ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); |
135 | }; | 139 | }; |
136 | 140 | ||
141 | template<typename Out, typename ... In> | ||
142 | class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...> | ||
143 | { | ||
144 | public: | ||
145 | SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
146 | void previousFutureReady(); | ||
147 | protected: | ||
148 | SyncThenTask<Out, In ...> mFunc; | ||
149 | }; | ||
150 | |||
151 | template<typename Out, typename In> | ||
152 | class SyncReduceExecutor : public SyncThenExecutor<Out, In> | ||
153 | { | ||
154 | public: | ||
155 | SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
156 | }; | ||
157 | |||
158 | template<typename PrevOut, typename Out, typename In> | ||
159 | class SyncEachExecutor : public Executor<PrevOut, Out, In> | ||
160 | { | ||
161 | public: | ||
162 | SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr()); | ||
163 | void previousFutureReady(); | ||
164 | protected: | ||
165 | SyncEachTask<Out, In> mFunc; | ||
166 | }; | ||
167 | |||
137 | } // namespace Private | 168 | } // namespace Private |
138 | 169 | ||
139 | /** | 170 | /** |
@@ -238,6 +269,9 @@ class Job : public JobBase | |||
238 | template<typename OutOther> | 269 | template<typename OutOther> |
239 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); | 270 | friend Job<OutOther> start(Async::ThenTask<OutOther> func); |
240 | 271 | ||
272 | template<typename OutOther> | ||
273 | friend Job<OutOther> start(Async::SyncThenTask<OutOther> func); | ||
274 | |||
241 | public: | 275 | public: |
242 | template<typename OutOther, typename ... InOther> | 276 | template<typename OutOther, typename ... InOther> |
243 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | 277 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) |
@@ -246,28 +280,45 @@ public: | |||
246 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | 280 | new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); |
247 | } | 281 | } |
248 | 282 | ||
283 | template<typename OutOther, typename ... InOther> | ||
284 | Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) | ||
285 | { | ||
286 | return Job<OutOther, InOther ...>(Private::ExecutorBasePtr( | ||
287 | new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | ||
288 | } | ||
289 | |||
249 | template<typename OutOther, typename InOther> | 290 | template<typename OutOther, typename InOther> |
250 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) | 291 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) |
251 | { | 292 | { |
252 | static_assert(detail::isIterable<Out>::value, | 293 | eachInvariants<OutOther>(); |
253 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
254 | static_assert(detail::isIterable<OutOther>::value, | ||
255 | "The result type of 'Each' task must be a list or an array."); | ||
256 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 294 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
257 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); | 295 | new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); |
258 | } | 296 | } |
259 | 297 | ||
260 | template<typename OutOther, typename InOther> | 298 | template<typename OutOther, typename InOther> |
299 | Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func) | ||
300 | { | ||
301 | eachInvariants<OutOther>(); | ||
302 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
303 | new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor))); | ||
304 | } | ||
305 | |||
306 | template<typename OutOther, typename InOther> | ||
261 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) | 307 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) |
262 | { | 308 | { |
263 | static_assert(Async::detail::isIterable<Out>::value, | 309 | reduceInvariants<InOther>(); |
264 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
265 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
266 | "The return type of previous task must be compatible with input type of this task"); | ||
267 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | 310 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( |
268 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); | 311 | new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); |
269 | } | 312 | } |
270 | 313 | ||
314 | template<typename OutOther, typename InOther> | ||
315 | Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func) | ||
316 | { | ||
317 | reduceInvariants<InOther>(); | ||
318 | return Job<OutOther, InOther>(Private::ExecutorBasePtr( | ||
319 | new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor))); | ||
320 | } | ||
321 | |||
271 | Async::Future<Out> exec() | 322 | Async::Future<Out> exec() |
272 | { | 323 | { |
273 | mExecutor->exec(); | 324 | mExecutor->exec(); |
@@ -283,6 +334,24 @@ private: | |||
283 | Job(Private::ExecutorBasePtr executor) | 334 | Job(Private::ExecutorBasePtr executor) |
284 | : JobBase(executor) | 335 | : JobBase(executor) |
285 | {} | 336 | {} |
337 | |||
338 | template<typename OutOther> | ||
339 | void eachInvariants() | ||
340 | { | ||
341 | static_assert(detail::isIterable<Out>::value, | ||
342 | "The 'Each' task can only be connected to a job that returns a list or an array."); | ||
343 | static_assert(detail::isIterable<OutOther>::value, | ||
344 | "The result type of 'Each' task must be a list or an array."); | ||
345 | } | ||
346 | |||
347 | template<typename InOther> | ||
348 | void reduceInvariants() | ||
349 | { | ||
350 | static_assert(Async::detail::isIterable<Out>::value, | ||
351 | "The 'Result' task can only be connected to a job that returns a list or an array"); | ||
352 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
353 | "The return type of previous task must be compatible with input type of this task"); | ||
354 | } | ||
286 | }; | 355 | }; |
287 | 356 | ||
288 | } // namespace Async | 357 | } // namespace Async |
@@ -298,6 +367,12 @@ Job<Out> start(ThenTask<Out> func) | |||
298 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); | 367 | return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); |
299 | } | 368 | } |
300 | 369 | ||
370 | template<typename Out> | ||
371 | Job<Out> start(SyncThenTask<Out> func) | ||
372 | { | ||
373 | return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func))); | ||
374 | } | ||
375 | |||
301 | namespace Private { | 376 | namespace Private { |
302 | 377 | ||
303 | template<typename PrevOut, typename Out, typename ... In> | 378 | template<typename PrevOut, typename Out, typename ... In> |
@@ -378,22 +453,23 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady() | |||
378 | } | 453 | } |
379 | 454 | ||
380 | for (auto arg : this->mPrevFuture->value()) { | 455 | for (auto arg : this->mPrevFuture->value()) { |
381 | Async::Future<Out> future; | 456 | auto future = new Async::Future<Out>; |
382 | this->mFunc(arg, future); | 457 | this->mFunc(arg, *future); |
383 | auto fw = new Async::FutureWatcher<Out>(); | 458 | auto fw = new Async::FutureWatcher<Out>(); |
384 | mFutureWatchers.append(fw); | 459 | mFutureWatchers.append(fw); |
385 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | 460 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, |
386 | [out, future, fw, this]() { | 461 | [out, future, fw, this]() { |
387 | assert(future.isFinished()); | 462 | assert(future->isFinished()); |
388 | const int index = mFutureWatchers.indexOf(fw); | 463 | const int index = mFutureWatchers.indexOf(fw); |
389 | assert(index > -1); | 464 | assert(index > -1); |
390 | mFutureWatchers.removeAt(index); | 465 | mFutureWatchers.removeAt(index); |
391 | out->setValue(out->value() + future.value()); | 466 | out->setValue(out->value() + future->value()); |
467 | delete future; | ||
392 | if (mFutureWatchers.isEmpty()) { | 468 | if (mFutureWatchers.isEmpty()) { |
393 | out->setFinished(); | 469 | out->setFinished(); |
394 | } | 470 | } |
395 | }); | 471 | }); |
396 | fw->setFuture(future); | 472 | fw->setFuture(*future); |
397 | } | 473 | } |
398 | } | 474 | } |
399 | 475 | ||
@@ -403,6 +479,68 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const Execut | |||
403 | { | 479 | { |
404 | } | 480 | } |
405 | 481 | ||
482 | template<typename Out, typename ... In> | ||
483 | SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) | ||
484 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | ||
485 | { | ||
486 | this->mFunc = then; | ||
487 | this->mErrorFunc = errorHandler; | ||
488 | } | ||
489 | |||
490 | template<typename Out, typename ... In> | ||
491 | void SyncThenExecutor<Out, In ...>::previousFutureReady() | ||
492 | { | ||
493 | if (this->mPrevFuture) { | ||
494 | assert(this->mPrevFuture->isFinished()); | ||
495 | } | ||
496 | |||
497 | if (this->mPrevFuture && this->mPrevFuture->errorCode()) { | ||
498 | if (this->mErrorFunc) { | ||
499 | this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
500 | this->mResult->setFinished(); | ||
501 | } else { | ||
502 | static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); | ||
503 | //propagate error if no error handler is available | ||
504 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
505 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
506 | this->mResult->setFinished(); | ||
507 | } | ||
508 | } else { | ||
509 | Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); | ||
510 | static_cast<Async::Future<Out>*>(this->mResult)->setValue(result); | ||
511 | this->mResult->setFinished(); | ||
512 | } | ||
513 | } | ||
514 | |||
515 | template<typename PrevOut, typename Out, typename In> | ||
516 | SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent) | ||
517 | : Executor<PrevOut, Out, In>(parent) | ||
518 | { | ||
519 | this->mFunc = each; | ||
520 | } | ||
521 | |||
522 | template<typename PrevOut, typename Out, typename In> | ||
523 | void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady() | ||
524 | { | ||
525 | assert(this->mPrevFuture->isFinished()); | ||
526 | auto out = static_cast<Async::Future<Out>*>(this->mResult); | ||
527 | if (this->mPrevFuture->value().isEmpty()) { | ||
528 | out->setFinished(); | ||
529 | return; | ||
530 | } | ||
531 | |||
532 | for (auto arg : this->mPrevFuture->value()) { | ||
533 | out->setValue(out->value() + this->mFunc(arg)); | ||
534 | } | ||
535 | out->setFinished(); | ||
536 | } | ||
537 | |||
538 | template<typename Out, typename In> | ||
539 | SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent) | ||
540 | : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent) | ||
541 | { | ||
542 | } | ||
543 | |||
406 | 544 | ||
407 | } // namespace Private | 545 | } // namespace Private |
408 | 546 | ||