From 237b9ae4113e7a9f489632296941becb71afdb45 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 16 Oct 2016 14:55:20 +0200 Subject: Refactor how the storage is used. This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal. --- common/synchronizer.cpp | 148 +++++++++++++++++++----------------------------- 1 file changed, 58 insertions(+), 90 deletions(-) (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 53db82f..5ddd77c 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -24,7 +24,7 @@ #include "bufferutils.h" #include "entitystore.h" #include "remoteidmap.h" -#include "adaptorfactoryregistry.h" +#include "entityreader.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" @@ -33,13 +33,12 @@ SINK_DEBUG_AREA("synchronizer") using namespace Sink; -Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), - mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), - mResourceType(resourceType), - mResourceInstanceIdentifier(resourceInstanceIdentifier) +Synchronizer::Synchronizer(const Sink::ResourceContext &context) + : mResourceContext(context), + mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), + mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) { - SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; + SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } Synchronizer::~Synchronizer() @@ -59,11 +58,9 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) mEnqueue(commandId, data); } -EntityStore &Synchronizer::store() +Storage::EntityStore &Synchronizer::store() { - if (!mEntityStore) { - mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, transaction()); - } + mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); return *mEntityStore; } @@ -75,13 +72,12 @@ RemoteIdMap &Synchronizer::syncStore() return *mSyncStore; } -void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; - adaptorFactory.createBuffer(domainObject, entityFbb); + mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; // This is the resource type and not the domain type auto entityId = fbb.CreateString(sinkId.toStdString()); @@ -89,18 +85,17 @@ void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &buff auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Sink::Commands::FinishCreateEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); + enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb)); } -void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) { // FIXME removals QByteArrayList deletedProperties; // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; - adaptorFactory.createBuffer(domainObject, entityFbb); + mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); @@ -110,10 +105,10 @@ void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties); Sink::Commands::FinishModifyEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); + enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb)); } -void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) +void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType) { // These changes are coming from the source const auto replayToSource = false; @@ -123,63 +118,69 @@ void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); + enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) { - entryGenerator([this, bufferType, &exists](const QByteArray &key) { - auto sinkId = Sink::Storage::uidFromKey(key); + entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); - SinkTrace() << "Checking for removal " << key << remoteId; + SinkTrace() << "Checking for removal " << sinkId << remoteId; // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { SinkTrace() << "Found a removed entity: " << sinkId; - deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); + deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); } } }); } -void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function exists) { - auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); - const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); - Q_ASSERT(adaptorFactory); - qint64 retrievedRevision = 0; - if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { + scanForRemovals(bufferType, + [this, &bufferType](const std::function &callback) { + store().readAllUids(bufferType, [callback](const QByteArray &uid) { + callback(uid); + }); + }, + exists + ); +} + +void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +{ + store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType ¤t) { bool changed = false; for (const auto &property : entity.changedProperties()) { - if (entity.getProperty(property) != current->getProperty(property)) { + if (entity.getProperty(property) != current.getProperty(property)) { SinkTrace() << "Property changed " << sinkId << property; changed = true; } } if (changed) { - SinkTrace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); + SinkTrace() << "Found a modified entity: " << sinkId; + modifyEntity(sinkId, store.maxRevision(), bufferType, entity); } - } else { - SinkWarning() << "Failed to get current entity"; - } + }); +} + +void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +{ + const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); + Storage::EntityStore store(mResourceContext); + modifyIfChanged(store, bufferType, sinkId, entity); } void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { SinkTrace() << "Create or modify" << bufferType << remoteId; - auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); + Storage::EntityStore store(mResourceContext); const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); - const auto found = mainDatabase.contains(sinkId); + const auto found = store.contains(bufferType, sinkId); if (!found) { SinkTrace() << "Found a new entity: " << remoteId; - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); - Q_ASSERT(adaptorFactory); - createEntity( - sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); + createEntity(sinkId, bufferType, entity); } else { // modification modify(bufferType, remoteId, entity); } @@ -190,10 +191,9 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray { SinkTrace() << "Create or modify" << bufferType << remoteId; - auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); - const auto found = mainDatabase.contains(sinkId); - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); + Storage::EntityStore store(mResourceContext); + const auto found = store.contains(bufferType, sinkId); if (!found) { if (!mergeCriteria.isEmpty()) { Sink::Query query; @@ -201,7 +201,8 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray query.filter(it.key(), it.value()); } bool merge = false; - Sink::EntityReader reader(mResourceType, mResourceInstanceIdentifier, transaction()); + Storage::EntityStore store(mResourceContext); + Sink::EntityReader reader(store); reader.query(query, [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ merge = true; @@ -211,43 +212,21 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray }); if (!merge) { SinkTrace() << "Found a new entity: " << remoteId; - createEntity( - sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); + createEntity(sinkId, bufferType, entity); } } else { SinkTrace() << "Found a new entity: " << remoteId; - createEntity( - sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); + createEntity(sinkId, bufferType, entity); } } else { // modification - qint64 retrievedRevision = 0; - if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { - bool changed = false; - for (const auto &property : entity.changedProperties()) { - if (entity.getProperty(property) != current->getProperty(property)) { - SinkTrace() << "Property changed " << sinkId << property; - changed = true; - } - } - if (changed) { - SinkTrace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); - } - } else { - SinkWarning() << "Failed to get current entity"; - } + modifyIfChanged(store, bufferType, sinkId, entity); } } template void Synchronizer::modify(const DomainType &entity) { - const auto bufferType = ApplicationDomain::getTypeName(); - const auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); - Q_ASSERT(adaptorFactory); - modifyEntity(entity.identifier(), entity.revision(), bufferType, entity, *adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); + modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity); } KAsync::Job Synchronizer::synchronize() @@ -257,7 +236,6 @@ KAsync::Job Synchronizer::synchronize() mMessageQueue->startTransaction(); return synchronizeWithSource().syncThen([this]() { mSyncStore.clear(); - mEntityStore.clear(); mMessageQueue->commit(); mSyncInProgress = false; }); @@ -266,8 +244,7 @@ KAsync::Job Synchronizer::synchronize() void Synchronizer::commit() { mMessageQueue->commit(); - mTransaction.abort(); - mEntityStore.clear(); + mEntityStore->abortTransaction(); mSyncTransaction.commit(); mSyncStore.clear(); if (mSyncInProgress) { @@ -275,20 +252,11 @@ void Synchronizer::commit() } } -Sink::Storage::Transaction &Synchronizer::transaction() -{ - if (!mTransaction) { - SinkTrace() << "Starting transaction"; - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - } - return mTransaction; -} - -Sink::Storage::Transaction &Synchronizer::syncTransaction() +Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() { if (!mSyncTransaction) { SinkTrace() << "Starting transaction"; - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); } return mSyncTransaction; } -- cgit v1.2.3