From 3b8ebe6d4235f5ba12bc9c9854a6dd28cbff06b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Vr=C3=A1til?= Date: Sat, 21 Feb 2015 12:11:42 +0100 Subject: Async: allow appending Jobs to already running or finished Jobs When user gets a Job (from a method call for instance), which is already running or might have even finished already, they can still append a new Job to the chain and re-execute it. The Job will internally chain up to the last finished Job, use it's result and continue from the next Job in the chain. If a Job in the chain is still running, it will wait for it to finish and pass the result to the next Job in the chain. --- async/autotests/asynctest.cpp | 72 +++++++++++++++++++++++++++++++++++++++++++ async/src/async.cpp | 2 ++ async/src/async.h | 21 ++++++++++++- common/resourceaccess.cpp | 57 +++++++++------------------------- 4 files changed, 109 insertions(+), 43 deletions(-) diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index 73026bb..7437608 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp @@ -59,6 +59,10 @@ private Q_SLOTS: void testErrorHandler(); + + void testChainingRunningJob(); + void testChainingFinishedJob(); + void benchmarkSyncThenExecutor(); private: @@ -399,6 +403,74 @@ void AsyncTest::testErrorHandler() +void AsyncTest::testChainingRunningJob() +{ + int check = 0; + + auto job = Async::start( + [&check](Async::Future &future) { + QTimer *timer = new QTimer(); + QObject::connect(timer, &QTimer::timeout, + [&future, &check]() { + ++check; + future.setValue(42); + future.setFinished(); + }); + QObject::connect(timer, &QTimer::timeout, + timer, &QObject::deleteLater); + timer->setSingleShot(true); + timer->start(500); + }); + + auto future1 = job.exec(); + QTest::qWait(200); + + auto job2 = job.then( + [&check](int in) -> int { + ++check; + return in * 2; + }); + + auto future2 = job2.exec(); + QVERIFY(!future1.isFinished()); + future2.waitForFinished(); + + QCOMPARE(check, 2); + QVERIFY(future1.isFinished()); + QVERIFY(future2.isFinished()); + QCOMPARE(future1.value(), 42); + QCOMPARE(future2.value(), 84); +} + +void AsyncTest::testChainingFinishedJob() +{ + int check = 0; + + auto job = Async::start( + [&check]() -> int { + ++check; + return 42; + }); + + auto future1 = job.exec(); + QVERIFY(future1.isFinished()); + + auto job2 = job.then( + [&check](int in) -> int { + ++check; + return in * 2; + }); + + auto future2 = job2.exec(); + QVERIFY(future2.isFinished()); + + QCOMPARE(check, 2); + QCOMPARE(future1.value(), 42); + QCOMPARE(future2.value(), 84); +} + + + void AsyncTest::benchmarkSyncThenExecutor() diff --git a/async/src/async.cpp b/async/src/async.cpp index c9fedc7..6eefd1b 100644 --- a/async/src/async.cpp +++ b/async/src/async.cpp @@ -27,6 +27,8 @@ using namespace Async; Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) : mPrev(parent) , mResult(0) + , mIsRunning(false) + , mIsFinished(false) { } diff --git a/async/src/async.h b/async/src/async.h index d21caf8..8296fbc 100644 --- a/async/src/async.h +++ b/async/src/async.h @@ -90,6 +90,8 @@ protected: ExecutorBasePtr mPrev; FutureBase *mResult; + bool mIsRunning; + bool mIsFinished; }; template @@ -470,19 +472,35 @@ Future* Executor::chainup() template void Executor::exec() { - mPrevFuture = chainup(); + // Don't chain up to job that already is running (or is finished) + if (mPrev && !mPrev->mIsRunning & !mPrev->mIsFinished) { + mPrevFuture = chainup(); + } else if (mPrev && !mPrevFuture) { + // If previous job is running or finished, just get it's future + mPrevFuture = static_cast*>(mPrev->result()); + } + // Initialize our future mResult = new Async::Future(); + auto fw = new Async::FutureWatcher(); + QObject::connect(fw, &Async::FutureWatcher::futureReady, + [&]() { + mIsFinished = true; + fw->deleteLater(); + }); + if (!mPrevFuture || mPrevFuture->isFinished()) { if (mPrevFuture && mPrevFuture->errorCode() != 0) { if (mErrorFunc) { mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage()); mResult->setFinished(); + mIsFinished = true; return; } else { // Propagate the error to next caller } } + mIsRunning = true; previousFutureReady(); } else { auto futureWatcher = new Async::FutureWatcher(); @@ -500,6 +518,7 @@ void Executor::exec() // Propagate the error to next caller } } + mIsRunning = true; previousFutureReady(); }); diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index c806478..ffe716b 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -154,51 +154,24 @@ Async::Job ResourceAccess::sendCommand(int commandId) }); } -/* - * TODO JOBAPI: This is a workaround to be able to return a job below to - * may or may not already be finished when the job is started. The job API should provide a mechanism - * for this. Essentially we need a way to set a job finished externally (we use the finisher as handle for that). - * If the job is then started the continuation should immediately be executed if the job finished already, and otherwise - * just wait until the work is done, and then execute the continuation as usual. - */ -struct JobFinisher { - bool finished; - std::function callback; - - JobFinisher() : finished(false) {} - - void setFinished(int error, const QString &errorMessage) { - finished = true; - if (callback) { - callback(error, errorMessage); - } - } -}; - Async::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) { - auto finisher = QSharedPointer::create(); - auto callback = [finisher] (int error, const QString &errorMessage) { - finisher->setFinished(error, errorMessage); - }; - if (isReady()) { - d->messageId++; - log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); - registerCallback(d->messageId, callback); - Commands::write(d->socket, d->messageId, commandId, fbb); - } else { - d->commandQueue << new QueuedCommand(commandId, fbb, callback); - } - return Async::start([this, finisher](Async::Future &f) { - if (finisher->finished) { - f.setFinished(); - } else { - finisher->callback = [&f](int error, const QString &errorMessage) { - if (error) { - f.setError(error, errorMessage); - } + return Async::start([commandId, &fbb, this](Async::Future &f) { + auto callback = [&f](int error, const QString &errorMessage) { + if (error) { + f.setError(error, errorMessage); + } else { f.setFinished(); - }; + } + }; + + if (isReady()) { + d->messageId++; + log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); + registerCallback(d->messageId, callback); + Commands::write(d->socket, d->messageId, commandId, fbb); + } else { + d->commandQueue << new QueuedCommand(commandId, fbb, callback); } }); } -- cgit v1.2.3