diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-01 11:00:14 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-01 11:00:14 +0200 |
commit | fb4c36e3daded17746a0bcd7c245b8cea9782c1a (patch) | |
tree | d1df7f5d9a93b8d4d2358e96da52f368f241bec9 | |
parent | f407e12321dada7470e561cefd576031cd9e4168 (diff) | |
download | sink-fb4c36e3daded17746a0bcd7c245b8cea9782c1a.tar.gz sink-fb4c36e3daded17746a0bcd7c245b8cea9782c1a.zip |
Use dowhile
-rw-r--r-- | dummyresource/resourcefactory.cpp | 95 |
1 files changed, 43 insertions, 52 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 4e79f4c..10c8eaf 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -123,16 +123,6 @@ signals: | |||
123 | void error(int errorCode, const QString &errorMessage); | 123 | void error(int errorCode, const QString &errorMessage); |
124 | 124 | ||
125 | private slots: | 125 | private slots: |
126 | static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) { | ||
127 | body([body, completionHandler](bool complete) { | ||
128 | if (complete) { | ||
129 | completionHandler(); | ||
130 | } else { | ||
131 | asyncWhile(body, completionHandler); | ||
132 | } | ||
133 | }); | ||
134 | } | ||
135 | |||
136 | void process() | 126 | void process() |
137 | { | 127 | { |
138 | if (mProcessingLock) { | 128 | if (mProcessingLock) { |
@@ -165,55 +155,56 @@ private slots: | |||
165 | //Process all messages of this queue | 155 | //Process all messages of this queue |
166 | Async::Job<void> processQueue(MessageQueue *queue) | 156 | Async::Job<void> processQueue(MessageQueue *queue) |
167 | { | 157 | { |
168 | auto job = Async::start<void>([this, queue](Async::Future<void> &future) { | 158 | //TODO use something like: |
169 | asyncWhile([&, queue](std::function<void(bool)> whileCallback) { | 159 | //Async::foreach("pass iterator here").each("process value here").join(); |
170 | // auto job = Async::start<Akonadi2::QueuedCommand*>(void *ptr, int size) | 160 | //Async::foreach("pass iterator here").parallel("process value here").join(); |
171 | //TODO use something like: | 161 | return Async::dowhile( |
172 | //Async::foreach("pass iterator here").each("process value here").join(); | 162 | [this, queue](Async::Future<bool> &future) { |
173 | //Async::foreach("pass iterator here").parallel("process value here").join(); | 163 | queue->dequeue( |
174 | queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | 164 | [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { |
175 | auto callback = [messageQueueCallback, whileCallback](bool success) { | 165 | auto callback = [messageQueueCallback, &future](bool success) { |
176 | messageQueueCallback(success); | 166 | messageQueueCallback(success); |
177 | whileCallback(!success); | 167 | future.setValue(!success); |
178 | }; | 168 | future.setFinished(); |
179 | 169 | }; | |
180 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | 170 | |
181 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | 171 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); |
182 | qWarning() << "invalid buffer"; | 172 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { |
183 | callback(false); | 173 | qWarning() << "invalid buffer"; |
184 | return; | 174 | callback(false); |
185 | } | 175 | return; |
186 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | 176 | } |
187 | qDebug() << "Dequeued: " << queuedCommand->commandId(); | 177 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); |
188 | //TODO JOBAPI: job lifetime management | 178 | qDebug() << "Dequeued: " << queuedCommand->commandId(); |
189 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | 179 | //TODO JOBAPI: job lifetime management |
190 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | 180 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete |
191 | //FIXME this job is stack allocated and thus simply dies.... | 181 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. |
192 | //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management | 182 | //FIXME this job is stack allocated and thus simply dies.... |
193 | processQueuedCommand(queuedCommand).then<void>([callback]() { | 183 | processQueuedCommand(queuedCommand).then<void>( |
194 | callback(true); | 184 | [callback]() { |
185 | callback(true); | ||
186 | }, | ||
187 | [callback](int errorCode, QString errorMessage) { | ||
188 | Warning() << errorMessage; | ||
189 | callback(false); | ||
190 | } | ||
191 | ).exec(); | ||
195 | }, | 192 | }, |
196 | [callback](int errorCode, QString errorMessage) { | 193 | [&future](const MessageQueue::Error &error) { |
197 | Warning() << errorMessage; | 194 | Warning() << error.message; |
198 | callback(false); | 195 | future.setValue(false); |
199 | }).exec().waitForFinished(); | 196 | future.setFinished(); |
200 | }, | 197 | } |
201 | [whileCallback](const MessageQueue::Error &error) { | 198 | ); |
202 | whileCallback(true); | 199 | } |
203 | }); | 200 | ); |
204 | }, | ||
205 | [&future]() { //while complete | ||
206 | future.setFinished(); | ||
207 | }); | ||
208 | }); | ||
209 | return job; | ||
210 | } | 201 | } |
211 | 202 | ||
212 | Async::Job<void> processPipeline() | 203 | Async::Job<void> processPipeline() |
213 | { | 204 | { |
214 | //Go through all message queues | 205 | //Go through all message queues |
215 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 206 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); |
216 | return Async::dowhile<void>( | 207 | return Async::dowhile( |
217 | [it]() { return it->hasNext(); }, | 208 | [it]() { return it->hasNext(); }, |
218 | [it, this](Async::Future<void> &future) { | 209 | [it, this](Async::Future<void> &future) { |
219 | auto queue = it->next(); | 210 | auto queue = it->next(); |