summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--async/autotests/asynctest.cpp201
-rw-r--r--async/src/async.h35
-rw-r--r--async/src/async_impl.h15
-rw-r--r--common/clientapi.h9
4 files changed, 181 insertions, 79 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp
index ed550ca..73026bb 100644
--- a/async/autotests/asynctest.cpp
+++ b/async/autotests/asynctest.cpp
@@ -47,12 +47,46 @@ private Q_SLOTS:
47 47
48 void testAsyncThen(); 48 void testAsyncThen();
49 void testSyncThen(); 49 void testSyncThen();
50 void testJoinedThen();
51
50 void testAsyncEach(); 52 void testAsyncEach();
51 void testSyncEach(); 53 void testSyncEach();
54 void testJoinedEach();
55
52 void testAsyncReduce(); 56 void testAsyncReduce();
53 void testSyncReduce(); 57 void testSyncReduce();
58 void testJoinedReduce();
59
54 void testErrorHandler(); 60 void testErrorHandler();
55 61
62 void benchmarkSyncThenExecutor();
63
64private:
65 template<typename T>
66 class AsyncSimulator {
67 public:
68 AsyncSimulator(Async::Future<T> &future, const T &result)
69 : mFuture(future)
70 , mResult(result)
71 {
72 QObject::connect(&mTimer, &QTimer::timeout,
73 [this]() {
74 mFuture.setValue(mResult);
75 mFuture.setFinished();
76 });
77 QObject::connect(&mTimer, &QTimer::timeout,
78 [this]() {
79 delete this;
80 });
81 mTimer.setSingleShot(true);
82 mTimer.start(200);
83 }
84
85 private:
86 Async::Future<T> mFuture;
87 T mResult;
88 QTimer mTimer;
89 };
56}; 90};
57 91
58 92
@@ -86,16 +120,7 @@ void AsyncTest::testAsyncPromises()
86{ 120{
87 auto job = Async::start<int>( 121 auto job = Async::start<int>(
88 [](Async::Future<int> &future) { 122 [](Async::Future<int> &future) {
89 QTimer *timer = new QTimer(); 123 new AsyncSimulator<int>(future, 42);
90 QObject::connect(timer, &QTimer::timeout,
91 [&]() {
92 future.setValue(42);
93 future.setFinished();
94 });
95 QObject::connect(timer, &QTimer::timeout,
96 timer, &QObject::deleteLater);
97 timer->setSingleShot(true);
98 timer->start(200);
99 }); 124 });
100 125
101 Async::Future<int> future = job.exec(); 126 Async::Future<int> future = job.exec();
@@ -110,16 +135,7 @@ void AsyncTest::testAsyncPromises2()
110 135
111 auto job = Async::start<int>( 136 auto job = Async::start<int>(
112 [](Async::Future<int> &future) { 137 [](Async::Future<int> &future) {
113 QTimer *timer = new QTimer(); 138 new AsyncSimulator<int>(future, 42);
114 QObject::connect(timer, &QTimer::timeout,
115 [&]() {
116 future.setValue(42);
117 future.setFinished();
118 });
119 QObject::connect(timer, &QTimer::timeout,
120 timer, &QObject::deleteLater);
121 timer->setSingleShot(true);
122 timer->start(200);
123 } 139 }
124 ).then<int, int>([&done](int result, Async::Future<int> &future) { 140 ).then<int, int>([&done](int result, Async::Future<int> &future) {
125 done = true; 141 done = true;
@@ -139,16 +155,7 @@ void AsyncTest::testNestedAsync()
139 auto job = Async::start<int>( 155 auto job = Async::start<int>(
140 [](Async::Future<int> &future) { 156 [](Async::Future<int> &future) {
141 auto innerJob = Async::start<int>([](Async::Future<int> &innerFuture) { 157 auto innerJob = Async::start<int>([](Async::Future<int> &innerFuture) {
142 QTimer *timer = new QTimer(); 158 new AsyncSimulator<int>(innerFuture, 42);
143 QObject::connect(timer, &QTimer::timeout,
144 [&]() {
145 innerFuture.setValue(42);
146 innerFuture.setFinished();
147 });
148 QObject::connect(timer, &QTimer::timeout,
149 timer, &QObject::deleteLater);
150 timer->setSingleShot(true);
151 timer->start(0);
152 }).then<void>([&future](Async::Future<void> &innerThenFuture) { 159 }).then<void>([&future](Async::Future<void> &innerThenFuture) {
153 future.setFinished(); 160 future.setFinished();
154 innerThenFuture.setFinished(); 161 innerThenFuture.setFinished();
@@ -186,16 +193,7 @@ void AsyncTest::testAsyncThen()
186{ 193{
187 auto job = Async::start<int>( 194 auto job = Async::start<int>(
188 [](Async::Future<int> &future) { 195 [](Async::Future<int> &future) {
189 QTimer *timer = new QTimer; 196 new AsyncSimulator<int>(future, 42);
190 QObject::connect(timer, &QTimer::timeout,
191 [&]() {
192 future.setValue(42);
193 future.setFinished();
194 });
195 QObject::connect(timer, &QTimer::timeout,
196 timer, &QObject::deleteLater);
197 timer->setSingleShot(true);
198 timer->start(0);
199 }); 197 });
200 198
201 auto future = job.exec(); 199 auto future = job.exec();
@@ -221,33 +219,37 @@ void AsyncTest::testSyncThen()
221 QCOMPARE(future.value(), 84); 219 QCOMPARE(future.value(), 84);
222} 220}
223 221
222void AsyncTest::testJoinedThen()
223{
224 auto job1 = Async::start<int, int>(
225 [](int in, Async::Future<int> &future) {
226 new AsyncSimulator<int>(future, in * 2);
227 });
228
229 auto job2 = Async::start<int>(
230 [](Async::Future<int> &future) {
231 new AsyncSimulator<int>(future, 42);
232 })
233 .then<int>(job1);
234
235 auto future = job2.exec();
236 future.waitForFinished();
237
238 QVERIFY(future.isFinished());
239 QCOMPARE(future.value(), 84);
240}
241
242
243
224void AsyncTest::testAsyncEach() 244void AsyncTest::testAsyncEach()
225{ 245{
226 auto job = Async::start<QList<int>>( 246 auto job = Async::start<QList<int>>(
227 [](Async::Future<QList<int>> &future) { 247 [](Async::Future<QList<int>> &future) {
228 QTimer *timer = new QTimer; 248 new AsyncSimulator<QList<int>>(future, { 1, 2, 3, 4 });
229 QObject::connect(timer, &QTimer::timeout,
230 [&future]() {
231 future.setValue({ 1, 2, 3, 4 });
232 future.setFinished();
233 });
234 QObject::connect(timer, &QTimer::timeout,
235 timer, &QObject::deleteLater);
236 timer->setSingleShot(true);
237 timer->start(0);
238 }) 249 })
239 .each<QList<int>, int>( 250 .each<QList<int>, int>(
240 [](const int &v, Async::Future<QList<int>> &future) { 251 [](const int &v, Async::Future<QList<int>> &future) {
241 QTimer *timer = new QTimer; 252 new AsyncSimulator<QList<int>>(future, { v + 1 });
242 QObject::connect(timer, &QTimer::timeout,
243 [v, &future]() {
244 future.setValue({ v + 1 });
245 future.setFinished();
246 });
247 QObject::connect(timer, &QTimer::timeout,
248 timer, &QObject::deleteLater);
249 timer->setSingleShot(true);
250 timer->start(0);
251 }); 253 });
252 254
253 auto future = job.exec(); 255 auto future = job.exec();
@@ -258,7 +260,6 @@ void AsyncTest::testAsyncEach()
258 QCOMPARE(future.value(), expected); 260 QCOMPARE(future.value(), expected);
259} 261}
260 262
261
262void AsyncTest::testSyncEach() 263void AsyncTest::testSyncEach()
263{ 264{
264 auto job = Async::start<QList<int>>( 265 auto job = Async::start<QList<int>>(
@@ -277,20 +278,35 @@ void AsyncTest::testSyncEach()
277 QCOMPARE(future.value(), expected); 278 QCOMPARE(future.value(), expected);
278} 279}
279 280
281void AsyncTest::testJoinedEach()
282{
283 auto job1 = Async::start<QList<int>, int>(
284 [](int v, Async::Future<QList<int>> &future) {
285 new AsyncSimulator<QList<int>>(future, { v * 2 });
286 });
287
288 auto job = Async::start<QList<int>>(
289 []() -> QList<int> {
290 return { 1, 2, 3, 4 };
291 })
292 .each(job1);
293
294 auto future = job.exec();
295 future.waitForFinished();
296
297 const QList<int> expected({ 2, 4, 6, 8 });
298 QVERIFY(future.isFinished());
299 QCOMPARE(future.value(), expected);
300}
301
302
303
304
280void AsyncTest::testAsyncReduce() 305void AsyncTest::testAsyncReduce()
281{ 306{
282 auto job = Async::start<QList<int>>( 307 auto job = Async::start<QList<int>>(
283 [](Async::Future<QList<int>> &future) { 308 [](Async::Future<QList<int>> &future) {
284 QTimer *timer = new QTimer(); 309 new AsyncSimulator<QList<int>>(future, { 1, 2, 3, 4 });
285 QObject::connect(timer, &QTimer::timeout,
286 [&future]() {
287 future.setValue({ 1, 2, 3, 4 });
288 future.setFinished();
289 });
290 QObject::connect(timer, &QTimer::timeout,
291 timer, &QObject::deleteLater);
292 timer->setSingleShot(true);
293 timer->start(0);
294 }) 310 })
295 .reduce<int, QList<int>>( 311 .reduce<int, QList<int>>(
296 [](const QList<int> &list, Async::Future<int> &future) { 312 [](const QList<int> &list, Async::Future<int> &future) {
@@ -334,6 +350,32 @@ void AsyncTest::testSyncReduce()
334 QCOMPARE(future.value(), 10); 350 QCOMPARE(future.value(), 10);
335} 351}
336 352
353
354void AsyncTest::testJoinedReduce()
355{
356 auto job1 = Async::start<int, QList<int>>(
357 [](const QList<int> &list, Async::Future<int> &future) {
358 int sum = 0;
359 for (int i : list) sum += i;
360 new AsyncSimulator<int>(future, sum);
361 });
362
363 auto job = Async::start<QList<int>>(
364 []() -> QList<int> {
365 return { 1, 2, 3, 4 };
366 })
367 .reduce(job1);
368
369 auto future = job.exec();
370 future.waitForFinished();
371
372 QVERIFY(future.isFinished());
373 QCOMPARE(future.value(), 10);
374}
375
376
377
378
337void AsyncTest::testErrorHandler() 379void AsyncTest::testErrorHandler()
338{ 380{
339 int error = 0; 381 int error = 0;
@@ -356,6 +398,23 @@ void AsyncTest::testErrorHandler()
356} 398}
357 399
358 400
401
402
403
404void AsyncTest::benchmarkSyncThenExecutor()
405{
406 auto job = Async::start<int>(
407 []() -> int {
408 return 0;
409 });
410
411 QBENCHMARK {
412 job.exec();
413 }
414}
415
416
417
359QTEST_MAIN(AsyncTest); 418QTEST_MAIN(AsyncTest);
360 419
361#include "asynctest.moc" 420#include "asynctest.moc"
diff --git a/async/src/async.h b/async/src/async.h
index 386722a..d21caf8 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -291,6 +291,12 @@ public:
291 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); 291 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
292 } 292 }
293 293
294 template<typename OutOther, typename ... InOther>
295 Job<OutOther, InOther ...> then(Job<OutOther, InOther ...> otherJob, ErrorHandler errorFunc = ErrorHandler())
296 {
297 return then<OutOther, InOther ...>(nestedJobWrapper<OutOther, InOther ...>(otherJob), errorFunc);
298 }
299
294 template<typename OutOther, typename InOther> 300 template<typename OutOther, typename InOther>
295 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) 301 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
296 { 302 {
@@ -308,6 +314,13 @@ public:
308 } 314 }
309 315
310 template<typename OutOther, typename InOther> 316 template<typename OutOther, typename InOther>
317 Job<OutOther, InOther> each(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler())
318 {
319 eachInvariants<OutOther>();
320 return each<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc);
321 }
322
323 template<typename OutOther, typename InOther>
311 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) 324 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
312 { 325 {
313 reduceInvariants<InOther>(); 326 reduceInvariants<InOther>();
@@ -323,6 +336,12 @@ public:
323 new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); 336 new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor)));
324 } 337 }
325 338
339 template<typename OutOther, typename InOther>
340 Job<OutOther, InOther> reduce(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler())
341 {
342 return reduce<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc);
343 }
344
326 template<typename FirstIn> 345 template<typename FirstIn>
327 Async::Future<Out> exec(FirstIn in) 346 Async::Future<Out> exec(FirstIn in)
328 { 347 {
@@ -377,6 +396,22 @@ private:
377 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, 396 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
378 "The return type of previous task must be compatible with input type of this task"); 397 "The return type of previous task must be compatible with input type of this task");
379 } 398 }
399
400 template<typename OutOther, typename ... InOther>
401 inline std::function<void(InOther ..., Async::Future<OutOther>&)> nestedJobWrapper(Job<OutOther, InOther ...> otherJob) {
402 return [otherJob](InOther ... in, Async::Future<OutOther> &future) {
403 // copy by value is const
404 auto job = otherJob;
405 FutureWatcher<OutOther> *watcher = new FutureWatcher<OutOther>();
406 QObject::connect(watcher, &FutureWatcherBase::futureReady,
407 [watcher, &future]() {
408 Async::detail::copyFutureValue(watcher->future(), future);
409 future.setFinished();
410 watcher->deleteLater();
411 });
412 watcher->setFuture(job.exec(in ...));
413 };
414 }
380}; 415};
381 416
382} // namespace Async 417} // namespace Async
diff --git a/async/src/async_impl.h b/async/src/async_impl.h
index 58f6ced..eccbc9b 100644
--- a/async/src/async_impl.h
+++ b/async/src/async_impl.h
@@ -19,6 +19,7 @@
19#define ASYNC_IMPL_H 19#define ASYNC_IMPL_H
20 20
21#include "async.h" 21#include "async.h"
22#include <type_traits>
22 23
23namespace Async { 24namespace Async {
24 25
@@ -45,6 +46,20 @@ struct prevOut {
45 using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type; 46 using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type;
46}; 47};
47 48
49template<typename T>
50inline typename std::enable_if<!std::is_void<T>::value, void>::type
51copyFutureValue(const Async::Future<T> &in, Async::Future<T> &out)
52{
53 out.setValue(in.value());
54}
55
56template<typename T>
57inline typename std::enable_if<std::is_void<T>::value, void>::type
58copyFutureValue(const Async::Future<T> &in, Async::Future<T> &out)
59{
60 // noop
61}
62
48} // namespace Detail 63} // namespace Detail
49 64
50} // namespace Async 65} // namespace Async
diff --git a/common/clientapi.h b/common/clientapi.h
index c1404da..aa47802 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -421,14 +421,7 @@ public:
421 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. 421 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive.
422 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); 422 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1);
423 //We copy the facade pointer to keep it alive 423 //We copy the facade pointer to keep it alive
424 //TODO JOBAPI: we should be able to just do, job = job.then(facade->load(..)) 424 job = job.then(facade->load(query, addCallback));
425 job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) {
426 Async::Job<void> j = facade->load(query, addCallback);
427 j.then<void>([&future, facade](Async::Future<void> &f) {
428 future.setFinished();
429 f.setFinished();
430 }).exec();
431 });
432 } 425 }
433 job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) { 426 job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) {
434 qDebug() << "Query complete"; 427 qDebug() << "Query complete";