diff options
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r-- | common/messagequeue.cpp | 157 |
1 files changed, 76 insertions, 81 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 73198a5..fd86635 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -3,41 +3,38 @@ | |||
3 | #include <QDebug> | 3 | #include <QDebug> |
4 | #include <log.h> | 4 | #include <log.h> |
5 | 5 | ||
6 | static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures) | 6 | static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures) |
7 | { | 7 | { |
8 | auto context = new QObject; | 8 | auto context = new QObject; |
9 | return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { | 9 | return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { |
10 | const auto total = futures.size(); | 10 | const auto total = futures.size(); |
11 | auto count = QSharedPointer<int>::create(); | 11 | auto count = QSharedPointer<int>::create(); |
12 | int i = 0; | 12 | int i = 0; |
13 | for (KAsync::Future<void> subFuture : futures) { | 13 | for (KAsync::Future<void> subFuture : futures) { |
14 | i++; | 14 | i++; |
15 | if (subFuture.isFinished()) { | 15 | if (subFuture.isFinished()) { |
16 | *count += 1; | 16 | *count += 1; |
17 | continue; | 17 | continue; |
18 | } | 18 | } |
19 | //FIXME bind lifetime all watcher to future (repectively the main job | 19 | // FIXME bind lifetime all watcher to future (repectively the main job |
20 | auto watcher = QSharedPointer<KAsync::FutureWatcher<void> >::create(); | 20 | auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create(); |
21 | QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, | 21 | QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() { |
22 | [count, total, &future](){ | 22 | *count += 1; |
23 | *count += 1; | 23 | if (*count == total) { |
24 | if (*count == total) { | 24 | future.setFinished(); |
25 | future.setFinished(); | 25 | } |
26 | } | 26 | }); |
27 | }); | 27 | watcher->setFuture(subFuture); |
28 | watcher->setFuture(subFuture); | 28 | context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); |
29 | context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); | 29 | } |
30 | } | 30 | if (*count == total) { |
31 | if (*count == total) { | 31 | future.setFinished(); |
32 | future.setFinished(); | 32 | } |
33 | } | 33 | }) |
34 | }).then<void>([context]() { | 34 | .then<void>([context]() { delete context; }); |
35 | delete context; | ||
36 | }); | ||
37 | } | 35 | } |
38 | 36 | ||
39 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) | 37 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) |
40 | : mStorage(storageRoot, name, Sink::Storage::ReadWrite) | ||
41 | { | 38 | { |
42 | } | 39 | } |
43 | 40 | ||
@@ -47,7 +44,7 @@ MessageQueue::~MessageQueue() | |||
47 | 44 | ||
48 | void MessageQueue::enqueue(void const *msg, size_t size) | 45 | void MessageQueue::enqueue(void const *msg, size_t size) |
49 | { | 46 | { |
50 | enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size)); | 47 | enqueue(QByteArray::fromRawData(static_cast<const char *>(msg), size)); |
51 | } | 48 | } |
52 | 49 | ||
53 | void MessageQueue::startTransaction() | 50 | void MessageQueue::startTransaction() |
@@ -96,19 +93,13 @@ void MessageQueue::processRemovals() | |||
96 | mPendingRemoval.clear(); | 93 | mPendingRemoval.clear(); |
97 | } | 94 | } |
98 | 95 | ||
99 | void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, | 96 | void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, const std::function<void(const Error &error)> &errorHandler) |
100 | const std::function<void(const Error &error)> &errorHandler) | ||
101 | { | 97 | { |
102 | dequeueBatch(1, [resultHandler](const QByteArray &value) { | 98 | dequeueBatch(1, [resultHandler](const QByteArray &value) { |
103 | return KAsync::start<void>([&value,resultHandler](KAsync::Future<void> &future) { | 99 | return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { |
104 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [&future](bool success){ | 100 | resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); |
105 | future.setFinished(); | ||
106 | }); | ||
107 | }); | 101 | }); |
108 | }).then<void>([](){}, | 102 | }).then<void>([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); |
109 | [errorHandler](int error, const QString &errorString) { | ||
110 | errorHandler(Error("messagequeue", error, errorString.toLatin1())); | ||
111 | }).exec(); | ||
112 | } | 103 | } |
113 | 104 | ||
114 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) | 105 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) |
@@ -116,41 +107,46 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
116 | auto resultCount = QSharedPointer<int>::create(0); | 107 | auto resultCount = QSharedPointer<int>::create(0); |
117 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { | 108 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { |
118 | int count = 0; | 109 | int count = 0; |
119 | QList<KAsync::Future<void> > waitCondition; | 110 | QList<KAsync::Future<void>> waitCondition; |
120 | mStorage.createTransaction(Sink::Storage::ReadOnly).openDatabase().scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { | 111 | mStorage.createTransaction(Sink::Storage::ReadOnly) |
121 | if (Sink::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { | 112 | .openDatabase() |
122 | return true; | 113 | .scan("", |
123 | } | 114 | [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { |
124 | *resultCount += 1; | 115 | if (Sink::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { |
125 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 116 | return true; |
126 | mPendingRemoval << QByteArray(key.constData(), key.size()); | 117 | } |
118 | *resultCount += 1; | ||
119 | // We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | ||
120 | mPendingRemoval << QByteArray(key.constData(), key.size()); | ||
127 | 121 | ||
128 | waitCondition << resultHandler(value).exec(); | 122 | waitCondition << resultHandler(value).exec(); |
129 | 123 | ||
130 | count++; | 124 | count++; |
131 | if (count < maxBatchSize) { | 125 | if (count < maxBatchSize) { |
132 | return true; | 126 | return true; |
133 | } | 127 | } |
134 | return false; | 128 | return false; |
135 | }, | 129 | }, |
136 | [](const Sink::Storage::Error &error) { | 130 | [](const Sink::Storage::Error &error) { |
137 | ErrorMsg() << "Error while retrieving value" << error.message; | 131 | ErrorMsg() << "Error while retrieving value" << error.message; |
138 | // errorHandler(Error(error.store, error.code, error.message)); | 132 | // errorHandler(Error(error.store, error.code, error.message)); |
139 | }); | 133 | }); |
140 | 134 | ||
141 | // Trace() << "Waiting on " << waitCondition.size() << " results"; | 135 | // Trace() << "Waiting on " << waitCondition.size() << " results"; |
142 | ::waitForCompletion(waitCondition).then<void>([this, resultCount, &future]() { | 136 | ::waitForCompletion(waitCondition) |
143 | processRemovals(); | 137 | .then<void>([this, resultCount, &future]() { |
144 | if (*resultCount == 0) { | 138 | processRemovals(); |
145 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); | 139 | if (*resultCount == 0) { |
146 | future.setFinished(); | 140 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); |
147 | } else { | 141 | future.setFinished(); |
148 | if (isEmpty()) { | 142 | } else { |
149 | emit this->drained(); | 143 | if (isEmpty()) { |
144 | emit this->drained(); | ||
145 | } | ||
146 | future.setFinished(); | ||
150 | } | 147 | } |
151 | future.setFinished(); | 148 | }) |
152 | } | 149 | .exec(); |
153 | }).exec(); | ||
154 | }); | 150 | }); |
155 | } | 151 | } |
156 | 152 | ||
@@ -160,16 +156,15 @@ bool MessageQueue::isEmpty() | |||
160 | auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); | 156 | auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); |
161 | auto db = t.openDatabase(); | 157 | auto db = t.openDatabase(); |
162 | if (db) { | 158 | if (db) { |
163 | db.scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { | 159 | db.scan("", |
164 | if (!Sink::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { | 160 | [&count, this](const QByteArray &key, const QByteArray &value) -> bool { |
165 | count++; | 161 | if (!Sink::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { |
166 | return false; | 162 | count++; |
167 | } | 163 | return false; |
168 | return true; | 164 | } |
169 | }, | 165 | return true; |
170 | [](const Sink::Storage::Error &error) { | 166 | }, |
171 | ErrorMsg() << "Error while checking if empty" << error.message; | 167 | [](const Sink::Storage::Error &error) { ErrorMsg() << "Error while checking if empty" << error.message; }); |
172 | }); | ||
173 | } | 168 | } |
174 | return count == 0; | 169 | return count == 0; |
175 | } | 170 | } |