summaryrefslogtreecommitdiffstats
path: root/examples/dummyresource/resourcefactory.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-20 22:22:19 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-20 22:22:19 +0200
commit47f105febcd17d6db1f998a99c6c6c423851573a (patch)
tree1f02b0e09444c55bd509984233b918b5a1937357 /examples/dummyresource/resourcefactory.cpp
parentb4db894f76de9ac252081972143efcd3fcd66533 (diff)
downloadsink-47f105febcd17d6db1f998a99c6c6c423851573a.tar.gz
sink-47f105febcd17d6db1f998a99c6c6c423851573a.zip
Moved generic part of resource implementation to GenericResource
Diffstat (limited to 'examples/dummyresource/resourcefactory.cpp')
-rw-r--r--examples/dummyresource/resourcefactory.cpp200
1 files changed, 3 insertions, 197 deletions
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index d5765e2..a4cd68d 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -103,152 +103,10 @@ QMap<QString, QString> populate()
103 103
104static QMap<QString, QString> s_dataSource = populate(); 104static QMap<QString, QString> s_dataSource = populate();
105 105
106//Drives the pipeline using the output from all command queues
107class Processor : public QObject
108{
109 Q_OBJECT
110public:
111 Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
112 : QObject(),
113 mPipeline(pipeline),
114 mCommandQueues(commandQueues),
115 mProcessingLock(false)
116 {
117 for (auto queue : mCommandQueues) {
118 const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
119 Q_UNUSED(ret);
120 }
121 }
122
123signals:
124 void error(int errorCode, const QString &errorMessage);
125
126private:
127 bool messagesToProcessAvailable()
128 {
129 for (auto queue : mCommandQueues) {
130 if (!queue->isEmpty()) {
131 return true;
132 }
133 }
134 return false;
135 }
136
137private slots:
138 void process()
139 {
140 if (mProcessingLock) {
141 return;
142 }
143 mProcessingLock = true;
144 auto job = processPipeline().then<void>([this]() {
145 mProcessingLock = false;
146 if (messagesToProcessAvailable()) {
147 process();
148 }
149 }).exec();
150 }
151
152 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
153 {
154 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId());
155 //Throw command into appropriate pipeline
156 switch (queuedCommand->commandId()) {
157 case Akonadi2::Commands::DeleteEntityCommand:
158 //mPipeline->removedEntity
159 return Async::null<void>();
160 case Akonadi2::Commands::ModifyEntityCommand:
161 //mPipeline->modifiedEntity
162 return Async::null<void>();
163 case Akonadi2::Commands::CreateEntityCommand:
164 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
165 default:
166 return Async::error<void>(-1, "Unhandled command");
167 }
168 return Async::null<void>();
169 }
170
171 //Process all messages of this queue
172 Async::Job<void> processQueue(MessageQueue *queue)
173 {
174 //TODO use something like:
175 //Async::foreach("pass iterator here").each("process value here").join();
176 //Async::foreach("pass iterator here").parallel("process value here").join();
177 return Async::dowhile(
178 [this, queue](Async::Future<bool> &future) {
179 if (queue->isEmpty()) {
180 future.setValue(false);
181 future.setFinished();
182 return;
183 }
184 queue->dequeue(
185 [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
186 auto callback = [messageQueueCallback, &future](bool success) {
187 messageQueueCallback(success);
188 future.setValue(!success);
189 future.setFinished();
190 };
191
192 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
193 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
194 Warning() << "invalid buffer";
195 callback(false);
196 return;
197 }
198 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
199 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId());
200 //TODO JOBAPI: job lifetime management
201 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
202 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
203 //FIXME this job is stack allocated and thus simply dies....
204 processQueuedCommand(queuedCommand).then<void>(
205 [callback]() {
206 callback(true);
207 },
208 [callback](int errorCode, QString errorMessage) {
209 Warning() << "Error while processing queue command: " << errorMessage;
210 callback(false);
211 }
212 ).exec();
213 },
214 [&future](const MessageQueue::Error &error) {
215 Warning() << "Error while getting message from messagequeue: " << error.message;
216 future.setValue(false);
217 future.setFinished();
218 }
219 );
220 }
221 );
222 }
223
224 Async::Job<void> processPipeline()
225 {
226 //Go through all message queues
227 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
228 return Async::dowhile(
229 [it]() { return it->hasNext(); },
230 [it, this](Async::Future<void> &future) {
231 auto queue = it->next();
232 processQueue(queue).then<void>([&future]() {
233 Trace() << "Queue processed";
234 future.setFinished();
235 }).exec();
236 }
237 );
238 }
239
240private:
241 Akonadi2::Pipeline *mPipeline;
242 //Ordered by priority
243 QList<MessageQueue*> mCommandQueues;
244 bool mProcessingLock;
245};
246 106
107//FIXME We need to pass the resource-instance name to generic resource, not the plugin name
247DummyResource::DummyResource() 108DummyResource::DummyResource()
248 : Akonadi2::Resource(), 109 : Akonadi2::GenericResource(PLUGIN_NAME)
249 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"),
250 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"),
251 mError(0)
252{ 110{
253} 111}
254 112
@@ -277,19 +135,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
277 135
278 //event is the entitytype and not the domain type 136 //event is the entitytype and not the domain type
279 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer << uidIndexer); 137 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer << uidIndexer);
280 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 138 GenericResource::configurePipeline(pipeline);
281 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
282}
283
284void DummyResource::onProcessorError(int errorCode, const QString &errorMessage)
285{
286 Warning() << "Received error from Processor: " << errorCode << errorMessage;
287 mError = errorCode;
288}
289
290int DummyResource::error() const
291{
292 return mError;
293} 139}
294 140
295void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) 141void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback)
@@ -316,15 +162,6 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri
316 }); 162 });
317} 163}
318 164
319void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
320{
321 m_fbb.Clear();
322 auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size());
323 auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData);
324 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
325 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
326}
327
328Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) 165Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
329{ 166{
330 return Async::start<void>([this, pipeline](Async::Future<void> &f) { 167 return Async::start<void>([this, pipeline](Async::Future<void> &f) {
@@ -377,37 +214,6 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
377 }); 214 });
378} 215}
379 216
380Async::Job<void> DummyResource::processAllMessages()
381{
382 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
383 //TODO: report errors while processing sync?
384 //TODO JOBAPI: A helper that waits for n events and then continues?
385 return Async::start<void>([this](Async::Future<void> &f) {
386 if (mSynchronizerQueue.isEmpty()) {
387 f.setFinished();
388 } else {
389 QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() {
390 f.setFinished();
391 });
392 }
393 }).then<void>([this](Async::Future<void> &f) {
394 if (mUserQueue.isEmpty()) {
395 f.setFinished();
396 } else {
397 QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() {
398 f.setFinished();
399 });
400 }
401 });
402}
403
404void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
405{
406 //TODO instead of copying the command including the full entity first into the command queue, we could directly
407 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
408 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
409 enqueueCommand(mUserQueue, commandId, data);
410}
411 217
412DummyResourceFactory::DummyResourceFactory(QObject *parent) 218DummyResourceFactory::DummyResourceFactory(QObject *parent)
413 : Akonadi2::ResourceFactory(parent) 219 : Akonadi2::ResourceFactory(parent)