From 1d713d9e2dbaf27de9da087f9270d260dfc40c31 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 21 Nov 2016 23:13:38 +0100 Subject: Folded the SourceWriteback into the Synchronizer. By concentrating all communication to the source in one place we get rid of several oddities. * Quite a bit of duplication since both need access to the synchronizationStore and the source. * We currently have an akward locking in place because both classes access the ync store. This is not easier to resolve cleanly. * The live of resource implementers becomes easier. * An implementation could elect to not use changereplay and always do a full sync... (maybe?) --- common/CMakeLists.txt | 1 - common/genericresource.cpp | 29 ++++----- common/genericresource.h | 4 +- common/sourcewriteback.cpp | 146 --------------------------------------------- common/sourcewriteback.h | 71 ---------------------- common/synchronizer.cpp | 94 ++++++++++++++++++++++++++++- common/synchronizer.h | 15 ++++- 7 files changed, 120 insertions(+), 240 deletions(-) delete mode 100644 common/sourcewriteback.cpp delete mode 100644 common/sourcewriteback.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index a08be8a..5ba524b 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -70,7 +70,6 @@ set(command_SRCS adaptorfactoryregistry.cpp synchronizer.cpp remoteidmap.cpp - sourcewriteback.cpp mailpreprocessor.cpp specialpurposepreprocessor.cpp datastorequery.cpp diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c4c8bc6..746fa33 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -292,14 +292,14 @@ KAsync::Job GenericResource::inspect( void GenericResource::enableChangeReplay(bool enable) { - Q_ASSERT(mChangeReplay); + Q_ASSERT(mSynchronizer); if (enable) { - QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); - QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); - QMetaObject::invokeMethod(mChangeReplay.data(), "revisionChanged", Qt::QueuedConnection); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); + QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); } else { - QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); - QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged); + QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); } } @@ -314,13 +314,8 @@ void GenericResource::setupSynchronizer(const QSharedPointer &sync mSynchronizer->setup([this](int commandId, const QByteArray &data) { enqueueCommand(mSynchronizerQueue, commandId, data); }, mSynchronizerQueue); -} - -void GenericResource::setupChangereplay(const QSharedPointer &changeReplay) -{ - mChangeReplay = changeReplay; { - auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() { + auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { Sink::Notification n; n.id = "changereplay"; n.type = Sink::Notification::Status; @@ -331,7 +326,7 @@ void GenericResource::setupChangereplay(const QSharedPointer &chan Q_ASSERT(ret); } { - auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() { + auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { Sink::Notification n; n.id = "changereplay"; n.type = Sink::Notification::Status; @@ -342,7 +337,7 @@ void GenericResource::setupChangereplay(const QSharedPointer &chan Q_ASSERT(ret); } - mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); + mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); enableChangeReplay(true); } @@ -459,11 +454,11 @@ KAsync::Job GenericResource::processAllMessages() .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) .then([this](KAsync::Future &f) { - if (mChangeReplay->allChangesReplayed()) { + if (mSynchronizer->allChangesReplayed()) { f.setFinished(); } else { auto context = new QObject; - QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { + QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { delete context; f.setFinished(); }); @@ -473,7 +468,7 @@ KAsync::Job GenericResource::processAllMessages() void GenericResource::updateLowerBoundRevision() { - mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); + mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSynchronizer->getLastReplayedRevision())); } void GenericResource::setLowerBoundRevision(qint64 revision) diff --git a/common/genericresource.h b/common/genericresource.h index 3736c8f..7e0f5ad 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -24,7 +24,7 @@ #include #include #include -#include "changereplay.h" +#include #include @@ -66,7 +66,6 @@ protected: void setupPreprocessors(const QByteArray &type, const QVector &preprocessors); void setupSynchronizer(const QSharedPointer &synchronizer); - void setupChangereplay(const QSharedPointer &changeReplay); void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); @@ -78,7 +77,6 @@ protected: private: std::unique_ptr mProcessor; - QSharedPointer mChangeReplay; QSharedPointer mSynchronizer; int mError; QTimer mCommitQueueTimer; diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp deleted file mode 100644 index e2994d1..0000000 --- a/common/sourcewriteback.cpp +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright (C) 2016 Christian Mollekopf - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) version 3, or any - * later version accepted by the membership of KDE e.V. (or its - * successor approved by the membership of KDE e.V.), which shall - * act as a proxy defined in Section 6 of version 3 of the license. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library. If not, see . - */ -#include "sourcewriteback.h" - -#include "definitions.h" -#include "log.h" -#include "bufferutils.h" -#include "entitybuffer.h" -#include "entity_generated.h" - -#define ENTITY_TYPE_MAIL "mail" -#define ENTITY_TYPE_FOLDER "folder" - -SINK_DEBUG_AREA("sourcewriteback") - -using namespace Sink; - -SourceWriteBack::SourceWriteBack(const ResourceContext &context) - : ChangeReplay(context), - mResourceContext(context), - mSyncStorage(Sink::storageLocation(), context.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadWrite), - mEntityStore(QSharedPointer::create(mResourceContext)) -{ - -} - -Storage::EntityStore &SourceWriteBack::store() -{ - return *mEntityStore; -} - -RemoteIdMap &SourceWriteBack::syncStore() -{ - if (!mSyncStore) { - mSyncStore = QSharedPointer::create(mSyncTransaction); - } - return *mSyncStore; -} - -bool SourceWriteBack::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) -{ - Sink::EntityBuffer buffer(value); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - if (!metadataBuffer->replayToSource()) { - SinkTrace() << "Change is coming from the source"; - } - return metadataBuffer->replayToSource(); -} - -KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) -{ - SinkTrace() << "Replaying" << type << key; - - Sink::EntityBuffer buffer(value); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - Q_ASSERT(!mSyncStore); - Q_ASSERT(!mSyncTransaction); - mEntityStore->startTransaction(Storage::DataStore::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); - - // const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - const auto uid = Sink::Storage::DataStore::uidFromKey(key); - const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); - QByteArray oldRemoteId; - - if (operation != Sink::Operation_Creation) { - oldRemoteId = syncStore().resolveLocalId(type, uid); - if (oldRemoteId.isEmpty()) { - SinkWarning() << "Couldn't find the remote id for: " << type << uid; - return KAsync::error(1, "Couldn't find the remote id."); - } - } - SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; - - KAsync::Job job = KAsync::null(); - if (type == ENTITY_TYPE_FOLDER) { - auto folder = store().readEntity(key); - job = replay(folder, operation, oldRemoteId, modifiedProperties); - } else if (type == ENTITY_TYPE_MAIL) { - auto mail = store().readEntity(key); - job = replay(mail, operation, oldRemoteId, modifiedProperties); - } - - return job.syncThen([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { - if (operation == Sink::Operation_Creation) { - SinkTrace() << "Replayed creation with remote id: " << remoteId; - if (remoteId.isEmpty()) { - SinkWarning() << "Returned an empty remoteId from the creation"; - } else { - syncStore().recordRemoteId(type, uid, remoteId); - } - } else if (operation == Sink::Operation_Modification) { - SinkTrace() << "Replayed modification with remote id: " << remoteId; - if (remoteId.isEmpty()) { - SinkWarning() << "Returned an empty remoteId from the creation"; - } else { - syncStore().updateRemoteId(type, uid, remoteId); - } - } else if (operation == Sink::Operation_Removal) { - SinkTrace() << "Replayed removal with remote id: " << oldRemoteId; - syncStore().removeRemoteId(type, uid, oldRemoteId); - } else { - SinkError() << "Unkown operation" << operation; - } - }) - .syncThen([this](const KAsync::Error &error) { - if (error) { - SinkWarning() << "Failed to replay change: " << error.errorMessage; - } - mSyncStore.clear(); - mSyncTransaction.commit(); - mEntityStore->abortTransaction(); - }); -} - -KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList &) -{ - return KAsync::null(); -} - -KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList &) -{ - return KAsync::null(); -} diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h deleted file mode 100644 index cf393e4..0000000 --- a/common/sourcewriteback.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (C) 2016 Christian Mollekopf - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) version 3, or any - * later version accepted by the membership of KDE e.V. (or its - * successor approved by the membership of KDE e.V.), which shall - * act as a proxy defined in Section 6 of version 3 of the license. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library. If not, see . - */ -#pragma once - -#include "sink_export.h" - -#include "changereplay.h" -#include "storage.h" -#include "storage/entitystore.h" -#include "remoteidmap.h" -#include "metadata_generated.h" - -namespace Sink { - -/** - * Replay changes to the source - */ -class SINK_EXPORT SourceWriteBack : public ChangeReplay -{ -public: - SourceWriteBack(const ResourceContext &resourceContext); - -protected: - ///Base implementation calls the replay$Type calls - virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; - virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; - -protected: - ///Implement to write back changes to the server - virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); - virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); - - //Read/Write access to sync storage - RemoteIdMap &syncStore(); - - template - T getPrevious(const T &entity) - { - return store().readPrevious(entity.identifier(), entity.revision()); - } - -private: - //Read only access to main storage - Storage::EntityStore &store(); - ResourceContext mResourceContext; - Sink::Storage::DataStore mSyncStorage; - QSharedPointer mSyncStore; - QSharedPointer mEntityStore; - Sink::Storage::DataStore::Transaction mSyncTransaction; - QByteArray mResourceType; - QByteArray mResourceInstanceIdentifier; -}; - -} diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 713387e..10acefc 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -33,7 +33,8 @@ SINK_DEBUG_AREA("synchronizer") using namespace Sink; Synchronizer::Synchronizer(const Sink::ResourceContext &context) - : mResourceContext(context), + : ChangeReplay(context), + mResourceContext(context), mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) { @@ -310,6 +311,97 @@ Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction( return mSyncTransaction; } +bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) +{ + Sink::EntityBuffer buffer(value); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + if (!metadataBuffer->replayToSource()) { + SinkTrace() << "Change is coming from the source"; + } + return metadataBuffer->replayToSource(); +} + +KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) +{ + SinkTrace() << "Replaying" << type << key; + + Sink::EntityBuffer buffer(value); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + Q_ASSERT(!mSyncStore); + Q_ASSERT(!mSyncTransaction); + mEntityStore->startTransaction(Storage::DataStore::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); + + const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; + const auto uid = Sink::Storage::DataStore::uidFromKey(key); + const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); + QByteArray oldRemoteId; + + if (operation != Sink::Operation_Creation) { + oldRemoteId = syncStore().resolveLocalId(type, uid); + if (oldRemoteId.isEmpty()) { + SinkWarning() << "Couldn't find the remote id for: " << type << uid; + return KAsync::error(1, "Couldn't find the remote id."); + } + } + SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; + + KAsync::Job job = KAsync::null(); + //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? + if (type == ApplicationDomain::getTypeName()) { + auto folder = store().readEntity(key); + job = replay(folder, operation, oldRemoteId, modifiedProperties); + } else if (type == ApplicationDomain::getTypeName()) { + auto mail = store().readEntity(key); + job = replay(mail, operation, oldRemoteId, modifiedProperties); + } + + return job.syncThen([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { + if (operation == Sink::Operation_Creation) { + SinkTrace() << "Replayed creation with remote id: " << remoteId; + if (remoteId.isEmpty()) { + SinkWarning() << "Returned an empty remoteId from the creation"; + } else { + syncStore().recordRemoteId(type, uid, remoteId); + } + } else if (operation == Sink::Operation_Modification) { + SinkTrace() << "Replayed modification with remote id: " << remoteId; + if (remoteId.isEmpty()) { + SinkWarning() << "Returned an empty remoteId from the creation"; + } else { + syncStore().updateRemoteId(type, uid, remoteId); + } + } else if (operation == Sink::Operation_Removal) { + SinkTrace() << "Replayed removal with remote id: " << oldRemoteId; + syncStore().removeRemoteId(type, uid, oldRemoteId); + } else { + SinkError() << "Unkown operation" << operation; + } + }) + .syncThen([this](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Failed to replay change: " << error.errorMessage; + } + mSyncStore.clear(); + mSyncTransaction.commit(); + mEntityStore->abortTransaction(); + }); +} + +KAsync::Job Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList &) +{ + return KAsync::null(); +} + +KAsync::Job Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList &) +{ + return KAsync::null(); +} + #define REGISTER_TYPE(T) \ template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash &mergeCriteria); \ template void Synchronizer::modify(const T &entity); diff --git a/common/synchronizer.h b/common/synchronizer.h index 47518ee..0a51f54 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -27,6 +27,8 @@ #include #include #include +#include "changereplay.h" +#include "remoteidmap.h" namespace Sink { class RemoteIdMap; @@ -34,8 +36,9 @@ class RemoteIdMap; /** * Synchronize and add what we don't already have to local queue */ -class SINK_EXPORT Synchronizer +class SINK_EXPORT Synchronizer : public ChangeReplay { + Q_OBJECT public: Synchronizer(const Sink::ResourceContext &resourceContext); virtual ~Synchronizer(); @@ -52,6 +55,16 @@ public: void commit(); Sink::Storage::DataStore::Transaction &syncTransaction(); +protected: + ///Base implementation calls the replay$Type calls + virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; + virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; + +protected: + ///Implement to write back changes to the server + virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); + virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); + protected: ///Calls the callback to enqueue the command void enqueueCommand(int commandId, const QByteArray &data); -- cgit v1.2.3