diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-17 00:19:50 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-18 18:54:07 +0200 |
commit | e19bad87f43caf602793d8297562804b17383f7d (patch) | |
tree | 401837d8ea4fe07d2ca174e06989c72f02de633f /common/messagequeue.cpp | |
parent | ca3ce44c7a565df701d9e1b1556cbf2b6819f37c (diff) | |
download | sink-e19bad87f43caf602793d8297562804b17383f7d.tar.gz sink-e19bad87f43caf602793d8297562804b17383f7d.zip |
Transactions for messagequeue
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r-- | common/messagequeue.cpp | 123 |
1 files changed, 68 insertions, 55 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 2a046d1..ab4b1cf 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -39,7 +39,10 @@ static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures | |||
39 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) | 39 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) |
40 | : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) | 40 | : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) |
41 | { | 41 | { |
42 | } | ||
42 | 43 | ||
44 | MessageQueue::~MessageQueue() | ||
45 | { | ||
43 | } | 46 | } |
44 | 47 | ||
45 | void MessageQueue::enqueue(void const *msg, size_t size) | 48 | void MessageQueue::enqueue(void const *msg, size_t size) |
@@ -47,71 +50,88 @@ void MessageQueue::enqueue(void const *msg, size_t size) | |||
47 | enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size)); | 50 | enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size)); |
48 | } | 51 | } |
49 | 52 | ||
53 | void MessageQueue::startTransaction() | ||
54 | { | ||
55 | if (mWriteTransaction) { | ||
56 | return; | ||
57 | } | ||
58 | processRemovals(); | ||
59 | mWriteTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | ||
60 | } | ||
61 | |||
62 | void MessageQueue::commit() | ||
63 | { | ||
64 | mWriteTransaction.commit(); | ||
65 | mWriteTransaction = Akonadi2::Storage::Transaction(); | ||
66 | processRemovals(); | ||
67 | emit messageReady(); | ||
68 | } | ||
69 | |||
50 | void MessageQueue::enqueue(const QByteArray &value) | 70 | void MessageQueue::enqueue(const QByteArray &value) |
51 | { | 71 | { |
52 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | 72 | bool implicitTransaction = false; |
53 | const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1; | 73 | if (!mWriteTransaction) { |
74 | implicitTransaction = true; | ||
75 | startTransaction(); | ||
76 | } | ||
77 | const qint64 revision = Akonadi2::Storage::maxRevision(mWriteTransaction) + 1; | ||
54 | const QByteArray key = QString("%1").arg(revision).toUtf8(); | 78 | const QByteArray key = QString("%1").arg(revision).toUtf8(); |
55 | transaction.write(key, value); | 79 | mWriteTransaction.write(key, value); |
56 | Akonadi2::Storage::setMaxRevision(transaction, revision); | 80 | Akonadi2::Storage::setMaxRevision(mWriteTransaction, revision); |
81 | if (implicitTransaction) { | ||
82 | commit(); | ||
83 | } | ||
84 | } | ||
85 | |||
86 | void MessageQueue::processRemovals() | ||
87 | { | ||
88 | if (mWriteTransaction) { | ||
89 | return; | ||
90 | } | ||
91 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | ||
92 | for (const auto &key : mPendingRemoval) { | ||
93 | transaction.remove(key); | ||
94 | } | ||
57 | transaction.commit(); | 95 | transaction.commit(); |
58 | emit messageReady(); | 96 | mPendingRemoval.clear(); |
59 | } | 97 | } |
60 | 98 | ||
61 | void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, | 99 | void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, |
62 | const std::function<void(const Error &error)> &errorHandler) | 100 | const std::function<void(const Error &error)> &errorHandler) |
63 | { | 101 | { |
64 | bool readValue = false; | 102 | dequeueBatch(1, [resultHandler](const QByteArray &value) { |
65 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { | 103 | return KAsync::start<void>([&value,resultHandler](KAsync::Future<void> &future) { |
66 | if (Akonadi2::Storage::isInternalKey(key)) { | 104 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [&future](bool success){ |
67 | return true; | 105 | future.setFinished(); |
68 | } | 106 | }); |
69 | readValue = true; | ||
70 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | ||
71 | const auto keyCopy = QByteArray(key.constData(), key.size()); | ||
72 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { | ||
73 | if (success) { | ||
74 | mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { | ||
75 | ErrorMsg() << "Error while removing value" << error.message << keyCopy; | ||
76 | //Don't call the errorhandler in here, we already called the result handler | ||
77 | }); | ||
78 | if (isEmpty()) { | ||
79 | emit this->drained(); | ||
80 | } | ||
81 | } else { | ||
82 | //TODO re-enqueue? | ||
83 | } | ||
84 | }); | 107 | }); |
85 | return false; | 108 | }).then<void>([](){}, |
86 | }, | 109 | [errorHandler](int error, const QString &errorString) { |
87 | [errorHandler](const Akonadi2::Storage::Error &error) { | 110 | errorHandler(Error("messagequeue", error, errorString.toLatin1())); |
88 | ErrorMsg() << "Error while retrieving value" << error.message; | 111 | }).exec(); |
89 | errorHandler(Error(error.store, error.code, error.message)); | ||
90 | } | ||
91 | ); | ||
92 | if (!readValue) { | ||
93 | errorHandler(Error("messagequeue", -1, "No message found")); | ||
94 | } | ||
95 | } | 112 | } |
96 | 113 | ||
97 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) | 114 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) |
98 | { | 115 | { |
116 | Trace() << "Dequeue batch"; | ||
99 | auto resultCount = QSharedPointer<int>::create(0); | 117 | auto resultCount = QSharedPointer<int>::create(0); |
100 | auto keyList = QSharedPointer<QByteArrayList>::create(); | 118 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { |
101 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { | ||
102 | int count = 0; | 119 | int count = 0; |
103 | QList<KAsync::Future<void> > waitCondition; | 120 | QList<KAsync::Future<void> > waitCondition; |
104 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { | 121 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { |
105 | if (Akonadi2::Storage::isInternalKey(key)) { | 122 | if (Akonadi2::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { |
106 | return true; | 123 | return true; |
107 | } | 124 | } |
125 | *resultCount += 1; | ||
126 | Trace() << "Dequeue value"; | ||
108 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 127 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) |
109 | keyList->append(QByteArray(key.constData(), key.size())); | 128 | mPendingRemoval << QByteArray(key.constData(), key.size()); |
110 | 129 | ||
111 | waitCondition << resultHandler(value).exec(); | 130 | waitCondition << resultHandler(value).exec(); |
112 | 131 | ||
113 | count++; | 132 | count++; |
114 | if (count <= maxBatchSize) { | 133 | Trace() << count << maxBatchSize; |
134 | if (count < maxBatchSize) { | ||
115 | return true; | 135 | return true; |
116 | } | 136 | } |
117 | return false; | 137 | return false; |
@@ -121,17 +141,10 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
121 | // errorHandler(Error(error.store, error.code, error.message)); | 141 | // errorHandler(Error(error.store, error.code, error.message)); |
122 | }); | 142 | }); |
123 | 143 | ||
124 | ::waitForCompletion(waitCondition).then<void>([this, keyList, &future]() { | 144 | // Trace() << "Waiting on " << waitCondition.size() << " results"; |
125 | Trace() << "Dequeue complete, removing values " << *keyList; | 145 | ::waitForCompletion(waitCondition).then<void>([this, resultCount, &future]() { |
126 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | 146 | processRemovals(); |
127 | for (const auto &key : *keyList) { | 147 | if (*resultCount == 0) { |
128 | transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { | ||
129 | ErrorMsg() << "Error while removing value" << error.message << key; | ||
130 | //Don't call the errorhandler in here, we already called the result handler | ||
131 | }); | ||
132 | } | ||
133 | transaction.commit(); | ||
134 | if (keyList->isEmpty()) { | ||
135 | future.setError(-1, "No message found"); | 148 | future.setError(-1, "No message found"); |
136 | future.setFinished(); | 149 | future.setFinished(); |
137 | } else { | 150 | } else { |
@@ -147,15 +160,15 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
147 | bool MessageQueue::isEmpty() | 160 | bool MessageQueue::isEmpty() |
148 | { | 161 | { |
149 | int count = 0; | 162 | int count = 0; |
150 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool { | 163 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { |
151 | if (!Akonadi2::Storage::isInternalKey(key)) { | 164 | if (!Akonadi2::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { |
152 | count++; | 165 | count++; |
153 | return false; | 166 | return false; |
154 | } | 167 | } |
155 | return true; | 168 | return true; |
156 | }, | 169 | }, |
157 | [](const Akonadi2::Storage::Error &error) { | 170 | [](const Akonadi2::Storage::Error &error) { |
158 | qDebug() << "Error while checking if empty" << error.message; | 171 | ErrorMsg() << "Error while checking if empty" << error.message; |
159 | }); | 172 | }); |
160 | return count == 0; | 173 | return count == 0; |
161 | } | 174 | } |