diff options
-rw-r--r-- | common/messagequeue.cpp | 24 | ||||
-rw-r--r-- | common/messagequeue.h | 1 | ||||
-rw-r--r-- | common/storage.h | 2 | ||||
-rw-r--r-- | common/storage_common.cpp | 21 |
4 files changed, 32 insertions, 16 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 0704dd3..a92d6be 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -11,12 +11,11 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) | |||
11 | 11 | ||
12 | void MessageQueue::enqueue(void const *msg, size_t size) | 12 | void MessageQueue::enqueue(void const *msg, size_t size) |
13 | { | 13 | { |
14 | mStorage.startTransaction(Akonadi2::Storage::ReadWrite); | 14 | auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite); |
15 | const qint64 revision = mStorage.maxRevision() + 1; | 15 | const qint64 revision = mStorage.maxRevision() + 1; |
16 | const QByteArray key = QString("%1").arg(revision).toUtf8(); | 16 | const QByteArray key = QString("%1").arg(revision).toUtf8(); |
17 | mStorage.write(key.data(), key.size(), msg, size); | 17 | transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size)); |
18 | mStorage.setMaxRevision(revision); | 18 | Akonadi2::Storage::setMaxRevision(transaction, revision); |
19 | mStorage.commitTransaction(); | ||
20 | emit messageReady(); | 19 | emit messageReady(); |
21 | } | 20 | } |
22 | 21 | ||
@@ -24,19 +23,21 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
24 | const std::function<void(const Error &error)> &errorHandler) | 23 | const std::function<void(const Error &error)> &errorHandler) |
25 | { | 24 | { |
26 | bool readValue = false; | 25 | bool readValue = false; |
27 | mStorage.scan("", [this, resultHandler, errorHandler, &readValue](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { | 26 | mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); |
28 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 27 | mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { |
29 | const auto key = QByteArray(static_cast<char*>(keyPtr), keySize); | ||
30 | if (Akonadi2::Storage::isInternalKey(key)) { | 28 | if (Akonadi2::Storage::isInternalKey(key)) { |
31 | return true; | 29 | return true; |
32 | } | 30 | } |
33 | readValue = true; | 31 | readValue = true; |
34 | resultHandler(valuePtr, valueSize, [this, key, errorHandler](bool success) { | 32 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) |
33 | const auto keyCopy = QByteArray(key.constData(), key.size()); | ||
34 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { | ||
35 | if (success) { | 35 | if (success) { |
36 | mStorage.remove(key.data(), key.size(), [errorHandler, key](const Akonadi2::Storage::Error &error) { | 36 | mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { |
37 | ErrorMsg() << "Error while removing value" << error.message << key; | 37 | ErrorMsg() << "Error while removing value" << error.message << keyCopy; |
38 | //Don't call the errorhandler in here, we already called the result handler | 38 | //Don't call the errorhandler in here, we already called the result handler |
39 | }); | 39 | }); |
40 | mTransaction.commit(); | ||
40 | if (isEmpty()) { | 41 | if (isEmpty()) { |
41 | emit this->drained(); | 42 | emit this->drained(); |
42 | } | 43 | } |
@@ -59,8 +60,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
59 | bool MessageQueue::isEmpty() | 60 | bool MessageQueue::isEmpty() |
60 | { | 61 | { |
61 | int count = 0; | 62 | int count = 0; |
62 | mStorage.scan("", [&count](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { | 63 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool { |
63 | const auto key = QByteArray::fromRawData(static_cast<char*>(keyPtr), keySize); | ||
64 | if (!Akonadi2::Storage::isInternalKey(key)) { | 64 | if (!Akonadi2::Storage::isInternalKey(key)) { |
65 | count++; | 65 | count++; |
66 | return false; | 66 | return false; |
diff --git a/common/messagequeue.h b/common/messagequeue.h index b7e3daa..ffc1ff2 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h | |||
@@ -39,4 +39,5 @@ signals: | |||
39 | private: | 39 | private: |
40 | Q_DISABLE_COPY(MessageQueue); | 40 | Q_DISABLE_COPY(MessageQueue); |
41 | Akonadi2::Storage mStorage; | 41 | Akonadi2::Storage mStorage; |
42 | Akonadi2::Storage::Transaction mTransaction; | ||
42 | }; | 43 | }; |
diff --git a/common/storage.h b/common/storage.h index fd35274..f9a207f 100644 --- a/common/storage.h +++ b/common/storage.h | |||
@@ -168,6 +168,8 @@ public: | |||
168 | 168 | ||
169 | qint64 maxRevision(); | 169 | qint64 maxRevision(); |
170 | void setMaxRevision(qint64 revision); | 170 | void setMaxRevision(qint64 revision); |
171 | static qint64 maxRevision(const Akonadi2::Storage::Transaction &); | ||
172 | static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); | ||
171 | 173 | ||
172 | bool exists() const; | 174 | bool exists() const; |
173 | 175 | ||
diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 65f6e57..8006a8e 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp | |||
@@ -69,18 +69,31 @@ bool Storage::write(const QByteArray &sKey, const QByteArray &sValue, const std: | |||
69 | 69 | ||
70 | void Storage::setMaxRevision(qint64 revision) | 70 | void Storage::setMaxRevision(qint64 revision) |
71 | { | 71 | { |
72 | write("__internal_maxRevision", QByteArray::number(revision)); | 72 | auto transaction = createTransaction(Akonadi2::Storage::ReadWrite); |
73 | setMaxRevision(transaction, revision); | ||
74 | } | ||
75 | |||
76 | void Storage::setMaxRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision) | ||
77 | { | ||
78 | transaction.write("__internal_maxRevision", QByteArray::number(revision)); | ||
73 | } | 79 | } |
74 | 80 | ||
75 | qint64 Storage::maxRevision() | 81 | qint64 Storage::maxRevision() |
76 | { | 82 | { |
83 | auto transaction = createTransaction(Akonadi2::Storage::ReadOnly); | ||
84 | return maxRevision(transaction); | ||
85 | } | ||
86 | |||
87 | qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) | ||
88 | { | ||
77 | qint64 r = 0; | 89 | qint64 r = 0; |
78 | scan("__internal_maxRevision", [&](const QByteArray &revision) -> bool { | 90 | transaction.scan("__internal_maxRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { |
79 | r = revision.toLongLong(); | 91 | r = revision.toLongLong(); |
80 | return false; | 92 | return false; |
81 | }, [this](const Error &error){ | 93 | }, [](const Error &error){ |
82 | if (error.code != ErrorCodes::NotFound) { | 94 | if (error.code != ErrorCodes::NotFound) { |
83 | defaultErrorHandler()(error); | 95 | //FIXME |
96 | // defaultErrorHandler()(error); | ||
84 | } | 97 | } |
85 | }); | 98 | }); |
86 | return r; | 99 | return r; |