From e9c75177590d8546ebd9425f16c4269a9c92f517 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 28 May 2016 00:24:53 +0200 Subject: Refactored the generic resource to use separate classes for changereplay and synchronization. This cleans up the API and avoids the excessive passing around of transactions. It also provides more flexibility in eventually using different synchronization strategies for different resources. --- common/CMakeLists.txt | 2 + common/adaptorfactoryregistry.cpp | 68 +++++ common/adaptorfactoryregistry.h | 64 +++++ common/changereplay.cpp | 102 +++++++ common/changereplay.h | 68 +++++ common/genericresource.cpp | 560 +++++++++++++++++++++----------------- common/genericresource.h | 151 +++++++--- common/pipeline.cpp | 17 +- common/pipeline.h | 3 +- common/resource.cpp | 2 + common/resource.h | 2 + 11 files changed, 742 insertions(+), 297 deletions(-) create mode 100644 common/adaptorfactoryregistry.cpp create mode 100644 common/adaptorfactoryregistry.h create mode 100644 common/changereplay.cpp create mode 100644 common/changereplay.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index c269a85..79b627a 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -66,6 +66,8 @@ set(command_SRCS domain/folder.cpp test.cpp query.cpp + changereplay.cpp + adaptorfactoryregistry.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/adaptorfactoryregistry.cpp b/common/adaptorfactoryregistry.cpp new file mode 100644 index 0000000..323a02d --- /dev/null +++ b/common/adaptorfactoryregistry.cpp @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "adaptorfactoryregistry.h" + +#include +#include +#include +#include +#include + +#include "domaintypeadaptorfactoryinterface.h" +#include "applicationdomaintype.h" +#include "log.h" + +using namespace Sink; + +AdaptorFactoryRegistry &AdaptorFactoryRegistry::instance() +{ + // QMutexLocker locker(&sMutex); + static AdaptorFactoryRegistry *instance = 0; + if (!instance) { + instance = new AdaptorFactoryRegistry; + } + return *instance; +} + +static QByteArray key(const QByteArray &resource, const QByteArray &type) +{ + return resource + type; +} + +AdaptorFactoryRegistry::AdaptorFactoryRegistry() +{ + +} + +std::shared_ptr AdaptorFactoryRegistry::getFactory(const QByteArray &resource, const QByteArray &typeName) +{ + const auto ptr = mRegistry.value(key(resource, typeName)); + //We have to check the pointer before the cast, otherwise a check would return true also for invalid instances. + if (!ptr) { + return std::shared_ptr(); + } + return std::static_pointer_cast(ptr); +} + +void AdaptorFactoryRegistry::registerFactory(const QByteArray &resource, const std::shared_ptr &instance, const QByteArray typeName) +{ + mRegistry.insert(key(resource, typeName), instance); +} + diff --git a/common/adaptorfactoryregistry.h b/common/adaptorfactoryregistry.h new file mode 100644 index 0000000..f06120a --- /dev/null +++ b/common/adaptorfactoryregistry.h @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ + +#pragma once + +#include "sink_export.h" +#include +#include +#include +#include +#include + +#include "domaintypeadaptorfactoryinterface.h" +#include "applicationdomaintype.h" +#include "log.h" + +namespace Sink { + +/** + */ +class SINK_EXPORT AdaptorFactoryRegistry +{ +public: + static AdaptorFactoryRegistry &instance(); + + template + void registerFactory(const QByteArray &resource) + { + registerFactory(resource, std::make_shared(), ApplicationDomain::getTypeName()); + } + + template + std::shared_ptr getFactory(const QByteArray &resource) + { + return getFactory(resource, ApplicationDomain::getTypeName()); + } + + std::shared_ptr getFactory(const QByteArray &resource, const QByteArray &typeName); + +private: + AdaptorFactoryRegistry(); + void registerFactory(const QByteArray &resource, const std::shared_ptr &instance, const QByteArray typeName); + + QHash> mRegistry; + static QMutex sMutex; +}; +} diff --git a/common/changereplay.cpp b/common/changereplay.cpp new file mode 100644 index 0000000..2447b6e --- /dev/null +++ b/common/changereplay.cpp @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "changereplay.h" + +#include "entitybuffer.h" +#include "log.h" +#include "definitions.h" +#include "bufferutils.h" + +using namespace Sink; + +#undef DEBUG_AREA +#define DEBUG_AREA "resource.changereplay" + +ChangeReplay::ChangeReplay(const QByteArray &resourceName) + : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite) +{ + Trace() << "Created change replay: " << resourceName; +} + +qint64 ChangeReplay::getLastReplayedRevision() +{ + qint64 lastReplayedRevision = 0; + auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); + replayStoreTransaction.openDatabase().scan("lastReplayedRevision", + [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { + lastReplayedRevision = value.toLongLong(); + return false; + }, + [](const Storage::Error &) {}); + return lastReplayedRevision; +} + +bool ChangeReplay::allChangesReplayed() +{ + const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + Warning() << error.message; + })); + const qint64 lastReplayedRevision = getLastReplayedRevision(); + Trace() << "All changes replayed " << topRevision << lastReplayedRevision; + return (lastReplayedRevision >= topRevision); +} + +void ChangeReplay::revisionChanged() +{ + auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + Warning() << error.message; + }); + auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + Warning() << error.message; + }); + qint64 lastReplayedRevision = 1; + replayStoreTransaction.openDatabase().scan("lastReplayedRevision", + [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { + lastReplayedRevision = value.toLongLong(); + return false; + }, + [](const Storage::Error &) {}); + const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); + + Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; + if (lastReplayedRevision <= topRevision) { + qint64 revision = lastReplayedRevision; + for (; revision <= topRevision; revision++) { + const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); + const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); + const auto key = Storage::assembleKey(uid, revision); + Storage::mainDatabase(mainStoreTransaction, type) + .scan(key, + [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { + replay(type, key, value).exec(); + // TODO make for loop async, and pass to async replay function together with type + Trace() << "Replaying " << key; + return false; + }, + [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); + } + revision--; + replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); + replayStoreTransaction.commit(); + Trace() << "Replayed until " << revision; + } + emit changesReplayed(); +} + diff --git a/common/changereplay.h b/common/changereplay.h new file mode 100644 index 0000000..a568060 --- /dev/null +++ b/common/changereplay.h @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" +#include +#include + +#include "storage.h" + +namespace Sink { + +/** + * Replays changes from the storage one by one. + * + * Uses a local database to: + * * Remember what changes have been replayed already. + * * store a mapping of remote to local buffers + */ +class SINK_EXPORT ChangeReplay : public QObject +{ + Q_OBJECT +public: + ChangeReplay(const QByteArray &resourceName); + + qint64 getLastReplayedRevision(); + bool allChangesReplayed(); + +signals: + void changesReplayed(); + +public slots: + void revisionChanged(); + +protected: + virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; + Sink::Storage mStorage; + +private: + Sink::Storage mChangeReplayStore; +}; + +class NullChangeReplay : public ChangeReplay +{ + public: + NullChangeReplay() : ChangeReplay("null") {} + KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null(); } +}; + +} + diff --git a/common/genericresource.cpp b/common/genericresource.cpp index eae6ead..cb2ef21 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -1,3 +1,22 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ #include "genericresource.h" #include "entitybuffer.h" @@ -14,6 +33,7 @@ #include "log.h" #include "definitions.h" #include "bufferutils.h" +#include "adaptorfactoryregistry.h" #include #include @@ -29,96 +49,6 @@ static int sCommitInterval = 10; using namespace Sink; -#undef DEBUG_AREA -#define DEBUG_AREA "resource.changereplay" - -/** - * Replays changes from the storage one by one. - * - * Uses a local database to: - * * Remember what changes have been replayed already. - * * store a mapping of remote to local buffers - */ -class ChangeReplay : public QObject -{ - Q_OBJECT -public: - typedef std::function(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; - - ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) - : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction) - { - } - - qint64 getLastReplayedRevision() - { - qint64 lastReplayedRevision = 0; - auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); - replayStoreTransaction.openDatabase().scan("lastReplayedRevision", - [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { - lastReplayedRevision = value.toLongLong(); - return false; - }, - [](const Storage::Error &) {}); - return lastReplayedRevision; - } - - bool allChangesReplayed() - { - const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); - const qint64 lastReplayedRevision = getLastReplayedRevision(); - Trace() << "All changes replayed " << topRevision << lastReplayedRevision; - return (lastReplayedRevision >= topRevision); - } - -signals: - void changesReplayed(); - -public slots: - void revisionChanged() - { - auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); - auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); - qint64 lastReplayedRevision = 1; - replayStoreTransaction.openDatabase().scan("lastReplayedRevision", - [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { - lastReplayedRevision = value.toLongLong(); - return false; - }, - [](const Storage::Error &) {}); - const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); - - Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; - if (lastReplayedRevision <= topRevision) { - qint64 revision = lastReplayedRevision; - for (; revision <= topRevision; revision++) { - const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); - const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); - const auto key = Storage::assembleKey(uid, revision); - Storage::mainDatabase(mainStoreTransaction, type) - .scan(key, - [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { - mReplayFunction(type, key, value).exec(); - // TODO make for loop async, and pass to async replay function together with type - Trace() << "Replaying " << key; - return false; - }, - [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); - } - revision--; - replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); - replayStoreTransaction.commit(); - Trace() << "Replayed until " << revision; - } - emit changesReplayed(); - } - -private: - Sink::Storage mStorage; - Sink::Storage mChangeReplayStore; - ReplayFunction mReplayFunction; -}; - #undef DEBUG_AREA #define DEBUG_AREA "resource.commandprocessor" @@ -133,10 +63,9 @@ class CommandProcessor : public QObject public: CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) { - mPipeline->startTransaction(); - // FIXME Should be initialized to the current value of the change replay queue - mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); - mPipeline->commit(); + mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + Warning() << error.message; + })); for (auto queue : mCommandQueues) { const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); @@ -303,15 +232,22 @@ private: #undef DEBUG_AREA #define DEBUG_AREA "resource" -GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline) +GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline, const QSharedPointer &changeReplay, const QSharedPointer &synchronizer) : Sink::Resource(), mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), + mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceInstanceIdentifier)), + mChangeReplay(changeReplay), + mSynchronizer(synchronizer), mError(0), mClientLowerBoundRevision(std::numeric_limits::max()) { + mPipeline->setResourceType(mResourceType); + mSynchronizer->setup([this](int commandId, const QByteArray &data) { + enqueueCommand(mSynchronizerQueue, commandId, data); + }); mProcessor = new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); mProcessor->setInspectionCommand([this](void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); @@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c }); QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); - mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { - // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) - auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); - return this->replay(*synchronizationStore, type, key, value).then([synchronizationStore]() {}); - }); enableChangeReplay(true); mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); - mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); + mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); mCommitQueueTimer.setInterval(sCommitInterval); mCommitQueueTimer.setSingleShot(true); @@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c GenericResource::~GenericResource() { delete mProcessor; - delete mSourceChangeReplay; } KAsync::Job GenericResource::inspect( @@ -383,86 +313,20 @@ KAsync::Job GenericResource::inspect( void GenericResource::enableChangeReplay(bool enable) { if (enable) { - QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); - QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); - mSourceChangeReplay->revisionChanged(); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); + QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + mChangeReplay->revisionChanged(); } else { - QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); - QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); + QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); } } void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); - mPipeline->setAdaptorFactory(type, factory); - mAdaptorFactories.insert(type, factory); -} - -KAsync::Job GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) -{ - Sink::EntityBuffer buffer(value); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - if (!metadataBuffer->replayToSource()) { - Trace() << "Change is coming from the source"; - return KAsync::null(); - } - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - const auto uid = Sink::Storage::uidFromKey(key); - QByteArray oldRemoteId; - - if (operation != Sink::Operation_Creation) { - auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly); - oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction); - } - Trace() << "Replaying " << key << type; - - KAsync::Job job = KAsync::null(); - if (type == ENTITY_TYPE_FOLDER) { - const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); - job = replay(folder, operation, oldRemoteId); - } else if (type == ENTITY_TYPE_MAIL) { - const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); - job = replay(mail, operation, oldRemoteId); - } - - return job.then([=, &synchronizationStore](const QByteArray &remoteId) { - auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite); - Trace() << "Replayed change with remote id: " << remoteId; - if (operation == Sink::Operation_Creation) { - if (remoteId.isEmpty()) { - Warning() << "Returned an empty remoteId from the creation"; - } else { - recordRemoteId(type, uid, remoteId, synchronizationTransaction); - } - } else if (operation == Sink::Operation_Modification) { - if (remoteId.isEmpty()) { - Warning() << "Returned an empty remoteId from the creation"; - } else { - updateRemoteId(type, uid, remoteId, synchronizationTransaction); - } - } else if (operation == Sink::Operation_Removal) { - removeRemoteId(type, uid, remoteId, synchronizationTransaction); - } else { - Warning() << "Unkown operation" << operation; - } - }, [](int errorCode, const QString &errorMessage) { - Warning() << "Failed to replay change: " << errorMessage; - }); } -KAsync::Job GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) -{ - return KAsync::null(); -} - -KAsync::Job GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) -{ - return KAsync::null(); -} void GenericResource::removeDataFromDisk() { @@ -528,10 +392,8 @@ KAsync::Job GenericResource::synchronizeWithSource() Log() << " Synchronizing"; // Changereplay would deadlock otherwise when trying to open the synchronization store enableChangeReplay(false); - auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); - auto syncStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); - synchronizeWithSource(*mainStore, *syncStore) - .then([this, mainStore, syncStore, &future]() { + mSynchronizer->synchronize() + .then([this, &future]() { Log() << "Done Synchronizing"; enableChangeReplay(true); future.setFinished(); @@ -576,11 +438,11 @@ KAsync::Job GenericResource::processAllMessages() .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) .then([this](KAsync::Future &f) { - if (mSourceChangeReplay->allChangesReplayed()) { + if (mChangeReplay->allChangesReplayed()) { f.setFinished(); } else { auto context = new QObject; - QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { + QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { delete context; f.setFinished(); }); @@ -590,7 +452,7 @@ KAsync::Job GenericResource::processAllMessages() void GenericResource::updateLowerBoundRevision() { - mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); + mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); } void GenericResource::setLowerBoundRevision(qint64 revision) @@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision) updateLowerBoundRevision(); } -void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + + + +EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) + : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), + mTransaction(transaction) +{ + +} + +template +T EntityStore::read(const QByteArray &identifier) const +{ + auto typeName = ApplicationDomain::getTypeName(); + auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); + auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType)); + Q_ASSERT(bufferAdaptor); + return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); +} + +QSharedPointer EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) +{ + QSharedPointer current; + db.findLatest(uid, + [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + Trace() << "Found value " << key; + current = adaptorFactory.createAdaptor(buffer.entity()); + } + return false; + }, + [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); + return current; +} + + + +SyncStore::SyncStore(Sink::Storage::Transaction &transaction) + : mTransaction(transaction) +{ + +} + +void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) +{ + Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); + Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); +} + +void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) +{ + Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); + Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); +} + +void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) +{ + const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); + removeRemoteId(bufferType, localId, oldRemoteId); + recordRemoteId(bufferType, localId, remoteId); +} + +QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) +{ + // Lookup local id for remote id, or insert a new pair otherwise + Index index("rid.mapping." + bufferType, mTransaction); + QByteArray sinkId = index.lookup(remoteId); + if (sinkId.isEmpty()) { + sinkId = QUuid::createUuid().toString().toUtf8(); + index.add(remoteId, sinkId); + Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); + } + return sinkId; +} + +QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) +{ + QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); + if (remoteId.isEmpty()) { + Warning() << "Couldn't find the remote id for " << localId; + return QByteArray(); + } + return remoteId; +} + + + + + + + + +Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) + : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), + mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), + mResourceType(resourceType), + mResourceInstanceIdentifier(resourceInstanceIdentifier) +{ + Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; + +} + +void Synchronizer::setup(const std::function &enqueueCommandCallback) +{ + mEnqueue = enqueueCommandCallback; +} + +void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) +{ + Q_ASSERT(mEnqueue); + mEnqueue(commandId, data); +} + +EntityStore &Synchronizer::store() +{ + if (!mEntityStore) { + mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); + } + return *mEntityStore; +} + +SyncStore &Synchronizer::syncStore() +{ + if (!mSyncStore) { + mSyncStore = QSharedPointer::create(mSyncTransaction); + } + return *mSyncStore; +} + +void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) { // These changes are coming from the source @@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b callback(BufferUtils::extractBuffer(fbb)); } -void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, +void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) { // These changes are coming from the source @@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co callback(BufferUtils::extractBuffer(fbb)); } -void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) +void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) { // These changes are coming from the source const auto replayToSource = false; @@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co callback(BufferUtils::extractBuffer(fbb)); } -void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) -{ - Index("rid.mapping." + bufferType, transaction).add(remoteId, localId); - ; - Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); -} - -void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) -{ - Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId); - Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId); -} - -void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) -{ - const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); - removeRemoteId(bufferType, localId, oldRemoteId, transaction); - recordRemoteId(bufferType, localId, remoteId, transaction); -} - -QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) -{ - // Lookup local id for remote id, or insert a new pair otherwise - Index index("rid.mapping." + bufferType, transaction); - QByteArray sinkId = index.lookup(remoteId); - if (sinkId.isEmpty()) { - sinkId = QUuid::createUuid().toString().toUtf8(); - index.add(remoteId, sinkId); - Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId); - } - return sinkId; -} - -QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) -{ - QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); - if (remoteId.isEmpty()) { - Warning() << "Couldn't find the remote id for " << localId; - return QByteArray(); - } - return remoteId; -} - -void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, - const std::function &callback)> &entryGenerator, std::function exists) +void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) { - entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { + entryGenerator([this, bufferType, &exists](const QByteArray &key) { auto sinkId = Sink::Storage::uidFromKey(key); Trace() << "Checking for removal " << key; - const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); + const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { Trace() << "Found a removed entity: " << sinkId; - deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, - [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); + deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, + [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); } } }); } -QSharedPointer GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) -{ - QSharedPointer current; - db.findLatest(uid, - [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { - Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - current = adaptorFactory.createAdaptor(buffer.entity()); - } - return false; - }, - [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); - return current; -} - -void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, - DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { - auto mainDatabase = Storage::mainDatabase(transaction, bufferType); - const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); + Trace() << "Create or modify" << bufferType << remoteId; + auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); + const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); const auto found = mainDatabase.contains(sinkId); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); if (!found) { Trace() << "Found a new entity: " << remoteId; createEntity( - sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); + sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); } else { // modification - if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { + if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { bool changed = false; for (const auto &property : entity.changedProperties()) { if (entity.getProperty(property) != current->getProperty(property)) { @@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si } if (changed) { Trace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); + modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, + [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); } } else { Warning() << "Failed to get current entity"; @@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si } } +KAsync::Job Synchronizer::synchronize() +{ + mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + return synchronizeWithSource().then([this]() { + mTransaction.abort(); + mSyncTransaction.commit(); + mSyncStore.clear(); + mEntityStore.clear(); + }); +} + + + +SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) + : ChangeReplay(resourceInstanceIdentifier), + mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), + mResourceType(resourceType), + mResourceInstanceIdentifier(resourceInstanceIdentifier) +{ + +} + +EntityStore &SourceWriteBack::store() +{ + if (!mEntityStore) { + mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); + } + return *mEntityStore; +} + +SyncStore &SourceWriteBack::syncStore() +{ + if (!mSyncStore) { + mSyncStore = QSharedPointer::create(mSyncTransaction); + } + return *mSyncStore; +} + +KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) +{ + mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + + Sink::EntityBuffer buffer(value); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + if (!metadataBuffer->replayToSource()) { + Trace() << "Change is coming from the source"; + return KAsync::null(); + } + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; + const auto uid = Sink::Storage::uidFromKey(key); + QByteArray oldRemoteId; + + if (operation != Sink::Operation_Creation) { + oldRemoteId = syncStore().resolveLocalId(type, uid); + } + Trace() << "Replaying " << key << type; + + KAsync::Job job = KAsync::null(); + if (type == ENTITY_TYPE_FOLDER) { + auto folder = store().read(uid); + // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); + job = replay(folder, operation, oldRemoteId); + } else if (type == ENTITY_TYPE_MAIL) { + auto mail = store().read(uid); + // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); + job = replay(mail, operation, oldRemoteId); + } + + return job.then([this, operation, type, uid](const QByteArray &remoteId) { + Trace() << "Replayed change with remote id: " << remoteId; + if (operation == Sink::Operation_Creation) { + if (remoteId.isEmpty()) { + Warning() << "Returned an empty remoteId from the creation"; + } else { + syncStore().recordRemoteId(type, uid, remoteId); + } + } else if (operation == Sink::Operation_Modification) { + if (remoteId.isEmpty()) { + Warning() << "Returned an empty remoteId from the creation"; + } else { + syncStore().updateRemoteId(type, uid, remoteId); + } + } else if (operation == Sink::Operation_Removal) { + syncStore().removeRemoteId(type, uid, remoteId); + } else { + Warning() << "Unkown operation" << operation; + } + + mTransaction.abort(); + mSyncTransaction.commit(); + mSyncStore.clear(); + mEntityStore.clear(); + }, [](int errorCode, const QString &errorMessage) { + Warning() << "Failed to replay change: " << errorMessage; + }); +} + +KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) +{ + return KAsync::null(); +} + +KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) +{ + return KAsync::null(); +} + #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" diff --git a/common/genericresource.h b/common/genericresource.h index c551e29..45d5d3a 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -24,14 +24,16 @@ #include #include #include +#include "changereplay.h" + #include class CommandProcessor; -class ChangeReplay; namespace Sink { class Pipeline; class Preprocessor; +class Synchronizer; /** * Generic Resource implementation. @@ -39,7 +41,7 @@ class Preprocessor; class SINK_EXPORT GenericResource : public Resource { public: - GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline = QSharedPointer()); + GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline, const QSharedPointer &changeReplay, const QSharedPointer &synchronizer); virtual ~GenericResource(); virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; @@ -64,41 +66,90 @@ protected: void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors); - ///Base implementation call the replay$Type calls - virtual KAsync::Job replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value); - ///Implement to write back changes to the server - virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); - virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); - void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); - static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); - static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); - static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function callback); + MessageQueue mUserQueue; + MessageQueue mSynchronizerQueue; + QByteArray mResourceType; + QByteArray mResourceInstanceIdentifier; + QSharedPointer mPipeline; + +private: + CommandProcessor *mProcessor; + QSharedPointer mChangeReplay; + QSharedPointer mSynchronizer; + int mError; + QTimer mCommitQueueTimer; + qint64 mClientLowerBoundRevision; + QHash mAdaptorFactories; +}; + +class SINK_EXPORT SyncStore +{ +public: + SyncStore(Sink::Storage::Transaction &); /** * Records a localId to remoteId mapping */ - void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); - void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); - void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); + void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); + void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); + void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); /** * Tries to find a local id for the remote id, and creates a new local id otherwise. * * The new local id is recorded in the local to remote id mapping. */ - QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); + QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); /** * Tries to find a remote id for a local id. * * This can fail if the entity hasn't been written back to the server yet. */ - QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); + QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); + +private: + Sink::Storage::Transaction &mTransaction; +}; + +class SINK_EXPORT EntityStore +{ +public: + EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); + + template + T read(const QByteArray &identifier) const; + + static QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); +private: + QByteArray mResourceType; + QByteArray mResourceInstanceIdentifier; + Sink::Storage::Transaction &mTransaction; +}; + +/** + * Synchronize and add what we don't already have to local queue + */ +class SINK_EXPORT Synchronizer +{ +public: + Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); + + void setup(const std::function &enqueueCommandCallback); + KAsync::Job synchronize(); + +protected: + ///Calls the callback to enqueue the command + void enqueueCommand(int commandId, const QByteArray &data); + + static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); + static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); + static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function callback); /** * A synchronous algorithm to remove entities that are no longer existing. @@ -110,7 +161,7 @@ protected: * * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. */ - void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, + void scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists); /** @@ -118,22 +169,60 @@ protected: * * Depending on whether the entity is locally available, or has changed. */ - void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, - const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); + void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); - static QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); + //Read only access to main storage + EntityStore &store(); - MessageQueue mUserQueue; - MessageQueue mSynchronizerQueue; + //Read/Write access to sync storage + SyncStore &syncStore(); + + virtual KAsync::Job synchronizeWithSource() = 0; + +private: + QSharedPointer mSyncStore; + QSharedPointer mEntityStore; + Sink::Storage mStorage; + Sink::Storage mSyncStorage; + QByteArray mResourceType; QByteArray mResourceInstanceIdentifier; - QSharedPointer mPipeline; + Sink::Storage::Transaction mTransaction; + Sink::Storage::Transaction mSyncTransaction; + std::function mEnqueue; +}; + +/** + * Replay changes to the source + */ +class SINK_EXPORT SourceWriteBack : public ChangeReplay +{ +public: + SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); + +protected: + ///Base implementation calls the replay$Type calls + virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; + +protected: + ///Implement to write back changes to the server + virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); + virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); + + //Read only access to main storage + EntityStore &store(); + + //Read/Write access to sync storage + SyncStore &syncStore(); private: - CommandProcessor *mProcessor; - ChangeReplay *mSourceChangeReplay; - int mError; - QTimer mCommitQueueTimer; - qint64 mClientLowerBoundRevision; - QHash mAdaptorFactories; + Sink::Storage mSyncStorage; + QSharedPointer mSyncStore; + QSharedPointer mEntityStore; + Sink::Storage::Transaction mTransaction; + Sink::Storage::Transaction mSyncTransaction; + QByteArray mResourceType; + QByteArray mResourceInstanceIdentifier; }; + + } diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 637a1b8..7863f67 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -34,6 +34,7 @@ #include "entitybuffer.h" #include "log.h" #include "domain/applicationdomaintype.h" +#include "adaptorfactoryregistry.h" #include "definitions.h" #include "bufferutils.h" @@ -52,11 +53,11 @@ public: Storage storage; Storage::Transaction transaction; QHash> processors; - QHash adaptorFactory; bool revisionChanged; void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); QTime transactionTime; int transactionItemCount; + QByteArray resourceType; }; void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) @@ -84,9 +85,9 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVectorprocessors[entityType] = processors; } -void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) +void Pipeline::setResourceType(const QByteArray &resourceType) { - d->adaptorFactory.insert(entityType, factory); + d->resourceType = resourceType; } void Pipeline::startTransaction() @@ -102,7 +103,9 @@ void Pipeline::startTransaction() Trace() << "Starting transaction."; d->transactionTime.start(); d->transactionItemCount = 0; - d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); + d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + Warning() << error.message; + })); } void Pipeline::commit() @@ -189,7 +192,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); - auto adaptorFactory = d->adaptorFactory.value(bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); @@ -244,7 +247,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } // TODO use only readPropertyMapper and writePropertyMapper - auto adaptorFactory = d->adaptorFactory.value(bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); @@ -373,7 +376,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); - auto adaptorFactory = d->adaptorFactory.value(bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); diff --git a/common/pipeline.h b/common/pipeline.h index c65cbfd..2ca87a4 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -46,13 +46,12 @@ public: Storage &storage() const; + void setResourceType(const QByteArray &resourceType); void setPreprocessors(const QString &entityType, const QVector &preprocessors); void startTransaction(); void commit(); Storage::Transaction &transaction(); - void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); - KAsync::Job newEntity(void const *command, size_t size); KAsync::Job modifiedEntity(void const *command, size_t size); KAsync::Job deletedEntity(void const *command, size_t size); diff --git a/common/resource.cpp b/common/resource.cpp index 6713686..82c9fc8 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -26,6 +26,7 @@ #include #include "facadefactory.h" +#include "adaptorfactoryregistry.h" namespace Sink { @@ -110,6 +111,7 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName) if (factory) { Private::s_loadedFactories.insert(resourceName, factory); factory->registerFacades(FacadeFactory::instance()); + factory->registerAdaptorFactories(AdaptorFactoryRegistry::instance()); // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); return factory; } else { diff --git a/common/resource.h b/common/resource.h index 0e7cd11..d6c3c5f 100644 --- a/common/resource.h +++ b/common/resource.h @@ -26,6 +26,7 @@ namespace Sink { class FacadeFactory; +class AdaptorFactoryRegistry; /** * Resource interface @@ -81,6 +82,7 @@ public: virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; virtual void registerFacades(FacadeFactory &factory) = 0; + virtual void registerAdaptorFactories(AdaptorFactoryRegistry ®istry) {}; private: class Private; -- cgit v1.2.3