From 9f3a6ff5d27e4983ee626231e43210d2bbb95dd6 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 14 Aug 2015 17:11:40 +0200 Subject: Almost working batch dequeues --- common/genericresource.cpp | 75 ++++++++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 35 deletions(-) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index a86b518..b3df389 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -77,6 +77,27 @@ private slots: return KAsync::null(); } + KAsync::Job processQueuedCommand(const QByteArray &data) + { + flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); + if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { + Warning() << "invalid buffer"; + return KAsync::error(1, "Invalid Buffer"); + } + auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); + const auto commandId = queuedCommand->commandId(); + Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); + return processQueuedCommand(queuedCommand).then( + [commandId]() { + Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); + }, + [](int errorCode, QString errorMessage) { + //FIXME propagate error, we didn't handle it + Warning() << "Error while processing queue command: " << errorMessage; + } + ); + } + //Process all messages of this queue KAsync::Job processQueue(MessageQueue *queue) { @@ -85,45 +106,29 @@ private slots: //KAsync::foreach("pass iterator here").parallel("process value here").join(); return KAsync::dowhile( [this, queue](KAsync::Future &future) { - if (queue->isEmpty()) { - future.setValue(false); - future.setFinished(); - return; - } - queue->dequeue( - [this, &future](void *ptr, int size, std::function messageQueueCallback) { - auto callback = [messageQueueCallback, &future](bool success) { - messageQueueCallback(true); - future.setValue(!success); - future.setFinished(); - }; - - flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); - if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { - Warning() << "invalid buffer"; - callback(false); - return; - } - auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); - const auto commandId = queuedCommand->commandId(); - Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); - processQueuedCommand(queuedCommand).then( - [callback, commandId]() { - Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); - callback(true); + queue->dequeueBatch(100, [this](void *ptr, int size, std::function messageQueueCallback) { + Trace() << "Got value"; + processQueuedCommand(QByteArray::fromRawData(static_cast(ptr), size)).then( + [&messageQueueCallback]() { + Trace() << "done"; + messageQueueCallback(true); }, - [callback](int errorCode, QString errorMessage) { - Warning() << "Error while processing queue command: " << errorMessage; - callback(false); + [&messageQueueCallback](int errorCode, QString errorMessage) { + //Use false? + //For now we use true to make sure we don't get stuck on messages we fail to process + messageQueueCallback(true); } ).exec(); - }, - [&future](const MessageQueue::Error &error) { - Warning() << "Error while getting message from messagequeue: " << error.message; - future.setValue(false); - future.setFinished(); } - ); + ).then([&future](){ + future.setValue(true); + future.setFinished(); + }, + [&future](int i, QString error) { + Warning() << "Error while getting message from messagequeue: " << error; + future.setValue(false); + future.setFinished(); + }).exec(); } ); } -- cgit v1.2.3