From dd6196206f87086a636677da55cf5c300a8e932a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 19 Jan 2015 19:26:14 +0100 Subject: Fixed sync, detect errors during sync, wait until sync items are processed until signalling completion. --- dummyresource/resourcefactory.cpp | 45 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) (limited to 'dummyresource/resourcefactory.cpp') diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 6fe10ec..60a9cf6 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -126,6 +126,9 @@ public: } } +signals: + void error(int errorCode, const QString &errorMessage); + private slots: static void asyncWhile(const std::function)> &body, const std::function &completionHandler) { body([body, completionHandler](bool complete) { @@ -178,6 +181,12 @@ private slots: messageQueueCallback(true); whileCallback(false); future.setFinished(); + }, + [this, messageQueueCallback, whileCallback](int errorCode, const QString &errorMessage) { + qDebug() << "Error while creating entity: " << errorCode << errorMessage; + emit error(errorCode, errorMessage); + messageQueueCallback(true); + whileCallback(false); }).exec(); } break; @@ -236,7 +245,8 @@ private: DummyResource::DummyResource() : Akonadi2::Resource(), mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), - mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") + mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), + mError(0) { } @@ -254,6 +264,18 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) //event is the entitytype and not the domain type pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); + QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); +} + +void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) +{ + qDebug() << "Received error from Processor: " << errorCode << errorMessage; + mError = errorCode; +} + +int DummyResource::error() const +{ + return mError; } void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) @@ -327,13 +349,30 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); flatbuffers::FlatBufferBuilder entityFbb; Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); - enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); + + flatbuffers::FlatBufferBuilder fbb; + //This is the resource type and not the domain type + auto type = fbb.CreateString("event"); + auto delta = fbb.CreateVector(entityFbb.GetBufferPointer(), entityFbb.GetSize()); + auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta); + Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); + + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); } else { //modification //TODO diff and create modification if necessary } } //TODO find items to remove - f.setFinished(); + + //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. + //TODO: report errors while processing sync? + if (mSynchronizerQueue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { + f.setFinished(); + }); + } }); } -- cgit v1.2.3