diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-11 10:30:10 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-11 10:30:10 +0200 |
commit | a9dc9ed667f06fa1828773d1bb8671ec2731dce5 (patch) | |
tree | f8a195992d79d1ff8122be44613e70e04cde0d95 /common/messagequeue.cpp | |
parent | 3144d1b4bbf523b80fa04ba61787d9366ccc0443 (diff) | |
download | sink-a9dc9ed667f06fa1828773d1bb8671ec2731dce5.tar.gz sink-a9dc9ed667f06fa1828773d1bb8671ec2731dce5.zip |
Fixed messagequeue
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r-- | common/messagequeue.cpp | 21 |
1 files changed, 15 insertions, 6 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index a92d6be..ecc4d1a 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -11,29 +11,38 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) | |||
11 | 11 | ||
12 | void MessageQueue::enqueue(void const *msg, size_t size) | 12 | void MessageQueue::enqueue(void const *msg, size_t size) |
13 | { | 13 | { |
14 | auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite); | 14 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); |
15 | const qint64 revision = mStorage.maxRevision() + 1; | 15 | const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1; |
16 | const QByteArray key = QString("%1").arg(revision).toUtf8(); | 16 | const QByteArray key = QString("%1").arg(revision).toUtf8(); |
17 | transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size)); | 17 | transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size)); |
18 | Akonadi2::Storage::setMaxRevision(transaction, revision); | 18 | Akonadi2::Storage::setMaxRevision(transaction, revision); |
19 | transaction.commit(); | ||
19 | emit messageReady(); | 20 | emit messageReady(); |
20 | } | 21 | } |
21 | 22 | ||
23 | void MessageQueue::enqueue(const QByteArray &value) | ||
24 | { | ||
25 | enqueue(value.data(), value.size()); | ||
26 | } | ||
27 | |||
22 | void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, | 28 | void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, |
23 | const std::function<void(const Error &error)> &errorHandler) | 29 | const std::function<void(const Error &error)> &errorHandler) |
24 | { | 30 | { |
25 | bool readValue = false; | 31 | bool readValue = false; |
26 | mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | 32 | auto readTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadOnly)); |
27 | mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { | 33 | readTransaction.scan("", [this, resultHandler, errorHandler, &readValue, &readTransaction](const QByteArray &key, const QByteArray &value) -> bool { |
28 | if (Akonadi2::Storage::isInternalKey(key)) { | 34 | if (Akonadi2::Storage::isInternalKey(key)) { |
29 | return true; | 35 | return true; |
30 | } | 36 | } |
31 | readValue = true; | 37 | readValue = true; |
32 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 38 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) |
33 | const auto keyCopy = QByteArray(key.constData(), key.size()); | 39 | const auto keyCopy = QByteArray(key.constData(), key.size()); |
34 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { | 40 | //TODO The value copy and the early transaction abort is necessary because we don't support parallel read-transactions yet (in case of a synchronous callback) |
41 | const auto valueCopy = QByteArray(value.constData(), value.size()); | ||
42 | readTransaction.abort(); | ||
43 | resultHandler(const_cast<void*>(static_cast<const void*>(valueCopy.data())), valueCopy.size(), [this, keyCopy, errorHandler](bool success) { | ||
35 | if (success) { | 44 | if (success) { |
36 | mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { | 45 | mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { |
37 | ErrorMsg() << "Error while removing value" << error.message << keyCopy; | 46 | ErrorMsg() << "Error while removing value" << error.message << keyCopy; |
38 | //Don't call the errorhandler in here, we already called the result handler | 47 | //Don't call the errorhandler in here, we already called the result handler |
39 | }); | 48 | }); |