diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-10 23:32:31 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-10 23:32:31 +0200 |
commit | 4f393198d022815c66b10871c7e5efe45a0bf462 (patch) | |
tree | fe5e87503d4298df78d9b1110388798de493f00b /common/messagequeue.cpp | |
parent | 13cd7cff06f1e4e51fb2a5d16b57662292c80b77 (diff) | |
download | sink-4f393198d022815c66b10871c7e5efe45a0bf462.tar.gz sink-4f393198d022815c66b10871c7e5efe45a0bf462.zip |
Ported messagequeue to new API
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r-- | common/messagequeue.cpp | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 0704dd3..a92d6be 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -11,12 +11,11 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) | |||
11 | 11 | ||
12 | void MessageQueue::enqueue(void const *msg, size_t size) | 12 | void MessageQueue::enqueue(void const *msg, size_t size) |
13 | { | 13 | { |
14 | mStorage.startTransaction(Akonadi2::Storage::ReadWrite); | 14 | auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite); |
15 | const qint64 revision = mStorage.maxRevision() + 1; | 15 | const qint64 revision = mStorage.maxRevision() + 1; |
16 | const QByteArray key = QString("%1").arg(revision).toUtf8(); | 16 | const QByteArray key = QString("%1").arg(revision).toUtf8(); |
17 | mStorage.write(key.data(), key.size(), msg, size); | 17 | transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size)); |
18 | mStorage.setMaxRevision(revision); | 18 | Akonadi2::Storage::setMaxRevision(transaction, revision); |
19 | mStorage.commitTransaction(); | ||
20 | emit messageReady(); | 19 | emit messageReady(); |
21 | } | 20 | } |
22 | 21 | ||
@@ -24,19 +23,21 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
24 | const std::function<void(const Error &error)> &errorHandler) | 23 | const std::function<void(const Error &error)> &errorHandler) |
25 | { | 24 | { |
26 | bool readValue = false; | 25 | bool readValue = false; |
27 | mStorage.scan("", [this, resultHandler, errorHandler, &readValue](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { | 26 | mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); |
28 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 27 | mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { |
29 | const auto key = QByteArray(static_cast<char*>(keyPtr), keySize); | ||
30 | if (Akonadi2::Storage::isInternalKey(key)) { | 28 | if (Akonadi2::Storage::isInternalKey(key)) { |
31 | return true; | 29 | return true; |
32 | } | 30 | } |
33 | readValue = true; | 31 | readValue = true; |
34 | resultHandler(valuePtr, valueSize, [this, key, errorHandler](bool success) { | 32 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) |
33 | const auto keyCopy = QByteArray(key.constData(), key.size()); | ||
34 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { | ||
35 | if (success) { | 35 | if (success) { |
36 | mStorage.remove(key.data(), key.size(), [errorHandler, key](const Akonadi2::Storage::Error &error) { | 36 | mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { |
37 | ErrorMsg() << "Error while removing value" << error.message << key; | 37 | ErrorMsg() << "Error while removing value" << error.message << keyCopy; |
38 | //Don't call the errorhandler in here, we already called the result handler | 38 | //Don't call the errorhandler in here, we already called the result handler |
39 | }); | 39 | }); |
40 | mTransaction.commit(); | ||
40 | if (isEmpty()) { | 41 | if (isEmpty()) { |
41 | emit this->drained(); | 42 | emit this->drained(); |
42 | } | 43 | } |
@@ -59,8 +60,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
59 | bool MessageQueue::isEmpty() | 60 | bool MessageQueue::isEmpty() |
60 | { | 61 | { |
61 | int count = 0; | 62 | int count = 0; |
62 | mStorage.scan("", [&count](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { | 63 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool { |
63 | const auto key = QByteArray::fromRawData(static_cast<char*>(keyPtr), keySize); | ||
64 | if (!Akonadi2::Storage::isInternalKey(key)) { | 64 | if (!Akonadi2::Storage::isInternalKey(key)) { |
65 | count++; | 65 | count++; |
66 | return false; | 66 | return false; |