summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-11 10:30:10 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-11 10:30:10 +0200
commita9dc9ed667f06fa1828773d1bb8671ec2731dce5 (patch)
treef8a195992d79d1ff8122be44613e70e04cde0d95 /common/messagequeue.cpp
parent3144d1b4bbf523b80fa04ba61787d9366ccc0443 (diff)
downloadsink-a9dc9ed667f06fa1828773d1bb8671ec2731dce5.tar.gz
sink-a9dc9ed667f06fa1828773d1bb8671ec2731dce5.zip
Fixed messagequeue
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp21
1 files changed, 15 insertions, 6 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index a92d6be..ecc4d1a 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -11,29 +11,38 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
11 11
12void MessageQueue::enqueue(void const *msg, size_t size) 12void MessageQueue::enqueue(void const *msg, size_t size)
13{ 13{
14 auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite); 14 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
15 const qint64 revision = mStorage.maxRevision() + 1; 15 const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1;
16 const QByteArray key = QString("%1").arg(revision).toUtf8(); 16 const QByteArray key = QString("%1").arg(revision).toUtf8();
17 transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size)); 17 transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size));
18 Akonadi2::Storage::setMaxRevision(transaction, revision); 18 Akonadi2::Storage::setMaxRevision(transaction, revision);
19 transaction.commit();
19 emit messageReady(); 20 emit messageReady();
20} 21}
21 22
23void MessageQueue::enqueue(const QByteArray &value)
24{
25 enqueue(value.data(), value.size());
26}
27
22void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, 28void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
23 const std::function<void(const Error &error)> &errorHandler) 29 const std::function<void(const Error &error)> &errorHandler)
24{ 30{
25 bool readValue = false; 31 bool readValue = false;
26 mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); 32 auto readTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadOnly));
27 mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { 33 readTransaction.scan("", [this, resultHandler, errorHandler, &readValue, &readTransaction](const QByteArray &key, const QByteArray &value) -> bool {
28 if (Akonadi2::Storage::isInternalKey(key)) { 34 if (Akonadi2::Storage::isInternalKey(key)) {
29 return true; 35 return true;
30 } 36 }
31 readValue = true; 37 readValue = true;
32 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) 38 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
33 const auto keyCopy = QByteArray(key.constData(), key.size()); 39 const auto keyCopy = QByteArray(key.constData(), key.size());
34 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { 40 //TODO The value copy and the early transaction abort is necessary because we don't support parallel read-transactions yet (in case of a synchronous callback)
41 const auto valueCopy = QByteArray(value.constData(), value.size());
42 readTransaction.abort();
43 resultHandler(const_cast<void*>(static_cast<const void*>(valueCopy.data())), valueCopy.size(), [this, keyCopy, errorHandler](bool success) {
35 if (success) { 44 if (success) {
36 mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { 45 mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) {
37 ErrorMsg() << "Error while removing value" << error.message << keyCopy; 46 ErrorMsg() << "Error while removing value" << error.message << keyCopy;
38 //Don't call the errorhandler in here, we already called the result handler 47 //Don't call the errorhandler in here, we already called the result handler
39 }); 48 });