From 9f3a6ff5d27e4983ee626231e43210d2bbb95dd6 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 14 Aug 2015 17:11:40 +0200 Subject: Almost working batch dequeues --- common/genericresource.cpp | 75 ++++++++++++++++++++++++---------------------- common/messagequeue.cpp | 50 +++++++++++++++++++++++++++++++ common/messagequeue.h | 2 ++ 3 files changed, 92 insertions(+), 35 deletions(-) diff --git a/common/genericresource.cpp b/common/genericresource.cpp index a86b518..b3df389 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -77,6 +77,27 @@ private slots: return KAsync::null(); } + KAsync::Job processQueuedCommand(const QByteArray &data) + { + flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); + if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { + Warning() << "invalid buffer"; + return KAsync::error(1, "Invalid Buffer"); + } + auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); + const auto commandId = queuedCommand->commandId(); + Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); + return processQueuedCommand(queuedCommand).then( + [commandId]() { + Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); + }, + [](int errorCode, QString errorMessage) { + //FIXME propagate error, we didn't handle it + Warning() << "Error while processing queue command: " << errorMessage; + } + ); + } + //Process all messages of this queue KAsync::Job processQueue(MessageQueue *queue) { @@ -85,45 +106,29 @@ private slots: //KAsync::foreach("pass iterator here").parallel("process value here").join(); return KAsync::dowhile( [this, queue](KAsync::Future &future) { - if (queue->isEmpty()) { - future.setValue(false); - future.setFinished(); - return; - } - queue->dequeue( - [this, &future](void *ptr, int size, std::function messageQueueCallback) { - auto callback = [messageQueueCallback, &future](bool success) { - messageQueueCallback(true); - future.setValue(!success); - future.setFinished(); - }; - - flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); - if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { - Warning() << "invalid buffer"; - callback(false); - return; - } - auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); - const auto commandId = queuedCommand->commandId(); - Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); - processQueuedCommand(queuedCommand).then( - [callback, commandId]() { - Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); - callback(true); + queue->dequeueBatch(100, [this](void *ptr, int size, std::function messageQueueCallback) { + Trace() << "Got value"; + processQueuedCommand(QByteArray::fromRawData(static_cast(ptr), size)).then( + [&messageQueueCallback]() { + Trace() << "done"; + messageQueueCallback(true); }, - [callback](int errorCode, QString errorMessage) { - Warning() << "Error while processing queue command: " << errorMessage; - callback(false); + [&messageQueueCallback](int errorCode, QString errorMessage) { + //Use false? + //For now we use true to make sure we don't get stuck on messages we fail to process + messageQueueCallback(true); } ).exec(); - }, - [&future](const MessageQueue::Error &error) { - Warning() << "Error while getting message from messagequeue: " << error.message; - future.setValue(false); - future.setFinished(); } - ); + ).then([&future](){ + future.setValue(true); + future.setFinished(); + }, + [&future](int i, QString error) { + Warning() << "Error while getting message from messagequeue: " << error; + future.setValue(false); + future.setFinished(); + }).exec(); } ); } diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 84385ca..598c63a 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -61,6 +61,56 @@ void MessageQueue::dequeue(const std::function MessageQueue::dequeueBatch(int maxBatchSize, const std::function)> &resultHandler) +{ + auto resultCount = QSharedPointer::create(0); + auto keyList = QSharedPointer::create(); + return KAsync::start([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future &future) { + bool readValue = false; + int count = 0; + mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + readValue = true; + //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) + keyList->append(QByteArray(key.constData(), key.size())); + resultHandler(const_cast(static_cast(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { + *resultCount += 1; + //We're done + //FIXME the check below should only be done once we finished reading + if (*resultCount >= count) { + //FIXME do this from the caller thread + auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); + for (const auto &key : *keyList) { + transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Error while removing value" << error.message << key; + //Don't call the errorhandler in here, we already called the result handler + }); + } + if (isEmpty()) { + emit this->drained(); + } + } + }); + count++; + if (count <= maxBatchSize) { + return true; + } + return false; + }, + [](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Error while retrieving value" << error.message; + // errorHandler(Error(error.store, error.code, error.message)); + } + ); + if (!readValue) { + future.setError(-1, "No message found"); + future.setFinished(); + } + }); +} + bool MessageQueue::isEmpty() { int count = 0; diff --git a/common/messagequeue.h b/common/messagequeue.h index 3393394..c5e32db 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "storage.h" /** @@ -32,6 +33,7 @@ public: //TODO track processing progress to avoid processing the same message with the same preprocessor twice? void dequeue(const std::function)> & resultHandler, const std::function &errorHandler); + KAsync::Job dequeueBatch(int maxBatchSize, const std::function)> & resultHandler); bool isEmpty(); signals: void messageReady(); -- cgit v1.2.3