diff options
Diffstat (limited to 'examples/dummyresource/resourcefactory.cpp')
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 200 |
1 files changed, 3 insertions, 197 deletions
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index d5765e2..a4cd68d 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -103,152 +103,10 @@ QMap<QString, QString> populate() | |||
103 | 103 | ||
104 | static QMap<QString, QString> s_dataSource = populate(); | 104 | static QMap<QString, QString> s_dataSource = populate(); |
105 | 105 | ||
106 | //Drives the pipeline using the output from all command queues | ||
107 | class Processor : public QObject | ||
108 | { | ||
109 | Q_OBJECT | ||
110 | public: | ||
111 | Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | ||
112 | : QObject(), | ||
113 | mPipeline(pipeline), | ||
114 | mCommandQueues(commandQueues), | ||
115 | mProcessingLock(false) | ||
116 | { | ||
117 | for (auto queue : mCommandQueues) { | ||
118 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | ||
119 | Q_UNUSED(ret); | ||
120 | } | ||
121 | } | ||
122 | |||
123 | signals: | ||
124 | void error(int errorCode, const QString &errorMessage); | ||
125 | |||
126 | private: | ||
127 | bool messagesToProcessAvailable() | ||
128 | { | ||
129 | for (auto queue : mCommandQueues) { | ||
130 | if (!queue->isEmpty()) { | ||
131 | return true; | ||
132 | } | ||
133 | } | ||
134 | return false; | ||
135 | } | ||
136 | |||
137 | private slots: | ||
138 | void process() | ||
139 | { | ||
140 | if (mProcessingLock) { | ||
141 | return; | ||
142 | } | ||
143 | mProcessingLock = true; | ||
144 | auto job = processPipeline().then<void>([this]() { | ||
145 | mProcessingLock = false; | ||
146 | if (messagesToProcessAvailable()) { | ||
147 | process(); | ||
148 | } | ||
149 | }).exec(); | ||
150 | } | ||
151 | |||
152 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | ||
153 | { | ||
154 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
155 | //Throw command into appropriate pipeline | ||
156 | switch (queuedCommand->commandId()) { | ||
157 | case Akonadi2::Commands::DeleteEntityCommand: | ||
158 | //mPipeline->removedEntity | ||
159 | return Async::null<void>(); | ||
160 | case Akonadi2::Commands::ModifyEntityCommand: | ||
161 | //mPipeline->modifiedEntity | ||
162 | return Async::null<void>(); | ||
163 | case Akonadi2::Commands::CreateEntityCommand: | ||
164 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | ||
165 | default: | ||
166 | return Async::error<void>(-1, "Unhandled command"); | ||
167 | } | ||
168 | return Async::null<void>(); | ||
169 | } | ||
170 | |||
171 | //Process all messages of this queue | ||
172 | Async::Job<void> processQueue(MessageQueue *queue) | ||
173 | { | ||
174 | //TODO use something like: | ||
175 | //Async::foreach("pass iterator here").each("process value here").join(); | ||
176 | //Async::foreach("pass iterator here").parallel("process value here").join(); | ||
177 | return Async::dowhile( | ||
178 | [this, queue](Async::Future<bool> &future) { | ||
179 | if (queue->isEmpty()) { | ||
180 | future.setValue(false); | ||
181 | future.setFinished(); | ||
182 | return; | ||
183 | } | ||
184 | queue->dequeue( | ||
185 | [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
186 | auto callback = [messageQueueCallback, &future](bool success) { | ||
187 | messageQueueCallback(success); | ||
188 | future.setValue(!success); | ||
189 | future.setFinished(); | ||
190 | }; | ||
191 | |||
192 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
193 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
194 | Warning() << "invalid buffer"; | ||
195 | callback(false); | ||
196 | return; | ||
197 | } | ||
198 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
199 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
200 | //TODO JOBAPI: job lifetime management | ||
201 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | ||
202 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | ||
203 | //FIXME this job is stack allocated and thus simply dies.... | ||
204 | processQueuedCommand(queuedCommand).then<void>( | ||
205 | [callback]() { | ||
206 | callback(true); | ||
207 | }, | ||
208 | [callback](int errorCode, QString errorMessage) { | ||
209 | Warning() << "Error while processing queue command: " << errorMessage; | ||
210 | callback(false); | ||
211 | } | ||
212 | ).exec(); | ||
213 | }, | ||
214 | [&future](const MessageQueue::Error &error) { | ||
215 | Warning() << "Error while getting message from messagequeue: " << error.message; | ||
216 | future.setValue(false); | ||
217 | future.setFinished(); | ||
218 | } | ||
219 | ); | ||
220 | } | ||
221 | ); | ||
222 | } | ||
223 | |||
224 | Async::Job<void> processPipeline() | ||
225 | { | ||
226 | //Go through all message queues | ||
227 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | ||
228 | return Async::dowhile( | ||
229 | [it]() { return it->hasNext(); }, | ||
230 | [it, this](Async::Future<void> &future) { | ||
231 | auto queue = it->next(); | ||
232 | processQueue(queue).then<void>([&future]() { | ||
233 | Trace() << "Queue processed"; | ||
234 | future.setFinished(); | ||
235 | }).exec(); | ||
236 | } | ||
237 | ); | ||
238 | } | ||
239 | |||
240 | private: | ||
241 | Akonadi2::Pipeline *mPipeline; | ||
242 | //Ordered by priority | ||
243 | QList<MessageQueue*> mCommandQueues; | ||
244 | bool mProcessingLock; | ||
245 | }; | ||
246 | 106 | ||
107 | //FIXME We need to pass the resource-instance name to generic resource, not the plugin name | ||
247 | DummyResource::DummyResource() | 108 | DummyResource::DummyResource() |
248 | : Akonadi2::Resource(), | 109 | : Akonadi2::GenericResource(PLUGIN_NAME) |
249 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), | ||
250 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), | ||
251 | mError(0) | ||
252 | { | 110 | { |
253 | } | 111 | } |
254 | 112 | ||
@@ -277,19 +135,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) | |||
277 | 135 | ||
278 | //event is the entitytype and not the domain type | 136 | //event is the entitytype and not the domain type |
279 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer << uidIndexer); | 137 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer << uidIndexer); |
280 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 138 | GenericResource::configurePipeline(pipeline); |
281 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | ||
282 | } | ||
283 | |||
284 | void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) | ||
285 | { | ||
286 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | ||
287 | mError = errorCode; | ||
288 | } | ||
289 | |||
290 | int DummyResource::error() const | ||
291 | { | ||
292 | return mError; | ||
293 | } | 139 | } |
294 | 140 | ||
295 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) | 141 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) |
@@ -316,15 +162,6 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri | |||
316 | }); | 162 | }); |
317 | } | 163 | } |
318 | 164 | ||
319 | void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
320 | { | ||
321 | m_fbb.Clear(); | ||
322 | auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); | ||
323 | auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); | ||
324 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
325 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
326 | } | ||
327 | |||
328 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) | 165 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) |
329 | { | 166 | { |
330 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { | 167 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { |
@@ -377,37 +214,6 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
377 | }); | 214 | }); |
378 | } | 215 | } |
379 | 216 | ||
380 | Async::Job<void> DummyResource::processAllMessages() | ||
381 | { | ||
382 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | ||
383 | //TODO: report errors while processing sync? | ||
384 | //TODO JOBAPI: A helper that waits for n events and then continues? | ||
385 | return Async::start<void>([this](Async::Future<void> &f) { | ||
386 | if (mSynchronizerQueue.isEmpty()) { | ||
387 | f.setFinished(); | ||
388 | } else { | ||
389 | QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { | ||
390 | f.setFinished(); | ||
391 | }); | ||
392 | } | ||
393 | }).then<void>([this](Async::Future<void> &f) { | ||
394 | if (mUserQueue.isEmpty()) { | ||
395 | f.setFinished(); | ||
396 | } else { | ||
397 | QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { | ||
398 | f.setFinished(); | ||
399 | }); | ||
400 | } | ||
401 | }); | ||
402 | } | ||
403 | |||
404 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | ||
405 | { | ||
406 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | ||
407 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | ||
408 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). | ||
409 | enqueueCommand(mUserQueue, commandId, data); | ||
410 | } | ||
411 | 217 | ||
412 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | 218 | DummyResourceFactory::DummyResourceFactory(QObject *parent) |
413 | : Akonadi2::ResourceFactory(parent) | 219 | : Akonadi2::ResourceFactory(parent) |