summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Vrátil <dvratil@redhat.com>2015-02-21 12:11:42 +0100
committerDan Vrátil <dvratil@redhat.com>2015-02-21 12:11:44 +0100
commit3b8ebe6d4235f5ba12bc9c9854a6dd28cbff06b5 (patch)
tree4e2a987e6c62523994965789387cace578e4c8d8
parent76ec0cfe075e3af758657f9aecab7d7ce7e8d387 (diff)
downloadsink-3b8ebe6d4235f5ba12bc9c9854a6dd28cbff06b5.tar.gz
sink-3b8ebe6d4235f5ba12bc9c9854a6dd28cbff06b5.zip
Async: allow appending Jobs to already running or finished Jobs
When user gets a Job (from a method call for instance), which is already running or might have even finished already, they can still append a new Job to the chain and re-execute it. The Job will internally chain up to the last finished Job, use it's result and continue from the next Job in the chain. If a Job in the chain is still running, it will wait for it to finish and pass the result to the next Job in the chain.
-rw-r--r--async/autotests/asynctest.cpp72
-rw-r--r--async/src/async.cpp2
-rw-r--r--async/src/async.h21
-rw-r--r--common/resourceaccess.cpp57
4 files changed, 109 insertions, 43 deletions
diff --git a/async/autotests/asynctest.cpp b/async/autotests/asynctest.cpp
index 73026bb..7437608 100644
--- a/async/autotests/asynctest.cpp
+++ b/async/autotests/asynctest.cpp
@@ -59,6 +59,10 @@ private Q_SLOTS:
59 59
60 void testErrorHandler(); 60 void testErrorHandler();
61 61
62
63 void testChainingRunningJob();
64 void testChainingFinishedJob();
65
62 void benchmarkSyncThenExecutor(); 66 void benchmarkSyncThenExecutor();
63 67
64private: 68private:
@@ -399,6 +403,74 @@ void AsyncTest::testErrorHandler()
399 403
400 404
401 405
406void AsyncTest::testChainingRunningJob()
407{
408 int check = 0;
409
410 auto job = Async::start<int>(
411 [&check](Async::Future<int> &future) {
412 QTimer *timer = new QTimer();
413 QObject::connect(timer, &QTimer::timeout,
414 [&future, &check]() {
415 ++check;
416 future.setValue(42);
417 future.setFinished();
418 });
419 QObject::connect(timer, &QTimer::timeout,
420 timer, &QObject::deleteLater);
421 timer->setSingleShot(true);
422 timer->start(500);
423 });
424
425 auto future1 = job.exec();
426 QTest::qWait(200);
427
428 auto job2 = job.then<int, int>(
429 [&check](int in) -> int {
430 ++check;
431 return in * 2;
432 });
433
434 auto future2 = job2.exec();
435 QVERIFY(!future1.isFinished());
436 future2.waitForFinished();
437
438 QCOMPARE(check, 2);
439 QVERIFY(future1.isFinished());
440 QVERIFY(future2.isFinished());
441 QCOMPARE(future1.value(), 42);
442 QCOMPARE(future2.value(), 84);
443}
444
445void AsyncTest::testChainingFinishedJob()
446{
447 int check = 0;
448
449 auto job = Async::start<int>(
450 [&check]() -> int {
451 ++check;
452 return 42;
453 });
454
455 auto future1 = job.exec();
456 QVERIFY(future1.isFinished());
457
458 auto job2 = job.then<int, int>(
459 [&check](int in) -> int {
460 ++check;
461 return in * 2;
462 });
463
464 auto future2 = job2.exec();
465 QVERIFY(future2.isFinished());
466
467 QCOMPARE(check, 2);
468 QCOMPARE(future1.value(), 42);
469 QCOMPARE(future2.value(), 84);
470}
471
472
473
402 474
403 475
404void AsyncTest::benchmarkSyncThenExecutor() 476void AsyncTest::benchmarkSyncThenExecutor()
diff --git a/async/src/async.cpp b/async/src/async.cpp
index c9fedc7..6eefd1b 100644
--- a/async/src/async.cpp
+++ b/async/src/async.cpp
@@ -27,6 +27,8 @@ using namespace Async;
27Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent) 27Private::ExecutorBase::ExecutorBase(const ExecutorBasePtr &parent)
28 : mPrev(parent) 28 : mPrev(parent)
29 , mResult(0) 29 , mResult(0)
30 , mIsRunning(false)
31 , mIsFinished(false)
30{ 32{
31} 33}
32 34
diff --git a/async/src/async.h b/async/src/async.h
index d21caf8..8296fbc 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -90,6 +90,8 @@ protected:
90 90
91 ExecutorBasePtr mPrev; 91 ExecutorBasePtr mPrev;
92 FutureBase *mResult; 92 FutureBase *mResult;
93 bool mIsRunning;
94 bool mIsFinished;
93}; 95};
94 96
95template<typename PrevOut, typename Out, typename ... In> 97template<typename PrevOut, typename Out, typename ... In>
@@ -470,19 +472,35 @@ Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup()
470template<typename PrevOut, typename Out, typename ... In> 472template<typename PrevOut, typename Out, typename ... In>
471void Executor<PrevOut, Out, In ...>::exec() 473void Executor<PrevOut, Out, In ...>::exec()
472{ 474{
473 mPrevFuture = chainup(); 475 // Don't chain up to job that already is running (or is finished)
476 if (mPrev && !mPrev->mIsRunning & !mPrev->mIsFinished) {
477 mPrevFuture = chainup();
478 } else if (mPrev && !mPrevFuture) {
479 // If previous job is running or finished, just get it's future
480 mPrevFuture = static_cast<Async::Future<PrevOut>*>(mPrev->result());
481 }
482
474 // Initialize our future 483 // Initialize our future
475 mResult = new Async::Future<Out>(); 484 mResult = new Async::Future<Out>();
485 auto fw = new Async::FutureWatcher<Out>();
486 QObject::connect(fw, &Async::FutureWatcher<Out>::futureReady,
487 [&]() {
488 mIsFinished = true;
489 fw->deleteLater();
490 });
491
476 if (!mPrevFuture || mPrevFuture->isFinished()) { 492 if (!mPrevFuture || mPrevFuture->isFinished()) {
477 if (mPrevFuture && mPrevFuture->errorCode() != 0) { 493 if (mPrevFuture && mPrevFuture->errorCode() != 0) {
478 if (mErrorFunc) { 494 if (mErrorFunc) {
479 mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage()); 495 mErrorFunc(mPrevFuture->errorCode(), mPrevFuture->errorMessage());
480 mResult->setFinished(); 496 mResult->setFinished();
497 mIsFinished = true;
481 return; 498 return;
482 } else { 499 } else {
483 // Propagate the error to next caller 500 // Propagate the error to next caller
484 } 501 }
485 } 502 }
503 mIsRunning = true;
486 previousFutureReady(); 504 previousFutureReady();
487 } else { 505 } else {
488 auto futureWatcher = new Async::FutureWatcher<PrevOut>(); 506 auto futureWatcher = new Async::FutureWatcher<PrevOut>();
@@ -500,6 +518,7 @@ void Executor<PrevOut, Out, In ...>::exec()
500 // Propagate the error to next caller 518 // Propagate the error to next caller
501 } 519 }
502 } 520 }
521 mIsRunning = true;
503 previousFutureReady(); 522 previousFutureReady();
504 }); 523 });
505 524
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index c806478..ffe716b 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -154,51 +154,24 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId)
154 }); 154 });
155} 155}
156 156
157/*
158 * TODO JOBAPI: This is a workaround to be able to return a job below to
159 * may or may not already be finished when the job is started. The job API should provide a mechanism
160 * for this. Essentially we need a way to set a job finished externally (we use the finisher as handle for that).
161 * If the job is then started the continuation should immediately be executed if the job finished already, and otherwise
162 * just wait until the work is done, and then execute the continuation as usual.
163 */
164struct JobFinisher {
165 bool finished;
166 std::function<void(int error, const QString &errorMessage)> callback;
167
168 JobFinisher() : finished(false) {}
169
170 void setFinished(int error, const QString &errorMessage) {
171 finished = true;
172 if (callback) {
173 callback(error, errorMessage);
174 }
175 }
176};
177
178Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 157Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
179{ 158{
180 auto finisher = QSharedPointer<JobFinisher>::create(); 159 return Async::start<void>([commandId, &fbb, this](Async::Future<void> &f) {
181 auto callback = [finisher] (int error, const QString &errorMessage) { 160 auto callback = [&f](int error, const QString &errorMessage) {
182 finisher->setFinished(error, errorMessage); 161 if (error) {
183 }; 162 f.setError(error, errorMessage);
184 if (isReady()) { 163 } else {
185 d->messageId++;
186 log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId));
187 registerCallback(d->messageId, callback);
188 Commands::write(d->socket, d->messageId, commandId, fbb);
189 } else {
190 d->commandQueue << new QueuedCommand(commandId, fbb, callback);
191 }
192 return Async::start<void>([this, finisher](Async::Future<void> &f) {
193 if (finisher->finished) {
194 f.setFinished();
195 } else {
196 finisher->callback = [&f](int error, const QString &errorMessage) {
197 if (error) {
198 f.setError(error, errorMessage);
199 }
200 f.setFinished(); 164 f.setFinished();
201 }; 165 }
166 };
167
168 if (isReady()) {
169 d->messageId++;
170 log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId));
171 registerCallback(d->messageId, callback);
172 Commands::write(d->socket, d->messageId, commandId, fbb);
173 } else {
174 d->commandQueue << new QueuedCommand(commandId, fbb, callback);
202 } 175 }
203 }); 176 });
204} 177}