From adc6a443776820b5ae36c982baf92b1d29bbaa4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Sun, 8 Feb 2015 12:02:04 +0100 Subject: Async: introduce sync executors Sync executors don't pass Async::Future into the user-provided tasks, but instead work with return values of the task methods, wrapping them into the Async::Future internally. Sync tasks are of course possible since forever, but not the API for those tasks is much cleaner, for users don't have to deal with "future" in synchronous tasks, for instance when synchronously processing results of an async task before passing the data to another async task. --- async/autotests/asynctest.cpp | 138 +++++++++++++++++++++++++++++++--- async/src/async.h | 170 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 281 insertions(+), 27 deletions(-) diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index 9240d28..4ebe65e 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp @@ -42,9 +42,14 @@ private Q_SLOTS: void testAsyncPromises(); void testAsyncPromises2(); void testNestedAsync(); + void testAsyncThen(); + void testSyncThen(); + void testAsyncEach(); void testSyncEach(); + void testAsyncReduce(); void testSyncReduce(); void testErrorHandler(); + }; void AsyncTest::testSyncPromises() @@ -155,17 +160,92 @@ void AsyncTest::testNestedAsync() QTRY_VERIFY(done); } -void AsyncTest::testSyncEach() +void AsyncTest::testAsyncThen() +{ + auto job = Async::start( + [](Async::Future &future) { + QTimer *timer = new QTimer; + QObject::connect(timer, &QTimer::timeout, + [&]() { + future.setValue(42); + future.setFinished(); + }); + QObject::connect(timer, &QTimer::timeout, + timer, &QObject::deleteLater); + timer->setSingleShot(true); + timer->start(0); + }); + + auto future = job.exec(); + future.waitForFinished(); + + QVERIFY(future.isFinished()); + QCOMPARE(future.value(), 42); +} + + +void AsyncTest::testSyncThen() +{ + auto job = Async::start( + []() -> int { + return 42; + }).then( + [](int in) -> int { + return in * 2; + }); + + auto future = job.exec(); + QVERIFY(future.isFinished()); + QCOMPARE(future.value(), 84); +} + +void AsyncTest::testAsyncEach() { auto job = Async::start>( [](Async::Future> &future) { - future.setValue(QList{ 1, 2, 3, 4 }); - future.setFinished(); + QTimer *timer = new QTimer; + QObject::connect(timer, &QTimer::timeout, + [&future]() { + future.setValue({ 1, 2, 3, 4 }); + future.setFinished(); + }); + QObject::connect(timer, &QTimer::timeout, + timer, &QObject::deleteLater); + timer->setSingleShot(true); + timer->start(0); }) .each, int>( [](const int &v, Async::Future> &future) { - future.setValue(QList{ v + 1 }); - future.setFinished(); + QTimer *timer = new QTimer; + QObject::connect(timer, &QTimer::timeout, + [v, &future]() { + future.setValue({ v + 1 }); + future.setFinished(); + }); + QObject::connect(timer, &QTimer::timeout, + timer, &QObject::deleteLater); + timer->setSingleShot(true); + timer->start(0); + }); + + auto future = job.exec(); + future.waitForFinished(); + + const QList expected({ 2, 3, 4, 5 }); + QVERIFY(future.isFinished()); + QCOMPARE(future.value(), expected); +} + + +void AsyncTest::testSyncEach() +{ + auto job = Async::start>( + []() -> QList { + return { 1, 2, 3, 4 }; + }) + .each, int>( + [](const int &v) -> QList { + return { v + 1 }; }); Async::Future> future = job.exec(); @@ -175,19 +255,55 @@ void AsyncTest::testSyncEach() QCOMPARE(future.value(), expected); } -void AsyncTest::testSyncReduce() +void AsyncTest::testAsyncReduce() { auto job = Async::start>( [](Async::Future> &future) { - future.setValue(QList{ 1, 2, 3, 4 }); - future.setFinished(); + QTimer *timer = new QTimer(); + QObject::connect(timer, &QTimer::timeout, + [&future]() { + future.setValue({ 1, 2, 3, 4 }); + future.setFinished(); + }); + QObject::connect(timer, &QTimer::timeout, + timer, &QObject::deleteLater); + timer->setSingleShot(true); + timer->start(0); }) .reduce>( [](const QList &list, Async::Future &future) { + QTimer *timer = new QTimer(); + QObject::connect(timer, &QTimer::timeout, + [list, &future]() { + int sum = 0; + for (int i : list) sum += i; + future.setValue(sum); + future.setFinished(); + }); + QObject::connect(timer, &QTimer::timeout, + timer, &QObject::deleteLater); + timer->setSingleShot(true); + timer->start(0); + }); + + Async::Future future = job.exec(); + future.waitForFinished(); + + QVERIFY(future.isFinished()); + QCOMPARE(future.value(), 10); +} + +void AsyncTest::testSyncReduce() +{ + auto job = Async::start>( + []() -> QList { + return { 1, 2, 3, 4 }; + }) + .reduce>( + [](const QList &list) -> int { int sum = 0; for (int i : list) sum += i; - future.setValue(sum); - future.setFinished(); + return sum; }); Async::Future future = job.exec(); @@ -213,7 +329,7 @@ void AsyncTest::testErrorHandler() ); auto future = job.exec(); future.waitForFinished(); - QVERIFY(error == 1); + QCOMPARE(error, 1); QVERIFY(future.isFinished()); } diff --git a/async/src/async.h b/async/src/async.h index d15373b..336bae2 100644 --- a/async/src/async.h +++ b/async/src/async.h @@ -33,8 +33,6 @@ /* - * TODO: on .then and potentially others: support for ThenTask without future argument and return value which makes it implicitly a sync continuation. - * Useful for typical value consumer continuations. * TODO: error continuation on .then and others. * TODO: instead of passing the future objects callbacks could be provided for result reporting (we can still use the future object internally */ @@ -47,13 +45,19 @@ class JobBase; template class Job; - template using ThenTask = typename detail::identity&)>>::type; +template +using SyncThenTask = typename detail::identity>::type; template using EachTask = typename detail::identity&)>>::type; template +using SyncEachTask = typename detail::identity>::type; +template using ReduceTask = typename detail::identity&)>>::type; +template +using SyncReduceTask = typename detail::identity>::type; + using ErrorHandler = std::function; namespace Private @@ -134,6 +138,33 @@ public: ReduceExecutor(ReduceTask reduce, const ExecutorBasePtr &parent); }; +template +class SyncThenExecutor : public Executor::type, Out, In ...> +{ +public: + SyncThenExecutor(SyncThenTask then, ErrorHandler errorHandler = ErrorHandler(), const ExecutorBasePtr &parent = ExecutorBasePtr()); + void previousFutureReady(); +protected: + SyncThenTask mFunc; +}; + +template +class SyncReduceExecutor : public SyncThenExecutor +{ +public: + SyncReduceExecutor(SyncReduceTask reduce, const ExecutorBasePtr &parent = ExecutorBasePtr()); +}; + +template +class SyncEachExecutor : public Executor +{ +public: + SyncEachExecutor(SyncEachTask each, const ExecutorBasePtr &parent = ExecutorBasePtr()); + void previousFutureReady(); +protected: + SyncEachTask mFunc; +}; + } // namespace Private /** @@ -238,6 +269,9 @@ class Job : public JobBase template friend Job start(Async::ThenTask func); + template + friend Job start(Async::SyncThenTask func); + public: template Job then(ThenTask func, ErrorHandler errorFunc = ErrorHandler()) @@ -246,28 +280,45 @@ public: new Private::ThenExecutor(func, errorFunc, mExecutor))); } + template + Job then(SyncThenTask func, ErrorHandler errorFunc = ErrorHandler()) + { + return Job(Private::ExecutorBasePtr( + new Private::SyncThenExecutor(func, errorFunc, mExecutor))); + } + template Job each(EachTask func) { - static_assert(detail::isIterable::value, - "The 'Each' task can only be connected to a job that returns a list or an array."); - static_assert(detail::isIterable::value, - "The result type of 'Each' task must be a list or an array."); + eachInvariants(); return Job(Private::ExecutorBasePtr( new Private::EachExecutor(func, mExecutor))); } + template + Job each(SyncEachTask func) + { + eachInvariants(); + return Job(Private::ExecutorBasePtr( + new Private::SyncEachExecutor(func, mExecutor))); + } + template Job reduce(ReduceTask func) { - static_assert(Async::detail::isIterable::value, - "The 'Result' task can only be connected to a job that returns a list or an array"); - static_assert(std::is_same::value, - "The return type of previous task must be compatible with input type of this task"); + reduceInvariants(); return Job(Private::ExecutorBasePtr( new Private::ReduceExecutor(func, mExecutor))); } + template + Job reduce(SyncReduceTask func) + { + reduceInvariants(); + return Job(Private::ExecutorBasePtr( + new Private::SyncReduceExecutor(func, mExecutor))); + } + Async::Future exec() { mExecutor->exec(); @@ -283,6 +334,24 @@ private: Job(Private::ExecutorBasePtr executor) : JobBase(executor) {} + + template + void eachInvariants() + { + static_assert(detail::isIterable::value, + "The 'Each' task can only be connected to a job that returns a list or an array."); + static_assert(detail::isIterable::value, + "The result type of 'Each' task must be a list or an array."); + } + + template + void reduceInvariants() + { + static_assert(Async::detail::isIterable::value, + "The 'Result' task can only be connected to a job that returns a list or an array"); + static_assert(std::is_same::value, + "The return type of previous task must be compatible with input type of this task"); + } }; } // namespace Async @@ -298,6 +367,12 @@ Job start(ThenTask func) return Job(Private::ExecutorBasePtr(new Private::ThenExecutor(func))); } +template +Job start(SyncThenTask func) +{ + return Job(Private::ExecutorBasePtr(new Private::SyncThenExecutor(func))); +} + namespace Private { template @@ -378,22 +453,23 @@ void EachExecutor::previousFutureReady() } for (auto arg : this->mPrevFuture->value()) { - Async::Future future; - this->mFunc(arg, future); + auto future = new Async::Future; + this->mFunc(arg, *future); auto fw = new Async::FutureWatcher(); mFutureWatchers.append(fw); QObject::connect(fw, &Async::FutureWatcher::futureReady, [out, future, fw, this]() { - assert(future.isFinished()); + assert(future->isFinished()); const int index = mFutureWatchers.indexOf(fw); assert(index > -1); mFutureWatchers.removeAt(index); - out->setValue(out->value() + future.value()); + out->setValue(out->value() + future->value()); + delete future; if (mFutureWatchers.isEmpty()) { out->setFinished(); } }); - fw->setFuture(future); + fw->setFuture(*future); } } @@ -403,6 +479,68 @@ ReduceExecutor::ReduceExecutor(ReduceTask reduce, const Execut { } +template +SyncThenExecutor::SyncThenExecutor(SyncThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) + : Executor::type, Out, In ...>(parent) +{ + this->mFunc = then; + this->mErrorFunc = errorHandler; +} + +template +void SyncThenExecutor::previousFutureReady() +{ + if (this->mPrevFuture) { + assert(this->mPrevFuture->isFinished()); + } + + if (this->mPrevFuture && this->mPrevFuture->errorCode()) { + if (this->mErrorFunc) { + this->mErrorFunc(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); + this->mResult->setFinished(); + } else { + static_cast*>(this->mResult)->setError(this->mPrevFuture->errorCode(), this->mPrevFuture->errorMessage()); + //propagate error if no error handler is available + Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); + static_cast*>(this->mResult)->setValue(result); + this->mResult->setFinished(); + } + } else { + Out result = this->mFunc(this->mPrevFuture ? this->mPrevFuture->value() : In() ...); + static_cast*>(this->mResult)->setValue(result); + this->mResult->setFinished(); + } +} + +template +SyncEachExecutor::SyncEachExecutor(SyncEachTask each, const ExecutorBasePtr &parent) + : Executor(parent) +{ + this->mFunc = each; +} + +template +void SyncEachExecutor::previousFutureReady() +{ + assert(this->mPrevFuture->isFinished()); + auto out = static_cast*>(this->mResult); + if (this->mPrevFuture->value().isEmpty()) { + out->setFinished(); + return; + } + + for (auto arg : this->mPrevFuture->value()) { + out->setValue(out->value() + this->mFunc(arg)); + } + out->setFinished(); +} + +template +SyncReduceExecutor::SyncReduceExecutor(SyncReduceTask reduce, const ExecutorBasePtr &parent) + : SyncThenExecutor(reduce, ErrorHandler(), parent) +{ +} + } // namespace Private -- cgit v1.2.3