From f6c3c144e60611d2da7ba7aa5b115affe92a57a4 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 11 Apr 2017 15:16:26 +0200 Subject: Move the preprocssing back out of entitystore into the pipeline. This is where this really belongs, only the indexing is part of storage. This is necessary so preprocessors can move entities as well. --- common/pipeline.cpp | 93 +++++++++++++++++++----------------------- common/storage/entitystore.cpp | 34 +++++++++------ common/storage/entitystore.h | 4 +- 3 files changed, 68 insertions(+), 63 deletions(-) diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 7f836c4..15ed5fc 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -195,6 +195,11 @@ struct CreateHelper { } }; +static KAsync::Job create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity) +{ + return TypeHelper{type}.operator(), const ApplicationDomain::ApplicationDomainType&>(newEntity); +} + KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) { d->transactionItemCount++; @@ -248,64 +253,52 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) deletions = BufferUtils::fromVector(*modifyEntity->deletions()); } - if (modifyEntity->targetResource()) { - auto isMove = modifyEntity->removeEntity(); - auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); - auto changeset = diff.changedProperties(); - const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); - if (current.identifier().isEmpty()) { - SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); - return KAsync::error(0); - } + const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); + if (current.identifier().isEmpty()) { + SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); + return KAsync::error(0); + } - auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy(current, current.availableProperties()); + auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); - // Apply diff - for (const auto &property : changeset) { - const auto value = diff.getProperty(property); - if (value.isValid()) { - newEntity.setProperty(property, value); - } - } + bool isMove = false; + if (modifyEntity->targetResource()) { + isMove = modifyEntity->removeEntity(); + newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource())); + } - // Remove deletions - for (const auto &property : deletions) { - newEntity.setProperty(property, QVariant()); - } - newEntity.setResource(targetResource); - newEntity.setChangedProperties(newEntity.availableProperties().toSet()); + foreach (const auto &processor, d->processors[bufferType]) { + processor->modifiedEntity(current, newEntity); + } - SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; - auto job = TypeHelper{bufferType}.operator(), ApplicationDomain::ApplicationDomainType&>(newEntity); - job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { - if (!error) { - SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; - if (isMove) { - flatbuffers::FlatBufferBuilder fbb; - auto entityId = fbb.CreateString(current.identifier()); - auto type = fbb.CreateString(bufferType); - auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); - Sink::Commands::FinishDeleteEntityBuffer(fbb, location); - const auto data = BufferUtils::extractBuffer(fbb); - deletedEntity(data, data.size()).exec(); + //The entity is either being copied or moved + if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) { + SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier(); + newEntity.setChangedProperties(newEntity.availableProperties().toSet()); + return create(bufferType, newEntity) + .then([=](const KAsync::Error &error) { + if (!error) { + SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; + if (isMove) { + flatbuffers::FlatBufferBuilder fbb; + auto entityId = fbb.CreateString(current.identifier()); + auto type = fbb.CreateString(bufferType); + auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); + Sink::Commands::FinishDeleteEntityBuffer(fbb, location); + const auto data = BufferUtils::extractBuffer(fbb); + deletedEntity(data, data.size()).exec(); + } + } else { + SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier(); } - } else { - SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); - } - }); - return job.then([this] { - return d->entityStore.maxRevision(); - }); + }) + .then([this] { + return d->entityStore.maxRevision(); + }); } - auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { - foreach (const auto &processor, d->processors[bufferType]) { - processor->modifiedEntity(oldEntity, newEntity); - } - }; - d->revisionChanged = true; - if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { + if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) { return KAsync::error(0); } diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 4afb407..3ef8784 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -209,22 +209,15 @@ bool EntityStore::add(const QByteArray &type, const ApplicationDomain::Applicati return true; } -bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) +ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const { - auto changeset = diff.changedProperties(); - const auto current = readLatest(type, diff.identifier()); - if (current.identifier().isEmpty()) { - SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); - return false; - } - auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(current, current.availableProperties()); SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; // Apply diff //SinkTrace() << "Applying changed properties: " << changeset; - for (const auto &property : changeset) { + for (const auto &property : diff.changedProperties()) { const auto value = diff.getProperty(property); if (value.isValid()) { //SinkTrace() << "Setting property: " << property; @@ -237,8 +230,25 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic //SinkTrace() << "Removing property: " << property; newEntity.setProperty(property, QVariant()); } + return newEntity; +} + +bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource) +{ + const auto current = readLatest(type, diff.identifier()); + if (current.identifier().isEmpty()) { + SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); + return false; + } + + auto newEntity = applyDiff(type, current, diff, deletions); + return modify(type, current, newEntity, replayToSource); +} + +bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource) +{ + SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; - preprocess(current, newEntity); d->typeIndex(type).remove(current.identifier(), current, d->transaction); d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); @@ -250,7 +260,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic flatbuffers::FlatBufferBuilder metadataFbb; { //We add availableProperties to account for the properties that have been changed by the preprocessors - auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); + auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties()); auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); metadataBuilder.add_operation(Operation_Modification); @@ -259,7 +269,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); } - SinkTraceCtx(d->logCtx) << "Changed properties: " << changeset + newEntity.changedProperties(); + SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties(); newEntity.setChangedProperties(newEntity.availableProperties().toSet()); diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 46410cd..ddb4ef9 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h @@ -44,9 +44,11 @@ public: //Only the pipeline may call the following functions outside of tests bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); - bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); + bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource); + bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); bool cleanupRevisions(qint64 revision); + ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const; void startTransaction(Sink::Storage::DataStore::AccessMode); void commitTransaction(); -- cgit v1.2.3