summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/messagequeue.cpp123
-rw-r--r--common/messagequeue.h12
-rw-r--r--tests/messagequeuetest.cpp48
3 files changed, 128 insertions, 55 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 2a046d1..ab4b1cf 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -39,7 +39,10 @@ static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures
39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) 39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
40 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) 40 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
41{ 41{
42}
42 43
44MessageQueue::~MessageQueue()
45{
43} 46}
44 47
45void MessageQueue::enqueue(void const *msg, size_t size) 48void MessageQueue::enqueue(void const *msg, size_t size)
@@ -47,71 +50,88 @@ void MessageQueue::enqueue(void const *msg, size_t size)
47 enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size)); 50 enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size));
48} 51}
49 52
53void MessageQueue::startTransaction()
54{
55 if (mWriteTransaction) {
56 return;
57 }
58 processRemovals();
59 mWriteTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
60}
61
62void MessageQueue::commit()
63{
64 mWriteTransaction.commit();
65 mWriteTransaction = Akonadi2::Storage::Transaction();
66 processRemovals();
67 emit messageReady();
68}
69
50void MessageQueue::enqueue(const QByteArray &value) 70void MessageQueue::enqueue(const QByteArray &value)
51{ 71{
52 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); 72 bool implicitTransaction = false;
53 const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1; 73 if (!mWriteTransaction) {
74 implicitTransaction = true;
75 startTransaction();
76 }
77 const qint64 revision = Akonadi2::Storage::maxRevision(mWriteTransaction) + 1;
54 const QByteArray key = QString("%1").arg(revision).toUtf8(); 78 const QByteArray key = QString("%1").arg(revision).toUtf8();
55 transaction.write(key, value); 79 mWriteTransaction.write(key, value);
56 Akonadi2::Storage::setMaxRevision(transaction, revision); 80 Akonadi2::Storage::setMaxRevision(mWriteTransaction, revision);
81 if (implicitTransaction) {
82 commit();
83 }
84}
85
86void MessageQueue::processRemovals()
87{
88 if (mWriteTransaction) {
89 return;
90 }
91 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
92 for (const auto &key : mPendingRemoval) {
93 transaction.remove(key);
94 }
57 transaction.commit(); 95 transaction.commit();
58 emit messageReady(); 96 mPendingRemoval.clear();
59} 97}
60 98
61void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, 99void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
62 const std::function<void(const Error &error)> &errorHandler) 100 const std::function<void(const Error &error)> &errorHandler)
63{ 101{
64 bool readValue = false; 102 dequeueBatch(1, [resultHandler](const QByteArray &value) {
65 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { 103 return KAsync::start<void>([&value,resultHandler](KAsync::Future<void> &future) {
66 if (Akonadi2::Storage::isInternalKey(key)) { 104 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [&future](bool success){
67 return true; 105 future.setFinished();
68 } 106 });
69 readValue = true;
70 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
71 const auto keyCopy = QByteArray(key.constData(), key.size());
72 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) {
73 if (success) {
74 mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) {
75 ErrorMsg() << "Error while removing value" << error.message << keyCopy;
76 //Don't call the errorhandler in here, we already called the result handler
77 });
78 if (isEmpty()) {
79 emit this->drained();
80 }
81 } else {
82 //TODO re-enqueue?
83 }
84 }); 107 });
85 return false; 108 }).then<void>([](){},
86 }, 109 [errorHandler](int error, const QString &errorString) {
87 [errorHandler](const Akonadi2::Storage::Error &error) { 110 errorHandler(Error("messagequeue", error, errorString.toLatin1()));
88 ErrorMsg() << "Error while retrieving value" << error.message; 111 }).exec();
89 errorHandler(Error(error.store, error.code, error.message));
90 }
91 );
92 if (!readValue) {
93 errorHandler(Error("messagequeue", -1, "No message found"));
94 }
95} 112}
96 113
97KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) 114KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
98{ 115{
116 Trace() << "Dequeue batch";
99 auto resultCount = QSharedPointer<int>::create(0); 117 auto resultCount = QSharedPointer<int>::create(0);
100 auto keyList = QSharedPointer<QByteArrayList>::create(); 118 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) {
101 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
102 int count = 0; 119 int count = 0;
103 QList<KAsync::Future<void> > waitCondition; 120 QList<KAsync::Future<void> > waitCondition;
104 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { 121 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
105 if (Akonadi2::Storage::isInternalKey(key)) { 122 if (Akonadi2::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) {
106 return true; 123 return true;
107 } 124 }
125 *resultCount += 1;
126 Trace() << "Dequeue value";
108 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) 127 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
109 keyList->append(QByteArray(key.constData(), key.size())); 128 mPendingRemoval << QByteArray(key.constData(), key.size());
110 129
111 waitCondition << resultHandler(value).exec(); 130 waitCondition << resultHandler(value).exec();
112 131
113 count++; 132 count++;
114 if (count <= maxBatchSize) { 133 Trace() << count << maxBatchSize;
134 if (count < maxBatchSize) {
115 return true; 135 return true;
116 } 136 }
117 return false; 137 return false;
@@ -121,17 +141,10 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
121 // errorHandler(Error(error.store, error.code, error.message)); 141 // errorHandler(Error(error.store, error.code, error.message));
122 }); 142 });
123 143
124 ::waitForCompletion(waitCondition).then<void>([this, keyList, &future]() { 144 // Trace() << "Waiting on " << waitCondition.size() << " results";
125 Trace() << "Dequeue complete, removing values " << *keyList; 145 ::waitForCompletion(waitCondition).then<void>([this, resultCount, &future]() {
126 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); 146 processRemovals();
127 for (const auto &key : *keyList) { 147 if (*resultCount == 0) {
128 transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
129 ErrorMsg() << "Error while removing value" << error.message << key;
130 //Don't call the errorhandler in here, we already called the result handler
131 });
132 }
133 transaction.commit();
134 if (keyList->isEmpty()) {
135 future.setError(-1, "No message found"); 148 future.setError(-1, "No message found");
136 future.setFinished(); 149 future.setFinished();
137 } else { 150 } else {
@@ -147,15 +160,15 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
147bool MessageQueue::isEmpty() 160bool MessageQueue::isEmpty()
148{ 161{
149 int count = 0; 162 int count = 0;
150 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool { 163 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool {
151 if (!Akonadi2::Storage::isInternalKey(key)) { 164 if (!Akonadi2::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) {
152 count++; 165 count++;
153 return false; 166 return false;
154 } 167 }
155 return true; 168 return true;
156 }, 169 },
157 [](const Akonadi2::Storage::Error &error) { 170 [](const Akonadi2::Storage::Error &error) {
158 qDebug() << "Error while checking if empty" << error.message; 171 ErrorMsg() << "Error while checking if empty" << error.message;
159 }); 172 });
160 return count == 0; 173 return count == 0;
161} 174}
diff --git a/common/messagequeue.h b/common/messagequeue.h
index 8ea8d8b..b6c2614 100644
--- a/common/messagequeue.h
+++ b/common/messagequeue.h
@@ -1,6 +1,7 @@
1#pragma once 1#pragma once
2 2
3#include <QObject> 3#include <QObject>
4#include <QByteArrayList>
4#include <string> 5#include <string>
5#include <functional> 6#include <functional>
6#include <QString> 7#include <QString>
@@ -25,7 +26,9 @@ public:
25 }; 26 };
26 27
27 MessageQueue(const QString &storageRoot, const QString &name); 28 MessageQueue(const QString &storageRoot, const QString &name);
29 ~MessageQueue();
28 30
31 void startTransaction();
29 void enqueue(void const *msg, size_t size); 32 void enqueue(void const *msg, size_t size);
30 void enqueue(const QByteArray &value); 33 void enqueue(const QByteArray &value);
31 //Dequeue a message. This will return a new message everytime called. 34 //Dequeue a message. This will return a new message everytime called.
@@ -35,11 +38,20 @@ public:
35 const std::function<void(const Error &error)> &errorHandler); 38 const std::function<void(const Error &error)> &errorHandler);
36 KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler); 39 KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler);
37 bool isEmpty(); 40 bool isEmpty();
41
42public slots:
43 void commit();
44
38signals: 45signals:
39 void messageReady(); 46 void messageReady();
40 void drained(); 47 void drained();
41 48
49private slots:
50 void processRemovals();
51
42private: 52private:
43 Q_DISABLE_COPY(MessageQueue); 53 Q_DISABLE_COPY(MessageQueue);
44 Akonadi2::Storage mStorage; 54 Akonadi2::Storage mStorage;
55 Akonadi2::Storage::Transaction mWriteTransaction;
56 QByteArrayList mPendingRemoval;
45}; 57};
diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp
index d5c47f5..9c2aa16 100644
--- a/tests/messagequeuetest.cpp
+++ b/tests/messagequeuetest.cpp
@@ -6,6 +6,7 @@
6#include "clientapi.h" 6#include "clientapi.h"
7#include "storage.h" 7#include "storage.h"
8#include "messagequeue.h" 8#include "messagequeue.h"
9#include "log.h"
9 10
10class MessageQueueTest : public QObject 11class MessageQueueTest : public QObject
11{ 12{
@@ -13,6 +14,7 @@ class MessageQueueTest : public QObject
13private Q_SLOTS: 14private Q_SLOTS:
14 void initTestCase() 15 void initTestCase()
15 { 16 {
17 Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace);
16 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue", Akonadi2::Storage::ReadWrite); 18 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue", Akonadi2::Storage::ReadWrite);
17 store.removeFromDisk(); 19 store.removeFromDisk();
18 } 20 }
@@ -50,6 +52,14 @@ private Q_SLOTS:
50 QVERIFY(gotError); 52 QVERIFY(gotError);
51 } 53 }
52 54
55 void testEnqueue()
56 {
57 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
58 QSignalSpy spy(&queue, SIGNAL(messageReady()));
59 queue.enqueue("value1");
60 QCOMPARE(spy.size(), 1);
61 }
62
53 void testDrained() 63 void testDrained()
54 { 64 {
55 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); 65 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
@@ -152,6 +162,44 @@ private Q_SLOTS:
152 QVERIFY(!gotError); 162 QVERIFY(!gotError);
153 } 163 }
154 164
165 void testBatchDequeue()
166 {
167 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
168 queue.enqueue("value1");
169 queue.enqueue("value2");
170 queue.enqueue("value3");
171
172 int count = 0;
173 queue.dequeueBatch(2, [&count](const QByteArray &data) {
174 count++;
175 return KAsync::null<void>();
176 }).exec().waitForFinished();
177 QCOMPARE(count, 2);
178
179 queue.dequeueBatch(1, [&count](const QByteArray &data) {
180 count++;
181 return KAsync::null<void>();
182 }).exec().waitForFinished();
183 QCOMPARE(count, 3);
184 }
185
186 void testBatchEnqueue()
187 {
188 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
189 QSignalSpy spy(&queue, SIGNAL(messageReady()));
190 queue.startTransaction();
191 queue.enqueue("value1");
192 queue.enqueue("value2");
193 queue.enqueue("value3");
194
195 QVERIFY(queue.isEmpty());
196 QCOMPARE(spy.count(), 0);
197
198 queue.commit();
199
200 QVERIFY(!queue.isEmpty());
201 QCOMPARE(spy.count(), 1);
202 }
155 203
156}; 204};
157 205