/* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "synchronizer.h" #include "definitions.h" #include "commands.h" #include "bufferutils.h" #include "entitystore.h" #include "remoteidmap.h" #include "adaptorfactoryregistry.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" using namespace Sink; Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) { Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; } void Synchronizer::setup(const std::function &enqueueCommandCallback) { mEnqueue = enqueueCommandCallback; } void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) { Q_ASSERT(mEnqueue); mEnqueue(commandId, data); } EntityStore &Synchronizer::store() { if (!mEntityStore) { mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); } return *mEntityStore; } RemoteIdMap &Synchronizer::syncStore() { if (!mSyncStore) { mSyncStore = QSharedPointer::create(mSyncTransaction); } return *mSyncStore; } void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; adaptorFactory.createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; // This is the resource type and not the domain type auto entityId = fbb.CreateString(sinkId.toStdString()); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Sink::Commands::FinishCreateEntityBuffer(fbb, location); callback(BufferUtils::extractBuffer(fbb)); } void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; adaptorFactory.createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); // This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); // FIXME removals auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); Sink::Commands::FinishModifyEntityBuffer(fbb, location); callback(BufferUtils::extractBuffer(fbb)); } void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); // This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); callback(BufferUtils::extractBuffer(fbb)); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) { entryGenerator([this, bufferType, &exists](const QByteArray &key) { auto sinkId = Sink::Storage::uidFromKey(key); Trace() << "Checking for removal " << key; const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { Trace() << "Found a removed entity: " << sinkId; deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); } } }); } void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { Trace() << "Create or modify" << bufferType << remoteId; auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); const auto found = mainDatabase.contains(sinkId); auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); if (!found) { Trace() << "Found a new entity: " << remoteId; createEntity( sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); } else { // modification if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { bool changed = false; for (const auto &property : entity.changedProperties()) { if (entity.getProperty(property) != current->getProperty(property)) { Trace() << "Property changed " << sinkId << property; changed = true; } } if (changed) { Trace() << "Found a modified entity: " << remoteId; modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); } } else { Warning() << "Failed to get current entity"; } } } KAsync::Job Synchronizer::synchronize() { mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); return synchronizeWithSource().then([this]() { mTransaction.abort(); mSyncTransaction.commit(); mSyncStore.clear(); mEntityStore.clear(); }); }