summaryrefslogtreecommitdiffstats
path: root/dummyresource/resourcefactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'dummyresource/resourcefactory.cpp')
-rw-r--r--dummyresource/resourcefactory.cpp45
1 files changed, 42 insertions, 3 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 6fe10ec..60a9cf6 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -126,6 +126,9 @@ public:
126 } 126 }
127 } 127 }
128 128
129signals:
130 void error(int errorCode, const QString &errorMessage);
131
129private slots: 132private slots:
130 static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) { 133 static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) {
131 body([body, completionHandler](bool complete) { 134 body([body, completionHandler](bool complete) {
@@ -178,6 +181,12 @@ private slots:
178 messageQueueCallback(true); 181 messageQueueCallback(true);
179 whileCallback(false); 182 whileCallback(false);
180 future.setFinished(); 183 future.setFinished();
184 },
185 [this, messageQueueCallback, whileCallback](int errorCode, const QString &errorMessage) {
186 qDebug() << "Error while creating entity: " << errorCode << errorMessage;
187 emit error(errorCode, errorMessage);
188 messageQueueCallback(true);
189 whileCallback(false);
181 }).exec(); 190 }).exec();
182 } 191 }
183 break; 192 break;
@@ -236,7 +245,8 @@ private:
236DummyResource::DummyResource() 245DummyResource::DummyResource()
237 : Akonadi2::Resource(), 246 : Akonadi2::Resource(),
238 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), 247 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"),
239 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") 248 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"),
249 mError(0)
240{ 250{
241} 251}
242 252
@@ -254,6 +264,18 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
254 //event is the entitytype and not the domain type 264 //event is the entitytype and not the domain type
255 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); 265 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
256 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 266 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
267 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
268}
269
270void DummyResource::onProcessorError(int errorCode, const QString &errorMessage)
271{
272 qDebug() << "Received error from Processor: " << errorCode << errorMessage;
273 mError = errorCode;
274}
275
276int DummyResource::error() const
277{
278 return mError;
257} 279}
258 280
259void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) 281void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback)
@@ -327,13 +349,30 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
327 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); 349 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
328 flatbuffers::FlatBufferBuilder entityFbb; 350 flatbuffers::FlatBufferBuilder entityFbb;
329 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); 351 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0);
330 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); 352
353 flatbuffers::FlatBufferBuilder fbb;
354 //This is the resource type and not the domain type
355 auto type = fbb.CreateString("event");
356 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
357 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta);
358 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
359
360 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
331 } else { //modification 361 } else { //modification
332 //TODO diff and create modification if necessary 362 //TODO diff and create modification if necessary
333 } 363 }
334 } 364 }
335 //TODO find items to remove 365 //TODO find items to remove
336 f.setFinished(); 366
367 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
368 //TODO: report errors while processing sync?
369 if (mSynchronizerQueue.isEmpty()) {
370 f.setFinished();
371 } else {
372 QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() {
373 f.setFinished();
374 });
375 }
337 }); 376 });
338} 377}
339 378