diff options
-rw-r--r-- | common/messagequeue.cpp | 3 | ||||
-rw-r--r-- | common/messagequeue.h | 1 | ||||
-rw-r--r-- | common/pipeline.cpp | 4 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 45 | ||||
-rw-r--r-- | dummyresource/resourcefactory.h | 3 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 7 |
6 files changed, 57 insertions, 6 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 99a0112..3754b16 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -31,6 +31,9 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
31 | resultHandler(valuePtr, valueSize, [this, key](bool success) { | 31 | resultHandler(valuePtr, valueSize, [this, key](bool success) { |
32 | if (success) { | 32 | if (success) { |
33 | mStorage.remove(key.data(), key.size()); | 33 | mStorage.remove(key.data(), key.size()); |
34 | if (isEmpty()) { | ||
35 | emit this->drained(); | ||
36 | } | ||
34 | } else { | 37 | } else { |
35 | //TODO re-enqueue? | 38 | //TODO re-enqueue? |
36 | } | 39 | } |
diff --git a/common/messagequeue.h b/common/messagequeue.h index 0b791c6..52eeb1f 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h | |||
@@ -34,6 +34,7 @@ public: | |||
34 | bool isEmpty(); | 34 | bool isEmpty(); |
35 | signals: | 35 | signals: |
36 | void messageReady(); | 36 | void messageReady(); |
37 | void drained(); | ||
37 | 38 | ||
38 | private: | 39 | private: |
39 | Q_DISABLE_COPY(MessageQueue); | 40 | Q_DISABLE_COPY(MessageQueue); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 8f15fef..13a3344 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -107,7 +107,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
107 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 107 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
108 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { | 108 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { |
109 | qWarning() << "invalid buffer, not a create entity buffer"; | 109 | qWarning() << "invalid buffer, not a create entity buffer"; |
110 | return Async::null<void>(); | 110 | return Async::error<void>(); |
111 | } | 111 | } |
112 | } | 112 | } |
113 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | 113 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); |
@@ -118,7 +118,7 @@ Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
118 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 118 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
119 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 119 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { |
120 | qWarning() << "invalid buffer, not an entity buffer"; | 120 | qWarning() << "invalid buffer, not an entity buffer"; |
121 | return Async::null<void>(); | 121 | return Async::error<void>(); |
122 | } | 122 | } |
123 | } | 123 | } |
124 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | 124 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 6fe10ec..60a9cf6 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -126,6 +126,9 @@ public: | |||
126 | } | 126 | } |
127 | } | 127 | } |
128 | 128 | ||
129 | signals: | ||
130 | void error(int errorCode, const QString &errorMessage); | ||
131 | |||
129 | private slots: | 132 | private slots: |
130 | static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) { | 133 | static void asyncWhile(const std::function<void(std::function<void(bool)>)> &body, const std::function<void()> &completionHandler) { |
131 | body([body, completionHandler](bool complete) { | 134 | body([body, completionHandler](bool complete) { |
@@ -178,6 +181,12 @@ private slots: | |||
178 | messageQueueCallback(true); | 181 | messageQueueCallback(true); |
179 | whileCallback(false); | 182 | whileCallback(false); |
180 | future.setFinished(); | 183 | future.setFinished(); |
184 | }, | ||
185 | [this, messageQueueCallback, whileCallback](int errorCode, const QString &errorMessage) { | ||
186 | qDebug() << "Error while creating entity: " << errorCode << errorMessage; | ||
187 | emit error(errorCode, errorMessage); | ||
188 | messageQueueCallback(true); | ||
189 | whileCallback(false); | ||
181 | }).exec(); | 190 | }).exec(); |
182 | } | 191 | } |
183 | break; | 192 | break; |
@@ -236,7 +245,8 @@ private: | |||
236 | DummyResource::DummyResource() | 245 | DummyResource::DummyResource() |
237 | : Akonadi2::Resource(), | 246 | : Akonadi2::Resource(), |
238 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), | 247 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), |
239 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") | 248 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), |
249 | mError(0) | ||
240 | { | 250 | { |
241 | } | 251 | } |
242 | 252 | ||
@@ -254,6 +264,18 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) | |||
254 | //event is the entitytype and not the domain type | 264 | //event is the entitytype and not the domain type |
255 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | 265 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); |
256 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 266 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); |
267 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | ||
268 | } | ||
269 | |||
270 | void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) | ||
271 | { | ||
272 | qDebug() << "Received error from Processor: " << errorCode << errorMessage; | ||
273 | mError = errorCode; | ||
274 | } | ||
275 | |||
276 | int DummyResource::error() const | ||
277 | { | ||
278 | return mError; | ||
257 | } | 279 | } |
258 | 280 | ||
259 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) | 281 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) |
@@ -327,13 +349,30 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
327 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | 349 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); |
328 | flatbuffers::FlatBufferBuilder entityFbb; | 350 | flatbuffers::FlatBufferBuilder entityFbb; |
329 | Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); | 351 | Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); |
330 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); | 352 | |
353 | flatbuffers::FlatBufferBuilder fbb; | ||
354 | //This is the resource type and not the domain type | ||
355 | auto type = fbb.CreateString("event"); | ||
356 | auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
357 | auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta); | ||
358 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | ||
359 | |||
360 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
331 | } else { //modification | 361 | } else { //modification |
332 | //TODO diff and create modification if necessary | 362 | //TODO diff and create modification if necessary |
333 | } | 363 | } |
334 | } | 364 | } |
335 | //TODO find items to remove | 365 | //TODO find items to remove |
336 | f.setFinished(); | 366 | |
367 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | ||
368 | //TODO: report errors while processing sync? | ||
369 | if (mSynchronizerQueue.isEmpty()) { | ||
370 | f.setFinished(); | ||
371 | } else { | ||
372 | QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { | ||
373 | f.setFinished(); | ||
374 | }); | ||
375 | } | ||
337 | }); | 376 | }); |
338 | } | 377 | } |
339 | 378 | ||
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 6043fb6..767eba1 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h | |||
@@ -37,13 +37,16 @@ public: | |||
37 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); | 37 | Async::Job<void> synchronizeWithSource(Akonadi2::Pipeline *pipeline); |
38 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); | 38 | void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); |
39 | void configurePipeline(Akonadi2::Pipeline *pipeline); | 39 | void configurePipeline(Akonadi2::Pipeline *pipeline); |
40 | int error() const; | ||
40 | 41 | ||
41 | private: | 42 | private: |
43 | void onProcessorError(int errorCode, const QString &errorMessage); | ||
42 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 44 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
43 | flatbuffers::FlatBufferBuilder m_fbb; | 45 | flatbuffers::FlatBufferBuilder m_fbb; |
44 | MessageQueue mUserQueue; | 46 | MessageQueue mUserQueue; |
45 | MessageQueue mSynchronizerQueue; | 47 | MessageQueue mSynchronizerQueue; |
46 | Processor *mProcessor; | 48 | Processor *mProcessor; |
49 | int mError; | ||
47 | }; | 50 | }; |
48 | 51 | ||
49 | class DummyResourceFactory : public Akonadi2::ResourceFactory | 52 | class DummyResourceFactory : public Akonadi2::ResourceFactory |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index a80d22f..56e5513 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -31,7 +31,7 @@ private Q_SLOTS: | |||
31 | removeFromDisk("org.kde.dummy.synchronizerqueue"); | 31 | removeFromDisk("org.kde.dummy.synchronizerqueue"); |
32 | } | 32 | } |
33 | 33 | ||
34 | void cleanupTestCase() | 34 | void cleanup() |
35 | { | 35 | { |
36 | removeFromDisk("org.kde.dummy"); | 36 | removeFromDisk("org.kde.dummy"); |
37 | removeFromDisk("org.kde.dummy.userqueue"); | 37 | removeFromDisk("org.kde.dummy.userqueue"); |
@@ -97,9 +97,14 @@ private Q_SLOTS: | |||
97 | { | 97 | { |
98 | Akonadi2::Pipeline pipeline("org.kde.dummy"); | 98 | Akonadi2::Pipeline pipeline("org.kde.dummy"); |
99 | DummyResource resource; | 99 | DummyResource resource; |
100 | resource.configurePipeline(&pipeline); | ||
100 | auto job = resource.synchronizeWithSource(&pipeline); | 101 | auto job = resource.synchronizeWithSource(&pipeline); |
102 | //TODO pass in optional timeout? | ||
101 | auto future = job.exec(); | 103 | auto future = job.exec(); |
104 | future.waitForFinished(); | ||
105 | QVERIFY(!future.errorCode()); | ||
102 | QTRY_VERIFY(future.isFinished()); | 106 | QTRY_VERIFY(future.isFinished()); |
107 | QVERIFY(!resource.error()); | ||
103 | } | 108 | } |
104 | 109 | ||
105 | void testSyncAndFacade() | 110 | void testSyncAndFacade() |