From dd6196206f87086a636677da55cf5c300a8e932a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 19 Jan 2015 19:26:14 +0100 Subject: Fixed sync, detect errors during sync, wait until sync items are processed until signalling completion. --- common/messagequeue.cpp | 3 +++ common/messagequeue.h | 1 + common/pipeline.cpp | 4 ++-- dummyresource/resourcefactory.cpp | 45 ++++++++++++++++++++++++++++++++++++--- dummyresource/resourcefactory.h | 3 +++ tests/dummyresourcetest.cpp | 7 +++++- 6 files changed, 57 insertions(+), 6 deletions(-) diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 99a0112..3754b16 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -31,6 +31,9 @@ void MessageQueue::dequeue(const std::functiondrained(); + } } else { //TODO re-enqueue? } diff --git a/common/messagequeue.h b/common/messagequeue.h index 0b791c6..52eeb1f 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -34,6 +34,7 @@ public: bool isEmpty(); signals: void messageReady(); + void drained(); private: Q_DISABLE_COPY(MessageQueue); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 8f15fef..13a3344 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -107,7 +107,7 @@ Async::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { qWarning() << "invalid buffer, not a create entity buffer"; - return Async::null(); + return Async::error(); } } auto createEntity = Akonadi2::Commands::GetCreateEntity(command); @@ -118,7 +118,7 @@ Async::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); if (!Akonadi2::VerifyEntityBuffer(verifyer)) { qWarning() << "invalid buffer, not an entity buffer"; - return Async::null(); + return Async::error(); } } auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 6fe10ec..60a9cf6 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -126,6 +126,9 @@ public: } } +signals: + void error(int errorCode, const QString &errorMessage); + private slots: static void asyncWhile(const std::function)> &body, const std::function &completionHandler) { body([body, completionHandler](bool complete) { @@ -178,6 +181,12 @@ private slots: messageQueueCallback(true); whileCallback(false); future.setFinished(); + }, + [this, messageQueueCallback, whileCallback](int errorCode, const QString &errorMessage) { + qDebug() << "Error while creating entity: " << errorCode << errorMessage; + emit error(errorCode, errorMessage); + messageQueueCallback(true); + whileCallback(false); }).exec(); } break; @@ -236,7 +245,8 @@ private: DummyResource::DummyResource() : Akonadi2::Resource(), mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), - mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") + mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), + mError(0) { } @@ -254,6 +264,18 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) //event is the entitytype and not the domain type pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); + QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); +} + +void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) +{ + qDebug() << "Received error from Processor: " << errorCode << errorMessage; + mError = errorCode; +} + +int DummyResource::error() const +{ + return mError; } void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) @@ -327,13 +349,30 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); flatbuffers::FlatBufferBuilder entityFbb; Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); - enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); + + flatbuffers::FlatBufferBuilder fbb; + //This is the resource type and not the domain type + auto type = fbb.CreateString("event"); + auto delta = fbb.CreateVector(entityFbb.GetBufferPointer(), entityFbb.GetSize()); + auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta); + Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); + + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); } else { //modification //TODO diff and create modification if necessary } } //TODO find items to remove - f.setFinished(); + + //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. + //TODO: report errors while processing sync? + if (mSynchronizerQueue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { + f.setFinished(); + }); + } }); } diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 6043fb6..767eba1 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h @@ -37,13 +37,16 @@ public: Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); void configurePipeline(Akonadi2::Pipeline *pipeline); + int error() const; private: + void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); flatbuffers::FlatBufferBuilder m_fbb; MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; Processor *mProcessor; + int mError; }; class DummyResourceFactory : public Akonadi2::ResourceFactory diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index a80d22f..56e5513 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -31,7 +31,7 @@ private Q_SLOTS: removeFromDisk("org.kde.dummy.synchronizerqueue"); } - void cleanupTestCase() + void cleanup() { removeFromDisk("org.kde.dummy"); removeFromDisk("org.kde.dummy.userqueue"); @@ -97,9 +97,14 @@ private Q_SLOTS: { Akonadi2::Pipeline pipeline("org.kde.dummy"); DummyResource resource; + resource.configurePipeline(&pipeline); auto job = resource.synchronizeWithSource(&pipeline); + //TODO pass in optional timeout? auto future = job.exec(); + future.waitForFinished(); + QVERIFY(!future.errorCode()); QTRY_VERIFY(future.isFinished()); + QVERIFY(!resource.error()); } void testSyncAndFacade() -- cgit v1.2.3