From 2cc09c118f73b2f71b48eb1fd57eea7a49a022dd Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 31 Mar 2015 16:55:10 +0200 Subject: async simplifications --- dummyresource/resourcefactory.cpp | 80 ++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 38 deletions(-) (limited to 'dummyresource/resourcefactory.cpp') diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index d1beb1d..d2b0c14 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -138,12 +138,46 @@ private slots: return; } mProcessingLock = true; - auto job = processPipeline().then([this](Async::Future &future) { + auto job = processPipeline().then([this]() { mProcessingLock = false; - future.setFinished(); }).exec(); } + void processCommand(const Akonadi2::QueuedCommand *queuedCommand, std::function callback) + { + qDebug() << "Dequeued: " << queuedCommand->commandId(); + //Throw command into appropriate pipeline + switch (queuedCommand->commandId()) { + case Akonadi2::Commands::DeleteEntityCommand: + //mPipeline->removedEntity + break; + case Akonadi2::Commands::ModifyEntityCommand: + //mPipeline->modifiedEntity + break; + case Akonadi2::Commands::CreateEntityCommand: { + //TODO JOBAPI: job lifetime management + //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete + //themselves once done. In other cases we'd like jobs that only live as long as their handle though. + //FIXME this job is stack allocated and thus simply dies.... + //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management + mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([callback]() { + callback(true); + }, + [this, callback](int errorCode, const QString &errorMessage) { + qWarning() << "Error while creating entity: " << errorCode << errorMessage; + emit error(errorCode, errorMessage); + callback(false); + }).exec().waitForFinished(); + } + break; + default: + //Unhandled command + qWarning() << "Unhandled command"; + callback(false); + break; + } + } + //Process all messages of this queue Async::Job processQueue(MessageQueue *queue) { @@ -158,39 +192,10 @@ private slots: return; } auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); - qDebug() << "Dequeued: " << queuedCommand->commandId(); - //Throw command into appropriate pipeline - switch (queuedCommand->commandId()) { - case Akonadi2::Commands::DeleteEntityCommand: - //mPipeline->removedEntity - break; - case Akonadi2::Commands::ModifyEntityCommand: - //mPipeline->modifiedEntity - break; - case Akonadi2::Commands::CreateEntityCommand: { - //TODO JOBAPI: job lifetime management - //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete - //themselves once done. In other cases we'd like jobs that only live as long as their handle though. - mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([messageQueueCallback, whileCallback](Async::Future &future) { - messageQueueCallback(true); - whileCallback(false); - future.setFinished(); - }, - [this, messageQueueCallback, whileCallback](int errorCode, const QString &errorMessage) { - qWarning() << "Error while creating entity: " << errorCode << errorMessage; - emit error(errorCode, errorMessage); - messageQueueCallback(true); - whileCallback(false); - }).exec(); - } - break; - default: - //Unhandled command - qWarning() << "Unhandled command"; - messageQueueCallback(true); - whileCallback(false); - break; - } + processCommand(queuedCommand, [whileCallback, messageQueueCallback](bool success) { + messageQueueCallback(success); + whileCallback(!success); + }); }, [whileCallback](const MessageQueue::Error &error) { whileCallback(true); @@ -201,20 +206,19 @@ private slots: }); }); return job; - } Async::Job processPipeline() { auto job = Async::start([this](Async::Future &future) { //An async for loop. Go through all message queues + //TODO: replace by Async::foreach auto it = QSharedPointer >::create(mCommandQueues); asyncWhile([&, it](std::function forCallback) { if (it->hasNext()) { auto queue = it->next(); - processQueue(queue).then([forCallback](Async::Future &future) { + processQueue(queue).then([forCallback]() { forCallback(false); - future.setFinished(); }).exec(); } else { forCallback(true); -- cgit v1.2.3