summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dummyresource/resourcefactory.cpp80
1 files changed, 42 insertions, 38 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index d1beb1d..d2b0c14 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -138,12 +138,46 @@ private slots:
138 return; 138 return;
139 } 139 }
140 mProcessingLock = true; 140 mProcessingLock = true;
141 auto job = processPipeline().then<void>([this](Async::Future<void> &future) { 141 auto job = processPipeline().then<void>([this]() {
142 mProcessingLock = false; 142 mProcessingLock = false;
143 future.setFinished();
144 }).exec(); 143 }).exec();
145 } 144 }
146 145
146 void processCommand(const Akonadi2::QueuedCommand *queuedCommand, std::function<void(bool success)> callback)
147 {
148 qDebug() << "Dequeued: " << queuedCommand->commandId();
149 //Throw command into appropriate pipeline
150 switch (queuedCommand->commandId()) {
151 case Akonadi2::Commands::DeleteEntityCommand:
152 //mPipeline->removedEntity
153 break;
154 case Akonadi2::Commands::ModifyEntityCommand:
155 //mPipeline->modifiedEntity
156 break;
157 case Akonadi2::Commands::CreateEntityCommand: {
158 //TODO JOBAPI: job lifetime management
159 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
160 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
161 //FIXME this job is stack allocated and thus simply dies....
162 //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management
163 mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([callback]() {
164 callback(true);
165 },
166 [this, callback](int errorCode, const QString &errorMessage) {
167 qWarning() << "Error while creating entity: " << errorCode << errorMessage;
168 emit error(errorCode, errorMessage);
169 callback(false);
170 }).exec().waitForFinished();
171 }
172 break;
173 default:
174 //Unhandled command
175 qWarning() << "Unhandled command";
176 callback(false);
177 break;
178 }
179 }
180
147 //Process all messages of this queue 181 //Process all messages of this queue
148 Async::Job<void> processQueue(MessageQueue *queue) 182 Async::Job<void> processQueue(MessageQueue *queue)
149 { 183 {
@@ -158,39 +192,10 @@ private slots:
158 return; 192 return;
159 } 193 }
160 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); 194 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
161 qDebug() << "Dequeued: " << queuedCommand->commandId(); 195 processCommand(queuedCommand, [whileCallback, messageQueueCallback](bool success) {
162 //Throw command into appropriate pipeline 196 messageQueueCallback(success);
163 switch (queuedCommand->commandId()) { 197 whileCallback(!success);
164 case Akonadi2::Commands::DeleteEntityCommand: 198 });
165 //mPipeline->removedEntity
166 break;
167 case Akonadi2::Commands::ModifyEntityCommand:
168 //mPipeline->modifiedEntity
169 break;
170 case Akonadi2::Commands::CreateEntityCommand: {
171 //TODO JOBAPI: job lifetime management
172 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
173 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
174 mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([messageQueueCallback, whileCallback](Async::Future<void> &future) {
175 messageQueueCallback(true);
176 whileCallback(false);
177 future.setFinished();
178 },
179 [this, messageQueueCallback, whileCallback](int errorCode, const QString &errorMessage) {
180 qWarning() << "Error while creating entity: " << errorCode << errorMessage;
181 emit error(errorCode, errorMessage);
182 messageQueueCallback(true);
183 whileCallback(false);
184 }).exec();
185 }
186 break;
187 default:
188 //Unhandled command
189 qWarning() << "Unhandled command";
190 messageQueueCallback(true);
191 whileCallback(false);
192 break;
193 }
194 }, 199 },
195 [whileCallback](const MessageQueue::Error &error) { 200 [whileCallback](const MessageQueue::Error &error) {
196 whileCallback(true); 201 whileCallback(true);
@@ -201,20 +206,19 @@ private slots:
201 }); 206 });
202 }); 207 });
203 return job; 208 return job;
204
205 } 209 }
206 210
207 Async::Job<void> processPipeline() 211 Async::Job<void> processPipeline()
208 { 212 {
209 auto job = Async::start<void>([this](Async::Future<void> &future) { 213 auto job = Async::start<void>([this](Async::Future<void> &future) {
210 //An async for loop. Go through all message queues 214 //An async for loop. Go through all message queues
215 //TODO: replace by Async::foreach
211 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 216 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
212 asyncWhile([&, it](std::function<void(bool)> forCallback) { 217 asyncWhile([&, it](std::function<void(bool)> forCallback) {
213 if (it->hasNext()) { 218 if (it->hasNext()) {
214 auto queue = it->next(); 219 auto queue = it->next();
215 processQueue(queue).then<void>([forCallback](Async::Future<void> &future) { 220 processQueue(queue).then<void>([forCallback]() {
216 forCallback(false); 221 forCallback(false);
217 future.setFinished();
218 }).exec(); 222 }).exec();
219 } else { 223 } else {
220 forCallback(true); 224 forCallback(true);