diff options
author | Dan Vrátil <dvratil@redhat.com> | 2015-02-20 13:58:03 +0100 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2015-02-20 14:00:49 +0100 |
commit | 8e6f41f851ae058dea63fbc9b9f523ec9fd1a4fb (patch) | |
tree | cbe403f4aaa9498389ab0ddba6af611b99dac98c | |
parent | 1da08ebfe267313015c201fd1106f891af554e14 (diff) | |
download | sink-8e6f41f851ae058dea63fbc9b9f523ec9fd1a4fb.tar.gz sink-8e6f41f851ae058dea63fbc9b9f523ec9fd1a4fb.zip |
Async: allow appending existing Job objects to the Job chain
Now it's possible to do something like
Job<int, int> job = createSomeJob();
auto main = Async::start<int>(....).then(job);
Previously the 'job' would have to be wrapped in a ThenTask-like lambda (which
is what we still do internally), but with this new syntax it's possible to append
another job chain to existing chain easilly. This syntax is available for all
task types.
-rw-r--r-- | async/autotests/asynctest.cpp | 201 | ||||
-rw-r--r-- | async/src/async.h | 35 | ||||
-rw-r--r-- | async/src/async_impl.h | 15 | ||||
-rw-r--r-- | common/clientapi.h | 9 |
4 files changed, 181 insertions, 79 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index ed550ca..73026bb 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp | |||
@@ -47,12 +47,46 @@ private Q_SLOTS: | |||
47 | 47 | ||
48 | void testAsyncThen(); | 48 | void testAsyncThen(); |
49 | void testSyncThen(); | 49 | void testSyncThen(); |
50 | void testJoinedThen(); | ||
51 | |||
50 | void testAsyncEach(); | 52 | void testAsyncEach(); |
51 | void testSyncEach(); | 53 | void testSyncEach(); |
54 | void testJoinedEach(); | ||
55 | |||
52 | void testAsyncReduce(); | 56 | void testAsyncReduce(); |
53 | void testSyncReduce(); | 57 | void testSyncReduce(); |
58 | void testJoinedReduce(); | ||
59 | |||
54 | void testErrorHandler(); | 60 | void testErrorHandler(); |
55 | 61 | ||
62 | void benchmarkSyncThenExecutor(); | ||
63 | |||
64 | private: | ||
65 | template<typename T> | ||
66 | class AsyncSimulator { | ||
67 | public: | ||
68 | AsyncSimulator(Async::Future<T> &future, const T &result) | ||
69 | : mFuture(future) | ||
70 | , mResult(result) | ||
71 | { | ||
72 | QObject::connect(&mTimer, &QTimer::timeout, | ||
73 | [this]() { | ||
74 | mFuture.setValue(mResult); | ||
75 | mFuture.setFinished(); | ||
76 | }); | ||
77 | QObject::connect(&mTimer, &QTimer::timeout, | ||
78 | [this]() { | ||
79 | delete this; | ||
80 | }); | ||
81 | mTimer.setSingleShot(true); | ||
82 | mTimer.start(200); | ||
83 | } | ||
84 | |||
85 | private: | ||
86 | Async::Future<T> mFuture; | ||
87 | T mResult; | ||
88 | QTimer mTimer; | ||
89 | }; | ||
56 | }; | 90 | }; |
57 | 91 | ||
58 | 92 | ||
@@ -86,16 +120,7 @@ void AsyncTest::testAsyncPromises() | |||
86 | { | 120 | { |
87 | auto job = Async::start<int>( | 121 | auto job = Async::start<int>( |
88 | [](Async::Future<int> &future) { | 122 | [](Async::Future<int> &future) { |
89 | QTimer *timer = new QTimer(); | 123 | new AsyncSimulator<int>(future, 42); |
90 | QObject::connect(timer, &QTimer::timeout, | ||
91 | [&]() { | ||
92 | future.setValue(42); | ||
93 | future.setFinished(); | ||
94 | }); | ||
95 | QObject::connect(timer, &QTimer::timeout, | ||
96 | timer, &QObject::deleteLater); | ||
97 | timer->setSingleShot(true); | ||
98 | timer->start(200); | ||
99 | }); | 124 | }); |
100 | 125 | ||
101 | Async::Future<int> future = job.exec(); | 126 | Async::Future<int> future = job.exec(); |
@@ -110,16 +135,7 @@ void AsyncTest::testAsyncPromises2() | |||
110 | 135 | ||
111 | auto job = Async::start<int>( | 136 | auto job = Async::start<int>( |
112 | [](Async::Future<int> &future) { | 137 | [](Async::Future<int> &future) { |
113 | QTimer *timer = new QTimer(); | 138 | new AsyncSimulator<int>(future, 42); |
114 | QObject::connect(timer, &QTimer::timeout, | ||
115 | [&]() { | ||
116 | future.setValue(42); | ||
117 | future.setFinished(); | ||
118 | }); | ||
119 | QObject::connect(timer, &QTimer::timeout, | ||
120 | timer, &QObject::deleteLater); | ||
121 | timer->setSingleShot(true); | ||
122 | timer->start(200); | ||
123 | } | 139 | } |
124 | ).then<int, int>([&done](int result, Async::Future<int> &future) { | 140 | ).then<int, int>([&done](int result, Async::Future<int> &future) { |
125 | done = true; | 141 | done = true; |
@@ -139,16 +155,7 @@ void AsyncTest::testNestedAsync() | |||
139 | auto job = Async::start<int>( | 155 | auto job = Async::start<int>( |
140 | [](Async::Future<int> &future) { | 156 | [](Async::Future<int> &future) { |
141 | auto innerJob = Async::start<int>([](Async::Future<int> &innerFuture) { | 157 | auto innerJob = Async::start<int>([](Async::Future<int> &innerFuture) { |
142 | QTimer *timer = new QTimer(); | 158 | new AsyncSimulator<int>(innerFuture, 42); |
143 | QObject::connect(timer, &QTimer::timeout, | ||
144 | [&]() { | ||
145 | innerFuture.setValue(42); | ||
146 | innerFuture.setFinished(); | ||
147 | }); | ||
148 | QObject::connect(timer, &QTimer::timeout, | ||
149 | timer, &QObject::deleteLater); | ||
150 | timer->setSingleShot(true); | ||
151 | timer->start(0); | ||
152 | }).then<void>([&future](Async::Future<void> &innerThenFuture) { | 159 | }).then<void>([&future](Async::Future<void> &innerThenFuture) { |
153 | future.setFinished(); | 160 | future.setFinished(); |
154 | innerThenFuture.setFinished(); | 161 | innerThenFuture.setFinished(); |
@@ -186,16 +193,7 @@ void AsyncTest::testAsyncThen() | |||
186 | { | 193 | { |
187 | auto job = Async::start<int>( | 194 | auto job = Async::start<int>( |
188 | [](Async::Future<int> &future) { | 195 | [](Async::Future<int> &future) { |
189 | QTimer *timer = new QTimer; | 196 | new AsyncSimulator<int>(future, 42); |
190 | QObject::connect(timer, &QTimer::timeout, | ||
191 | [&]() { | ||
192 | future.setValue(42); | ||
193 | future.setFinished(); | ||
194 | }); | ||
195 | QObject::connect(timer, &QTimer::timeout, | ||
196 | timer, &QObject::deleteLater); | ||
197 | timer->setSingleShot(true); | ||
198 | timer->start(0); | ||
199 | }); | 197 | }); |
200 | 198 | ||
201 | auto future = job.exec(); | 199 | auto future = job.exec(); |
@@ -221,33 +219,37 @@ void AsyncTest::testSyncThen() | |||
221 | QCOMPARE(future.value(), 84); | 219 | QCOMPARE(future.value(), 84); |
222 | } | 220 | } |
223 | 221 | ||
222 | void AsyncTest::testJoinedThen() | ||
223 | { | ||
224 | auto job1 = Async::start<int, int>( | ||
225 | [](int in, Async::Future<int> &future) { | ||
226 | new AsyncSimulator<int>(future, in * 2); | ||
227 | }); | ||
228 | |||
229 | auto job2 = Async::start<int>( | ||
230 | [](Async::Future<int> &future) { | ||
231 | new AsyncSimulator<int>(future, 42); | ||
232 | }) | ||
233 | .then<int>(job1); | ||
234 | |||
235 | auto future = job2.exec(); | ||
236 | future.waitForFinished(); | ||
237 | |||
238 | QVERIFY(future.isFinished()); | ||
239 | QCOMPARE(future.value(), 84); | ||
240 | } | ||
241 | |||
242 | |||
243 | |||
224 | void AsyncTest::testAsyncEach() | 244 | void AsyncTest::testAsyncEach() |
225 | { | 245 | { |
226 | auto job = Async::start<QList<int>>( | 246 | auto job = Async::start<QList<int>>( |
227 | [](Async::Future<QList<int>> &future) { | 247 | [](Async::Future<QList<int>> &future) { |
228 | QTimer *timer = new QTimer; | 248 | new AsyncSimulator<QList<int>>(future, { 1, 2, 3, 4 }); |
229 | QObject::connect(timer, &QTimer::timeout, | ||
230 | [&future]() { | ||
231 | future.setValue({ 1, 2, 3, 4 }); | ||
232 | future.setFinished(); | ||
233 | }); | ||
234 | QObject::connect(timer, &QTimer::timeout, | ||
235 | timer, &QObject::deleteLater); | ||
236 | timer->setSingleShot(true); | ||
237 | timer->start(0); | ||
238 | }) | 249 | }) |
239 | .each<QList<int>, int>( | 250 | .each<QList<int>, int>( |
240 | [](const int &v, Async::Future<QList<int>> &future) { | 251 | [](const int &v, Async::Future<QList<int>> &future) { |
241 | QTimer *timer = new QTimer; | 252 | new AsyncSimulator<QList<int>>(future, { v + 1 }); |
242 | QObject::connect(timer, &QTimer::timeout, | ||
243 | [v, &future]() { | ||
244 | future.setValue({ v + 1 }); | ||
245 | future.setFinished(); | ||
246 | }); | ||
247 | QObject::connect(timer, &QTimer::timeout, | ||
248 | timer, &QObject::deleteLater); | ||
249 | timer->setSingleShot(true); | ||
250 | timer->start(0); | ||
251 | }); | 253 | }); |
252 | 254 | ||
253 | auto future = job.exec(); | 255 | auto future = job.exec(); |
@@ -258,7 +260,6 @@ void AsyncTest::testAsyncEach() | |||
258 | QCOMPARE(future.value(), expected); | 260 | QCOMPARE(future.value(), expected); |
259 | } | 261 | } |
260 | 262 | ||
261 | |||
262 | void AsyncTest::testSyncEach() | 263 | void AsyncTest::testSyncEach() |
263 | { | 264 | { |
264 | auto job = Async::start<QList<int>>( | 265 | auto job = Async::start<QList<int>>( |
@@ -277,20 +278,35 @@ void AsyncTest::testSyncEach() | |||
277 | QCOMPARE(future.value(), expected); | 278 | QCOMPARE(future.value(), expected); |
278 | } | 279 | } |
279 | 280 | ||
281 | void AsyncTest::testJoinedEach() | ||
282 | { | ||
283 | auto job1 = Async::start<QList<int>, int>( | ||
284 | [](int v, Async::Future<QList<int>> &future) { | ||
285 | new AsyncSimulator<QList<int>>(future, { v * 2 }); | ||
286 | }); | ||
287 | |||
288 | auto job = Async::start<QList<int>>( | ||
289 | []() -> QList<int> { | ||
290 | return { 1, 2, 3, 4 }; | ||
291 | }) | ||
292 | .each(job1); | ||
293 | |||
294 | auto future = job.exec(); | ||
295 | future.waitForFinished(); | ||
296 | |||
297 | const QList<int> expected({ 2, 4, 6, 8 }); | ||
298 | QVERIFY(future.isFinished()); | ||
299 | QCOMPARE(future.value(), expected); | ||
300 | } | ||
301 | |||
302 | |||
303 | |||
304 | |||
280 | void AsyncTest::testAsyncReduce() | 305 | void AsyncTest::testAsyncReduce() |
281 | { | 306 | { |
282 | auto job = Async::start<QList<int>>( | 307 | auto job = Async::start<QList<int>>( |
283 | [](Async::Future<QList<int>> &future) { | 308 | [](Async::Future<QList<int>> &future) { |
284 | QTimer *timer = new QTimer(); | 309 | new AsyncSimulator<QList<int>>(future, { 1, 2, 3, 4 }); |
285 | QObject::connect(timer, &QTimer::timeout, | ||
286 | [&future]() { | ||
287 | future.setValue({ 1, 2, 3, 4 }); | ||
288 | future.setFinished(); | ||
289 | }); | ||
290 | QObject::connect(timer, &QTimer::timeout, | ||
291 | timer, &QObject::deleteLater); | ||
292 | timer->setSingleShot(true); | ||
293 | timer->start(0); | ||
294 | }) | 310 | }) |
295 | .reduce<int, QList<int>>( | 311 | .reduce<int, QList<int>>( |
296 | [](const QList<int> &list, Async::Future<int> &future) { | 312 | [](const QList<int> &list, Async::Future<int> &future) { |
@@ -334,6 +350,32 @@ void AsyncTest::testSyncReduce() | |||
334 | QCOMPARE(future.value(), 10); | 350 | QCOMPARE(future.value(), 10); |
335 | } | 351 | } |
336 | 352 | ||
353 | |||
354 | void AsyncTest::testJoinedReduce() | ||
355 | { | ||
356 | auto job1 = Async::start<int, QList<int>>( | ||
357 | [](const QList<int> &list, Async::Future<int> &future) { | ||
358 | int sum = 0; | ||
359 | for (int i : list) sum += i; | ||
360 | new AsyncSimulator<int>(future, sum); | ||
361 | }); | ||
362 | |||
363 | auto job = Async::start<QList<int>>( | ||
364 | []() -> QList<int> { | ||
365 | return { 1, 2, 3, 4 }; | ||
366 | }) | ||
367 | .reduce(job1); | ||
368 | |||
369 | auto future = job.exec(); | ||
370 | future.waitForFinished(); | ||
371 | |||
372 | QVERIFY(future.isFinished()); | ||
373 | QCOMPARE(future.value(), 10); | ||
374 | } | ||
375 | |||
376 | |||
377 | |||
378 | |||
337 | void AsyncTest::testErrorHandler() | 379 | void AsyncTest::testErrorHandler() |
338 | { | 380 | { |
339 | int error = 0; | 381 | int error = 0; |
@@ -356,6 +398,23 @@ void AsyncTest::testErrorHandler() | |||
356 | } | 398 | } |
357 | 399 | ||
358 | 400 | ||
401 | |||
402 | |||
403 | |||
404 | void AsyncTest::benchmarkSyncThenExecutor() | ||
405 | { | ||
406 | auto job = Async::start<int>( | ||
407 | []() -> int { | ||
408 | return 0; | ||
409 | }); | ||
410 | |||
411 | QBENCHMARK { | ||
412 | job.exec(); | ||
413 | } | ||
414 | } | ||
415 | |||
416 | |||
417 | |||
359 | QTEST_MAIN(AsyncTest); | 418 | QTEST_MAIN(AsyncTest); |
360 | 419 | ||
361 | #include "asynctest.moc" | 420 | #include "asynctest.moc" |
diff --git a/async/src/async.h b/async/src/async.h index 386722a..d21caf8 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -291,6 +291,12 @@ public: | |||
291 | new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); | 291 | new Private::SyncThenExecutor<OutOther, InOther ...>(func, errorFunc, mExecutor))); |
292 | } | 292 | } |
293 | 293 | ||
294 | template<typename OutOther, typename ... InOther> | ||
295 | Job<OutOther, InOther ...> then(Job<OutOther, InOther ...> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
296 | { | ||
297 | return then<OutOther, InOther ...>(nestedJobWrapper<OutOther, InOther ...>(otherJob), errorFunc); | ||
298 | } | ||
299 | |||
294 | template<typename OutOther, typename InOther> | 300 | template<typename OutOther, typename InOther> |
295 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | 301 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) |
296 | { | 302 | { |
@@ -308,6 +314,13 @@ public: | |||
308 | } | 314 | } |
309 | 315 | ||
310 | template<typename OutOther, typename InOther> | 316 | template<typename OutOther, typename InOther> |
317 | Job<OutOther, InOther> each(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
318 | { | ||
319 | eachInvariants<OutOther>(); | ||
320 | return each<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc); | ||
321 | } | ||
322 | |||
323 | template<typename OutOther, typename InOther> | ||
311 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) | 324 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func, ErrorHandler errorFunc = ErrorHandler()) |
312 | { | 325 | { |
313 | reduceInvariants<InOther>(); | 326 | reduceInvariants<InOther>(); |
@@ -323,6 +336,12 @@ public: | |||
323 | new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); | 336 | new Private::SyncReduceExecutor<OutOther, InOther>(func, errorFunc, mExecutor))); |
324 | } | 337 | } |
325 | 338 | ||
339 | template<typename OutOther, typename InOther> | ||
340 | Job<OutOther, InOther> reduce(Job<OutOther, InOther> otherJob, ErrorHandler errorFunc = ErrorHandler()) | ||
341 | { | ||
342 | return reduce<OutOther, InOther>(nestedJobWrapper<OutOther, InOther>(otherJob), errorFunc); | ||
343 | } | ||
344 | |||
326 | template<typename FirstIn> | 345 | template<typename FirstIn> |
327 | Async::Future<Out> exec(FirstIn in) | 346 | Async::Future<Out> exec(FirstIn in) |
328 | { | 347 | { |
@@ -377,6 +396,22 @@ private: | |||
377 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | 396 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, |
378 | "The return type of previous task must be compatible with input type of this task"); | 397 | "The return type of previous task must be compatible with input type of this task"); |
379 | } | 398 | } |
399 | |||
400 | template<typename OutOther, typename ... InOther> | ||
401 | inline std::function<void(InOther ..., Async::Future<OutOther>&)> nestedJobWrapper(Job<OutOther, InOther ...> otherJob) { | ||
402 | return [otherJob](InOther ... in, Async::Future<OutOther> &future) { | ||
403 | // copy by value is const | ||
404 | auto job = otherJob; | ||
405 | FutureWatcher<OutOther> *watcher = new FutureWatcher<OutOther>(); | ||
406 | QObject::connect(watcher, &FutureWatcherBase::futureReady, | ||
407 | [watcher, &future]() { | ||
408 | Async::detail::copyFutureValue(watcher->future(), future); | ||
409 | future.setFinished(); | ||
410 | watcher->deleteLater(); | ||
411 | }); | ||
412 | watcher->setFuture(job.exec(in ...)); | ||
413 | }; | ||
414 | } | ||
380 | }; | 415 | }; |
381 | 416 | ||
382 | } // namespace Async | 417 | } // namespace Async |
diff --git a/async/src/async_impl.h b/async/src/async_impl.h index 58f6ced..eccbc9b 100644 --- a/async/src/async_impl.h +++ b/async/src/async_impl.h | |||
@@ -19,6 +19,7 @@ | |||
19 | #define ASYNC_IMPL_H | 19 | #define ASYNC_IMPL_H |
20 | 20 | ||
21 | #include "async.h" | 21 | #include "async.h" |
22 | #include <type_traits> | ||
22 | 23 | ||
23 | namespace Async { | 24 | namespace Async { |
24 | 25 | ||
@@ -45,6 +46,20 @@ struct prevOut { | |||
45 | using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type; | 46 | using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type; |
46 | }; | 47 | }; |
47 | 48 | ||
49 | template<typename T> | ||
50 | inline typename std::enable_if<!std::is_void<T>::value, void>::type | ||
51 | copyFutureValue(const Async::Future<T> &in, Async::Future<T> &out) | ||
52 | { | ||
53 | out.setValue(in.value()); | ||
54 | } | ||
55 | |||
56 | template<typename T> | ||
57 | inline typename std::enable_if<std::is_void<T>::value, void>::type | ||
58 | copyFutureValue(const Async::Future<T> &in, Async::Future<T> &out) | ||
59 | { | ||
60 | // noop | ||
61 | } | ||
62 | |||
48 | } // namespace Detail | 63 | } // namespace Detail |
49 | 64 | ||
50 | } // namespace Async | 65 | } // namespace Async |
diff --git a/common/clientapi.h b/common/clientapi.h index c1404da..aa47802 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -421,14 +421,7 @@ public: | |||
421 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. | 421 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. |
422 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); | 422 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); |
423 | //We copy the facade pointer to keep it alive | 423 | //We copy the facade pointer to keep it alive |
424 | //TODO JOBAPI: we should be able to just do, job = job.then(facade->load(..)) | 424 | job = job.then(facade->load(query, addCallback)); |
425 | job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { | ||
426 | Async::Job<void> j = facade->load(query, addCallback); | ||
427 | j.then<void>([&future, facade](Async::Future<void> &f) { | ||
428 | future.setFinished(); | ||
429 | f.setFinished(); | ||
430 | }).exec(); | ||
431 | }); | ||
432 | } | 425 | } |
433 | job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) { | 426 | job.then<void>([/* eventloop, */resultSet](Async::Future<void> &future) { |
434 | qDebug() << "Query complete"; | 427 | qDebug() << "Query complete"; |