summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-11 10:30:10 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-11 10:30:10 +0200
commita9dc9ed667f06fa1828773d1bb8671ec2731dce5 (patch)
treef8a195992d79d1ff8122be44613e70e04cde0d95
parent3144d1b4bbf523b80fa04ba61787d9366ccc0443 (diff)
downloadsink-a9dc9ed667f06fa1828773d1bb8671ec2731dce5.tar.gz
sink-a9dc9ed667f06fa1828773d1bb8671ec2731dce5.zip
Fixed messagequeue
-rw-r--r--common/messagequeue.cpp21
-rw-r--r--common/messagequeue.h2
-rw-r--r--tests/messagequeuetest.cpp90
3 files changed, 97 insertions, 16 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index a92d6be..ecc4d1a 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -11,29 +11,38 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
11 11
12void MessageQueue::enqueue(void const *msg, size_t size) 12void MessageQueue::enqueue(void const *msg, size_t size)
13{ 13{
14 auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite); 14 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
15 const qint64 revision = mStorage.maxRevision() + 1; 15 const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1;
16 const QByteArray key = QString("%1").arg(revision).toUtf8(); 16 const QByteArray key = QString("%1").arg(revision).toUtf8();
17 transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size)); 17 transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size));
18 Akonadi2::Storage::setMaxRevision(transaction, revision); 18 Akonadi2::Storage::setMaxRevision(transaction, revision);
19 transaction.commit();
19 emit messageReady(); 20 emit messageReady();
20} 21}
21 22
23void MessageQueue::enqueue(const QByteArray &value)
24{
25 enqueue(value.data(), value.size());
26}
27
22void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, 28void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
23 const std::function<void(const Error &error)> &errorHandler) 29 const std::function<void(const Error &error)> &errorHandler)
24{ 30{
25 bool readValue = false; 31 bool readValue = false;
26 mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); 32 auto readTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadOnly));
27 mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { 33 readTransaction.scan("", [this, resultHandler, errorHandler, &readValue, &readTransaction](const QByteArray &key, const QByteArray &value) -> bool {
28 if (Akonadi2::Storage::isInternalKey(key)) { 34 if (Akonadi2::Storage::isInternalKey(key)) {
29 return true; 35 return true;
30 } 36 }
31 readValue = true; 37 readValue = true;
32 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) 38 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
33 const auto keyCopy = QByteArray(key.constData(), key.size()); 39 const auto keyCopy = QByteArray(key.constData(), key.size());
34 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { 40 //TODO The value copy and the early transaction abort is necessary because we don't support parallel read-transactions yet (in case of a synchronous callback)
41 const auto valueCopy = QByteArray(value.constData(), value.size());
42 readTransaction.abort();
43 resultHandler(const_cast<void*>(static_cast<const void*>(valueCopy.data())), valueCopy.size(), [this, keyCopy, errorHandler](bool success) {
35 if (success) { 44 if (success) {
36 mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { 45 mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) {
37 ErrorMsg() << "Error while removing value" << error.message << keyCopy; 46 ErrorMsg() << "Error while removing value" << error.message << keyCopy;
38 //Don't call the errorhandler in here, we already called the result handler 47 //Don't call the errorhandler in here, we already called the result handler
39 }); 48 });
diff --git a/common/messagequeue.h b/common/messagequeue.h
index ffc1ff2..3393394 100644
--- a/common/messagequeue.h
+++ b/common/messagequeue.h
@@ -26,6 +26,7 @@ public:
26 MessageQueue(const QString &storageRoot, const QString &name); 26 MessageQueue(const QString &storageRoot, const QString &name);
27 27
28 void enqueue(void const *msg, size_t size); 28 void enqueue(void const *msg, size_t size);
29 void enqueue(const QByteArray &value);
29 //Dequeue a message. This will return a new message everytime called. 30 //Dequeue a message. This will return a new message everytime called.
30 //Call the result handler with a success response to remove the message from the store. 31 //Call the result handler with a success response to remove the message from the store.
31 //TODO track processing progress to avoid processing the same message with the same preprocessor twice? 32 //TODO track processing progress to avoid processing the same message with the same preprocessor twice?
@@ -39,5 +40,4 @@ signals:
39private: 40private:
40 Q_DISABLE_COPY(MessageQueue); 41 Q_DISABLE_COPY(MessageQueue);
41 Akonadi2::Storage mStorage; 42 Akonadi2::Storage mStorage;
42 Akonadi2::Storage::Transaction mTransaction;
43}; 43};
diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp
index c43b192..d5c47f5 100644
--- a/tests/messagequeuetest.cpp
+++ b/tests/messagequeuetest.cpp
@@ -31,12 +31,38 @@ private Q_SLOTS:
31 { 31 {
32 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); 32 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
33 QVERIFY(queue.isEmpty()); 33 QVERIFY(queue.isEmpty());
34 QByteArray value("value"); 34 queue.enqueue("value");
35 queue.enqueue(value.data(), value.size());
36 QVERIFY(!queue.isEmpty()); 35 QVERIFY(!queue.isEmpty());
37 } 36 }
38 37
39 void testQueue() 38 void testDequeueEmpty()
39 {
40 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
41 bool gotValue = false;
42 bool gotError = false;
43 queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) {
44 gotValue = true;
45 },
46 [&](const MessageQueue::Error &error) {
47 gotError = true;
48 });
49 QVERIFY(!gotValue);
50 QVERIFY(gotError);
51 }
52
53 void testDrained()
54 {
55 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
56 QSignalSpy spy(&queue, SIGNAL(drained()));
57 queue.enqueue("value1");
58
59 queue.dequeue([](void *ptr, int size, std::function<void(bool success)> callback) {
60 callback(true);
61 }, [](const MessageQueue::Error &error) {});
62 QCOMPARE(spy.size(), 1);
63 }
64
65 void testSyncDequeue()
40 { 66 {
41 QQueue<QByteArray> values; 67 QQueue<QByteArray> values;
42 values << "value1"; 68 values << "value1";
@@ -44,10 +70,11 @@ private Q_SLOTS:
44 70
45 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); 71 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
46 for (const QByteArray &value : values) { 72 for (const QByteArray &value : values) {
47 queue.enqueue(value.data(), value.size()); 73 queue.enqueue(value);
48 } 74 }
49 75
50 while (!queue.isEmpty()) { 76 while (!queue.isEmpty()) {
77 Log() << "start";
51 const auto expected = values.dequeue(); 78 const auto expected = values.dequeue();
52 bool gotValue = false; 79 bool gotValue = false;
53 bool gotError = false; 80 bool gotError = false;
@@ -66,21 +93,66 @@ private Q_SLOTS:
66 QVERIFY(values.isEmpty()); 93 QVERIFY(values.isEmpty());
67 } 94 }
68 95
69 void testDequeueEmpty() 96 void testAsyncDequeue()
70 { 97 {
98 QQueue<QByteArray> values;
99 values << "value1";
100 values << "value2";
101
71 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); 102 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
72 bool gotValue = false; 103 for (const QByteArray &value : values) {
104 queue.enqueue(value);
105 }
106
107 while (!queue.isEmpty()) {
108 QEventLoop eventLoop;
109 const auto expected = values.dequeue();
110 bool gotValue = false;
111 bool gotError = false;
112
113 queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) {
114 if (QByteArray(static_cast<char*>(ptr), size) == expected) {
115 gotValue = true;
116 }
117 auto timer = new QTimer();
118 timer->setSingleShot(true);
119 QObject::connect(timer, &QTimer::timeout, [timer, callback, &eventLoop]() {
120 delete timer;
121 callback(true);
122 eventLoop.exit();
123 });
124 timer->start(0);
125 },
126 [&](const MessageQueue::Error &error) {
127 gotError = true;
128 });
129 eventLoop.exec();
130 QVERIFY(gotValue);
131 QVERIFY(!gotError);
132 }
133 QVERIFY(values.isEmpty());
134 }
135
136 /*
137 * Dequeue's are async and we want to be able to enqueue new items in between.
138 */
139 void testNestedEnqueue()
140 {
141 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
142 queue.enqueue("value1");
143
73 bool gotError = false; 144 bool gotError = false;
74 queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) { 145 queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) {
75 gotValue = true; 146 queue.enqueue("value3");
147 callback(true);
76 }, 148 },
77 [&](const MessageQueue::Error &error) { 149 [&](const MessageQueue::Error &error) {
78 gotError = true; 150 gotError = true;
79 }); 151 });
80 QVERIFY(!gotValue); 152 QVERIFY(!gotError);
81 QVERIFY(gotError);
82 } 153 }
83 154
155
84}; 156};
85 157
86QTEST_MAIN(MessageQueueTest) 158QTEST_MAIN(MessageQueueTest)