From c4fc276f4af964ce586e009ea3d0c728bb660331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Sun, 5 Apr 2015 01:05:55 +0200 Subject: Async: improve error handling and error propagation The error is now propagated to the top-most (user-owned) Future. When an error occurs, no further tasks are executed. The first error handler function in the chain is also called, if there's any. --- async/autotests/asynctest.cpp | 166 ++++++++++++++++++++++++++++++++++++++---- async/src/async.cpp | 46 ++++++++++++ async/src/async.h | 157 ++++++++++++++++++++------------------- 3 files changed, 280 insertions(+), 89 deletions(-) diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index 65e604f..d709567 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp @@ -27,6 +27,8 @@ #include #include +#include + class AsyncTest : public QObject { Q_OBJECT @@ -62,6 +64,9 @@ private Q_SLOTS: void testProgressReporting(); void testErrorHandler(); + void testErrorPropagation(); + void testErrorHandlerAsync(); + void testErrorPropagationAsync(); void testChainingRunningJob(); void testChainingFinishedJob(); @@ -92,8 +97,25 @@ private: mTimer.start(200); } + AsyncSimulator(Async::Future &future, std::function&)> callback) + : mFuture(future) + , mCallback(callback) + { + QObject::connect(&mTimer, &QTimer::timeout, + [this]() { + mCallback(mFuture); + }); + QObject::connect(&mTimer, &QTimer::timeout, + [this]() { + delete this; + }); + mTimer.setSingleShot(true); + mTimer.start(200); + } + private: Async::Future mFuture; + std::function&)> mCallback; T mResult; QTimer mTimer; }; @@ -390,18 +412,14 @@ void AsyncTest::testAsyncReduce() }) .reduce>( [](const QList &list, Async::Future &future) { - QTimer *timer = new QTimer(); - QObject::connect(timer, &QTimer::timeout, - [list, &future]() { - int sum = 0; - for (int i : list) sum += i; - future.setValue(sum); - future.setFinished(); - }); - QObject::connect(timer, &QTimer::timeout, - timer, &QObject::deleteLater); - timer->setSingleShot(true); - timer->start(0); + new AsyncSimulator(future, + [list](Async::Future &future) { + int sum = 0; + for (int i : list) sum += i; + future.setValue(sum); + future.setFinished(); + } + ); }); Async::Future future = job.exec(); @@ -508,28 +526,146 @@ void AsyncTest::testProgressReporting() } void AsyncTest::testErrorHandler() +{ + + { + auto job = Async::start( + [](Async::Future &f) { + f.setError(1, "error"); + }); + + auto future = job.exec(); + QVERIFY(future.isFinished()); + QCOMPARE(future.errorCode(), 1); + QCOMPARE(future.errorMessage(), QString::fromLatin1("error")); + } + + { + int error = 0; + auto job = Async::start( + [](Async::Future &f) { + f.setError(1, "error"); + }, + [&error](int errorCode, const QString &errorMessage) { + error += errorCode; + } + ); + + auto future = job.exec(); + QVERIFY(future.isFinished()); + QCOMPARE(error, 1); + QCOMPARE(future.errorCode(), 1); + QCOMPARE(future.errorMessage(), QString::fromLatin1("error")); + } +} + +void AsyncTest::testErrorPropagation() { int error = 0; + bool called = false; auto job = Async::start( [](Async::Future &f) { f.setError(1, "error"); }) .then( - [](int v, Async::Future &f) { + [&called](int v, Async::Future &f) { + called = true; f.setFinished(); }, [&error](int errorCode, const QString &errorMessage) { - error = errorCode; + error += errorCode; } ); auto future = job.exec(); - future.waitForFinished(); + QVERIFY(future.isFinished()); + QCOMPARE(future.errorCode(), 1); + QCOMPARE(future.errorMessage(), QString::fromLatin1("error")); + QCOMPARE(called, false); QCOMPARE(error, 1); +} + +void AsyncTest::testErrorHandlerAsync() +{ + { + auto job = Async::start( + [](Async::Future &f) { + new AsyncSimulator(f, + [](Async::Future &f) { + f.setError(1, "error"); + } + ); + } + ); + + auto future = job.exec(); + future.waitForFinished(); + + QVERIFY(future.isFinished()); + QCOMPARE(future.errorCode(), 1); + QCOMPARE(future.errorMessage(), QString::fromLatin1("error")); + } + + { + int error = 0; + auto job = Async::start( + [](Async::Future &f) { + new AsyncSimulator(f, + [](Async::Future &f) { + f.setError(1, "error"); + } + ); + }, + [&error](int errorCode, const QString &errorMessage) { + error += errorCode; + } + ); + + auto future = job.exec(); + future.waitForFinished(); + + QVERIFY(future.isFinished()); + QCOMPARE(error, 1); + QCOMPARE(future.errorCode(), 1); + QCOMPARE(future.errorMessage(), QString::fromLatin1("error")); + } +} + +void AsyncTest::testErrorPropagationAsync() +{ + int error = 0; + bool called = false; + auto job = Async::start( + [](Async::Future &f) { + new AsyncSimulator(f, + [](Async::Future &f) { + f.setError(1, "error"); + } + ); + }) + .then( + [&called](int v, Async::Future &f) { + called = true; + f.setFinished(); + }, + [&error](int errorCode, const QString &errorMessage) { + error += errorCode; + } + ); + + auto future = job.exec(); + future.waitForFinished(); + QVERIFY(future.isFinished()); + QCOMPARE(future.errorCode(), 1); + QCOMPARE(future.errorMessage(), QString::fromLatin1("error")); + QCOMPARE(called, false); + QCOMPARE(error, 1); } + + void AsyncTest::testChainingRunningJob() { int check = 0; diff --git a/async/src/async.cpp b/async/src/async.cpp index 5e26bd8..c780878 100644 --- a/async/src/async.cpp +++ b/async/src/async.cpp @@ -24,6 +24,50 @@ using namespace Async; +Private::Execution::Execution(const Private::ExecutorBasePtr &executor) + : executor(executor) + , resultBase(nullptr) + , isRunning(false) + , isFinished(false) +{ +} + +Private::Execution::~Execution() +{ + if (resultBase) { + resultBase->releaseExecution(); + delete resultBase; + } + prevExecution.reset(); +} + +void Private::Execution::setFinished() +{ + isFinished = true; + //executor.clear(); +} + +void Private::Execution::releaseFuture() +{ + resultBase = 0; +} + +bool Private::Execution::errorWasHandled() const +{ + Execution * const exec = this; + while (exec) { + if (exec->executor->hasErrorFunc()) { + return true; + } + exec = exec->prevExecution.data(); + } + return false; +} + + + + + Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) : mPrev(parent) { @@ -34,6 +78,8 @@ Private::ExecutorBase::~ExecutorBase() } + + JobBase::JobBase(const Private::ExecutorBasePtr &executor) : mExecutor(executor) { diff --git a/async/src/async.h b/async/src/async.h index 89ca0d0..adc0b69 100644 --- a/async/src/async.h +++ b/async/src/async.h @@ -96,27 +96,9 @@ class ExecutorBase; typedef QSharedPointer ExecutorBasePtr; struct Execution { - Execution(const ExecutorBasePtr &executor) - : executor(executor) - , resultBase(nullptr) - , isRunning(false) - , isFinished(false) - {} - - ~Execution() - { - if (resultBase) { - resultBase->releaseExecution(); - delete resultBase; - } - prevExecution.reset(); - } - - void setFinished() - { - isFinished = true; - executor.clear(); - } + Execution(const ExecutorBasePtr &executor); + ~Execution(); + void setFinished(); template Async::Future* result() @@ -124,10 +106,8 @@ struct Execution { return static_cast*>(resultBase); } - void releaseFuture() - { - resultBase = 0; - } + void releaseFuture(); + bool errorWasHandled() const; ExecutorBasePtr executor; FutureBase *resultBase; @@ -147,6 +127,8 @@ class ExecutorBase template friend class Async::Job; + friend class Execution; + public: virtual ~ExecutorBase(); virtual ExecutionPtr exec(const ExecutorBasePtr &self) = 0; @@ -157,6 +139,9 @@ protected: template Async::Future* createFuture(const ExecutionPtr &execution) const; + virtual bool hasErrorFunc() const = 0; + virtual bool handleError(const ExecutionPtr &execution) = 0; + ExecutorBasePtr mPrev; }; @@ -164,14 +149,17 @@ template class Executor : public ExecutorBase { protected: - Executor(ErrorHandler errorHandler, const Private::ExecutorBasePtr &parent) + Executor(ErrorHandler errorFunc, const Private::ExecutorBasePtr &parent) : ExecutorBase(parent) - , mErrorFunc(errorHandler) + , mErrorFunc(errorFunc) {} + virtual ~Executor() {} virtual void run(const ExecutionPtr &execution) = 0; ExecutionPtr exec(const ExecutorBasePtr &self); + bool hasErrorFunc() const { return (bool) mErrorFunc; } + bool handleError(const ExecutionPtr &execution); std::function mErrorFunc; }; @@ -180,7 +168,7 @@ template class ThenExecutor: public Executor::type, Out, In ...> { public: - ThenExecutor(ThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); + ThenExecutor(ThenTask then, ErrorHandler errorFunc, const ExecutorBasePtr &parent); void run(const ExecutionPtr &execution); private: ThenTask mFunc; @@ -190,7 +178,7 @@ template class EachExecutor : public Executor { public: - EachExecutor(EachTask each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); + EachExecutor(EachTask each, ErrorHandler errorFunc, const ExecutorBasePtr &parent); void run(const ExecutionPtr &execution); private: EachTask mFunc; @@ -201,7 +189,7 @@ template class ReduceExecutor : public ThenExecutor { public: - ReduceExecutor(ReduceTask reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); + ReduceExecutor(ReduceTask reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent); private: ReduceTask mFunc; }; @@ -210,7 +198,7 @@ template class SyncThenExecutor : public Executor::type, Out, In ...> { public: - SyncThenExecutor(SyncThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent); + SyncThenExecutor(SyncThenTask then, ErrorHandler errorFunc, const ExecutorBasePtr &parent); void run(const ExecutionPtr &execution); private: @@ -223,7 +211,7 @@ template class SyncReduceExecutor : public SyncThenExecutor { public: - SyncReduceExecutor(SyncReduceTask reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent); + SyncReduceExecutor(SyncReduceTask reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent); private: SyncReduceTask mFunc; }; @@ -232,7 +220,7 @@ template class SyncEachExecutor : public Executor { public: - SyncEachExecutor(SyncEachTask each, ErrorHandler errorHandler, const ExecutorBasePtr &parent); + SyncEachExecutor(SyncEachTask each, ErrorHandler errorFunc, const ExecutorBasePtr &parent); void run(const ExecutionPtr &execution); private: void run(Async::Future *future, const typename PrevOut::value_type &arg, std::false_type); // !std::is_void @@ -253,10 +241,10 @@ private: * where @p In is type of the result. */ template -Job start(ThenTask func); +Job start(ThenTask func, ErrorHandler errorFunc = ErrorHandler()); template -Job start(SyncThenTask func); +Job start(SyncThenTask func, ErrorHandler errorFunc = ErrorHandler()); #ifdef WITH_KJOB template @@ -358,10 +346,10 @@ class Job : public JobBase friend class Job; template - friend Job start(Async::ThenTask func); + friend Job start(Async::ThenTask func, ErrorHandler errorFunc); template - friend Job start(Async::SyncThenTask func); + friend Job start(Async::SyncThenTask func, ErrorHandler errorFunc); #ifdef WITH_KJOB template @@ -524,17 +512,17 @@ private: namespace Async { template -Job start(ThenTask func) +Job start(ThenTask func, ErrorHandler error) { return Job(Private::ExecutorBasePtr( - new Private::ThenExecutor(func, ErrorHandler(), Private::ExecutorBasePtr()))); + new Private::ThenExecutor(func, error, Private::ExecutorBasePtr()))); } template -Job start(SyncThenTask func) +Job start(SyncThenTask func, ErrorHandler error) { return Job(Private::ExecutorBasePtr( - new Private::SyncThenExecutor(func, ErrorHandler(), Private::ExecutorBasePtr()))); + new Private::SyncThenExecutor(func, error, Private::ExecutorBasePtr()))); } #ifdef WITH_KJOB @@ -596,17 +584,12 @@ ExecutionPtr Executor::exec(const ExecutorBasePtr &self) // chainup execution->prevExecution = mPrev ? mPrev->exec(mPrev) : ExecutionPtr(); - /* - } else if (mPrev && !mPrevFuture) { - // If previous job is running or finished, just get it's future - mPrevFuture = static_cast*>(mPrev->result()); - } - */ execution->resultBase = this->createFuture(execution); auto fw = new Async::FutureWatcher(); QObject::connect(fw, &Async::FutureWatcher::futureReady, [fw, execution, this]() { + handleError(execution); execution->setFinished(); delete fw; }); @@ -614,44 +597,70 @@ ExecutionPtr Executor::exec(const ExecutorBasePtr &self) Async::Future *prevFuture = execution->prevExecution ? execution->prevExecution->result() : nullptr; if (!prevFuture || prevFuture->isFinished()) { - if (prevFuture && prevFuture->errorCode() != 0) { - if (mErrorFunc) { - mErrorFunc(prevFuture->errorCode(), prevFuture->errorMessage()); - execution->resultBase->setFinished(); - execution->setFinished(); - return execution; + if (prevFuture) { // prevFuture implies execution->prevExecution + if (prevFuture->errorCode()) { + // Propagate the errorCode and message to the outer Future + execution->resultBase->setError(prevFuture->errorCode(), prevFuture->errorMessage()); + if (!execution->errorWasHandled()) { + if (handleError(execution)) { + return execution; + } + } else { + return execution; + } } else { - // Propagate the error to next caller + // Propagate error (if any) } } + execution->isRunning = true; run(execution); } else { - auto futureWatcher = new Async::FutureWatcher(); - QObject::connect(futureWatcher, &Async::FutureWatcher::futureReady, - [futureWatcher, execution, this]() { - auto prevFuture = futureWatcher->future(); + auto prevFutureWatcher = new Async::FutureWatcher(); + QObject::connect(prevFutureWatcher, &Async::FutureWatcher::futureReady, + [prevFutureWatcher, execution, this]() { + auto prevFuture = prevFutureWatcher->future(); assert(prevFuture.isFinished()); - delete futureWatcher; - if (prevFuture.errorCode() != 0) { - if (mErrorFunc) { - mErrorFunc(prevFuture.errorCode(), prevFuture.errorMessage()); - execution->resultBase->setFinished(); - return; + delete prevFutureWatcher; + auto prevExecutor = execution->executor->mPrev; + if (prevFuture.errorCode()) { + execution->resultBase->setError(prevFuture.errorCode(), prevFuture.errorMessage()); + if (!execution->errorWasHandled()) { + if (handleError(execution)) { + return; + } } else { - // Propagate the error to next caller + return; } } + + + // propagate error (if any) execution->isRunning = true; run(execution); }); - futureWatcher->setFuture(*static_cast*>(prevFuture)); + prevFutureWatcher->setFuture(*static_cast*>(prevFuture)); } return execution; } +template +bool Executor::handleError(const ExecutionPtr &execution) +{ + assert(execution->resultBase->isFinished()); + if (execution->resultBase->errorCode()) { + if (mErrorFunc) { + mErrorFunc(execution->resultBase->errorCode(), + execution->resultBase->errorMessage()); + return true; + } + } + + return false; +} + template ThenExecutor::ThenExecutor(ThenTask then, ErrorHandler error, const ExecutorBasePtr &parent) @@ -715,14 +724,14 @@ void EachExecutor::run(const ExecutionPtr &execution) } template -ReduceExecutor::ReduceExecutor(ReduceTask reduce, ErrorHandler error, const ExecutorBasePtr &parent) - : ThenExecutor(reduce, error, parent) +ReduceExecutor::ReduceExecutor(ReduceTask reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent) + : ThenExecutor(reduce, errorFunc, parent) { } template -SyncThenExecutor::SyncThenExecutor(SyncThenTask then, ErrorHandler errorHandler, const ExecutorBasePtr &parent) - : Executor::type, Out, In ...>(errorHandler, parent) +SyncThenExecutor::SyncThenExecutor(SyncThenTask then, ErrorHandler errorFunc, const ExecutorBasePtr &parent) + : Executor::type, Out, In ...>(errorFunc, parent) , mFunc(then) { } @@ -762,8 +771,8 @@ void SyncThenExecutor::run(const ExecutionPtr &execution, std::true } template -SyncEachExecutor::SyncEachExecutor(SyncEachTask each, ErrorHandler errorHandler, const ExecutorBasePtr &parent) - : Executor(errorHandler, parent) +SyncEachExecutor::SyncEachExecutor(SyncEachTask each, ErrorHandler errorFunc, const ExecutorBasePtr &parent) + : Executor(errorFunc, parent) , mFunc(each) { } @@ -800,8 +809,8 @@ void SyncEachExecutor::run(Async::Future * /* unused */, } template -SyncReduceExecutor::SyncReduceExecutor(SyncReduceTask reduce, ErrorHandler errorHandler, const ExecutorBasePtr &parent) - : SyncThenExecutor(reduce, errorHandler, parent) +SyncReduceExecutor::SyncReduceExecutor(SyncReduceTask reduce, ErrorHandler errorFunc, const ExecutorBasePtr &parent) + : SyncThenExecutor(reduce, errorFunc, parent) { } -- cgit v1.2.3