From e19bad87f43caf602793d8297562804b17383f7d Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 17 Aug 2015 00:19:50 +0200 Subject: Transactions for messagequeue --- common/messagequeue.cpp | 123 +++++++++++++++++++++++++-------------------- common/messagequeue.h | 12 +++++ tests/messagequeuetest.cpp | 48 ++++++++++++++++++ 3 files changed, 128 insertions(+), 55 deletions(-) diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 2a046d1..ab4b1cf 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -39,7 +39,10 @@ static KAsync::Job waitForCompletion(QList > &futures MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) { +} +MessageQueue::~MessageQueue() +{ } void MessageQueue::enqueue(void const *msg, size_t size) @@ -47,71 +50,88 @@ void MessageQueue::enqueue(void const *msg, size_t size) enqueue(QByteArray::fromRawData(static_cast(msg), size)); } +void MessageQueue::startTransaction() +{ + if (mWriteTransaction) { + return; + } + processRemovals(); + mWriteTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); +} + +void MessageQueue::commit() +{ + mWriteTransaction.commit(); + mWriteTransaction = Akonadi2::Storage::Transaction(); + processRemovals(); + emit messageReady(); +} + void MessageQueue::enqueue(const QByteArray &value) { - auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); - const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1; + bool implicitTransaction = false; + if (!mWriteTransaction) { + implicitTransaction = true; + startTransaction(); + } + const qint64 revision = Akonadi2::Storage::maxRevision(mWriteTransaction) + 1; const QByteArray key = QString("%1").arg(revision).toUtf8(); - transaction.write(key, value); - Akonadi2::Storage::setMaxRevision(transaction, revision); + mWriteTransaction.write(key, value); + Akonadi2::Storage::setMaxRevision(mWriteTransaction, revision); + if (implicitTransaction) { + commit(); + } +} + +void MessageQueue::processRemovals() +{ + if (mWriteTransaction) { + return; + } + auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); + for (const auto &key : mPendingRemoval) { + transaction.remove(key); + } transaction.commit(); - emit messageReady(); + mPendingRemoval.clear(); } void MessageQueue::dequeue(const std::function)> &resultHandler, const std::function &errorHandler) { - bool readValue = false; - mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - readValue = true; - //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) - const auto keyCopy = QByteArray(key.constData(), key.size()); - resultHandler(const_cast(static_cast(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { - if (success) { - mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Error while removing value" << error.message << keyCopy; - //Don't call the errorhandler in here, we already called the result handler - }); - if (isEmpty()) { - emit this->drained(); - } - } else { - //TODO re-enqueue? - } + dequeueBatch(1, [resultHandler](const QByteArray &value) { + return KAsync::start([&value,resultHandler](KAsync::Future &future) { + resultHandler(const_cast(static_cast(value.data())), value.size(), [&future](bool success){ + future.setFinished(); + }); }); - return false; - }, - [errorHandler](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Error while retrieving value" << error.message; - errorHandler(Error(error.store, error.code, error.message)); - } - ); - if (!readValue) { - errorHandler(Error("messagequeue", -1, "No message found")); - } + }).then([](){}, + [errorHandler](int error, const QString &errorString) { + errorHandler(Error("messagequeue", error, errorString.toLatin1())); + }).exec(); } KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler) { + Trace() << "Dequeue batch"; auto resultCount = QSharedPointer::create(0); - auto keyList = QSharedPointer::create(); - return KAsync::start([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future &future) { + return KAsync::start([this, maxBatchSize, resultHandler, resultCount](KAsync::Future &future) { int count = 0; QList > waitCondition; - mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { - if (Akonadi2::Storage::isInternalKey(key)) { + mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { + if (Akonadi2::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { return true; } + *resultCount += 1; + Trace() << "Dequeue value"; //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) - keyList->append(QByteArray(key.constData(), key.size())); + mPendingRemoval << QByteArray(key.constData(), key.size()); waitCondition << resultHandler(value).exec(); count++; - if (count <= maxBatchSize) { + Trace() << count << maxBatchSize; + if (count < maxBatchSize) { return true; } return false; @@ -121,17 +141,10 @@ KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::functi // errorHandler(Error(error.store, error.code, error.message)); }); - ::waitForCompletion(waitCondition).then([this, keyList, &future]() { - Trace() << "Dequeue complete, removing values " << *keyList; - auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); - for (const auto &key : *keyList) { - transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Error while removing value" << error.message << key; - //Don't call the errorhandler in here, we already called the result handler - }); - } - transaction.commit(); - if (keyList->isEmpty()) { + // Trace() << "Waiting on " << waitCondition.size() << " results"; + ::waitForCompletion(waitCondition).then([this, resultCount, &future]() { + processRemovals(); + if (*resultCount == 0) { future.setError(-1, "No message found"); future.setFinished(); } else { @@ -147,15 +160,15 @@ KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::functi bool MessageQueue::isEmpty() { int count = 0; - mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool { - if (!Akonadi2::Storage::isInternalKey(key)) { + mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { + if (!Akonadi2::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { count++; return false; } return true; }, [](const Akonadi2::Storage::Error &error) { - qDebug() << "Error while checking if empty" << error.message; + ErrorMsg() << "Error while checking if empty" << error.message; }); return count == 0; } diff --git a/common/messagequeue.h b/common/messagequeue.h index 8ea8d8b..b6c2614 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -25,7 +26,9 @@ public: }; MessageQueue(const QString &storageRoot, const QString &name); + ~MessageQueue(); + void startTransaction(); void enqueue(void const *msg, size_t size); void enqueue(const QByteArray &value); //Dequeue a message. This will return a new message everytime called. @@ -35,11 +38,20 @@ public: const std::function &errorHandler); KAsync::Job dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler); bool isEmpty(); + +public slots: + void commit(); + signals: void messageReady(); void drained(); +private slots: + void processRemovals(); + private: Q_DISABLE_COPY(MessageQueue); Akonadi2::Storage mStorage; + Akonadi2::Storage::Transaction mWriteTransaction; + QByteArrayList mPendingRemoval; }; diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp index d5c47f5..9c2aa16 100644 --- a/tests/messagequeuetest.cpp +++ b/tests/messagequeuetest.cpp @@ -6,6 +6,7 @@ #include "clientapi.h" #include "storage.h" #include "messagequeue.h" +#include "log.h" class MessageQueueTest : public QObject { @@ -13,6 +14,7 @@ class MessageQueueTest : public QObject private Q_SLOTS: void initTestCase() { + Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue", Akonadi2::Storage::ReadWrite); store.removeFromDisk(); } @@ -50,6 +52,14 @@ private Q_SLOTS: QVERIFY(gotError); } + void testEnqueue() + { + MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); + QSignalSpy spy(&queue, SIGNAL(messageReady())); + queue.enqueue("value1"); + QCOMPARE(spy.size(), 1); + } + void testDrained() { MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); @@ -152,6 +162,44 @@ private Q_SLOTS: QVERIFY(!gotError); } + void testBatchDequeue() + { + MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); + queue.enqueue("value1"); + queue.enqueue("value2"); + queue.enqueue("value3"); + + int count = 0; + queue.dequeueBatch(2, [&count](const QByteArray &data) { + count++; + return KAsync::null(); + }).exec().waitForFinished(); + QCOMPARE(count, 2); + + queue.dequeueBatch(1, [&count](const QByteArray &data) { + count++; + return KAsync::null(); + }).exec().waitForFinished(); + QCOMPARE(count, 3); + } + + void testBatchEnqueue() + { + MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); + QSignalSpy spy(&queue, SIGNAL(messageReady())); + queue.startTransaction(); + queue.enqueue("value1"); + queue.enqueue("value2"); + queue.enqueue("value3"); + + QVERIFY(queue.isEmpty()); + QCOMPARE(spy.count(), 0); + + queue.commit(); + + QVERIFY(!queue.isEmpty()); + QCOMPARE(spy.count(), 1); + } }; -- cgit v1.2.3