summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/messagequeue.cpp24
-rw-r--r--common/messagequeue.h1
-rw-r--r--common/storage.h2
-rw-r--r--common/storage_common.cpp21
4 files changed, 32 insertions, 16 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 0704dd3..a92d6be 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -11,12 +11,11 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
11 11
12void MessageQueue::enqueue(void const *msg, size_t size) 12void MessageQueue::enqueue(void const *msg, size_t size)
13{ 13{
14 mStorage.startTransaction(Akonadi2::Storage::ReadWrite); 14 auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite);
15 const qint64 revision = mStorage.maxRevision() + 1; 15 const qint64 revision = mStorage.maxRevision() + 1;
16 const QByteArray key = QString("%1").arg(revision).toUtf8(); 16 const QByteArray key = QString("%1").arg(revision).toUtf8();
17 mStorage.write(key.data(), key.size(), msg, size); 17 transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size));
18 mStorage.setMaxRevision(revision); 18 Akonadi2::Storage::setMaxRevision(transaction, revision);
19 mStorage.commitTransaction();
20 emit messageReady(); 19 emit messageReady();
21} 20}
22 21
@@ -24,19 +23,21 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
24 const std::function<void(const Error &error)> &errorHandler) 23 const std::function<void(const Error &error)> &errorHandler)
25{ 24{
26 bool readValue = false; 25 bool readValue = false;
27 mStorage.scan("", [this, resultHandler, errorHandler, &readValue](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { 26 mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
28 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) 27 mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool {
29 const auto key = QByteArray(static_cast<char*>(keyPtr), keySize);
30 if (Akonadi2::Storage::isInternalKey(key)) { 28 if (Akonadi2::Storage::isInternalKey(key)) {
31 return true; 29 return true;
32 } 30 }
33 readValue = true; 31 readValue = true;
34 resultHandler(valuePtr, valueSize, [this, key, errorHandler](bool success) { 32 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
33 const auto keyCopy = QByteArray(key.constData(), key.size());
34 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) {
35 if (success) { 35 if (success) {
36 mStorage.remove(key.data(), key.size(), [errorHandler, key](const Akonadi2::Storage::Error &error) { 36 mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) {
37 ErrorMsg() << "Error while removing value" << error.message << key; 37 ErrorMsg() << "Error while removing value" << error.message << keyCopy;
38 //Don't call the errorhandler in here, we already called the result handler 38 //Don't call the errorhandler in here, we already called the result handler
39 }); 39 });
40 mTransaction.commit();
40 if (isEmpty()) { 41 if (isEmpty()) {
41 emit this->drained(); 42 emit this->drained();
42 } 43 }
@@ -59,8 +60,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
59bool MessageQueue::isEmpty() 60bool MessageQueue::isEmpty()
60{ 61{
61 int count = 0; 62 int count = 0;
62 mStorage.scan("", [&count](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { 63 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool {
63 const auto key = QByteArray::fromRawData(static_cast<char*>(keyPtr), keySize);
64 if (!Akonadi2::Storage::isInternalKey(key)) { 64 if (!Akonadi2::Storage::isInternalKey(key)) {
65 count++; 65 count++;
66 return false; 66 return false;
diff --git a/common/messagequeue.h b/common/messagequeue.h
index b7e3daa..ffc1ff2 100644
--- a/common/messagequeue.h
+++ b/common/messagequeue.h
@@ -39,4 +39,5 @@ signals:
39private: 39private:
40 Q_DISABLE_COPY(MessageQueue); 40 Q_DISABLE_COPY(MessageQueue);
41 Akonadi2::Storage mStorage; 41 Akonadi2::Storage mStorage;
42 Akonadi2::Storage::Transaction mTransaction;
42}; 43};
diff --git a/common/storage.h b/common/storage.h
index fd35274..f9a207f 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -168,6 +168,8 @@ public:
168 168
169 qint64 maxRevision(); 169 qint64 maxRevision();
170 void setMaxRevision(qint64 revision); 170 void setMaxRevision(qint64 revision);
171 static qint64 maxRevision(const Akonadi2::Storage::Transaction &);
172 static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision);
171 173
172 bool exists() const; 174 bool exists() const;
173 175
diff --git a/common/storage_common.cpp b/common/storage_common.cpp
index 65f6e57..8006a8e 100644
--- a/common/storage_common.cpp
+++ b/common/storage_common.cpp
@@ -69,18 +69,31 @@ bool Storage::write(const QByteArray &sKey, const QByteArray &sValue, const std:
69 69
70void Storage::setMaxRevision(qint64 revision) 70void Storage::setMaxRevision(qint64 revision)
71{ 71{
72 write("__internal_maxRevision", QByteArray::number(revision)); 72 auto transaction = createTransaction(Akonadi2::Storage::ReadWrite);
73 setMaxRevision(transaction, revision);
74}
75
76void Storage::setMaxRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision)
77{
78 transaction.write("__internal_maxRevision", QByteArray::number(revision));
73} 79}
74 80
75qint64 Storage::maxRevision() 81qint64 Storage::maxRevision()
76{ 82{
83 auto transaction = createTransaction(Akonadi2::Storage::ReadOnly);
84 return maxRevision(transaction);
85}
86
87qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction)
88{
77 qint64 r = 0; 89 qint64 r = 0;
78 scan("__internal_maxRevision", [&](const QByteArray &revision) -> bool { 90 transaction.scan("__internal_maxRevision", [&](const QByteArray &, const QByteArray &revision) -> bool {
79 r = revision.toLongLong(); 91 r = revision.toLongLong();
80 return false; 92 return false;
81 }, [this](const Error &error){ 93 }, [](const Error &error){
82 if (error.code != ErrorCodes::NotFound) { 94 if (error.code != ErrorCodes::NotFound) {
83 defaultErrorHandler()(error); 95 //FIXME
96 // defaultErrorHandler()(error);
84 } 97 }
85 }); 98 });
86 return r; 99 return r;