From 9f3a6ff5d27e4983ee626231e43210d2bbb95dd6 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 14 Aug 2015 17:11:40 +0200 Subject: Almost working batch dequeues --- common/messagequeue.cpp | 50 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) (limited to 'common/messagequeue.cpp') diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 84385ca..598c63a 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -61,6 +61,56 @@ void MessageQueue::dequeue(const std::function MessageQueue::dequeueBatch(int maxBatchSize, const std::function)> &resultHandler) +{ + auto resultCount = QSharedPointer::create(0); + auto keyList = QSharedPointer::create(); + return KAsync::start([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future &future) { + bool readValue = false; + int count = 0; + mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + readValue = true; + //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) + keyList->append(QByteArray(key.constData(), key.size())); + resultHandler(const_cast(static_cast(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { + *resultCount += 1; + //We're done + //FIXME the check below should only be done once we finished reading + if (*resultCount >= count) { + //FIXME do this from the caller thread + auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); + for (const auto &key : *keyList) { + transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Error while removing value" << error.message << key; + //Don't call the errorhandler in here, we already called the result handler + }); + } + if (isEmpty()) { + emit this->drained(); + } + } + }); + count++; + if (count <= maxBatchSize) { + return true; + } + return false; + }, + [](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Error while retrieving value" << error.message; + // errorHandler(Error(error.store, error.code, error.message)); + } + ); + if (!readValue) { + future.setError(-1, "No message found"); + future.setFinished(); + } + }); +} + bool MessageQueue::isEmpty() { int count = 0; -- cgit v1.2.3