summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/messagequeue.cpp47
-rw-r--r--common/messagequeue.h34
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/messagequeuetest.cpp50
5 files changed, 133 insertions, 0 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 1a9a812..671d1cd 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -30,6 +30,7 @@ set(command_SRCS
30 resourceaccess.cpp 30 resourceaccess.cpp
31 storage_common.cpp 31 storage_common.cpp
32 threadboundary.cpp 32 threadboundary.cpp
33 messagequeue.cpp
33 ${storage_SRCS}) 34 ${storage_SRCS})
34 35
35add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 36add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
new file mode 100644
index 0000000..cf63e57
--- /dev/null
+++ b/common/messagequeue.cpp
@@ -0,0 +1,47 @@
1#include "messagequeue.h"
2#include <QDebug>
3
4MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
5 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
6{
7
8}
9
10void MessageQueue::enqueue(void const *msg, size_t size)
11{
12 mStorage.startTransaction(Akonadi2::Storage::ReadWrite);
13 const qint64 revision = mStorage.maxRevision() + 1;
14 const QByteArray key = QString("%1").arg(revision).toUtf8();
15 mStorage.write(key.data(), key.size(), msg, size);
16 mStorage.setMaxRevision(revision);
17 mStorage.commitTransaction();
18}
19
20void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
21 const std::function<void(const Error &error)> &errorHandler)
22{
23 mStorage.scan("", [this, resultHandler](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool {
24 const std::string key(static_cast<char*>(keyPtr), keySize);
25 resultHandler(valuePtr, valueSize, [this, key](bool success) {
26 if (success) {
27 mStorage.remove(key.data(), key.size());
28 } else {
29 //TODO re-enqueue?
30 }
31 });
32 return false;
33 });
34}
35
36bool MessageQueue::isEmpty()
37{
38 int count = 0;
39 mStorage.scan("", [&count](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool {
40 const QByteArray key(static_cast<char*>(keyPtr), keySize);
41 if (!key.startsWith("__internal")) {
42 count++;
43 }
44 });
45 return count == 0;
46}
47
diff --git a/common/messagequeue.h b/common/messagequeue.h
new file mode 100644
index 0000000..b56b3cd
--- /dev/null
+++ b/common/messagequeue.h
@@ -0,0 +1,34 @@
1#pragma once
2
3#include <string>
4#include <functional>
5#include <QString>
6#include "storage.h"
7
8/**
9 * A persistent FIFO message queue.
10 */
11class MessageQueue {
12public:
13 class Error
14 {
15 public:
16 Error(const std::string &s, int c, const std::string &m)
17 : store(s), message(m), code(c) {}
18 std::string store;
19 std::string message;
20 int code;
21 };
22
23 MessageQueue(const QString &storageRoot, const QString &name);
24
25 void enqueue(void const *msg, size_t size);
26 //Dequeue a message. This will return a new message everytime called.
27 //Call the result handler with a success response to remove the message from the store.
28 //TODO track processing progress to avoid processing the same message with the same preprocessor twice?
29 void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler,
30 const std::function<void(const Error &error)> &errorHandler);
31 bool isEmpty();
32private:
33 Akonadi2::Storage mStorage;
34};
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 12695ff..183b1bf 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -18,6 +18,7 @@ manual_tests (
18 storagetest 18 storagetest
19 dummyresourcetest 19 dummyresourcetest
20 domainadaptortest 20 domainadaptortest
21 messagequeuetest
21) 22)
22 23
23target_link_libraries(dummyresourcetest akonadi2_resource_dummy) 24target_link_libraries(dummyresourcetest akonadi2_resource_dummy)
diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp
new file mode 100644
index 0000000..2ea9f0c
--- /dev/null
+++ b/tests/messagequeuetest.cpp
@@ -0,0 +1,50 @@
1#include <QtTest>
2
3#include <QString>
4#include <QQueue>
5
6#include "clientapi.h"
7#include "storage.h"
8#include "messagequeue.h"
9
10class MessageQueueTest : public QObject
11{
12 Q_OBJECT
13private Q_SLOTS:
14 void initTestCase()
15 {
16 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue", Akonadi2::Storage::ReadWrite);
17 store.removeFromDisk();
18 }
19
20 void cleanupTestCase()
21 {
22 }
23
24 void testQueue()
25 {
26 QQueue<QByteArray> values;
27 values << "value1";
28 values << "value2";
29
30 MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue");
31 for (const QByteArray &value : values) {
32 queue.enqueue(value.data(), value.size());
33 }
34
35 while (!queue.isEmpty()) {
36 const auto expected = values.dequeue();
37 queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) {
38 QCOMPARE(QByteArray(static_cast<char*>(ptr), size), expected);
39 callback(true);
40 }, [](const MessageQueue::Error &error) {
41
42 });
43 }
44 Q_ASSERT(values.isEmpty());
45 }
46
47};
48
49QTEST_MAIN(MessageQueueTest)
50#include "messagequeuetest.moc"