diff options
author | Dan Vrátil <dvratil@redhat.com> | 2015-02-21 12:11:42 +0100 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2015-02-21 12:11:44 +0100 |
commit | 3b8ebe6d4235f5ba12bc9c9854a6dd28cbff06b5 (patch) | |
tree | 4e2a987e6c62523994965789387cace578e4c8d8 | |
parent | 76ec0cfe075e3af758657f9aecab7d7ce7e8d387 (diff) | |
download | sink-3b8ebe6d4235f5ba12bc9c9854a6dd28cbff06b5.tar.gz sink-3b8ebe6d4235f5ba12bc9c9854a6dd28cbff06b5.zip |
Async: allow appending Jobs to already running or finished Jobs
When user gets a Job (from a method call for instance), which is already running
or might have even finished already, they can still append a new Job to the chain
and re-execute it. The Job will internally chain up to the last finished Job, use
it's result and continue from the next Job in the chain. If a Job in the chain is
still running, it will wait for it to finish and pass the result to the next Job
in the chain.
-rw-r--r-- | async/autotests/asynctest.cpp | 72 | ||||
-rw-r--r-- | async/src/async.cpp | 2 | ||||
-rw-r--r-- | async/src/async.h | 21 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 57 |
4 files changed, 109 insertions, 43 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp index 73026bb..7437608 100644 --- a/async/autotests/asynctest.cpp +++ b/async/autotests/asynctest.cpp | |||
@@ -59,6 +59,10 @@ private Q_SLOTS: | |||
59 | 59 | ||
60 | void testErrorHandler(); | 60 | void testErrorHandler(); |
61 | 61 | ||
62 | |||
63 | void testChainingRunningJob(); | ||
64 | void testChainingFinishedJob(); | ||
65 | |||
62 | void benchmarkSyncThenExecutor(); | 66 | void benchmarkSyncThenExecutor(); |
63 | 67 | ||
64 | private: | 68 | private: |
@@ -399,6 +403,74 @@ void AsyncTest::testErrorHandler() | |||
399 | 403 | ||
400 | 404 | ||
401 | 405 | ||
406 | void AsyncTest::testChainingRunningJob() | ||
407 | { | ||
408 | int check = 0; | ||
409 | |||
410 | auto job = Async::start<int>( | ||
411 | [&check](Async::Future<int> &future) { | ||
412 | QTimer *timer = new QTimer(); | ||
413 | QObject::connect(timer, &QTimer::timeout, | ||
414 | [&future, &check]() { | ||
415 | ++check; | ||
416 | future.setValue(42); | ||
417 | future.setFinished(); | ||
418 | }); | ||
419 | QObject::connect(timer, &QTimer::timeout, | ||
420 | timer, &QObject::deleteLater); | ||
421 | timer->setSingleShot(true); | ||
422 | timer->start(500); | ||
423 | }); | ||
424 | |||
425 | auto future1 = job.exec(); | ||
426 | QTest::qWait(200); | ||
427 | |||
428 | auto job2 = job.then<int, int>( | ||
429 | [&check](int in) -> int { | ||
430 | ++check; | ||
431 | return in * 2; | ||
432 | }); | ||
433 | |||
434 | auto future2 = job2.exec(); | ||
435 | QVERIFY(!future1.isFinished()); | ||
436 | future2.waitForFinished(); | ||
437 | |||
438 | QCOMPARE(check, 2); | ||
439 | QVERIFY(future1.isFinished()); | ||
440 | QVERIFY(future2.isFinished()); | ||
441 | QCOMPARE(future1.value(), 42); | ||
442 | QCOMPARE(future2.value(), 84); | ||
443 | } | ||
444 | |||
445 | void AsyncTest::testChainingFinishedJob() | ||
446 | { | ||
447 | int check = 0; | ||
448 | |||
449 | auto job = Async::start<int>( | ||
450 | [&check]() -> int { | ||
451 | ++check; | ||
452 | return 42; | ||
453 | }); | ||
454 | |||
455 | auto future1 = job.exec(); | ||
456 | QVERIFY(future1.isFinished()); | ||
457 | |||
458 | auto job2 = job.then<int, int>( | ||
459 | [&check](int in) -> int { | ||
460 | ++check; | ||
461 | return in * 2; | ||
462 | }); | ||
463 | |||
464 | auto future2 = job2.exec(); | ||
465 | QVERIFY(future2.isFinished()); | ||
466 | |||
467 | QCOMPARE(check, 2); | ||
468 | QCOMPARE(future1.value(), 42); | ||
469 | QCOMPARE(future2.value(), 84); | ||
470 | } | ||
471 | |||
472 | |||
473 | |||
402 | 474 | ||
403 | 475 | ||
404 | void AsyncTest::benchmarkSyncThenExecutor() | 476 | void AsyncTest::benchmarkSyncThenExecutor() |
diff --git a/async/src/async.cpp b/async/src/async.cpp index c9fedc7..6eefd1b 100644 --- a/async/src/async.cpp +++ b/async/src/async.cpp | |||
@@ -27,6 +27,8 @@ using namespace Async; | |||
27 | Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) | 27 | Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) |
28 | : mPrev(parent) | 28 | : mPrev(parent) |
29 | , mResult(0) | 29 | , mResult(0) |
30 | , mIsRunning(false) | ||
31 | , mIsFinished(false) | ||
30 | { | 32 | { |
31 | } | 33 | } |
32 | 34 | ||
diff --git a/async/src/async.h b/async/src/async.h index d21caf8..8296fbc 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -90,6 +90,8 @@ protected: | |||
90 | 90 | ||
91 | ExecutorBasePtr mPrev; | 91 | ExecutorBasePtr mPrev; |
92 | FutureBase *mResult; | 92 | FutureBase *mResult; |
93 | bool mIsRunning; | ||
94 | bool mIsFinished; | ||
93 | }; | 95 | }; |
94 | 96 | ||
95 | template<typename PrevOut, typename Out, typename ... In> | 97 | template<typename PrevOut, typename Out, typename ... In> |
@@ -470,19 +472,35 @@ Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup() | |||
470 | template<typename PrevOut, typename Out, typename ... In> | 472 | template<typename PrevOut, typename Out, typename ... In> |
471 | void Executor<PrevOut, Out, In ...>::exec() | 473 | void Executor<PrevOut, Out, In ...>::exec() |
472 | { | 474 | { |
473 | mPrevFuture = chainup(); | 475 | // Don't chain up to job that already is running (or is finished) |
476 | if (mPrev && !mPrev->mIsRunning & !mPrev->mIsFinished) { | ||
477 | mPrevFuture = chainup(); | ||
478 | } else if (mPrev && !mPrevFuture) { | ||
479 | // If previous job is running or finished, just get it's future | ||
480 | mPrevFuture = static_cast<Async::Future<PrevOut>*>(mPrev->result()); | ||
481 | } | ||
482 | |||
474 | // Initialize our future | 483 | // Initialize our future |
475 | mResult = new Async::Future<Out>(); | 484 | mResult = new Async::Future<Out>(); |
485 | auto fw = new Async::FutureWatcher<Out>(); | ||
486 | QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady, | ||
487 | [&]() { | ||
488 | mIsFinished = true; | ||
489 | fw->deleteLater(); | ||
490 | }); | ||
491 | |||
476 | if (!mPrevFuture || mPrevFuture->isFinished()) { | 492 | if (!mPrevFuture || mPrevFuture->isFinished()) { |
477 | if (mPrevFuture && mPrevFuture->errorCode() != 0) { | 493 | if (mPrevFuture && mPrevFuture->errorCode() != 0) { |
478 | if (mErrorFunc) { | 494 | if (mErrorFunc) { |
479 | mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage()); | 495 | mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage()); |
480 | mResult->setFinished(); | 496 | mResult->setFinished(); |
497 | mIsFinished = true; | ||
481 | return; | 498 | return; |
482 | } else { | 499 | } else { |
483 | // Propagate the error to next caller | 500 | // Propagate the error to next caller |
484 | } | 501 | } |
485 | } | 502 | } |
503 | mIsRunning = true; | ||
486 | previousFutureReady(); | 504 | previousFutureReady(); |
487 | } else { | 505 | } else { |
488 | auto futureWatcher = new Async::FutureWatcher<PrevOut>(); | 506 | auto futureWatcher = new Async::FutureWatcher<PrevOut>(); |
@@ -500,6 +518,7 @@ void Executor<PrevOut, Out, In ...>::exec() | |||
500 | // Propagate the error to next caller | 518 | // Propagate the error to next caller |
501 | } | 519 | } |
502 | } | 520 | } |
521 | mIsRunning = true; | ||
503 | previousFutureReady(); | 522 | previousFutureReady(); |
504 | }); | 523 | }); |
505 | 524 | ||
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index c806478..ffe716b 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -154,51 +154,24 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId) | |||
154 | }); | 154 | }); |
155 | } | 155 | } |
156 | 156 | ||
157 | /* | ||
158 | * TODO JOBAPI: This is a workaround to be able to return a job below to | ||
159 | * may or may not already be finished when the job is started. The job API should provide a mechanism | ||
160 | * for this. Essentially we need a way to set a job finished externally (we use the finisher as handle for that). | ||
161 | * If the job is then started the continuation should immediately be executed if the job finished already, and otherwise | ||
162 | * just wait until the work is done, and then execute the continuation as usual. | ||
163 | */ | ||
164 | struct JobFinisher { | ||
165 | bool finished; | ||
166 | std::function<void(int error, const QString &errorMessage)> callback; | ||
167 | |||
168 | JobFinisher() : finished(false) {} | ||
169 | |||
170 | void setFinished(int error, const QString &errorMessage) { | ||
171 | finished = true; | ||
172 | if (callback) { | ||
173 | callback(error, errorMessage); | ||
174 | } | ||
175 | } | ||
176 | }; | ||
177 | |||
178 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 157 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) |
179 | { | 158 | { |
180 | auto finisher = QSharedPointer<JobFinisher>::create(); | 159 | return Async::start<void>([commandId, &fbb, this](Async::Future<void> &f) { |
181 | auto callback = [finisher] (int error, const QString &errorMessage) { | 160 | auto callback = [&f](int error, const QString &errorMessage) { |
182 | finisher->setFinished(error, errorMessage); | 161 | if (error) { |
183 | }; | 162 | f.setError(error, errorMessage); |
184 | if (isReady()) { | 163 | } else { |
185 | d->messageId++; | ||
186 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); | ||
187 | registerCallback(d->messageId, callback); | ||
188 | Commands::write(d->socket, d->messageId, commandId, fbb); | ||
189 | } else { | ||
190 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); | ||
191 | } | ||
192 | return Async::start<void>([this, finisher](Async::Future<void> &f) { | ||
193 | if (finisher->finished) { | ||
194 | f.setFinished(); | ||
195 | } else { | ||
196 | finisher->callback = [&f](int error, const QString &errorMessage) { | ||
197 | if (error) { | ||
198 | f.setError(error, errorMessage); | ||
199 | } | ||
200 | f.setFinished(); | 164 | f.setFinished(); |
201 | }; | 165 | } |
166 | }; | ||
167 | |||
168 | if (isReady()) { | ||
169 | d->messageId++; | ||
170 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); | ||
171 | registerCallback(d->messageId, callback); | ||
172 | Commands::write(d->socket, d->messageId, commandId, fbb); | ||
173 | } else { | ||
174 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); | ||
202 | } | 175 | } |
203 | }); | 176 | }); |
204 | } | 177 | } |