summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/messagequeue.cpp3
-rw-r--r--common/messagequeue.h1
-rw-r--r--common/pipeline.cpp4
-rw-r--r--dummyresource/resourcefactory.cpp45
-rw-r--r--dummyresource/resourcefactory.h3
-rw-r--r--tests/dummyresourcetest.cpp7
6 files changed, 57 insertions, 6 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 99a0112..3754b16 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -31,6 +31,9 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
31 resultHandler(valuePtr, valueSize, [this, key](bool success) { 31 resultHandler(valuePtr, valueSize, [this, key](bool success) {
32 if (success) { 32 if (success) {
33 mStorage.remove(key.data(), key.size()); 33 mStorage.remove(key.data(), key.size());
34 if (isEmpty()) {
35 emit this->drained();
36 }
34 } else { 37 } else {
35 //TODO re-enqueue? 38 //TODO re-enqueue?
36 } 39 }
diff --git a/common/messagequeue.h b/common/messagequeue.h
index 0b791c6..52eeb1f 100644
--- a/common/messagequeue.h
+++ b/common/messagequeue.h
@@ -34,6 +34,7 @@ public:
34 bool isEmpty(); 34 bool isEmpty();
35signals: 35signals:
36 void messageReady(); 36 void messageReady();
37 void drained();
37 38
38private: 39private:
39 Q_DISABLE_COPY(MessageQueue); 40 Q_DISABLE_COPY(MessageQueue);
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 8f15fef..13a3344 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -107,7 +107,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
107 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 107 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
108 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { 108 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) {
109 qWarning() << "invalid buffer, not a create entity buffer"; 109 qWarning() << "invalid buffer, not a create entity buffer";
110 return Async::null<void>(); 110 return Async::error<void>();
111 } 111 }
112 } 112 }
113 auto createEntity = Akonadi2::Commands::GetCreateEntity(command); 113 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
@@ -118,7 +118,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
118 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 118 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
119 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 119 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
120 qWarning() << "invalid buffer, not an entity buffer"; 120 qWarning() << "invalid buffer, not an entity buffer";
121 return Async::null<void>(); 121 return Async::error<void>();
122 } 122 }
123 } 123 }
124 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); 124 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 6fe10ec..60a9cf6 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -126,6 +126,9 @@ public:
126 } 126 }
127 } 127 }
128 128
129signals:
130 void error(int errorCode, const QString &errorMessage);
131
129private slots: 132private slots:
130 static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) { 133 static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) {
131 body([body, completionHandler](bool complete) { 134 body([body, completionHandler](bool complete) {
@@ -178,6 +181,12 @@ private slots:
178 messageQueueCallback(true); 181 messageQueueCallback(true);
179 whileCallback(false); 182 whileCallback(false);
180 future.setFinished(); 183 future.setFinished();
184 },
185 [this, messageQueueCallback, whileCallback](int errorCode, const QString &errorMessage) {
186 qDebug() << "Error while creating entity: " << errorCode << errorMessage;
187 emit error(errorCode, errorMessage);
188 messageQueueCallback(true);
189 whileCallback(false);
181 }).exec(); 190 }).exec();
182 } 191 }
183 break; 192 break;
@@ -236,7 +245,8 @@ private:
236DummyResource::DummyResource() 245DummyResource::DummyResource()
237 : Akonadi2::Resource(), 246 : Akonadi2::Resource(),
238 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), 247 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"),
239 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") 248 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"),
249 mError(0)
240{ 250{
241} 251}
242 252
@@ -254,6 +264,18 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
254 //event is the entitytype and not the domain type 264 //event is the entitytype and not the domain type
255 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); 265 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
256 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 266 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
267 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
268}
269
270void DummyResource::onProcessorError(int errorCode, const QString &errorMessage)
271{
272 qDebug() << "Received error from Processor: " << errorCode << errorMessage;
273 mError = errorCode;
274}
275
276int DummyResource::error() const
277{
278 return mError;
257} 279}
258 280
259void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) 281void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback)
@@ -327,13 +349,30 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
327 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); 349 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
328 flatbuffers::FlatBufferBuilder entityFbb; 350 flatbuffers::FlatBufferBuilder entityFbb;
329 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); 351 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0);
330 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); 352
353 flatbuffers::FlatBufferBuilder fbb;
354 //This is the resource type and not the domain type
355 auto type = fbb.CreateString("event");
356 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
357 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta);
358 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
359
360 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
331 } else { //modification 361 } else { //modification
332 //TODO diff and create modification if necessary 362 //TODO diff and create modification if necessary
333 } 363 }
334 } 364 }
335 //TODO find items to remove 365 //TODO find items to remove
336 f.setFinished(); 366
367 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
368 //TODO: report errors while processing sync?
369 if (mSynchronizerQueue.isEmpty()) {
370 f.setFinished();
371 } else {
372 QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() {
373 f.setFinished();
374 });
375 }
337 }); 376 });
338} 377}
339 378
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h
index 6043fb6..767eba1 100644
--- a/dummyresource/resourcefactory.h
+++ b/dummyresource/resourcefactory.h
@@ -37,13 +37,16 @@ public:
37 Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); 37 Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline);
38 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); 38 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline);
39 void configurePipeline(Akonadi2::Pipeline *pipeline); 39 void configurePipeline(Akonadi2::Pipeline *pipeline);
40 int error() const;
40 41
41private: 42private:
43 void onProcessorError(int errorCode, const QString &errorMessage);
42 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 44 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
43 flatbuffers::FlatBufferBuilder m_fbb; 45 flatbuffers::FlatBufferBuilder m_fbb;
44 MessageQueue mUserQueue; 46 MessageQueue mUserQueue;
45 MessageQueue mSynchronizerQueue; 47 MessageQueue mSynchronizerQueue;
46 Processor *mProcessor; 48 Processor *mProcessor;
49 int mError;
47}; 50};
48 51
49class DummyResourceFactory : public Akonadi2::ResourceFactory 52class DummyResourceFactory : public Akonadi2::ResourceFactory
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index a80d22f..56e5513 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -31,7 +31,7 @@ private Q_SLOTS:
31 removeFromDisk("org.kde.dummy.synchronizerqueue"); 31 removeFromDisk("org.kde.dummy.synchronizerqueue");
32 } 32 }
33 33
34 void cleanupTestCase() 34 void cleanup()
35 { 35 {
36 removeFromDisk("org.kde.dummy"); 36 removeFromDisk("org.kde.dummy");
37 removeFromDisk("org.kde.dummy.userqueue"); 37 removeFromDisk("org.kde.dummy.userqueue");
@@ -97,9 +97,14 @@ private Q_SLOTS:
97 { 97 {
98 Akonadi2::Pipeline pipeline("org.kde.dummy"); 98 Akonadi2::Pipeline pipeline("org.kde.dummy");
99 DummyResource resource; 99 DummyResource resource;
100 resource.configurePipeline(&pipeline);
100 auto job = resource.synchronizeWithSource(&pipeline); 101 auto job = resource.synchronizeWithSource(&pipeline);
102 //TODO pass in optional timeout?
101 auto future = job.exec(); 103 auto future = job.exec();
104 future.waitForFinished();
105 QVERIFY(!future.errorCode());
102 QTRY_VERIFY(future.isFinished()); 106 QTRY_VERIFY(future.isFinished());
107 QVERIFY(!resource.error());
103 } 108 }
104 109
105 void testSyncAndFacade() 110 void testSyncAndFacade()