summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/messagequeue.cpp47
-rw-r--r--common/messagequeue.h34
3 files changed, 82 insertions, 0 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 1a9a812..671d1cd 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -30,6 +30,7 @@ set(command_SRCS
30 resourceaccess.cpp 30 resourceaccess.cpp
31 storage_common.cpp 31 storage_common.cpp
32 threadboundary.cpp 32 threadboundary.cpp
33 messagequeue.cpp
33 ${storage_SRCS}) 34 ${storage_SRCS})
34 35
35add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 36add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
new file mode 100644
index 0000000..cf63e57
--- /dev/null
+++ b/common/messagequeue.cpp
@@ -0,0 +1,47 @@
1#include "messagequeue.h"
2#include <QDebug>
3
4MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
5 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
6{
7
8}
9
10void MessageQueue::enqueue(void const *msg, size_t size)
11{
12 mStorage.startTransaction(Akonadi2::Storage::ReadWrite);
13 const qint64 revision = mStorage.maxRevision() + 1;
14 const QByteArray key = QString("%1").arg(revision).toUtf8();
15 mStorage.write(key.data(), key.size(), msg, size);
16 mStorage.setMaxRevision(revision);
17 mStorage.commitTransaction();
18}
19
20void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
21 const std::function<void(const Error &error)> &errorHandler)
22{
23 mStorage.scan("", [this, resultHandler](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool {
24 const std::string key(static_cast<char*>(keyPtr), keySize);
25 resultHandler(valuePtr, valueSize, [this, key](bool success) {
26 if (success) {
27 mStorage.remove(key.data(), key.size());
28 } else {
29 //TODO re-enqueue?
30 }
31 });
32 return false;
33 });
34}
35
36bool MessageQueue::isEmpty()
37{
38 int count = 0;
39 mStorage.scan("", [&count](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool {
40 const QByteArray key(static_cast<char*>(keyPtr), keySize);
41 if (!key.startsWith("__internal")) {
42 count++;
43 }
44 });
45 return count == 0;
46}
47
diff --git a/common/messagequeue.h b/common/messagequeue.h
new file mode 100644
index 0000000..b56b3cd
--- /dev/null
+++ b/common/messagequeue.h
@@ -0,0 +1,34 @@
1#pragma once
2
3#include <string>
4#include <functional>
5#include <QString>
6#include "storage.h"
7
8/**
9 * A persistent FIFO message queue.
10 */
11class MessageQueue {
12public:
13 class Error
14 {
15 public:
16 Error(const std::string &s, int c, const std::string &m)
17 : store(s), message(m), code(c) {}
18 std::string store;
19 std::string message;
20 int code;
21 };
22
23 MessageQueue(const QString &storageRoot, const QString &name);
24
25 void enqueue(void const *msg, size_t size);
26 //Dequeue a message. This will return a new message everytime called.
27 //Call the result handler with a success response to remove the message from the store.
28 //TODO track processing progress to avoid processing the same message with the same preprocessor twice?
29 void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler,
30 const std::function<void(const Error &error)> &errorHandler);
31 bool isEmpty();
32private:
33 Akonadi2::Storage mStorage;
34};