From 8e6f41f851ae058dea63fbc9b9f523ec9fd1a4fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Fri, 20 Feb 2015 13:58:03 +0100 Subject: Async: allow appending existing Job objects to the Job chain Now it's possible to do something like Job job = createSomeJob(); auto main = Async::start(....).then(job); Previously the 'job' would have to be wrapped in a ThenTask-like lambda (which is what we still do internally), but with this new syntax it's possible to append another job chain to existing chain easilly. This syntax is available for all task types. --- async/autotests/asynctest.cpp | 201 +++++++++++++++++++++++++++--------------- async/src/async.h | 35 ++++++++ async/src/async_impl.h | 15 ++++ common/clientapi.h | 9 +- 4 files changed, 181 insertions(+), 79 deletions(-) diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index ed550ca..73026bb 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp @@ -47,12 +47,46 @@ private Q_SLOTS: void testAsyncThen(); void testSyncThen(); + void testJoinedThen(); + void testAsyncEach(); void testSyncEach(); + void testJoinedEach(); + void testAsyncReduce(); void testSyncReduce(); + void testJoinedReduce(); + void testErrorHandler(); + void benchmarkSyncThenExecutor(); + +private: + template + class AsyncSimulator { + public: + AsyncSimulator(Async::Future &future, const T &result) + : mFuture(future) + , mResult(result) + { + QObject::connect(&mTimer, &QTimer::timeout, + [this]() { + mFuture.setValue(mResult); + mFuture.setFinished(); + }); + QObject::connect(&mTimer, &QTimer::timeout, + [this]() { + delete this; + }); + mTimer.setSingleShot(true); + mTimer.start(200); + } + + private: + Async::Future mFuture; + T mResult; + QTimer mTimer; + }; }; @@ -86,16 +120,7 @@ void AsyncTest::testAsyncPromises() { auto job = Async::start( [](Async::Future &future) { - QTimer *timer = new QTimer(); - QObject::connect(timer, &QTimer::timeout, - [&]() { - future.setValue(42); - future.setFinished(); - }); - QObject::connect(timer, &QTimer::timeout, - timer, &QObject::deleteLater); - timer->setSingleShot(true); - timer->start(200); + new AsyncSimulator(future, 42); }); Async::Future future = job.exec(); @@ -110,16 +135,7 @@ void AsyncTest::testAsyncPromises2() auto job = Async::start( [](Async::Future &future) { - QTimer *timer = new QTimer(); - QObject::connect(timer, &QTimer::timeout, - [&]() { - future.setValue(42); - future.setFinished(); - }); - QObject::connect(timer, &QTimer::timeout, - timer, &QObject::deleteLater); - timer->setSingleShot(true); - timer->start(200); + new AsyncSimulator(future, 42); } ).then([&done](int result, Async::Future &future) { done = true; @@ -139,16 +155,7 @@ void AsyncTest::testNestedAsync() auto job = Async::start( [](Async::Future &future) { auto innerJob = Async::start([](Async::Future &innerFuture) { - QTimer *timer = new QTimer(); - QObject::connect(timer, &QTimer::timeout, - [&]() { - innerFuture.setValue(42); - innerFuture.setFinished(); - }); - QObject::connect(timer, &QTimer::timeout, - timer, &QObject::deleteLater); - timer->setSingleShot(true); - timer->start(0); + new AsyncSimulator(innerFuture, 42); }).then([&future](Async::Future &innerThenFuture) { future.setFinished(); innerThenFuture.setFinished(); @@ -186,16 +193,7 @@ void AsyncTest::testAsyncThen() { auto job = Async::start( [](Async::Future &future) { - QTimer *timer = new QTimer; - QObject::connect(timer, &QTimer::timeout, - [&]() { - future.setValue(42); - future.setFinished(); - }); - QObject::connect(timer, &QTimer::timeout, - timer, &QObject::deleteLater); - timer->setSingleShot(true); - timer->start(0); + new AsyncSimulator(future, 42); }); auto future = job.exec(); @@ -221,33 +219,37 @@ void AsyncTest::testSyncThen() QCOMPARE(future.value(), 84); } +void AsyncTest::testJoinedThen() +{ + auto job1 = Async::start( + [](int in, Async::Future &future) { + new AsyncSimulator(future, in * 2); + }); + + auto job2 = Async::start( + [](Async::Future &future) { + new AsyncSimulator(future, 42); + }) + .then(job1); + + auto future = job2.exec(); + future.waitForFinished(); + + QVERIFY(future.isFinished()); + QCOMPARE(future.value(), 84); +} + + + void AsyncTest::testAsyncEach() { auto job = Async::start>( [](Async::Future> &future) { - QTimer *timer = new QTimer; - QObject::connect(timer, &QTimer::timeout, - [&future]() { - future.setValue({ 1, 2, 3, 4 }); - future.setFinished(); - }); - QObject::connect(timer, &QTimer::timeout, - timer, &QObject::deleteLater); - timer->setSingleShot(true); - timer->start(0); + new AsyncSimulator>(future, { 1, 2, 3, 4 }); }) .each, int>( [](const int &v, Async::Future> &future) { - QTimer *timer = new QTimer; - QObject::connect(timer, &QTimer::timeout, - [v, &future]() { - future.setValue({ v + 1 }); - future.setFinished(); - }); - QObject::connect(timer, &QTimer::timeout, - timer, &QObject::deleteLater); - timer->setSingleShot(true); - timer->start(0); + new AsyncSimulator>(future, { v + 1 }); }); auto future = job.exec(); @@ -258,7 +260,6 @@ void AsyncTest::testAsyncEach() QCOMPARE(future.value(), expected); } - void AsyncTest::testSyncEach() { auto job = Async::start>( @@ -277,20 +278,35 @@ void AsyncTest::testSyncEach() QCOMPARE(future.value(), expected); } +void AsyncTest::testJoinedEach() +{ + auto job1 = Async::start, int>( + [](int v, Async::Future> &future) { + new AsyncSimulator>(future, { v * 2 }); + }); + + auto job = Async::start>( + []() -> QList { + return { 1, 2, 3, 4 }; + }) + .each(job1); + + auto future = job.exec(); + future.waitForFinished(); + + const QList expected({ 2, 4, 6, 8 }); + QVERIFY(future.isFinished()); + QCOMPARE(future.value(), expected); +} + + + + void AsyncTest::testAsyncReduce() { auto job = Async::start>( [](Async::Future> &future) { - QTimer *timer = new QTimer(); - QObject::connect(timer, &QTimer::timeout, - [&future]() { - future.setValue({ 1, 2, 3, 4 }); - future.setFinished(); - }); - QObject::connect(timer, &QTimer::timeout, - timer, &QObject::deleteLater); - timer->setSingleShot(true); - timer->start(0); + new AsyncSimulator>(future, { 1, 2, 3, 4 }); }) .reduce>( [](const QList &list, Async::Future &future) { @@ -334,6 +350,32 @@ void AsyncTest::testSyncReduce() QCOMPARE(future.value(), 10); } + +void AsyncTest::testJoinedReduce() +{ + auto job1 = Async::start>( + [](const QList &list, Async::Future &future) { + int sum = 0; + for (int i : list) sum += i; + new AsyncSimulator(future, sum); + }); + + auto job = Async::start>( + []() -> QList { + return { 1, 2, 3, 4 }; + }) + .reduce(job1); + + auto future = job.exec(); + future.waitForFinished(); + + QVERIFY(future.isFinished()); + QCOMPARE(future.value(), 10); +} + + + + void AsyncTest::testErrorHandler() { int error = 0; @@ -356,6 +398,23 @@ void AsyncTest::testErrorHandler() } + + + +void AsyncTest::benchmarkSyncThenExecutor() +{ + auto job = Async::start( + []() -> int { + return 0; + }); + + QBENCHMARK { + job.exec(); + } +} + + + QTEST_MAIN(AsyncTest); #include "asynctest.moc" diff --git a/async/src/async.h b/async/src/async.h index 386722a..d21caf8 100644 --- a/async/src/async.h +++ b/async/src/async.h @@ -291,6 +291,12 @@ public: new Private::SyncThenExecutor(func, errorFunc, mExecutor))); } + template + Job then(Job otherJob, ErrorHandler errorFunc = ErrorHandler()) + { + return then(nestedJobWrapper(otherJob), errorFunc); + } + template Job each(EachTask func, ErrorHandler errorFunc = ErrorHandler()) { @@ -307,6 +313,13 @@ public: new Private::SyncEachExecutor(func, errorFunc, mExecutor))); } + template + Job each(Job otherJob, ErrorHandler errorFunc = ErrorHandler()) + { + eachInvariants(); + return each(nestedJobWrapper(otherJob), errorFunc); + } + template Job reduce(ReduceTask func, ErrorHandler errorFunc = ErrorHandler()) { @@ -323,6 +336,12 @@ public: new Private::SyncReduceExecutor(func, errorFunc, mExecutor))); } + template + Job reduce(Job otherJob, ErrorHandler errorFunc = ErrorHandler()) + { + return reduce(nestedJobWrapper(otherJob), errorFunc); + } + template Async::Future exec(FirstIn in) { @@ -377,6 +396,22 @@ private: static_assert(std::is_same::value, "The return type of previous task must be compatible with input type of this task"); } + + template + inline std::function&)> nestedJobWrapper(Job otherJob) { + return [otherJob](InOther ... in, Async::Future &future) { + // copy by value is const + auto job = otherJob; + FutureWatcher *watcher = new FutureWatcher(); + QObject::connect(watcher, &FutureWatcherBase::futureReady, + [watcher, &future]() { + Async::detail::copyFutureValue(watcher->future(), future); + future.setFinished(); + watcher->deleteLater(); + }); + watcher->setFuture(job.exec(in ...)); + }; + } }; } // namespace Async diff --git a/async/src/async_impl.h b/async/src/async_impl.h index 58f6ced..eccbc9b 100644 --- a/async/src/async_impl.h +++ b/async/src/async_impl.h @@ -19,6 +19,7 @@ #define ASYNC_IMPL_H #include "async.h" +#include namespace Async { @@ -45,6 +46,20 @@ struct prevOut { using type = typename std::tuple_element<0, std::tuple>::type; }; +template +inline typename std::enable_if::value, void>::type +copyFutureValue(const Async::Future &in, Async::Future &out) +{ + out.setValue(in.value()); +} + +template +inline typename std::enable_if::value, void>::type +copyFutureValue(const Async::Future &in, Async::Future &out) +{ + // noop +} + } // namespace Detail } // namespace Async diff --git a/common/clientapi.h b/common/clientapi.h index c1404da..aa47802 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -421,14 +421,7 @@ public: //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. std::function addCallback = std::bind(&ResultProvider::add, resultSet, std::placeholders::_1); //We copy the facade pointer to keep it alive - //TODO JOBAPI: we should be able to just do, job = job.then(facade->load(..)) - job = job.then([facade, query, addCallback](Async::Future &future) { - Async::Job j = facade->load(query, addCallback); - j.then([&future, facade](Async::Future &f) { - future.setFinished(); - f.setFinished(); - }).exec(); - }); + job = job.then(facade->load(query, addCallback)); } job.then([/* eventloop, */resultSet](Async::Future &future) { qDebug() << "Query complete"; -- cgit v1.2.3