summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-03-03 09:01:05 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-03-03 09:01:05 +0100
commit4d9746c828558c9f872e0aed52442863affb25d5 (patch)
tree507d7c2ba67f47d3cbbcf01a722236ff1b48426b /common/messagequeue.cpp
parent9cea920b7dd51867a0be0fed2f461b6be73c103e (diff)
downloadsink-4d9746c828558c9f872e0aed52442863affb25d5.tar.gz
sink-4d9746c828558c9f872e0aed52442863affb25d5.zip
Fromatted the whole codebase with clang-format.
clang-format -i */**{.cpp,.h}
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp157
1 files changed, 76 insertions, 81 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 73198a5..fd86635 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -3,41 +3,38 @@
3#include <QDebug> 3#include <QDebug>
4#include <log.h> 4#include <log.h>
5 5
6static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures) 6static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures)
7{ 7{
8 auto context = new QObject; 8 auto context = new QObject;
9 return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { 9 return KAsync::start<void>([futures, context](KAsync::Future<void> &future) {
10 const auto total = futures.size(); 10 const auto total = futures.size();
11 auto count = QSharedPointer<int>::create(); 11 auto count = QSharedPointer<int>::create();
12 int i = 0; 12 int i = 0;
13 for (KAsync::Future<void> subFuture : futures) { 13 for (KAsync::Future<void> subFuture : futures) {
14 i++; 14 i++;
15 if (subFuture.isFinished()) { 15 if (subFuture.isFinished()) {
16 *count += 1; 16 *count += 1;
17 continue; 17 continue;
18 } 18 }
19 //FIXME bind lifetime all watcher to future (repectively the main job 19 // FIXME bind lifetime all watcher to future (repectively the main job
20 auto watcher = QSharedPointer<KAsync::FutureWatcher<void> >::create(); 20 auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create();
21 QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, 21 QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() {
22 [count, total, &future](){ 22 *count += 1;
23 *count += 1; 23 if (*count == total) {
24 if (*count == total) { 24 future.setFinished();
25 future.setFinished(); 25 }
26 } 26 });
27 }); 27 watcher->setFuture(subFuture);
28 watcher->setFuture(subFuture); 28 context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher));
29 context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); 29 }
30 } 30 if (*count == total) {
31 if (*count == total) { 31 future.setFinished();
32 future.setFinished(); 32 }
33 } 33 })
34 }).then<void>([context]() { 34 .then<void>([context]() { delete context; });
35 delete context;
36 });
37} 35}
38 36
39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) 37MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite)
40 : mStorage(storageRoot, name, Sink::Storage::ReadWrite)
41{ 38{
42} 39}
43 40
@@ -47,7 +44,7 @@ MessageQueue::~MessageQueue()
47 44
48void MessageQueue::enqueue(void const *msg, size_t size) 45void MessageQueue::enqueue(void const *msg, size_t size)
49{ 46{
50 enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size)); 47 enqueue(QByteArray::fromRawData(static_cast<const char *>(msg), size));
51} 48}
52 49
53void MessageQueue::startTransaction() 50void MessageQueue::startTransaction()
@@ -96,19 +93,13 @@ void MessageQueue::processRemovals()
96 mPendingRemoval.clear(); 93 mPendingRemoval.clear();
97} 94}
98 95
99void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, 96void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, const std::function<void(const Error &error)> &errorHandler)
100 const std::function<void(const Error &error)> &errorHandler)
101{ 97{
102 dequeueBatch(1, [resultHandler](const QByteArray &value) { 98 dequeueBatch(1, [resultHandler](const QByteArray &value) {
103 return KAsync::start<void>([&value,resultHandler](KAsync::Future<void> &future) { 99 return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) {
104 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [&future](bool success){ 100 resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); });
105 future.setFinished();
106 });
107 }); 101 });
108 }).then<void>([](){}, 102 }).then<void>([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec();
109 [errorHandler](int error, const QString &errorString) {
110 errorHandler(Error("messagequeue", error, errorString.toLatin1()));
111 }).exec();
112} 103}
113 104
114KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) 105KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
@@ -116,41 +107,46 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
116 auto resultCount = QSharedPointer<int>::create(0); 107 auto resultCount = QSharedPointer<int>::create(0);
117 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { 108 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) {
118 int count = 0; 109 int count = 0;
119 QList<KAsync::Future<void> > waitCondition; 110 QList<KAsync::Future<void>> waitCondition;
120 mStorage.createTransaction(Sink::Storage::ReadOnly).openDatabase().scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { 111 mStorage.createTransaction(Sink::Storage::ReadOnly)
121 if (Sink::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { 112 .openDatabase()
122 return true; 113 .scan("",
123 } 114 [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
124 *resultCount += 1; 115 if (Sink::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) {
125 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) 116 return true;
126 mPendingRemoval << QByteArray(key.constData(), key.size()); 117 }
118 *resultCount += 1;
119 // We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
120 mPendingRemoval << QByteArray(key.constData(), key.size());
127 121
128 waitCondition << resultHandler(value).exec(); 122 waitCondition << resultHandler(value).exec();
129 123
130 count++; 124 count++;
131 if (count < maxBatchSize) { 125 if (count < maxBatchSize) {
132 return true; 126 return true;
133 } 127 }
134 return false; 128 return false;
135 }, 129 },
136 [](const Sink::Storage::Error &error) { 130 [](const Sink::Storage::Error &error) {
137 ErrorMsg() << "Error while retrieving value" << error.message; 131 ErrorMsg() << "Error while retrieving value" << error.message;
138 // errorHandler(Error(error.store, error.code, error.message)); 132 // errorHandler(Error(error.store, error.code, error.message));
139 }); 133 });
140 134
141 // Trace() << "Waiting on " << waitCondition.size() << " results"; 135 // Trace() << "Waiting on " << waitCondition.size() << " results";
142 ::waitForCompletion(waitCondition).then<void>([this, resultCount, &future]() { 136 ::waitForCompletion(waitCondition)
143 processRemovals(); 137 .then<void>([this, resultCount, &future]() {
144 if (*resultCount == 0) { 138 processRemovals();
145 future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); 139 if (*resultCount == 0) {
146 future.setFinished(); 140 future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found");
147 } else { 141 future.setFinished();
148 if (isEmpty()) { 142 } else {
149 emit this->drained(); 143 if (isEmpty()) {
144 emit this->drained();
145 }
146 future.setFinished();
150 } 147 }
151 future.setFinished(); 148 })
152 } 149 .exec();
153 }).exec();
154 }); 150 });
155} 151}
156 152
@@ -160,16 +156,15 @@ bool MessageQueue::isEmpty()
160 auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); 156 auto t = mStorage.createTransaction(Sink::Storage::ReadOnly);
161 auto db = t.openDatabase(); 157 auto db = t.openDatabase();
162 if (db) { 158 if (db) {
163 db.scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { 159 db.scan("",
164 if (!Sink::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { 160 [&count, this](const QByteArray &key, const QByteArray &value) -> bool {
165 count++; 161 if (!Sink::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) {
166 return false; 162 count++;
167 } 163 return false;
168 return true; 164 }
169 }, 165 return true;
170 [](const Sink::Storage::Error &error) { 166 },
171 ErrorMsg() << "Error while checking if empty" << error.message; 167 [](const Sink::Storage::Error &error) { ErrorMsg() << "Error while checking if empty" << error.message; });
172 });
173 } 168 }
174 return count == 0; 169 return count == 0;
175} 170}