From b441386c4e138d19bbd79d578e0a2ff1b3f54a93 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 28 May 2016 02:09:58 +0200 Subject: Moved the classes to individual files --- common/synchronizer.cpp | 176 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 common/synchronizer.cpp (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp new file mode 100644 index 0000000..fb0baaa --- /dev/null +++ b/common/synchronizer.cpp @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "synchronizer.h" + +#include "definitions.h" +#include "commands.h" +#include "bufferutils.h" +#include "entitystore.h" +#include "remoteidmap.h" +#include "adaptorfactoryregistry.h" +#include "createentity_generated.h" +#include "modifyentity_generated.h" +#include "deleteentity_generated.h" + +using namespace Sink; + +Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) + : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), + mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), + mResourceType(resourceType), + mResourceInstanceIdentifier(resourceInstanceIdentifier) +{ + Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; +} + +void Synchronizer::setup(const std::function &enqueueCommandCallback) +{ + mEnqueue = enqueueCommandCallback; +} + +void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) +{ + Q_ASSERT(mEnqueue); + mEnqueue(commandId, data); +} + +EntityStore &Synchronizer::store() +{ + if (!mEntityStore) { + mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); + } + return *mEntityStore; +} + +RemoteIdMap &Synchronizer::syncStore() +{ + if (!mSyncStore) { + mSyncStore = QSharedPointer::create(mSyncTransaction); + } + return *mSyncStore; +} + +void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +{ + // These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder entityFbb; + adaptorFactory.createBuffer(domainObject, entityFbb); + flatbuffers::FlatBufferBuilder fbb; + // This is the resource type and not the domain type + auto entityId = fbb.CreateString(sinkId.toStdString()); + auto type = fbb.CreateString(bufferType.toStdString()); + auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); + auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); + Sink::Commands::FinishCreateEntityBuffer(fbb, location); + callback(BufferUtils::extractBuffer(fbb)); +} + +void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +{ + // These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder entityFbb; + adaptorFactory.createBuffer(domainObject, entityFbb); + flatbuffers::FlatBufferBuilder fbb; + auto entityId = fbb.CreateString(sinkId.toStdString()); + // This is the resource type and not the domain type + auto type = fbb.CreateString(bufferType.toStdString()); + auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); + // FIXME removals + auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); + Sink::Commands::FinishModifyEntityBuffer(fbb, location); + callback(BufferUtils::extractBuffer(fbb)); +} + +void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) +{ + // These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder fbb; + auto entityId = fbb.CreateString(sinkId.toStdString()); + // This is the resource type and not the domain type + auto type = fbb.CreateString(bufferType.toStdString()); + auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); + Sink::Commands::FinishDeleteEntityBuffer(fbb, location); + callback(BufferUtils::extractBuffer(fbb)); +} + +void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) +{ + entryGenerator([this, bufferType, &exists](const QByteArray &key) { + auto sinkId = Sink::Storage::uidFromKey(key); + Trace() << "Checking for removal " << key; + const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); + // If we have no remoteId, the entity hasn't been replayed to the source yet + if (!remoteId.isEmpty()) { + if (!exists(remoteId)) { + Trace() << "Found a removed entity: " << sinkId; + deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, + [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); + } + } + }); +} + +void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +{ + Trace() << "Create or modify" << bufferType << remoteId; + auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); + const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); + const auto found = mainDatabase.contains(sinkId); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); + if (!found) { + Trace() << "Found a new entity: " << remoteId; + createEntity( + sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); + } else { // modification + if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { + bool changed = false; + for (const auto &property : entity.changedProperties()) { + if (entity.getProperty(property) != current->getProperty(property)) { + Trace() << "Property changed " << sinkId << property; + changed = true; + } + } + if (changed) { + Trace() << "Found a modified entity: " << remoteId; + modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, + [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); + } + } else { + Warning() << "Failed to get current entity"; + } + } +} + +KAsync::Job Synchronizer::synchronize() +{ + mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + return synchronizeWithSource().then([this]() { + mTransaction.abort(); + mSyncTransaction.commit(); + mSyncStore.clear(); + mEntityStore.clear(); + }); +} -- cgit v1.2.3