1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
#include "messagequeue.h"
#include "storage.h"
#include <QDebug>
#include <log.h>
MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
: mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
{
}
void MessageQueue::enqueue(void const *msg, size_t size)
{
auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1;
const QByteArray key = QString("%1").arg(revision).toUtf8();
transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size));
Akonadi2::Storage::setMaxRevision(transaction, revision);
transaction.commit();
emit messageReady();
}
void MessageQueue::enqueue(const QByteArray &value)
{
enqueue(value.data(), value.size());
}
void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
const std::function<void(const Error &error)> &errorHandler)
{
bool readValue = false;
mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool {
if (Akonadi2::Storage::isInternalKey(key)) {
return true;
}
readValue = true;
//We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
const auto keyCopy = QByteArray(key.constData(), key.size());
resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) {
if (success) {
mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) {
ErrorMsg() << "Error while removing value" << error.message << keyCopy;
//Don't call the errorhandler in here, we already called the result handler
});
if (isEmpty()) {
emit this->drained();
}
} else {
//TODO re-enqueue?
}
});
return false;
},
[errorHandler](const Akonadi2::Storage::Error &error) {
ErrorMsg() << "Error while retrieving value" << error.message;
errorHandler(Error(error.store, error.code, error.message));
}
);
if (!readValue) {
errorHandler(Error("messagequeue", -1, "No message found"));
}
}
KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler)
{
auto resultCount = QSharedPointer<int>::create(0);
auto keyList = QSharedPointer<QByteArrayList>::create();
return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
bool readValue = false;
int count = 0;
mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool {
if (Akonadi2::Storage::isInternalKey(key)) {
return true;
}
readValue = true;
//We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
keyList->append(QByteArray(key.constData(), key.size()));
resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) {
*resultCount += 1;
//We're done
//FIXME the check below should only be done once we finished reading
if (*resultCount >= count) {
//FIXME do this from the caller thread
auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
for (const auto &key : *keyList) {
transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
ErrorMsg() << "Error while removing value" << error.message << key;
//Don't call the errorhandler in here, we already called the result handler
});
}
if (isEmpty()) {
emit this->drained();
}
}
});
count++;
if (count <= maxBatchSize) {
return true;
}
return false;
},
[](const Akonadi2::Storage::Error &error) {
ErrorMsg() << "Error while retrieving value" << error.message;
// errorHandler(Error(error.store, error.code, error.message));
}
);
if (!readValue) {
future.setError(-1, "No message found");
future.setFinished();
}
});
}
bool MessageQueue::isEmpty()
{
int count = 0;
mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool {
if (!Akonadi2::Storage::isInternalKey(key)) {
count++;
return false;
}
return true;
},
[](const Akonadi2::Storage::Error &error) {
qDebug() << "Error while checking if empty" << error.message;
});
return count == 0;
}
|