From fb4c36e3daded17746a0bcd7c245b8cea9782c1a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 1 Apr 2015 11:00:14 +0200 Subject: Use dowhile --- dummyresource/resourcefactory.cpp | 95 ++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 52 deletions(-) diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 4e79f4c..10c8eaf 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -123,16 +123,6 @@ signals: void error(int errorCode, const QString &errorMessage); private slots: - static void asyncWhile(const std::function)> &body, const std::function &completionHandler) { - body([body, completionHandler](bool complete) { - if (complete) { - completionHandler(); - } else { - asyncWhile(body, completionHandler); - } - }); - } - void process() { if (mProcessingLock) { @@ -165,55 +155,56 @@ private slots: //Process all messages of this queue Async::Job processQueue(MessageQueue *queue) { - auto job = Async::start([this, queue](Async::Future &future) { - asyncWhile([&, queue](std::function whileCallback) { - // auto job = Async::start(void *ptr, int size) - //TODO use something like: - //Async::foreach("pass iterator here").each("process value here").join(); - //Async::foreach("pass iterator here").parallel("process value here").join(); - queue->dequeue([this, whileCallback](void *ptr, int size, std::function messageQueueCallback) { - auto callback = [messageQueueCallback, whileCallback](bool success) { - messageQueueCallback(success); - whileCallback(!success); - }; - - flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); - if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { - qWarning() << "invalid buffer"; - callback(false); - return; - } - auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); - qDebug() << "Dequeued: " << queuedCommand->commandId(); - //TODO JOBAPI: job lifetime management - //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete - //themselves once done. In other cases we'd like jobs that only live as long as their handle though. - //FIXME this job is stack allocated and thus simply dies.... - //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management - processQueuedCommand(queuedCommand).then([callback]() { - callback(true); + //TODO use something like: + //Async::foreach("pass iterator here").each("process value here").join(); + //Async::foreach("pass iterator here").parallel("process value here").join(); + return Async::dowhile( + [this, queue](Async::Future &future) { + queue->dequeue( + [this, &future](void *ptr, int size, std::function messageQueueCallback) { + auto callback = [messageQueueCallback, &future](bool success) { + messageQueueCallback(success); + future.setValue(!success); + future.setFinished(); + }; + + flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); + if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { + qWarning() << "invalid buffer"; + callback(false); + return; + } + auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); + qDebug() << "Dequeued: " << queuedCommand->commandId(); + //TODO JOBAPI: job lifetime management + //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete + //themselves once done. In other cases we'd like jobs that only live as long as their handle though. + //FIXME this job is stack allocated and thus simply dies.... + processQueuedCommand(queuedCommand).then( + [callback]() { + callback(true); + }, + [callback](int errorCode, QString errorMessage) { + Warning() << errorMessage; + callback(false); + } + ).exec(); }, - [callback](int errorCode, QString errorMessage) { - Warning() << errorMessage; - callback(false); - }).exec().waitForFinished(); - }, - [whileCallback](const MessageQueue::Error &error) { - whileCallback(true); - }); - }, - [&future]() { //while complete - future.setFinished(); - }); - }); - return job; + [&future](const MessageQueue::Error &error) { + Warning() << error.message; + future.setValue(false); + future.setFinished(); + } + ); + } + ); } Async::Job processPipeline() { //Go through all message queues auto it = QSharedPointer >::create(mCommandQueues); - return Async::dowhile( + return Async::dowhile( [it]() { return it->hasNext(); }, [it, this](Async::Future &future) { auto queue = it->next(); -- cgit v1.2.3