From a9dc9ed667f06fa1828773d1bb8671ec2731dce5 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 11 Aug 2015 10:30:10 +0200 Subject: Fixed messagequeue --- common/messagequeue.cpp | 21 +++++++---- common/messagequeue.h | 2 +- tests/messagequeuetest.cpp | 90 +++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 97 insertions(+), 16 deletions(-) diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index a92d6be..ecc4d1a 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -11,29 +11,38 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) void MessageQueue::enqueue(void const *msg, size_t size) { - auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite); - const qint64 revision = mStorage.maxRevision() + 1; + auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); + const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1; const QByteArray key = QString("%1").arg(revision).toUtf8(); transaction.write(key, QByteArray::fromRawData(static_cast(msg), size)); Akonadi2::Storage::setMaxRevision(transaction, revision); + transaction.commit(); emit messageReady(); } +void MessageQueue::enqueue(const QByteArray &value) +{ + enqueue(value.data(), value.size()); +} + void MessageQueue::dequeue(const std::function)> &resultHandler, const std::function &errorHandler) { bool readValue = false; - mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); - mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { + auto readTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadOnly)); + readTransaction.scan("", [this, resultHandler, errorHandler, &readValue, &readTransaction](const QByteArray &key, const QByteArray &value) -> bool { if (Akonadi2::Storage::isInternalKey(key)) { return true; } readValue = true; //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) const auto keyCopy = QByteArray(key.constData(), key.size()); - resultHandler(const_cast(static_cast(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { + //TODO The value copy and the early transaction abort is necessary because we don't support parallel read-transactions yet (in case of a synchronous callback) + const auto valueCopy = QByteArray(value.constData(), value.size()); + readTransaction.abort(); + resultHandler(const_cast(static_cast(valueCopy.data())), valueCopy.size(), [this, keyCopy, errorHandler](bool success) { if (success) { - mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { + mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { ErrorMsg() << "Error while removing value" << error.message << keyCopy; //Don't call the errorhandler in here, we already called the result handler }); diff --git a/common/messagequeue.h b/common/messagequeue.h index ffc1ff2..3393394 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -26,6 +26,7 @@ public: MessageQueue(const QString &storageRoot, const QString &name); void enqueue(void const *msg, size_t size); + void enqueue(const QByteArray &value); //Dequeue a message. This will return a new message everytime called. //Call the result handler with a success response to remove the message from the store. //TODO track processing progress to avoid processing the same message with the same preprocessor twice? @@ -39,5 +40,4 @@ signals: private: Q_DISABLE_COPY(MessageQueue); Akonadi2::Storage mStorage; - Akonadi2::Storage::Transaction mTransaction; }; diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp index c43b192..d5c47f5 100644 --- a/tests/messagequeuetest.cpp +++ b/tests/messagequeuetest.cpp @@ -31,12 +31,38 @@ private Q_SLOTS: { MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); QVERIFY(queue.isEmpty()); - QByteArray value("value"); - queue.enqueue(value.data(), value.size()); + queue.enqueue("value"); QVERIFY(!queue.isEmpty()); } - void testQueue() + void testDequeueEmpty() + { + MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); + bool gotValue = false; + bool gotError = false; + queue.dequeue([&](void *ptr, int size, std::function callback) { + gotValue = true; + }, + [&](const MessageQueue::Error &error) { + gotError = true; + }); + QVERIFY(!gotValue); + QVERIFY(gotError); + } + + void testDrained() + { + MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); + QSignalSpy spy(&queue, SIGNAL(drained())); + queue.enqueue("value1"); + + queue.dequeue([](void *ptr, int size, std::function callback) { + callback(true); + }, [](const MessageQueue::Error &error) {}); + QCOMPARE(spy.size(), 1); + } + + void testSyncDequeue() { QQueue values; values << "value1"; @@ -44,10 +70,11 @@ private Q_SLOTS: MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); for (const QByteArray &value : values) { - queue.enqueue(value.data(), value.size()); + queue.enqueue(value); } while (!queue.isEmpty()) { + Log() << "start"; const auto expected = values.dequeue(); bool gotValue = false; bool gotError = false; @@ -66,21 +93,66 @@ private Q_SLOTS: QVERIFY(values.isEmpty()); } - void testDequeueEmpty() + void testAsyncDequeue() { + QQueue values; + values << "value1"; + values << "value2"; + MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); - bool gotValue = false; + for (const QByteArray &value : values) { + queue.enqueue(value); + } + + while (!queue.isEmpty()) { + QEventLoop eventLoop; + const auto expected = values.dequeue(); + bool gotValue = false; + bool gotError = false; + + queue.dequeue([&](void *ptr, int size, std::function callback) { + if (QByteArray(static_cast(ptr), size) == expected) { + gotValue = true; + } + auto timer = new QTimer(); + timer->setSingleShot(true); + QObject::connect(timer, &QTimer::timeout, [timer, callback, &eventLoop]() { + delete timer; + callback(true); + eventLoop.exit(); + }); + timer->start(0); + }, + [&](const MessageQueue::Error &error) { + gotError = true; + }); + eventLoop.exec(); + QVERIFY(gotValue); + QVERIFY(!gotError); + } + QVERIFY(values.isEmpty()); + } + + /* + * Dequeue's are async and we want to be able to enqueue new items in between. + */ + void testNestedEnqueue() + { + MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); + queue.enqueue("value1"); + bool gotError = false; queue.dequeue([&](void *ptr, int size, std::function callback) { - gotValue = true; + queue.enqueue("value3"); + callback(true); }, [&](const MessageQueue::Error &error) { gotError = true; }); - QVERIFY(!gotValue); - QVERIFY(gotError); + QVERIFY(!gotError); } + }; QTEST_MAIN(MessageQueueTest) -- cgit v1.2.3