summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-01 01:40:54 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-01 01:40:54 +0200
commit08e35aad0b53364fb82e830f1b2e2ceb60658ea5 (patch)
tree7ce6fb2c1579ec3bd1af2805b1d4caecbec9c845
parent65131b9470455ae52a86d811882ca1d97c4a899c (diff)
downloadsink-08e35aad0b53364fb82e830f1b2e2ceb60658ea5.tar.gz
sink-08e35aad0b53364fb82e830f1b2e2ceb60658ea5.zip
Refactored resourcefactory further.
-rw-r--r--dummyresource/resourcefactory.cpp61
1 files changed, 31 insertions, 30 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index c0395b7..4e79f4c 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -29,6 +29,7 @@
29#include "commands.h" 29#include "commands.h"
30#include "clientapi.h" 30#include "clientapi.h"
31#include "index.h" 31#include "index.h"
32#include "log.h"
32#include <QUuid> 33#include <QUuid>
33#include <assert.h> 34#include <assert.h>
34 35
@@ -143,39 +144,22 @@ private slots:
143 }).exec(); 144 }).exec();
144 } 145 }
145 146
146 void processCommand(const Akonadi2::QueuedCommand *queuedCommand, std::function<void(bool success)> callback) 147 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
147 { 148 {
148 qDebug() << "Dequeued: " << queuedCommand->commandId();
149 //Throw command into appropriate pipeline 149 //Throw command into appropriate pipeline
150 switch (queuedCommand->commandId()) { 150 switch (queuedCommand->commandId()) {
151 case Akonadi2::Commands::DeleteEntityCommand: 151 case Akonadi2::Commands::DeleteEntityCommand:
152 //mPipeline->removedEntity 152 //mPipeline->removedEntity
153 break; 153 return Async::null<void>();
154 case Akonadi2::Commands::ModifyEntityCommand: 154 case Akonadi2::Commands::ModifyEntityCommand:
155 //mPipeline->modifiedEntity 155 //mPipeline->modifiedEntity
156 break; 156 return Async::null<void>();
157 case Akonadi2::Commands::CreateEntityCommand: { 157 case Akonadi2::Commands::CreateEntityCommand:
158 //TODO JOBAPI: job lifetime management 158 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
159 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
160 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
161 //FIXME this job is stack allocated and thus simply dies....
162 //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management
163 mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([callback]() {
164 callback(true);
165 },
166 [this, callback](int errorCode, const QString &errorMessage) {
167 qWarning() << "Error while creating entity: " << errorCode << errorMessage;
168 emit error(errorCode, errorMessage);
169 callback(false);
170 }).exec().waitForFinished();
171 }
172 break;
173 default: 159 default:
174 //Unhandled command 160 return Async::error<void>(-1, "Unhandled command");
175 qWarning() << "Unhandled command";
176 callback(false);
177 break;
178 } 161 }
162 return Async::null<void>();
179 } 163 }
180 164
181 //Process all messages of this queue 165 //Process all messages of this queue
@@ -183,19 +167,36 @@ private slots:
183 { 167 {
184 auto job = Async::start<void>([this, queue](Async::Future<void> &future) { 168 auto job = Async::start<void>([this, queue](Async::Future<void> &future) {
185 asyncWhile([&, queue](std::function<void(bool)> whileCallback) { 169 asyncWhile([&, queue](std::function<void(bool)> whileCallback) {
170 // auto job = Async::start<Akonadi2::QueuedCommand*>(void *ptr, int size)
171 //TODO use something like:
172 //Async::foreach("pass iterator here").each("process value here").join();
173 //Async::foreach("pass iterator here").parallel("process value here").join();
186 queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { 174 queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
175 auto callback = [messageQueueCallback, whileCallback](bool success) {
176 messageQueueCallback(success);
177 whileCallback(!success);
178 };
179
187 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); 180 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
188 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 181 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
189 qWarning() << "invalid buffer"; 182 qWarning() << "invalid buffer";
190 messageQueueCallback(false); 183 callback(false);
191 whileCallback(true);
192 return; 184 return;
193 } 185 }
194 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); 186 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
195 processCommand(queuedCommand, [whileCallback, messageQueueCallback](bool success) { 187 qDebug() << "Dequeued: " << queuedCommand->commandId();
196 messageQueueCallback(success); 188 //TODO JOBAPI: job lifetime management
197 whileCallback(!success); 189 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
198 }); 190 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
191 //FIXME this job is stack allocated and thus simply dies....
192 //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management
193 processQueuedCommand(queuedCommand).then<void>([callback]() {
194 callback(true);
195 },
196 [callback](int errorCode, QString errorMessage) {
197 Warning() << errorMessage;
198 callback(false);
199 }).exec().waitForFinished();
199 }, 200 },
200 [whileCallback](const MessageQueue::Error &error) { 201 [whileCallback](const MessageQueue::Error &error) {
201 whileCallback(true); 202 whileCallback(true);