summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-16 16:49:15 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-16 16:49:15 +0100
commitc831909be94abfc24c64f482ce22ce508aed2147 (patch)
tree7be7cc86d9a2e39a0fba6871aa46122f431a4cb8
parent20bf85c5cfa51ee5da0b890cfd72346373ff7c8c (diff)
downloadsink-c831909be94abfc24c64f482ce22ce508aed2147.tar.gz
sink-c831909be94abfc24c64f482ce22ce508aed2147.zip
Asynchronously go through messagequeues
-rw-r--r--dummyresource/resourcefactory.cpp35
1 files changed, 29 insertions, 6 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index b43e4a3..01db782 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -138,12 +138,11 @@ private slots:
138 }).exec(); 138 }).exec();
139 } 139 }
140 140
141 Async::Job<void> processPipeline() 141 //Process all messages of this queue
142 Async::Job<void> processQueue(MessageQueue *queue)
142 { 143 {
143 auto job = Async::start<void>([this](Async::Future<void> &future) { 144 auto job = Async::start<void>([this, queue](Async::Future<void> &future) {
144 //TODO process all queues in async for 145 asyncWhile([&, queue](std::function<void(bool)> whileCallback) {
145 auto queue = mCommandQueues.first();
146 asyncWhile([&](std::function<void(bool)> whileCallback) {
147 queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { 146 queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); 147 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
149 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 148 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
@@ -186,10 +185,34 @@ private slots:
186 }, 185 },
187 [&future]() { //while complete 186 [&future]() { //while complete
188 future.setFinished(); 187 future.setFinished();
189 //Call async-for completion handler
190 }); 188 });
191 }); 189 });
192 return job; 190 return job;
191
192 }
193
194 Async::Job<void> processPipeline()
195 {
196 auto job = Async::start<void>([this](Async::Future<void> &future) {
197 //An async for loop. Go through all message queues
198 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
199 asyncWhile([&, it](std::function<void(bool)> forCallback) {
200 if (it->hasNext()) {
201 auto queue = it->next();
202 processQueue(queue).then<void>([forCallback](Async::Future<void> &future) {
203 forCallback(false);
204 future.setFinished();
205 }).exec();
206 } else {
207 forCallback(true);
208 }
209 },
210 [&future]() { //while complete
211 future.setFinished();
212 });
213
214 });
215 return job;
193 } 216 }
194 217
195private: 218private: