summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp47
1 files changed, 47 insertions, 0 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
new file mode 100644
index 0000000..cf63e57
--- /dev/null
+++ b/common/messagequeue.cpp
@@ -0,0 +1,47 @@
1#include "messagequeue.h"
2#include <QDebug>
3
4MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
5 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
6{
7
8}
9
10void MessageQueue::enqueue(void const *msg, size_t size)
11{
12 mStorage.startTransaction(Akonadi2::Storage::ReadWrite);
13 const qint64 revision = mStorage.maxRevision() + 1;
14 const QByteArray key = QString("%1").arg(revision).toUtf8();
15 mStorage.write(key.data(), key.size(), msg, size);
16 mStorage.setMaxRevision(revision);
17 mStorage.commitTransaction();
18}
19
20void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
21 const std::function<void(const Error &error)> &errorHandler)
22{
23 mStorage.scan("", [this, resultHandler](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool {
24 const std::string key(static_cast<char*>(keyPtr), keySize);
25 resultHandler(valuePtr, valueSize, [this, key](bool success) {
26 if (success) {
27 mStorage.remove(key.data(), key.size());
28 } else {
29 //TODO re-enqueue?
30 }
31 });
32 return false;
33 });
34}
35
36bool MessageQueue::isEmpty()
37{
38 int count = 0;
39 mStorage.scan("", [&count](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool {
40 const QByteArray key(static_cast<char*>(keyPtr), keySize);
41 if (!key.startsWith("__internal")) {
42 count++;
43 }
44 });
45 return count == 0;
46}
47