summaryrefslogtreecommitdiffstats
path: root/async
diff options
context:
space:
mode:
Diffstat (limited to 'async')
-rw-r--r--async/autotests/asynctest.cpp138
-rw-r--r--async/src/async.h170
2 files changed, 281 insertions, 27 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp
index 9240d28..4ebe65e 100644
--- a/async/autotests/asynctest.cpp
+++ b/async/autotests/asynctest.cpp
@@ -42,9 +42,14 @@ private Q_SLOTS:
42 void testAsyncPromises(); 42 void testAsyncPromises();
43 void testAsyncPromises2(); 43 void testAsyncPromises2();
44 void testNestedAsync(); 44 void testNestedAsync();
45 void testAsyncThen();
46 void testSyncThen();
47 void testAsyncEach();
45 void testSyncEach(); 48 void testSyncEach();
49 void testAsyncReduce();
46 void testSyncReduce(); 50 void testSyncReduce();
47 void testErrorHandler(); 51 void testErrorHandler();
52
48}; 53};
49 54
50void AsyncTest::testSyncPromises() 55void AsyncTest::testSyncPromises()
@@ -155,17 +160,92 @@ void AsyncTest::testNestedAsync()
155 QTRY_VERIFY(done); 160 QTRY_VERIFY(done);
156} 161}
157 162
158void AsyncTest::testSyncEach() 163void AsyncTest::testAsyncThen()
164{
165 auto job = Async::start<int>(
166 [](Async::Future<int> &future) {
167 QTimer *timer = new QTimer;
168 QObject::connect(timer, &QTimer::timeout,
169 [&]() {
170 future.setValue(42);
171 future.setFinished();
172 });
173 QObject::connect(timer, &QTimer::timeout,
174 timer, &QObject::deleteLater);
175 timer->setSingleShot(true);
176 timer->start(0);
177 });
178
179 auto future = job.exec();
180 future.waitForFinished();
181
182 QVERIFY(future.isFinished());
183 QCOMPARE(future.value(), 42);
184}
185
186
187void AsyncTest::testSyncThen()
188{
189 auto job = Async::start<int>(
190 []() -> int {
191 return 42;
192 }).then<int, int>(
193 [](int in) -> int {
194 return in * 2;
195 });
196
197 auto future = job.exec();
198 QVERIFY(future.isFinished());
199 QCOMPARE(future.value(), 84);
200}
201
202void AsyncTest::testAsyncEach()
159{ 203{
160 auto job = Async::start<QList<int>>( 204 auto job = Async::start<QList<int>>(
161 [](Async::Future<QList<int>> &future) { 205 [](Async::Future<QList<int>> &future) {
162 future.setValue(QList<int>{ 1, 2, 3, 4 }); 206 QTimer *timer = new QTimer;
163 future.setFinished(); 207 QObject::connect(timer, &QTimer::timeout,
208 [&future]() {
209 future.setValue({ 1, 2, 3, 4 });
210 future.setFinished();
211 });
212 QObject::connect(timer, &QTimer::timeout,
213 timer, &QObject::deleteLater);
214 timer->setSingleShot(true);
215 timer->start(0);
164 }) 216 })
165 .each<QList<int>, int>( 217 .each<QList<int>, int>(
166 [](const int &v, Async::Future<QList<int>> &future) { 218 [](const int &v, Async::Future<QList<int>> &future) {
167 future.setValue(QList<int>{ v + 1 }); 219 QTimer *timer = new QTimer;
168 future.setFinished(); 220 QObject::connect(timer, &QTimer::timeout,
221 [v, &future]() {
222 future.setValue({ v + 1 });
223 future.setFinished();
224 });
225 QObject::connect(timer, &QTimer::timeout,
226 timer, &QObject::deleteLater);
227 timer->setSingleShot(true);
228 timer->start(0);
229 });
230
231 auto future = job.exec();
232 future.waitForFinished();
233
234 const QList<int> expected({ 2, 3, 4, 5 });
235 QVERIFY(future.isFinished());
236 QCOMPARE(future.value(), expected);
237}
238
239
240void AsyncTest::testSyncEach()
241{
242 auto job = Async::start<QList<int>>(
243 []() -> QList<int> {
244 return { 1, 2, 3, 4 };
245 })
246 .each<QList<int>, int>(
247 [](const int &v) -> QList<int> {
248 return { v + 1 };
169 }); 249 });
170 250
171 Async::Future<QList<int>> future = job.exec(); 251 Async::Future<QList<int>> future = job.exec();
@@ -175,19 +255,55 @@ void AsyncTest::testSyncEach()
175 QCOMPARE(future.value(), expected); 255 QCOMPARE(future.value(), expected);
176} 256}
177 257
178void AsyncTest::testSyncReduce() 258void AsyncTest::testAsyncReduce()
179{ 259{
180 auto job = Async::start<QList<int>>( 260 auto job = Async::start<QList<int>>(
181 [](Async::Future<QList<int>> &future) { 261 [](Async::Future<QList<int>> &future) {
182 future.setValue(QList<int>{ 1, 2, 3, 4 }); 262 QTimer *timer = new QTimer();
183 future.setFinished(); 263 QObject::connect(timer, &QTimer::timeout,
264 [&future]() {
265 future.setValue({ 1, 2, 3, 4 });
266 future.setFinished();
267 });
268 QObject::connect(timer, &QTimer::timeout,
269 timer, &QObject::deleteLater);
270 timer->setSingleShot(true);
271 timer->start(0);
184 }) 272 })
185 .reduce<int, QList<int>>( 273 .reduce<int, QList<int>>(
186 [](const QList<int> &list, Async::Future<int> &future) { 274 [](const QList<int> &list, Async::Future<int> &future) {
275 QTimer *timer = new QTimer();
276 QObject::connect(timer, &QTimer::timeout,
277 [list, &future]() {
278 int sum = 0;
279 for (int i : list) sum += i;
280 future.setValue(sum);
281 future.setFinished();
282 });
283 QObject::connect(timer, &QTimer::timeout,
284 timer, &QObject::deleteLater);
285 timer->setSingleShot(true);
286 timer->start(0);
287 });
288
289 Async::Future<int> future = job.exec();
290 future.waitForFinished();
291
292 QVERIFY(future.isFinished());
293 QCOMPARE(future.value(), 10);
294}
295
296void AsyncTest::testSyncReduce()
297{
298 auto job = Async::start<QList<int>>(
299 []() -> QList<int> {
300 return { 1, 2, 3, 4 };
301 })
302 .reduce<int, QList<int>>(
303 [](const QList<int> &list) -> int {
187 int sum = 0; 304 int sum = 0;
188 for (int i : list) sum += i; 305 for (int i : list) sum += i;
189 future.setValue(sum); 306 return sum;
190 future.setFinished();
191 }); 307 });
192 308
193 Async::Future<int> future = job.exec(); 309 Async::Future<int> future = job.exec();
@@ -213,7 +329,7 @@ void AsyncTest::testErrorHandler()
213 ); 329 );
214 auto future = job.exec(); 330 auto future = job.exec();
215 future.waitForFinished(); 331 future.waitForFinished();
216 QVERIFY(error == 1); 332 QCOMPARE(error, 1);
217 QVERIFY(future.isFinished()); 333 QVERIFY(future.isFinished());
218} 334}
219 335
diff --git a/async/src/async.h b/async/src/async.h
index d15373b..336bae2 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -33,8 +33,6 @@
33 33
34 34
35/* 35/*
36 * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation.
37 * Useful for typical value consumer continuations.
38 * TODO: error continuation on .then and others. 36 * TODO: error continuation on .then and others.
39 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally 37 * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally
40 */ 38 */
@@ -47,13 +45,19 @@ class JobBase;
47 45
48template<typename Out, typename ... In> 46template<typename Out, typename ... In>
49class Job; 47class Job;
50
51template<typename Out, typename ... In> 48template<typename Out, typename ... In>
52using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type; 49using ThenTask = typename detail::identity<std::function<void(In ..., Async::Future<Out>&)>>::type;
50template<typename Out, typename ... In>
51using SyncThenTask = typename detail::identity<std::function<Out(In ...)>>::type;
53template<typename Out, typename In> 52template<typename Out, typename In>
54using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 53using EachTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
55template<typename Out, typename In> 54template<typename Out, typename In>
55using SyncEachTask = typename detail::identity<std::function<Out(In)>>::type;
56template<typename Out, typename In>
56using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 57using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
58template<typename Out, typename In>
59using SyncReduceTask = typename detail::identity<std::function<Out(In)>>::type;
60
57using ErrorHandler = std::function<void(int, const QString &)>; 61using ErrorHandler = std::function<void(int, const QString &)>;
58 62
59namespace Private 63namespace Private
@@ -134,6 +138,33 @@ public:
134 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent); 138 ReduceExecutor(ReduceTask<Out, In> reduce, const ExecutorBasePtr &parent);
135}; 139};
136 140
141template<typename Out, typename ... In>
142class SyncThenExecutor : public Executor<typename PreviousOut<In ...>::type, Out, In ...>
143{
144public:
145 SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr());
146 void previousFutureReady();
147protected:
148 SyncThenTask<Out, In ...> mFunc;
149};
150
151template<typename Out, typename In>
152class SyncReduceExecutor : public SyncThenExecutor<Out, In>
153{
154public:
155 SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent = ExecutorBasePtr());
156};
157
158template<typename PrevOut, typename Out, typename In>
159class SyncEachExecutor : public Executor<PrevOut, Out, In>
160{
161public:
162 SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent = ExecutorBasePtr());
163 void previousFutureReady();
164protected:
165 SyncEachTask<Out, In> mFunc;
166};
167
137} // namespace Private 168} // namespace Private
138 169
139/** 170/**
@@ -238,6 +269,9 @@ class Job : public JobBase
238 template<typename OutOther> 269 template<typename OutOther>
239 friend Job<OutOther> start(Async::ThenTask<OutOther> func); 270 friend Job<OutOther> start(Async::ThenTask<OutOther> func);
240 271
272 template<typename OutOther>
273 friend Job<OutOther> start(Async::SyncThenTask<OutOther> func);
274
241public: 275public:
242 template<typename OutOther, typename ... InOther> 276 template<typename OutOther, typename ... InOther>
243 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler()) 277 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
@@ -246,28 +280,45 @@ public:
246 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); 280 new Private::ThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
247 } 281 }
248 282
283 template<typename OutOther, typename ... InOther>
284 Job<OutOther, InOther ...> then(SyncThenTask<OutOther, InOther ...> func, ErrorHandler errorFunc = ErrorHandler())
285 {
286 return Job<OutOther, InOther ...>(Private::ExecutorBasePtr(
287 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
288 }
289
249 template<typename OutOther, typename InOther> 290 template<typename OutOther, typename InOther>
250 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) 291 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func)
251 { 292 {
252 static_assert(detail::isIterable<Out>::value, 293 eachInvariants<OutOther>();
253 "The 'Each' task can only be connected to a job that returns a list or an array.");
254 static_assert(detail::isIterable<OutOther>::value,
255 "The result type of 'Each' task must be a list or an array.");
256 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 294 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
257 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor))); 295 new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor)));
258 } 296 }
259 297
260 template<typename OutOther, typename InOther> 298 template<typename OutOther, typename InOther>
299 Job<OutOther, InOther> each(SyncEachTask<OutOther, InOther> func)
300 {
301 eachInvariants<OutOther>();
302 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
303 new Private::SyncEachExecutor<Out, OutOther, InOther>(func, mExecutor)));
304 }
305
306 template<typename OutOther, typename InOther>
261 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) 307 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func)
262 { 308 {
263 static_assert(Async::detail::isIterable<Out>::value, 309 reduceInvariants<InOther>();
264 "The 'Result' task can only be connected to a job that returns a list or an array");
265 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
266 "The return type of previous task must be compatible with input type of this task");
267 return Job<OutOther, InOther>(Private::ExecutorBasePtr( 310 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
268 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor))); 311 new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor)));
269 } 312 }
270 313
314 template<typename OutOther, typename InOther>
315 Job<OutOther, InOther> reduce(SyncReduceTask<OutOther, InOther> func)
316 {
317 reduceInvariants<InOther>();
318 return Job<OutOther, InOther>(Private::ExecutorBasePtr(
319 new Private::SyncReduceExecutor<OutOther, InOther>(func, mExecutor)));
320 }
321
271 Async::Future<Out> exec() 322 Async::Future<Out> exec()
272 { 323 {
273 mExecutor->exec(); 324 mExecutor->exec();
@@ -283,6 +334,24 @@ private:
283 Job(Private::ExecutorBasePtr executor) 334 Job(Private::ExecutorBasePtr executor)
284 : JobBase(executor) 335 : JobBase(executor)
285 {} 336 {}
337
338 template<typename OutOther>
339 void eachInvariants()
340 {
341 static_assert(detail::isIterable<Out>::value,
342 "The 'Each' task can only be connected to a job that returns a list or an array.");
343 static_assert(detail::isIterable<OutOther>::value,
344 "The result type of 'Each' task must be a list or an array.");
345 }
346
347 template<typename InOther>
348 void reduceInvariants()
349 {
350 static_assert(Async::detail::isIterable<Out>::value,
351 "The 'Result' task can only be connected to a job that returns a list or an array");
352 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
353 "The return type of previous task must be compatible with input type of this task");
354 }
286}; 355};
287 356
288} // namespace Async 357} // namespace Async
@@ -298,6 +367,12 @@ Job<Out> start(ThenTask<Out> func)
298 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func))); 367 return Job<Out>(Private::ExecutorBasePtr(new Private::ThenExecutor<Out>(func)));
299} 368}
300 369
370template<typename Out>
371Job<Out> start(SyncThenTask<Out> func)
372{
373 return Job<Out>(Private::ExecutorBasePtr(new Private::SyncThenExecutor<Out>(func)));
374}
375
301namespace Private { 376namespace Private {
302 377
303template<typename PrevOut, typename Out, typename ... In> 378template<typename PrevOut, typename Out, typename ... In>
@@ -378,22 +453,23 @@ void EachExecutor<PrevOut, Out, In>::previousFutureReady()
378 } 453 }
379 454
380 for (auto arg : this->mPrevFuture->value()) { 455 for (auto arg : this->mPrevFuture->value()) {
381 Async::Future<Out> future; 456 auto future = new Async::Future<Out>;
382 this->mFunc(arg, future); 457 this->mFunc(arg, *future);
383 auto fw = new Async::FutureWatcher<Out>(); 458 auto fw = new Async::FutureWatcher<Out>();
384 mFutureWatchers.append(fw); 459 mFutureWatchers.append(fw);
385 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, 460 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
386 [out, future, fw, this]() { 461 [out, future, fw, this]() {
387 assert(future.isFinished()); 462 assert(future->isFinished());
388 const int index = mFutureWatchers.indexOf(fw); 463 const int index = mFutureWatchers.indexOf(fw);
389 assert(index > -1); 464 assert(index > -1);
390 mFutureWatchers.removeAt(index); 465 mFutureWatchers.removeAt(index);
391 out->setValue(out->value() + future.value()); 466 out->setValue(out->value() + future->value());
467 delete future;
392 if (mFutureWatchers.isEmpty()) { 468 if (mFutureWatchers.isEmpty()) {
393 out->setFinished(); 469 out->setFinished();
394 } 470 }
395 }); 471 });
396 fw->setFuture(future); 472 fw->setFuture(*future);
397 } 473 }
398} 474}
399 475
@@ -403,6 +479,68 @@ ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, const Execut
403{ 479{
404} 480}
405 481
482template<typename Out, typename ... In>
483SyncThenExecutor<Out, In ...>::SyncThenExecutor(SyncThenTask<Out, In ...> then, ErrorHandler errorHandler, const ExecutorBasePtr &parent)
484 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent)
485{
486 this->mFunc = then;
487 this->mErrorFunc = errorHandler;
488}
489
490template<typename Out, typename ... In>
491void SyncThenExecutor<Out, In ...>::previousFutureReady()
492{
493 if (this->mPrevFuture) {
494 assert(this->mPrevFuture->isFinished());
495 }
496
497 if (this->mPrevFuture && this->mPrevFuture->errorCode()) {
498 if (this->mErrorFunc) {
499 this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
500 this->mResult->setFinished();
501 } else {
502 static_cast<Async::Future<Out>*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage());
503 //propagate error if no error handler is available
504 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
505 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
506 this->mResult->setFinished();
507 }
508 } else {
509 Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...);
510 static_cast<Async::Future<Out>*>(this->mResult)->setValue(result);
511 this->mResult->setFinished();
512 }
513}
514
515template<typename PrevOut, typename Out, typename In>
516SyncEachExecutor<PrevOut, Out, In>::SyncEachExecutor(SyncEachTask<Out, In> each, const ExecutorBasePtr &parent)
517 : Executor<PrevOut, Out, In>(parent)
518{
519 this->mFunc = each;
520}
521
522template<typename PrevOut, typename Out, typename In>
523void SyncEachExecutor<PrevOut, Out, In>::previousFutureReady()
524{
525 assert(this->mPrevFuture->isFinished());
526 auto out = static_cast<Async::Future<Out>*>(this->mResult);
527 if (this->mPrevFuture->value().isEmpty()) {
528 out->setFinished();
529 return;
530 }
531
532 for (auto arg : this->mPrevFuture->value()) {
533 out->setValue(out->value() + this->mFunc(arg));
534 }
535 out->setFinished();
536}
537
538template<typename Out, typename In>
539SyncReduceExecutor<Out, In>::SyncReduceExecutor(SyncReduceTask<Out, In> reduce, const ExecutorBasePtr &parent)
540 : SyncThenExecutor<Out, In>(reduce, ErrorHandler(), parent)
541{
542}
543
406 544
407} // namespace Private 545} // namespace Private
408 546