summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Vrátil <dvratil@redhat.com>2015-02-20 13:58:03 +0100
committerDan Vrátil <dvratil@redhat.com>2015-02-20 14:00:49 +0100
commit8e6f41f851ae058dea63fbc9b9f523ec9fd1a4fb (patch)
treecbe403f4aaa9498389ab0ddba6af611b99dac98c
parent1da08ebfe267313015c201fd1106f891af554e14 (diff)
downloadsink-8e6f41f851ae058dea63fbc9b9f523ec9fd1a4fb.tar.gz
sink-8e6f41f851ae058dea63fbc9b9f523ec9fd1a4fb.zip
Async: allow appending existing Job objects to the Job chain
Now it's possible to do something like Job<int, int> job = createSomeJob(); auto main = Async::start<int>(....).then(job); Previously the 'job' would have to be wrapped in a ThenTask-like lambda (which is what we still do internally), but with this new syntax it's possible to append another job chain to existing chain easilly. This syntax is available for all task types.
-rw-r--r--async/autotests/asynctest.cpp201
-rw-r--r--async/src/async.h35
-rw-r--r--async/src/async_impl.h15
-rw-r--r--common/clientapi.h9
4 files changed, 181 insertions, 79 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp
index ed550ca..73026bb 100644
--- a/async/autotests/asynctest.cpp
+++ b/async/autotests/asynctest.cpp
@@ -47,12 +47,46 @@ private Q_SLOTS:
47 47
48 void testAsyncThen(); 48 void testAsyncThen();
49 void testSyncThen(); 49 void testSyncThen();
50 void testJoinedThen();
51
50 void testAsyncEach(); 52 void testAsyncEach();
51 void testSyncEach(); 53 void testSyncEach();
54 void testJoinedEach();
55
52 void testAsyncReduce(); 56 void testAsyncReduce();
53 void testSyncReduce(); 57 void testSyncReduce();
58 void testJoinedReduce();
59
54 void testErrorHandler(); 60 void testErrorHandler();
55 61
62 void benchmarkSyncThenExecutor();
63
64private:
65 template<typename T>
66 class AsyncSimulator {
67 public:
68 AsyncSimulator(Async::Future<T> &future, const T &result)
69 : mFuture(future)
70 , mResult(result)
71 {
72 QObject::connect(&mTimer, &QTimer::timeout,
73 [this]() {
74 mFuture.setValue(mResult);
75 mFuture.setFinished();
76 });
77 QObject::connect(&mTimer, &QTimer::timeout,
78 [this]() {
79 delete this;
80 });
81 mTimer.setSingleShot(true);
82 mTimer.start(200);
83 }
84
85 private:
86 Async::Future<T> mFuture;
87 T mResult;
88 QTimer mTimer;
89 };
56}; 90};
57 91
58 92
@@ -86,16 +120,7 @@ void AsyncTest::testAsyncPromises()
86{ 120{
87 auto job = Async::start<int>( 121 auto job = Async::start<int>(
88 [](Async::Future<int> &future) { 122 [](Async::Future<int> &future) {
89 QTimer *timer = new QTimer(); 123 new AsyncSimulator<int>(future, 42);
90 QObject::connect(timer, &QTimer::timeout,
91 [&]() {
92 future.setValue(42);
93 future.setFinished();
94 });
95 QObject::connect(timer, &QTimer::timeout,
96 timer, &QObject::deleteLater);
97 timer->setSingleShot(true);
98 timer->start(200);
99 }); 124 });
100 125
101 Async::Future<int> future = job.exec(); 126 Async::Future<int> future = job.exec();
@@ -110,16 +135,7 @@ void AsyncTest::testAsyncPromises2()
110 135
111 auto job = Async::start<int>( 136 auto job = Async::start<int>(
112 [](Async::Future<int> &future) { 137 [](Async::Future<int> &future) {
113 QTimer *timer = new QTimer(); 138 new AsyncSimulator<int>(future, 42);
114 QObject::connect(timer, &QTimer::timeout,
115 [&]() {
116 future.setValue(42);
117 future.setFinished();
118 });
119 QObject::connect(timer, &QTimer::timeout,
120 timer, &QObject::deleteLater);
121 timer->setSingleShot(true);
122 timer->start(200);
123 } 139 }
124 ).then<int, int>([&done](int result, Async::Future<int> &future) { 140 ).then<int, int>([&done](int result, Async::Future<int> &future) {
125 done = true; 141 done = true;
@@ -139,16 +155,7 @@ void AsyncTest::testNestedAsync()
139 auto job = Async::start<int>( 155 auto job = Async::start<int>(
140 [](Async::Future<int> &future) { 156 [](Async::Future<int> &future) {
141 auto innerJob = Async::start<int>([](Async::Future<int> &innerFuture) { 157 auto innerJob = Async::start<int>([](Async::Future<int> &innerFuture) {
142 QTimer *timer = new QTimer(); 158 new AsyncSimulator<int>(innerFuture, 42);
143 QObject::connect(timer, &QTimer::timeout,
144 [&]() {
145 innerFuture.setValue(42);
146 innerFuture.setFinished();
147 });
148 QObject::connect(timer, &QTimer::timeout,
149 timer, &QObject::deleteLater);
150 timer->setSingleShot(true);
151 timer->start(0);
152 }).then<void>([&future](Async::Future<void> &innerThenFuture) { 159 }).then<void>([&future](Async::Future<void> &innerThenFuture) {
153 future.setFinished(); 160 future.setFinished();
154 innerThenFuture.setFinished(); 161 innerThenFuture.setFinished();
@@ -186,16 +193,7 @@ void AsyncTest::testAsyncThen()
186{ 193{
187 auto job = Async::start<int>( 194 auto job = Async::start<int>(
188 [](Async::Future<int> &future) { 195 [](Async::Future<int> &future) {
189 QTimer *timer = new QTimer; 196 new AsyncSimulator<int>(future, 42);
190 QObject::connect(timer, &QTimer::timeout,
191 [&]() {
192 future.setValue(42);
193 future.setFinished();
194 });
195 QObject::connect(timer, &QTimer::timeout,
196 timer, &QObject::deleteLater);
197 timer->setSingleShot(true);
198 timer->start(0);
199 }); 197 });
200 198
201 auto future = job.exec(); 199 auto future = job.exec();
@@ -221,33 +219,37 @@ void AsyncTest::testSyncThen()
221 QCOMPARE(future.value(), 84); 219 QCOMPARE(future.value(), 84);
222} 220}
223 221
222void AsyncTest::testJoinedThen()
223{
224 auto job1 = Async::start<int, int>(
225 [](int in, Async::Future<int> &future) {
226 new AsyncSimulator<int>(future, in * 2);
227 });
228
229 auto job2 = Async::start<int>(
230 [](Async::Future<int> &future) {
231 new AsyncSimulator<int>(future, 42);
232 })
233 .then<int>(job1);
234
235 auto future = job2.exec();
236 future.waitForFinished();
237
238 QVERIFY(future.isFinished());
239 QCOMPARE(future.value(), 84);
240}
241
242
243
224void AsyncTest::testAsyncEach() 244void AsyncTest::testAsyncEach()
225{ 245{
226 auto job = Async::start<QList<int>>( 246 auto job = Async::start<QList<int>>(
227 [](Async::Future<QList<int>> &future) { 247 [](Async::Future<QList<int>> &future) {
228 QTimer *timer = new QTimer; 248 new AsyncSimulator<QList<int>>(future, { 1, 2, 3, 4 });
229 QObject::connect(timer, &QTimer::timeout,
230 [&future]() {
231 future.setValue({ 1, 2, 3, 4 });
232 future.setFinished();
233 });
234 QObject::connect(timer, &QTimer::timeout,
235 timer, &QObject::deleteLater);
236 timer->setSingleShot(true);
237 timer->start(0);
238 }) 249 })
239 .each<QList<int>, int>( 250 .each<QList<int>, int>(
240 [](const int &v, Async::Future<QList<int>> &future) { 251 [](const int &v, Async::Future<QList<int>> &future) {
241 QTimer *timer = new QTimer; 252 new AsyncSimulator<QList<int>>(future, { v + 1 });
242 QObject::connect(timer, &QTimer::timeout,
243 [v, &future]() {
244 future.setValue({ v + 1 });
245 future.setFinished();
246 });
247 QObject::connect(timer, &QTimer::timeout,
248 timer, &QObject::deleteLater);
249 timer->setSingleShot(true);
250 timer->start(0);
251 }); 253 });
252 254
253 auto future = job.exec(); 255 auto future = job.exec();
@@ -258,7 +260,6 @@ void AsyncTest::testAsyncEach()
258 QCOMPARE(future.value(), expected); 260 QCOMPARE(future.value(), expected);
259} 261}
260 262
261
262void AsyncTest::testSyncEach() 263void AsyncTest::testSyncEach()
263{ 264{
264 auto job = Async::start<QList<int>>( 265 auto job = Async::start<QList<int>>(
@@ -277,20 +278,35 @@ void AsyncTest::testSyncEach()
277 QCOMPARE(future.value(), expected); 278 QCOMPARE(future.value(), expected);
278} 279}
279 280
281void AsyncTest::testJoinedEach()
282{
283 auto job1 = Async::start<QList<int>, int>(
284 [](int v, Async::Future<QList<int>> &future) {
285 new AsyncSimulator<QList<int>>(future, { v * 2 });
286 });
287
288 auto job = Async::start<QList<int>>(
289 []() -> QList<int> {
290 return { 1, 2, 3, 4 };
291 })
292 .each(job1);
293
294 auto future = job.exec();
295 future.waitForFinished();
296
297 const QList<int> expected({ 2, 4, 6, 8 });
298 QVERIFY(future.isFinished());
299 QCOMPARE(future.value(), expected);
300}
301
302
303
304
280void AsyncTest::testAsyncReduce() 305void AsyncTest::testAsyncReduce()
281{ 306{
282 auto job = Async::start<QList<int>>( 307 auto job = Async::start<QList<int>>(
283 [](Async::Future<QList<int>> &future) { 308 [](Async::Future<QList<int>> &future) {
284 QTimer *timer = new QTimer(); 309 new AsyncSimulator<QList<int>>(future, { 1, 2, 3, 4 });
285 QObject::connect(timer, &QTimer::timeout,
286 [&future]() {
287 future.setValue({ 1, 2, 3, 4 });
288 future.setFinished();
289 });
290 QObject::connect(timer, &QTimer::timeout,
291 timer, &QObject::deleteLater);
292 timer->setSingleShot(true);
293 timer->start(0);
294 }) 310 })
295 .reduce<int, QList<int>>( 311 .reduce<int, QList<int>>(
296 [](const QList<int> &list, Async::Future<int> &future) { 312 [](const QList<int> &list, Async::Future<int> &future) {
@@ -334,6 +350,32 @@ void AsyncTest::testSyncReduce()
334 QCOMPARE(future.value(), 10); 350 QCOMPARE(future.value(), 10);
335} 351}
336 352
353
354void AsyncTest::testJoinedReduce()
355{
356 auto job1 = Async::start<int, QList<int>>(
357 [](const QList<int> &list, Async::Future<int> &future) {
358 int sum = 0;
359 for (int i : list) sum += i;
360 new AsyncSimulator<int>(future, sum);
361 });
362
363 auto job = Async::start<QList<int>>(
364 []() -> QList<int> {
365 return { 1, 2, 3, 4 };
366 })
367 .reduce(job1);
368
369 auto future = job.exec();
370 future.waitForFinished();
371
372 QVERIFY(future.isFinished());
373 QCOMPARE(future.value(), 10);
374}
375
376
377
378
337void AsyncTest::testErrorHandler() 379void AsyncTest::testErrorHandler()
338{ 380{
339 int error = 0; 381 int error = 0;
@@ -356,6 +398,23 @@ void AsyncTest::testErrorHandler()
356} 398}
357 399
358 400
401
402
403
404void AsyncTest::benchmarkSyncThenExecutor()
405{
406 auto job = Async::start<int>(
407 []() -> int {
408 return 0;
409 });
410
411 QBENCHMARK {
412 job.exec();
413 }
414}
415
416
417
359QTEST_MAIN(AsyncTest); 418QTEST_MAIN(AsyncTest);
360 419
361#include "asynctest.moc" 420#include "asynctest.moc"
diff --git a/async/src/async.h b/async/src/async.h
index 386722a..d21caf8 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -291,6 +291,12 @@ public:
291 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); 291 new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor)));
292 } 292 }
293 293
294 template<typename OutOther, typename ... InOther>
295 Job<OutOther, InOther ...> then(Job<OutOther, InOther ...> otherJob, ErrorHandler errorFunc = ErrorHandler())
296 {
297 return then<OutOther, InOther ...>(nestedJobWrapper<OutOther, InOther ...>(otherJob), errorFunc);
298 }
299
294 template<typename OutOther, typename InOther> 300 template<typename OutOther, typename InOther>
295 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) 301 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
296 { 302 {
@@ -308,6 +314,13 @@ public:
308 } 314 }
309 315
310 template<typename OutOther, typename InOther> 316 template<typename OutOther, typename InOther>
317 Job<OutOther, InOther> each(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler())
318 {
319 eachInvariants<OutOther>();
320 return each<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc);
321 }
322
323 template<typename OutOther, typename InOther>
311 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) 324 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler())
312 { 325 {
313 reduceInvariants<InOther>(); 326 reduceInvariants<InOther>();
@@ -323,6 +336,12 @@ public:
323 new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); 336 new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor)));
324 } 337 }
325 338
339 template<typename OutOther, typename InOther>
340 Job<OutOther, InOther> reduce(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler())
341 {
342 return reduce<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc);
343 }
344
326 template<typename FirstIn> 345 template<typename FirstIn>
327 Async::Future<Out> exec(FirstIn in) 346 Async::Future<Out> exec(FirstIn in)
328 { 347 {
@@ -377,6 +396,22 @@ private:
377 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, 396 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
378 "The return type of previous task must be compatible with input type of this task"); 397 "The return type of previous task must be compatible with input type of this task");
379 } 398 }
399
400 template<typename OutOther, typename ... InOther>
401 inline std::function<void(InOther ..., Async::Future<OutOther>&)> nestedJobWrapper(Job<OutOther, InOther ...> otherJob) {
402 return [otherJob](InOther ... in, Async::Future<OutOther> &future) {
403 // copy by value is const
404 auto job = otherJob;
405 FutureWatcher<OutOther> *watcher = new FutureWatcher<OutOther>();
406 QObject::connect(watcher, &FutureWatcherBase::futureReady,
407 [watcher, &future]() {
408 Async::detail::copyFutureValue(watcher->future(), future);
409 future.setFinished();
410 watcher->deleteLater();
411 });
412 watcher->setFuture(job.exec(in ...));
413 };
414 }
380}; 415};
381 416
382} // namespace Async 417} // namespace Async
diff --git a/async/src/async_impl.h b/async/src/async_impl.h
index 58f6ced..eccbc9b 100644
--- a/async/src/async_impl.h
+++ b/async/src/async_impl.h
@@ -19,6 +19,7 @@
19#define ASYNC_IMPL_H 19#define ASYNC_IMPL_H
20 20
21#include "async.h" 21#include "async.h"
22#include <type_traits>
22 23
23namespace Async { 24namespace Async {
24 25
@@ -45,6 +46,20 @@ struct prevOut {
45 using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type; 46 using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type;
46}; 47};
47 48
49template<typename T>
50inline typename std::enable_if<!std::is_void<T>::value, void>::type
51copyFutureValue(const Async::Future<T> &in, Async::Future<T> &out)
52{
53 out.setValue(in.value());
54}
55
56template<typename T>
57inline typename std::enable_if<std::is_void<T>::value, void>::type
58copyFutureValue(const Async::Future<T> &in, Async::Future<T> &out)
59{
60 // noop
61}
62
48} // namespace Detail 63} // namespace Detail
49 64
50} // namespace Async 65} // namespace Async
diff --git a/common/clientapi.h b/common/clientapi.h
index c1404da..aa47802 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -421,14 +421,7 @@ public:
421 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. 421 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive.
422 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); 422 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1);
423 //We copy the facade pointer to keep it alive 423 //We copy the facade pointer to keep it alive
424 //TODO JOBAPI: we should be able to just do, job = job.then(facade->load(..)) 424 job = job.then(facade->load(query, addCallback));
425 job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) {
426 Async::Job<void> j = facade->load(query, addCallback);
427 j.then<void>([&future, facade](Async::Future<void> &f) {
428 future.setFinished();
429 f.setFinished();
430 }).exec();
431 });
432 } 425 }
433 job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) { 426 job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) {
434 qDebug() << "Query complete"; 427 qDebug() << "Query complete";