From b441386c4e138d19bbd79d578e0a2ff1b3f54a93 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 28 May 2016 02:09:58 +0200 Subject: Moved the classes to individual files --- common/genericresource.cpp | 342 +-------------------------------------------- 1 file changed, 1 insertion(+), 341 deletions(-) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index cb2ef21..568e066 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -34,6 +34,7 @@ #include "definitions.h" #include "bufferutils.h" #include "adaptorfactoryregistry.h" +#include "synchronizer.h" #include #include @@ -461,347 +462,6 @@ void GenericResource::setLowerBoundRevision(qint64 revision) updateLowerBoundRevision(); } - - - -EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) - : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), - mTransaction(transaction) -{ - -} - -template -T EntityStore::read(const QByteArray &identifier) const -{ - auto typeName = ApplicationDomain::getTypeName(); - auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType)); - Q_ASSERT(bufferAdaptor); - return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); -} - -QSharedPointer EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) -{ - QSharedPointer current; - db.findLatest(uid, - [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { - Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - Trace() << "Found value " << key; - current = adaptorFactory.createAdaptor(buffer.entity()); - } - return false; - }, - [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); - return current; -} - - - -SyncStore::SyncStore(Sink::Storage::Transaction &transaction) - : mTransaction(transaction) -{ - -} - -void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) -{ - Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); - Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); -} - -void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) -{ - Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); - Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); -} - -void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) -{ - const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); - removeRemoteId(bufferType, localId, oldRemoteId); - recordRemoteId(bufferType, localId, remoteId); -} - -QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) -{ - // Lookup local id for remote id, or insert a new pair otherwise - Index index("rid.mapping." + bufferType, mTransaction); - QByteArray sinkId = index.lookup(remoteId); - if (sinkId.isEmpty()) { - sinkId = QUuid::createUuid().toString().toUtf8(); - index.add(remoteId, sinkId); - Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); - } - return sinkId; -} - -QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) -{ - QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); - if (remoteId.isEmpty()) { - Warning() << "Couldn't find the remote id for " << localId; - return QByteArray(); - } - return remoteId; -} - - - - - - - - -Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), - mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), - mResourceType(resourceType), - mResourceInstanceIdentifier(resourceInstanceIdentifier) -{ - Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; - -} - -void Synchronizer::setup(const std::function &enqueueCommandCallback) -{ - mEnqueue = enqueueCommandCallback; -} - -void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) -{ - Q_ASSERT(mEnqueue); - mEnqueue(commandId, data); -} - -EntityStore &Synchronizer::store() -{ - if (!mEntityStore) { - mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); - } - return *mEntityStore; -} - -SyncStore &Synchronizer::syncStore() -{ - if (!mSyncStore) { - mSyncStore = QSharedPointer::create(mSyncTransaction); - } - return *mSyncStore; -} - -void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) -{ - // These changes are coming from the source - const auto replayToSource = false; - flatbuffers::FlatBufferBuilder entityFbb; - adaptorFactory.createBuffer(domainObject, entityFbb); - flatbuffers::FlatBufferBuilder fbb; - // This is the resource type and not the domain type - auto entityId = fbb.CreateString(sinkId.toStdString()); - auto type = fbb.CreateString(bufferType.toStdString()); - auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); - auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); - Sink::Commands::FinishCreateEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); -} - -void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) -{ - // These changes are coming from the source - const auto replayToSource = false; - flatbuffers::FlatBufferBuilder entityFbb; - adaptorFactory.createBuffer(domainObject, entityFbb); - flatbuffers::FlatBufferBuilder fbb; - auto entityId = fbb.CreateString(sinkId.toStdString()); - // This is the resource type and not the domain type - auto type = fbb.CreateString(bufferType.toStdString()); - auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); - // TODO removals - auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); - Sink::Commands::FinishModifyEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); -} - -void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) -{ - // These changes are coming from the source - const auto replayToSource = false; - flatbuffers::FlatBufferBuilder fbb; - auto entityId = fbb.CreateString(sinkId.toStdString()); - // This is the resource type and not the domain type - auto type = fbb.CreateString(bufferType.toStdString()); - auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); - Sink::Commands::FinishDeleteEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); -} - -void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) -{ - entryGenerator([this, bufferType, &exists](const QByteArray &key) { - auto sinkId = Sink::Storage::uidFromKey(key); - Trace() << "Checking for removal " << key; - const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); - // If we have no remoteId, the entity hasn't been replayed to the source yet - if (!remoteId.isEmpty()) { - if (!exists(remoteId)) { - Trace() << "Found a removed entity: " << sinkId; - deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); - } - } - }); -} - -void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) -{ - Trace() << "Create or modify" << bufferType << remoteId; - auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); - const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); - const auto found = mainDatabase.contains(sinkId); - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); - if (!found) { - Trace() << "Found a new entity: " << remoteId; - createEntity( - sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); - } else { // modification - if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { - bool changed = false; - for (const auto &property : entity.changedProperties()) { - if (entity.getProperty(property) != current->getProperty(property)) { - Trace() << "Property changed " << sinkId << property; - changed = true; - } - } - if (changed) { - Trace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); - } - } else { - Warning() << "Failed to get current entity"; - } - } -} - -KAsync::Job Synchronizer::synchronize() -{ - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); - return synchronizeWithSource().then([this]() { - mTransaction.abort(); - mSyncTransaction.commit(); - mSyncStore.clear(); - mEntityStore.clear(); - }); -} - - - -SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : ChangeReplay(resourceInstanceIdentifier), - mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), - mResourceType(resourceType), - mResourceInstanceIdentifier(resourceInstanceIdentifier) -{ - -} - -EntityStore &SourceWriteBack::store() -{ - if (!mEntityStore) { - mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); - } - return *mEntityStore; -} - -SyncStore &SourceWriteBack::syncStore() -{ - if (!mSyncStore) { - mSyncStore = QSharedPointer::create(mSyncTransaction); - } - return *mSyncStore; -} - -KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) -{ - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); - - Sink::EntityBuffer buffer(value); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - if (!metadataBuffer->replayToSource()) { - Trace() << "Change is coming from the source"; - return KAsync::null(); - } - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - const auto uid = Sink::Storage::uidFromKey(key); - QByteArray oldRemoteId; - - if (operation != Sink::Operation_Creation) { - oldRemoteId = syncStore().resolveLocalId(type, uid); - } - Trace() << "Replaying " << key << type; - - KAsync::Job job = KAsync::null(); - if (type == ENTITY_TYPE_FOLDER) { - auto folder = store().read(uid); - // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); - job = replay(folder, operation, oldRemoteId); - } else if (type == ENTITY_TYPE_MAIL) { - auto mail = store().read(uid); - // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); - job = replay(mail, operation, oldRemoteId); - } - - return job.then([this, operation, type, uid](const QByteArray &remoteId) { - Trace() << "Replayed change with remote id: " << remoteId; - if (operation == Sink::Operation_Creation) { - if (remoteId.isEmpty()) { - Warning() << "Returned an empty remoteId from the creation"; - } else { - syncStore().recordRemoteId(type, uid, remoteId); - } - } else if (operation == Sink::Operation_Modification) { - if (remoteId.isEmpty()) { - Warning() << "Returned an empty remoteId from the creation"; - } else { - syncStore().updateRemoteId(type, uid, remoteId); - } - } else if (operation == Sink::Operation_Removal) { - syncStore().removeRemoteId(type, uid, remoteId); - } else { - Warning() << "Unkown operation" << operation; - } - - mTransaction.abort(); - mSyncTransaction.commit(); - mSyncStore.clear(); - mEntityStore.clear(); - }, [](int errorCode, const QString &errorMessage) { - Warning() << "Failed to replay change: " << errorMessage; - }); -} - -KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) -{ - return KAsync::null(); -} - -KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) -{ - return KAsync::null(); -} - - #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "genericresource.moc" -- cgit v1.2.3