diff options
-rw-r--r-- | dummyresource/resourcefactory.cpp | 80 |
1 files changed, 42 insertions, 38 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index d1beb1d..d2b0c14 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -138,12 +138,46 @@ private slots: | |||
138 | return; | 138 | return; |
139 | } | 139 | } |
140 | mProcessingLock = true; | 140 | mProcessingLock = true; |
141 | auto job = processPipeline().then<void>([this](Async::Future<void> &future) { | 141 | auto job = processPipeline().then<void>([this]() { |
142 | mProcessingLock = false; | 142 | mProcessingLock = false; |
143 | future.setFinished(); | ||
144 | }).exec(); | 143 | }).exec(); |
145 | } | 144 | } |
146 | 145 | ||
146 | void processCommand(const Akonadi2::QueuedCommand *queuedCommand, std::function<void(bool success)> callback) | ||
147 | { | ||
148 | qDebug() << "Dequeued: " << queuedCommand->commandId(); | ||
149 | //Throw command into appropriate pipeline | ||
150 | switch (queuedCommand->commandId()) { | ||
151 | case Akonadi2::Commands::DeleteEntityCommand: | ||
152 | //mPipeline->removedEntity | ||
153 | break; | ||
154 | case Akonadi2::Commands::ModifyEntityCommand: | ||
155 | //mPipeline->modifiedEntity | ||
156 | break; | ||
157 | case Akonadi2::Commands::CreateEntityCommand: { | ||
158 | //TODO JOBAPI: job lifetime management | ||
159 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | ||
160 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | ||
161 | //FIXME this job is stack allocated and thus simply dies.... | ||
162 | //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management | ||
163 | mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([callback]() { | ||
164 | callback(true); | ||
165 | }, | ||
166 | [this, callback](int errorCode, const QString &errorMessage) { | ||
167 | qWarning() << "Error while creating entity: " << errorCode << errorMessage; | ||
168 | emit error(errorCode, errorMessage); | ||
169 | callback(false); | ||
170 | }).exec().waitForFinished(); | ||
171 | } | ||
172 | break; | ||
173 | default: | ||
174 | //Unhandled command | ||
175 | qWarning() << "Unhandled command"; | ||
176 | callback(false); | ||
177 | break; | ||
178 | } | ||
179 | } | ||
180 | |||
147 | //Process all messages of this queue | 181 | //Process all messages of this queue |
148 | Async::Job<void> processQueue(MessageQueue *queue) | 182 | Async::Job<void> processQueue(MessageQueue *queue) |
149 | { | 183 | { |
@@ -158,39 +192,10 @@ private slots: | |||
158 | return; | 192 | return; |
159 | } | 193 | } |
160 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | 194 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); |
161 | qDebug() << "Dequeued: " << queuedCommand->commandId(); | 195 | processCommand(queuedCommand, [whileCallback, messageQueueCallback](bool success) { |
162 | //Throw command into appropriate pipeline | 196 | messageQueueCallback(success); |
163 | switch (queuedCommand->commandId()) { | 197 | whileCallback(!success); |
164 | case Akonadi2::Commands::DeleteEntityCommand: | 198 | }); |
165 | //mPipeline->removedEntity | ||
166 | break; | ||
167 | case Akonadi2::Commands::ModifyEntityCommand: | ||
168 | //mPipeline->modifiedEntity | ||
169 | break; | ||
170 | case Akonadi2::Commands::CreateEntityCommand: { | ||
171 | //TODO JOBAPI: job lifetime management | ||
172 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | ||
173 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | ||
174 | mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([messageQueueCallback, whileCallback](Async::Future<void> &future) { | ||
175 | messageQueueCallback(true); | ||
176 | whileCallback(false); | ||
177 | future.setFinished(); | ||
178 | }, | ||
179 | [this, messageQueueCallback, whileCallback](int errorCode, const QString &errorMessage) { | ||
180 | qWarning() << "Error while creating entity: " << errorCode << errorMessage; | ||
181 | emit error(errorCode, errorMessage); | ||
182 | messageQueueCallback(true); | ||
183 | whileCallback(false); | ||
184 | }).exec(); | ||
185 | } | ||
186 | break; | ||
187 | default: | ||
188 | //Unhandled command | ||
189 | qWarning() << "Unhandled command"; | ||
190 | messageQueueCallback(true); | ||
191 | whileCallback(false); | ||
192 | break; | ||
193 | } | ||
194 | }, | 199 | }, |
195 | [whileCallback](const MessageQueue::Error &error) { | 200 | [whileCallback](const MessageQueue::Error &error) { |
196 | whileCallback(true); | 201 | whileCallback(true); |
@@ -201,20 +206,19 @@ private slots: | |||
201 | }); | 206 | }); |
202 | }); | 207 | }); |
203 | return job; | 208 | return job; |
204 | |||
205 | } | 209 | } |
206 | 210 | ||
207 | Async::Job<void> processPipeline() | 211 | Async::Job<void> processPipeline() |
208 | { | 212 | { |
209 | auto job = Async::start<void>([this](Async::Future<void> &future) { | 213 | auto job = Async::start<void>([this](Async::Future<void> &future) { |
210 | //An async for loop. Go through all message queues | 214 | //An async for loop. Go through all message queues |
215 | //TODO: replace by Async::foreach | ||
211 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 216 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); |
212 | asyncWhile([&, it](std::function<void(bool)> forCallback) { | 217 | asyncWhile([&, it](std::function<void(bool)> forCallback) { |
213 | if (it->hasNext()) { | 218 | if (it->hasNext()) { |
214 | auto queue = it->next(); | 219 | auto queue = it->next(); |
215 | processQueue(queue).then<void>([forCallback](Async::Future<void> &future) { | 220 | processQueue(queue).then<void>([forCallback]() { |
216 | forCallback(false); | 221 | forCallback(false); |
217 | future.setFinished(); | ||
218 | }).exec(); | 222 | }).exec(); |
219 | } else { | 223 | } else { |
220 | forCallback(true); | 224 | forCallback(true); |