From c831909be94abfc24c64f482ce22ce508aed2147 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 16 Jan 2015 16:49:15 +0100 Subject: Asynchronously go through messagequeues --- dummyresource/resourcefactory.cpp | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index b43e4a3..01db782 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -138,12 +138,11 @@ private slots: }).exec(); } - Async::Job processPipeline() + //Process all messages of this queue + Async::Job processQueue(MessageQueue *queue) { - auto job = Async::start([this](Async::Future &future) { - //TODO process all queues in async for - auto queue = mCommandQueues.first(); - asyncWhile([&](std::function whileCallback) { + auto job = Async::start([this, queue](Async::Future &future) { + asyncWhile([&, queue](std::function whileCallback) { queue->dequeue([this, whileCallback](void *ptr, int size, std::function messageQueueCallback) { flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { @@ -186,10 +185,34 @@ private slots: }, [&future]() { //while complete future.setFinished(); - //Call async-for completion handler }); }); return job; + + } + + Async::Job processPipeline() + { + auto job = Async::start([this](Async::Future &future) { + //An async for loop. Go through all message queues + auto it = QSharedPointer >::create(mCommandQueues); + asyncWhile([&, it](std::function forCallback) { + if (it->hasNext()) { + auto queue = it->next(); + processQueue(queue).then([forCallback](Async::Future &future) { + forCallback(false); + future.setFinished(); + }).exec(); + } else { + forCallback(true); + } + }, + [&future]() { //while complete + future.setFinished(); + }); + + }); + return job; } private: -- cgit v1.2.3