From 5bf7ded65ef517fac6b088342d195392bc09be4c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 11 Apr 2017 15:41:20 +0200 Subject: Moved all preprocessing back into the pipeline --- common/pipeline.cpp | 25 ++++++++-------- common/storage/entitystore.cpp | 66 ++++++++++++++++++++++-------------------- common/storage/entitystore.h | 12 ++++---- 3 files changed, 54 insertions(+), 49 deletions(-) (limited to 'common') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 15ed5fc..91437d4 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -175,13 +175,14 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; o.setChangedProperties(o.availableProperties().toSet()); - auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { - foreach (const auto &processor, d->processors[bufferType]) { - processor->newEntity(newEntity); - } - }; + auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(o, o.availableProperties()); + newEntity.setChangedProperties(newEntity.availableProperties().toSet()); + + foreach (const auto &processor, d->processors[bufferType]) { + processor->newEntity(newEntity); + } - if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { + if (!d->entityStore.add(bufferType, o, replayToSource)) { return KAsync::error(0); } @@ -323,14 +324,14 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; - auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { - foreach (const auto &processor, d->processors[bufferType]) { - processor->deletedEntity(oldEntity); - } - }; + const auto current = d->entityStore.readLatest(bufferType, key); + + foreach (const auto &processor, d->processors[bufferType]) { + processor->deletedEntity(current); + } d->revisionChanged = true; - if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { + if (!d->entityStore.remove(bufferType, current, replayToSource)) { return KAsync::error(0); } diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 3ef8784..b7309ab 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -167,19 +167,15 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi } } -bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) +bool EntityStore::add(const QByteArray &type, ApplicationDomain::ApplicationDomainType entity, bool replayToSource) { - if (entity_.identifier().isEmpty()) { + if (entity.identifier().isEmpty()) { SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; return false; } - auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(entity_, entity_.availableProperties()); - entity.setChangedProperties(entity.availableProperties().toSet()); - SinkTraceCtx(d->logCtx) << "New entity " << entity; - preprocess(entity); d->typeIndex(type).add(entity.identifier(), entity, d->transaction); //The maxRevision may have changed meanwhile if the entity created sub-entities @@ -285,36 +281,14 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic return true; } -bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) +bool EntityStore::remove(const QByteArray &type, const Sink::ApplicationDomain::ApplicationDomainType ¤t, bool replayToSource) { - bool found = false; - bool alreadyRemoved = false; - DataStore::mainDatabase(d->transaction, type) - .findLatest(uid, - [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { - auto entity = GetEntity(data.data()); - if (entity && entity->metadata()) { - auto metadata = GetMetadata(entity->metadata()->Data()); - found = true; - if (metadata->operation() == Operation_Removal) { - alreadyRemoved = true; - } - } - return false; - }, - [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); - - if (!found) { - SinkWarningCtx(d->logCtx) << "Remove: Failed to find entity " << uid; - return false; - } - if (alreadyRemoved) { + const auto uid = current.identifier(); + if (!exists(type, uid)) { SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; return false; } - const auto current = readLatest(type, uid); - preprocess(current); d->typeIndex(type).remove(current.identifier(), current, d->transaction); SinkTraceCtx(d->logCtx) << "Removed entity " << current; @@ -601,6 +575,36 @@ bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); } +bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) +{ + bool found = false; + bool alreadyRemoved = false; + DataStore::mainDatabase(d->transaction, type) + .findLatest(uid, + [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { + auto entity = GetEntity(data.data()); + if (entity && entity->metadata()) { + auto metadata = GetMetadata(entity->metadata()->Data()); + found = true; + if (metadata->operation() == Operation_Removal) { + alreadyRemoved = true; + } + } + return false; + }, + [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); + if (!found) { + SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid; + return false; + } + if (alreadyRemoved) { + SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid; + return false; + } + return true; +} + + qint64 EntityStore::maxRevision() { if (!d->exists()) { diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index ddb4ef9..00241f2 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h @@ -38,15 +38,11 @@ public: typedef QSharedPointer Ptr; EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); - typedef std::function PreprocessModification; - typedef std::function PreprocessCreation; - typedef std::function PreprocessRemoval; - //Only the pipeline may call the following functions outside of tests - bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); + bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource); bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); - bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); + bool remove(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, bool replayToSource); bool cleanupRevisions(qint64 revision); ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const; @@ -107,8 +103,12 @@ public: void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function &callback); + ///Db contains entity (but may already be marked as removed bool contains(const QByteArray &type, const QByteArray &uid); + ///Db contains entity and entity is not yet removed + bool exists(const QByteArray &type, const QByteArray &uid); + qint64 maxRevision(); Sink::Log::Context logContext() const; -- cgit v1.2.3