From ba7c8b890c45d735216888204ec88882ef58c918 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 19 Oct 2016 15:28:42 +0200 Subject: Ported the pipeline to the entitystore --- common/storage/entitystore.cpp | 185 +++++++++++++++++++++++++++++++++++++++++ common/storage/entitystore.h | 11 ++- 2 files changed, 193 insertions(+), 3 deletions(-) (limited to 'common/storage') diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index fe63f0b..30c7a71 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -25,6 +25,8 @@ #include "definitions.h" #include "resourcecontext.h" #include "index.h" +#include "bufferutils.h" +#include "entity_generated.h" #include "mail.h" #include "folder.h" @@ -108,16 +110,199 @@ void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMo void EntityStore::commitTransaction() { + SinkTrace() << "Committing transaction"; d->transaction.commit(); d->transaction = Storage::DataStore::Transaction(); } void EntityStore::abortTransaction() { + SinkTrace() << "Aborting transaction"; d->transaction.abort(); d->transaction = Storage::DataStore::Transaction(); } +bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) +{ + if (entity_.identifier().isEmpty()) { + SinkWarning() << "Can't write entity with an empty identifier"; + return false; + } + + auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(entity_, entity_.availableProperties()); + entity.setChangedProperties(entity.availableProperties().toSet()); + + preprocess(entity); + d->typeIndex(type).add(entity.identifier(), entity, d->transaction); + + //The maxRevision may have changed meanwhile if the entity created sub-entities + const qint64 newRevision = maxRevision() + 1; + + // Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_operation(Operation_Creation); + metadataBuilder.add_replayToSource(replayToSource); + auto metadataBuffer = metadataBuilder.Finish(); + FinishMetadataBuffer(metadataFbb, metadataBuffer); + + flatbuffers::FlatBufferBuilder fbb; + d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + + DataStore::mainDatabase(d->transaction, type) + .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), + [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); + DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); + SinkTrace() << "Wrote entity: " << entity.identifier() << type << newRevision; + return true; +} + +bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) +{ + auto changeset = diff.changedProperties(); + //TODO handle errors + const auto current = readLatest(type, diff.identifier()); + if (current.identifier().isEmpty()) { + SinkWarning() << "Failed to read current version: " << diff.identifier(); + return false; + } + + auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(current, current.availableProperties()); + + // Apply diff + //SinkTrace() << "Applying changed properties: " << changeset; + for (const auto &property : changeset) { + const auto value = diff.getProperty(property); + if (value.isValid()) { + //SinkTrace() << "Setting property: " << property; + newEntity.setProperty(property, value); + } + } + + // Remove deletions + for (const auto property : deletions) { + //SinkTrace() << "Removing property: " << property; + newEntity.setProperty(property, QVariant()); + } + + preprocess(current, newEntity); + d->typeIndex(type).remove(current.identifier(), current, d->transaction); + d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); + + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; + + // Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + { + //We add availableProperties to account for the properties that have been changed by the preprocessors + auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); + auto metadataBuilder = MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_operation(Operation_Modification); + metadataBuilder.add_replayToSource(replayToSource); + metadataBuilder.add_modifiedProperties(modifiedProperties); + auto metadataBuffer = metadataBuilder.Finish(); + FinishMetadataBuffer(metadataFbb, metadataBuffer); + } + + newEntity.setChangedProperties(newEntity.availableProperties().toSet()); + SinkTrace() << "All properties: " << newEntity.availableProperties(); + + flatbuffers::FlatBufferBuilder fbb; + d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + + DataStore::mainDatabase(d->transaction, type) + .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), + [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); + DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); + SinkTrace() << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; + return true; +} + +bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) +{ + bool found = false; + bool alreadyRemoved = false; + DataStore::mainDatabase(d->transaction, type) + .findLatest(uid, + [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { + auto entity = GetEntity(data.data()); + if (entity && entity->metadata()) { + auto metadata = GetMetadata(entity->metadata()->Data()); + found = true; + if (metadata->operation() == Operation_Removal) { + alreadyRemoved = true; + } + } + return false; + }, + [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); + + if (!found) { + SinkWarning() << "Failed to find entity " << uid; + return false; + } + if (alreadyRemoved) { + SinkWarning() << "Entity is already removed " << uid; + return false; + } + + const auto current = readLatest(type, uid); + preprocess(current); + d->typeIndex(type).remove(current.identifier(), current, d->transaction); + + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; + + // Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_operation(Operation_Removal); + metadataBuilder.add_replayToSource(replayToSource); + auto metadataBuffer = metadataBuilder.Finish(); + FinishMetadataBuffer(metadataFbb, metadataBuffer); + + flatbuffers::FlatBufferBuilder fbb; + EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); + + DataStore::mainDatabase(d->transaction, type) + .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), + [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); + DataStore::recordRevision(d->transaction, newRevision, uid, type); + return true; +} + +void EntityStore::cleanupRevision(qint64 revision) +{ + const auto uid = DataStore::getUidFromRevision(d->transaction, revision); + const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); + SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; + DataStore::mainDatabase(d->transaction, bufferType) + .scan(uid, + [&](const QByteArray &key, const QByteArray &data) -> bool { + EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + SinkWarning() << "Read invalid buffer from disk"; + } else { + const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); + const qint64 rev = metadata->revision(); + // Remove old revisions, and the current if the entity has already been removed + if (rev < revision || metadata->operation() == Operation_Removal) { + DataStore::removeRevision(d->transaction, rev); + DataStore::mainDatabase(d->transaction, bufferType).remove(key); + } + } + + return true; + }, + [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); + DataStore::setCleanedUpRevision(d->transaction, revision); +} + QVector EntityStore::fullScan(const QByteArray &type) { SinkTrace() << "Looking for : " << type; diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 455e9c3..65bff50 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h @@ -38,9 +38,14 @@ public: typedef QSharedPointer Ptr; EntityStore(const ResourceContext &resourceContext); - void add(const ApplicationDomain::ApplicationDomainType &); - void modify(const ApplicationDomain::ApplicationDomainType &); - void remove(const ApplicationDomain::ApplicationDomainType &); + typedef std::function PreprocessModification; + typedef std::function PreprocessCreation; + typedef std::function PreprocessRemoval; + + bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); + bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); + bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); + void cleanupRevision(qint64 revision); void startTransaction(Sink::Storage::DataStore::AccessMode); void commitTransaction(); -- cgit v1.2.3