From 726a49431909dbd8bdc6efdb8a36ddf4214a7328 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 16 Aug 2015 12:45:17 +0200 Subject: Batch dequeue in messagequeue. The messagequeue removes all dequeued values once all values have been processed in a single transaction. --- common/genericresource.cpp | 14 ++------ common/messagequeue.cpp | 89 +++++++++++++++++++++++++++++++--------------- common/messagequeue.h | 2 +- 3 files changed, 64 insertions(+), 41 deletions(-) (limited to 'common') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index b3df389..bbd992b 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -106,19 +106,9 @@ private slots: //KAsync::foreach("pass iterator here").parallel("process value here").join(); return KAsync::dowhile( [this, queue](KAsync::Future &future) { - queue->dequeueBatch(100, [this](void *ptr, int size, std::function messageQueueCallback) { + queue->dequeueBatch(100, [this](const QByteArray &data) { Trace() << "Got value"; - processQueuedCommand(QByteArray::fromRawData(static_cast(ptr), size)).then( - [&messageQueueCallback]() { - Trace() << "done"; - messageQueueCallback(true); - }, - [&messageQueueCallback](int errorCode, QString errorMessage) { - //Use false? - //For now we use true to make sure we don't get stuck on messages we fail to process - messageQueueCallback(true); - } - ).exec(); + return processQueuedCommand(data); } ).then([&future](){ future.setValue(true); diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 598c63a..3b5ca2b 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -3,6 +3,39 @@ #include #include +static KAsync::Job waitForCompletion(QList > &futures) +{ + auto context = new QObject; + return KAsync::start([futures, context](KAsync::Future &future) { + const auto total = futures.size(); + auto count = QSharedPointer::create(); + int i = 0; + for (KAsync::Future subFuture : futures) { + i++; + if (subFuture.isFinished()) { + *count += 1; + continue; + } + //FIXME bind lifetime all watcher to future (repectively the main job + auto watcher = QSharedPointer >::create(); + QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, + [count, total, &future](){ + *count += 1; + if (*count == total) { + future.setFinished(); + } + }); + watcher->setFuture(subFuture); + context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); + } + if (*count == total) { + future.setFinished(); + } + }).then([context]() { + delete context; + }); +} + MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) { @@ -61,38 +94,22 @@ void MessageQueue::dequeue(const std::function MessageQueue::dequeueBatch(int maxBatchSize, const std::function)> &resultHandler) +KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler) { auto resultCount = QSharedPointer::create(0); auto keyList = QSharedPointer::create(); return KAsync::start([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future &future) { - bool readValue = false; int count = 0; - mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { + QList > waitCondition; + mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { if (Akonadi2::Storage::isInternalKey(key)) { return true; } - readValue = true; //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) keyList->append(QByteArray(key.constData(), key.size())); - resultHandler(const_cast(static_cast(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { - *resultCount += 1; - //We're done - //FIXME the check below should only be done once we finished reading - if (*resultCount >= count) { - //FIXME do this from the caller thread - auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); - for (const auto &key : *keyList) { - transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Error while removing value" << error.message << key; - //Don't call the errorhandler in here, we already called the result handler - }); - } - if (isEmpty()) { - emit this->drained(); - } - } - }); + + waitCondition << resultHandler(value).exec(); + count++; if (count <= maxBatchSize) { return true; @@ -102,12 +119,28 @@ KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::functi [](const Akonadi2::Storage::Error &error) { ErrorMsg() << "Error while retrieving value" << error.message; // errorHandler(Error(error.store, error.code, error.message)); - } - ); - if (!readValue) { - future.setError(-1, "No message found"); - future.setFinished(); - } + }); + + ::waitForCompletion(waitCondition).then([this, keyList, &future]() { + Trace() << "Dequeue complete, removing values " << *keyList; + auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); + for (const auto &key : *keyList) { + transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Error while removing value" << error.message << key; + //Don't call the errorhandler in here, we already called the result handler + }); + } + transaction.commit(); + if (keyList->isEmpty()) { + future.setError(-1, "No message found"); + future.setFinished(); + } else { + if (isEmpty()) { + emit this->drained(); + } + future.setFinished(); + } + }).exec(); }); } diff --git a/common/messagequeue.h b/common/messagequeue.h index c5e32db..8ea8d8b 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -33,7 +33,7 @@ public: //TODO track processing progress to avoid processing the same message with the same preprocessor twice? void dequeue(const std::function)> & resultHandler, const std::function &errorHandler); - KAsync::Job dequeueBatch(int maxBatchSize, const std::function)> & resultHandler); + KAsync::Job dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler); bool isEmpty(); signals: void messageReady(); -- cgit v1.2.3