From fdeb92ca128e6eb51bbcace0b47519b12a08ce93 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 8 May 2016 10:32:01 +0200 Subject: Run preprocessors before persising the value. And allow preprocessors to modify the result. --- common/domainadaptor.h | 9 +++++ common/domaintypeadaptorfactoryinterface.h | 1 + common/indexupdater.h | 8 ++--- common/pipeline.cpp | 58 ++++++++++-------------------- common/pipeline.h | 4 +-- 5 files changed, 34 insertions(+), 46 deletions(-) (limited to 'common') diff --git a/common/domainadaptor.h b/common/domainadaptor.h index 99afb60..c620f91 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h @@ -178,6 +178,15 @@ public: Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); } + virtual void createBuffer(const QSharedPointer &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE + { + //TODO rewrite the unterlying functions so we don't have to wrap the bufferAdaptor + auto newObject = Sink::ApplicationDomain::ApplicationDomainType("", "", 0, bufferAdaptor); + //Serialize all properties + newObject.setChangedProperties(bufferAdaptor->availableProperties().toSet()); + createBuffer(newObject, fbb, metadataData, metadataSize); + } + protected: QSharedPointer> mLocalMapper; diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h index 72aa9b9..b498796 100644 --- a/common/domaintypeadaptorfactoryinterface.h +++ b/common/domaintypeadaptorfactoryinterface.h @@ -46,4 +46,5 @@ public: */ virtual void createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; + virtual void createBuffer(const QSharedPointer &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; }; diff --git a/common/indexupdater.h b/common/indexupdater.h index deaaa16..936d03a 100644 --- a/common/indexupdater.h +++ b/common/indexupdater.h @@ -28,12 +28,12 @@ public: { } - void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE { add(newEntity.getProperty(mProperty), uid, transaction); } - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, + void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE { remove(oldEntity.getProperty(mProperty), uid, transaction); @@ -68,12 +68,12 @@ template class DefaultIndexUpdater : public Sink::Preprocessor { public: - void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE { Sink::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); } - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, + void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE { Sink::ApplicationDomain::TypeImplementation::removeIndex(uid, oldEntity, transaction); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 65a2f5b..637a1b8 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -189,31 +189,23 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); - flatbuffers::FlatBufferBuilder fbb; - EntityBuffer::assembleEntityBuffer( - fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); - - d->storeNewRevision(newRevision, fbb, bufferType, key); - auto adaptorFactory = d->adaptorFactory.value(bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); } + auto adaptor = adaptorFactory->createAdaptor(*entity); + auto memoryAdaptor = QSharedPointer::create(*(adaptor), adaptor->availableProperties()); + for (auto processor : d->processors[bufferType]) { + processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction); + } + flatbuffers::FlatBufferBuilder fbb; + adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + + d->storeNewRevision(newRevision, fbb, bufferType, key); + Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; - Storage::mainDatabase(d->transaction, bufferType) - .scan(Storage::assembleKey(key, newRevision), - [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { - auto entity = GetEntity(value); - Q_ASSERT(entity->resource() || entity->local()); - auto adaptor = adaptorFactory->createAdaptor(*entity); - for (auto processor : d->processors[bufferType]) { - processor->newEntity(key, newRevision, *adaptor, d->transaction); - } - return false; - }, - [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); return KAsync::start([newRevision]() { return newRevision; }); } @@ -281,9 +273,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) return KAsync::error(0); } - // resource and uid don't matter at this point - const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); - auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(existingObject); + auto newAdaptor = QSharedPointer::create(*(current), current->availableProperties()); // Apply diff // FIXME only apply the properties that are available in the buffer @@ -293,19 +283,21 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) changeset << property; const auto value = diff->getProperty(property); if (value.isValid()) { - newObject->setProperty(property, value); + newAdaptor->setProperty(property, value); } } - // Altough we only set some properties, we want all to be serialized - newObject->setChangedProperties(changeset); // Remove deletions if (modifyEntity->deletions()) { for (const auto &property : *modifyEntity->deletions()) { - newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); + newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant()); } } + for (auto processor : d->processors[bufferType]) { + processor->modifiedEntity(key, newRevision, *current, *newAdaptor, d->transaction); + } + // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = MetadataBuilder(metadataFbb); @@ -316,24 +308,10 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) FinishMetadataBuffer(metadataFbb, metadataBuffer); flatbuffers::FlatBufferBuilder fbb; - adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); d->storeNewRevision(newRevision, fbb, bufferType, key); Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; - Storage::mainDatabase(d->transaction, bufferType) - .scan(Storage::assembleKey(key, newRevision), - [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { - if (value.isEmpty()) { - ErrorMsg() << "Read buffer is empty."; - } - auto entity = GetEntity(value.data()); - auto newEntity = adaptorFactory->createAdaptor(*entity); - for (auto processor : d->processors[bufferType]) { - processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); - } - return false; - }, - [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); return KAsync::start([newRevision]() { return newRevision; }); } diff --git a/common/pipeline.h b/common/pipeline.h index dc2cc4d..c65cbfd 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -83,9 +83,9 @@ public: virtual ~Preprocessor(); virtual void startBatch(); - virtual void newEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; + virtual void newEntity(const QByteArray &key, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, - const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; + Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) = 0; virtual void finalize(); -- cgit v1.2.3