diff options
-rw-r--r-- | dummyresource/resourcefactory.cpp | 61 |
1 files changed, 31 insertions, 30 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index c0395b7..4e79f4c 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -29,6 +29,7 @@ | |||
29 | #include "commands.h" | 29 | #include "commands.h" |
30 | #include "clientapi.h" | 30 | #include "clientapi.h" |
31 | #include "index.h" | 31 | #include "index.h" |
32 | #include "log.h" | ||
32 | #include <QUuid> | 33 | #include <QUuid> |
33 | #include <assert.h> | 34 | #include <assert.h> |
34 | 35 | ||
@@ -143,39 +144,22 @@ private slots: | |||
143 | }).exec(); | 144 | }).exec(); |
144 | } | 145 | } |
145 | 146 | ||
146 | void processCommand(const Akonadi2::QueuedCommand *queuedCommand, std::function<void(bool success)> callback) | 147 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) |
147 | { | 148 | { |
148 | qDebug() << "Dequeued: " << queuedCommand->commandId(); | ||
149 | //Throw command into appropriate pipeline | 149 | //Throw command into appropriate pipeline |
150 | switch (queuedCommand->commandId()) { | 150 | switch (queuedCommand->commandId()) { |
151 | case Akonadi2::Commands::DeleteEntityCommand: | 151 | case Akonadi2::Commands::DeleteEntityCommand: |
152 | //mPipeline->removedEntity | 152 | //mPipeline->removedEntity |
153 | break; | 153 | return Async::null<void>(); |
154 | case Akonadi2::Commands::ModifyEntityCommand: | 154 | case Akonadi2::Commands::ModifyEntityCommand: |
155 | //mPipeline->modifiedEntity | 155 | //mPipeline->modifiedEntity |
156 | break; | 156 | return Async::null<void>(); |
157 | case Akonadi2::Commands::CreateEntityCommand: { | 157 | case Akonadi2::Commands::CreateEntityCommand: |
158 | //TODO JOBAPI: job lifetime management | 158 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
159 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | ||
160 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | ||
161 | //FIXME this job is stack allocated and thus simply dies.... | ||
162 | //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management | ||
163 | mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([callback]() { | ||
164 | callback(true); | ||
165 | }, | ||
166 | [this, callback](int errorCode, const QString &errorMessage) { | ||
167 | qWarning() << "Error while creating entity: " << errorCode << errorMessage; | ||
168 | emit error(errorCode, errorMessage); | ||
169 | callback(false); | ||
170 | }).exec().waitForFinished(); | ||
171 | } | ||
172 | break; | ||
173 | default: | 159 | default: |
174 | //Unhandled command | 160 | return Async::error<void>(-1, "Unhandled command"); |
175 | qWarning() << "Unhandled command"; | ||
176 | callback(false); | ||
177 | break; | ||
178 | } | 161 | } |
162 | return Async::null<void>(); | ||
179 | } | 163 | } |
180 | 164 | ||
181 | //Process all messages of this queue | 165 | //Process all messages of this queue |
@@ -183,19 +167,36 @@ private slots: | |||
183 | { | 167 | { |
184 | auto job = Async::start<void>([this, queue](Async::Future<void> &future) { | 168 | auto job = Async::start<void>([this, queue](Async::Future<void> &future) { |
185 | asyncWhile([&, queue](std::function<void(bool)> whileCallback) { | 169 | asyncWhile([&, queue](std::function<void(bool)> whileCallback) { |
170 | // auto job = Async::start<Akonadi2::QueuedCommand*>(void *ptr, int size) | ||
171 | //TODO use something like: | ||
172 | //Async::foreach("pass iterator here").each("process value here").join(); | ||
173 | //Async::foreach("pass iterator here").parallel("process value here").join(); | ||
186 | queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | 174 | queue->dequeue([this, whileCallback](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { |
175 | auto callback = [messageQueueCallback, whileCallback](bool success) { | ||
176 | messageQueueCallback(success); | ||
177 | whileCallback(!success); | ||
178 | }; | ||
179 | |||
187 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | 180 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); |
188 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | 181 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { |
189 | qWarning() << "invalid buffer"; | 182 | qWarning() << "invalid buffer"; |
190 | messageQueueCallback(false); | 183 | callback(false); |
191 | whileCallback(true); | ||
192 | return; | 184 | return; |
193 | } | 185 | } |
194 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | 186 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); |
195 | processCommand(queuedCommand, [whileCallback, messageQueueCallback](bool success) { | 187 | qDebug() << "Dequeued: " << queuedCommand->commandId(); |
196 | messageQueueCallback(success); | 188 | //TODO JOBAPI: job lifetime management |
197 | whileCallback(!success); | 189 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete |
198 | }); | 190 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. |
191 | //FIXME this job is stack allocated and thus simply dies.... | ||
192 | //FIXME get rid of waitForFinished, it's a workaround for the missing lifetime management | ||
193 | processQueuedCommand(queuedCommand).then<void>([callback]() { | ||
194 | callback(true); | ||
195 | }, | ||
196 | [callback](int errorCode, QString errorMessage) { | ||
197 | Warning() << errorMessage; | ||
198 | callback(false); | ||
199 | }).exec().waitForFinished(); | ||
199 | }, | 200 | }, |
200 | [whileCallback](const MessageQueue::Error &error) { | 201 | [whileCallback](const MessageQueue::Error &error) { |
201 | whileCallback(true); | 202 | whileCallback(true); |