summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dummyresource/resourcefactory.cpp95
1 files changed, 43 insertions, 52 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 4e79f4c..10c8eaf 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -123,16 +123,6 @@ signals:
123 void error(int errorCode, const QString &errorMessage); 123 void error(int errorCode, const QString &errorMessage);
124 124
125private slots: 125private slots:
126 static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) {
127 body([body, completionHandler](bool complete) {
128 if (complete) {
129 completionHandler();
130 } else {
131 asyncWhile(body, completionHandler);
132 }
133 });
134 }
135
136 void process() 126 void process()
137 { 127 {
138 if (mProcessingLock) { 128 if (mProcessingLock) {
@@ -165,55 +155,56 @@ private slots:
165 //Process all messages of this queue 155 //Process all messages of this queue
166 Async::Job<void> processQueue(MessageQueue *queue) 156 Async::Job<void> processQueue(MessageQueue *queue)
167 { 157 {
168 auto job = Async::start<void>([this, queue](Async::Future<void> &future) { 158 //TODO use something like:
169 asyncWhile([&, queue](std::function<void(bool)> whileCallback) { 159 //Async::foreach("pass iterator here").each("process value here").join();
170 // auto job = Async::start<Akonadi2::QueuedCommand*>(void *ptr, int size) 160 //Async::foreach("pass iterator here").parallel("process value here").join();
171 //TODO use something like: 161 return Async::dowhile(
172 //Async::foreach("pass iterator here").each("process value here").join(); 162 [this, queue](Async::Future<bool> &future) {
173 //Async::foreach("pass iterator here").parallel("process value here").join(); 163 queue->dequeue(
174 queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { 164 [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
175 auto callback = [messageQueueCallback, whileCallback](bool success) { 165 auto callback = [messageQueueCallback, &future](bool success) {
176 messageQueueCallback(success); 166 messageQueueCallback(success);
177 whileCallback(!success); 167 future.setValue(!success);
178 }; 168 future.setFinished();
179 169 };
180 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); 170
181 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 171 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
182 qWarning() << "invalid buffer"; 172 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
183 callback(false); 173 qWarning() << "invalid buffer";
184 return; 174 callback(false);
185 } 175 return;
186 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); 176 }
187 qDebug() << "Dequeued: " << queuedCommand->commandId(); 177 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
188 //TODO JOBAPI: job lifetime management 178 qDebug() << "Dequeued: " << queuedCommand->commandId();
189 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete 179 //TODO JOBAPI: job lifetime management
190 //themselves once done. In other cases we'd like jobs that only live as long as their handle though. 180 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
191 //FIXME this job is stack allocated and thus simply dies.... 181 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
192 //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management 182 //FIXME this job is stack allocated and thus simply dies....
193 processQueuedCommand(queuedCommand).then<void>([callback]() { 183 processQueuedCommand(queuedCommand).then<void>(
194 callback(true); 184 [callback]() {
185 callback(true);
186 },
187 [callback](int errorCode, QString errorMessage) {
188 Warning() << errorMessage;
189 callback(false);
190 }
191 ).exec();
195 }, 192 },
196 [callback](int errorCode, QString errorMessage) { 193 [&future](const MessageQueue::Error &error) {
197 Warning() << errorMessage; 194 Warning() << error.message;
198 callback(false); 195 future.setValue(false);
199 }).exec().waitForFinished(); 196 future.setFinished();
200 }, 197 }
201 [whileCallback](const MessageQueue::Error &error) { 198 );
202 whileCallback(true); 199 }
203 }); 200 );
204 },
205 [&future]() { //while complete
206 future.setFinished();
207 });
208 });
209 return job;
210 } 201 }
211 202
212 Async::Job<void> processPipeline() 203 Async::Job<void> processPipeline()
213 { 204 {
214 //Go through all message queues 205 //Go through all message queues
215 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 206 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
216 return Async::dowhile<void>( 207 return Async::dowhile(
217 [it]() { return it->hasNext(); }, 208 [it]() { return it->hasNext(); },
218 [it, this](Async::Future<void> &future) { 209 [it, this](Async::Future<void> &future) {
219 auto queue = it->next(); 210 auto queue = it->next();