From ba7c8b890c45d735216888204ec88882ef58c918 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 19 Oct 2016 15:28:42 +0200 Subject: Ported the pipeline to the entitystore --- common/domain/event.cpp | 21 -- common/domain/event.h | 2 - common/domain/folder.cpp | 22 -- common/domain/folder.h | 2 - common/domain/mail.cpp | 13 - common/domain/mail.h | 2 - common/domainadaptor.h | 16 +- common/genericresource.cpp | 6 +- common/indexupdater.h | 91 ------- common/mailpreprocessor.cpp | 10 +- common/mailpreprocessor.h | 10 +- common/pipeline.cpp | 275 +++++---------------- common/pipeline.h | 44 ++-- common/specialpurposepreprocessor.cpp | 18 +- common/specialpurposepreprocessor.h | 8 +- common/storage/entitystore.cpp | 185 ++++++++++++++ common/storage/entitystore.h | 11 +- common/typeindex.cpp | 16 +- common/typeindex.h | 5 +- examples/dummyresource/resourcefactory.cpp | 7 +- examples/imapresource/imapresource.cpp | 5 +- examples/maildirresource/maildirresource.cpp | 33 ++- .../mailtransportresource.cpp | 3 +- tests/mailquerybenchmark.cpp | 5 - tests/pipelinebenchmark.cpp | 30 +-- tests/pipelinetest.cpp | 71 ++++-- 26 files changed, 378 insertions(+), 533 deletions(-) delete mode 100644 common/indexupdater.h diff --git a/common/domain/event.cpp b/common/domain/event.cpp index d50652d..41ec625 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -47,27 +47,6 @@ void TypeImplementation::configureIndex(TypeIndex &index) index.addProperty(Event::Uid::name); } -static TypeIndex &getIndex() -{ - QMutexLocker locker(&sMutex); - static TypeIndex *index = 0; - if (!index) { - index = new TypeIndex("event"); - TypeImplementation::configureIndex(*index); - } - return *index; -} - -void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) -{ - return getIndex().add(identifier, bufferAdaptor, transaction); -} - -void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) -{ - return getIndex().remove(identifier, bufferAdaptor, transaction); -} - QSharedPointer::Buffer> > TypeImplementation::initializeReadPropertyMapper() { auto propertyMapper = QSharedPointer >::create(); diff --git a/common/domain/event.h b/common/domain/event.h index 18e0f20..fbaf4ed 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -56,8 +56,6 @@ public: typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; static void configureIndex(TypeIndex &index); static QSet indexedProperties(); - static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); - static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 94727a3..058035a 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp @@ -50,28 +50,6 @@ void TypeImplementation::configureIndex(TypeIndex &index) index.addProperty(Folder::Name::name); } -static TypeIndex &getIndex() -{ - QMutexLocker locker(&sMutex); - static TypeIndex *index = 0; - if (!index) { - index = new TypeIndex("folder"); - TypeImplementation::configureIndex(*index); - } - return *index; -} - -void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) -{ - SinkTrace() << "Indexing " << identifier; - getIndex().add(identifier, bufferAdaptor, transaction); -} - -void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) -{ - getIndex().remove(identifier, bufferAdaptor, transaction); -} - QSharedPointer::Buffer> > TypeImplementation::initializeReadPropertyMapper() { auto propertyMapper = QSharedPointer >::create(); diff --git a/common/domain/folder.h b/common/domain/folder.h index ea0d79a..47a544b 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h @@ -50,8 +50,6 @@ public: typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; static void configureIndex(TypeIndex &index); static QSet indexedProperties(); - static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); - static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index b0a3aae..9d58767 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -170,19 +170,6 @@ static void updateThreadingIndex(const QByteArray &identifier, const BufferAdapt } } -void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) -{ - SinkTrace() << "Indexing " << identifier; - getIndex().add(identifier, bufferAdaptor, transaction); - updateThreadingIndex(identifier, bufferAdaptor, transaction); -} - -void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) -{ - getIndex().remove(identifier, bufferAdaptor, transaction); - //TODO cleanup threading index -} - QSharedPointer::Buffer> > TypeImplementation::initializeReadPropertyMapper() { auto propertyMapper = QSharedPointer >::create(); diff --git a/common/domain/mail.h b/common/domain/mail.h index 81a0d1c..c0cfc55 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h @@ -50,8 +50,6 @@ public: typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; static void configureIndex(TypeIndex &index); static QSet indexedProperties(); - static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); - static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/domainadaptor.h b/common/domainadaptor.h index 6a9d755..195d5ec 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h @@ -87,11 +87,11 @@ static void createBufferPartBuffer(const Sink::ApplicationDomain::ApplicationDom * A generic adaptor implementation that uses a property mapper to read/write values. */ template -class GenericBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor +class DatastoreBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor { SINK_DEBUG_AREA("bufferadaptor") public: - GenericBufferAdaptor() : BufferAdaptor() + DatastoreBufferAdaptor() : BufferAdaptor() { } @@ -148,18 +148,14 @@ public: /** * Creates an adaptor for the given domain and resource types. * - * This returns by default a GenericBufferAdaptor initialized with the corresponding property mappers. + * This returns by default a DatastoreBufferAdaptor initialized with the corresponding property mappers. */ virtual QSharedPointer createAdaptor(const Sink::Entity &entity) Q_DECL_OVERRIDE { - const auto resourceBuffer = Sink::EntityBuffer::readBuffer(entity.resource()); - const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); - // const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - - auto adaptor = QSharedPointer>::create(); - adaptor->mLocalBuffer = localBuffer; + auto adaptor = QSharedPointer>::create(); + adaptor->mLocalBuffer = Sink::EntityBuffer::readBuffer(entity.local()); adaptor->mLocalMapper = mLocalMapper; - adaptor->mResourceBuffer = resourceBuffer; + adaptor->mResourceBuffer = Sink::EntityBuffer::readBuffer(entity.resource()); adaptor->mResourceMapper = mResourceMapper; return adaptor; } diff --git a/common/genericresource.cpp b/common/genericresource.cpp index e0d395a..8f08f3d 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -59,9 +59,9 @@ class CommandProcessor : public QObject public: CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) { - mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { - SinkWarning() << error.message; - })); + mPipeline->startTransaction(); + mLowerBoundRevision = mPipeline->revision(); + mPipeline->commit(); for (auto queue : mCommandQueues) { const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); diff --git a/common/indexupdater.h b/common/indexupdater.h deleted file mode 100644 index 221a4ed..0000000 --- a/common/indexupdater.h +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (C) 2015 Christian Mollekopf - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ -#pragma once - -#include -#include - -class IndexUpdater : public Sink::Preprocessor -{ -public: - IndexUpdater(const QByteArray &index, const QByteArray &type, const QByteArray &property) : mIndexIdentifier(index), mBufferType(type), mProperty(property) - { - } - - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE - { - add(newEntity.getProperty(mProperty), uid, transaction); - } - - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE - { - remove(oldEntity.getProperty(mProperty), uid, transaction); - add(newEntity.getProperty(mProperty), uid, transaction); - } - - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE - { - remove(oldEntity.getProperty(mProperty), uid, transaction); - } - -private: - void add(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction) - { - if (value.isValid()) { - Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid); - } - } - - void remove(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction) - { - if (value.isValid()) { - const auto data = value.toByteArray(); - if (!data.isEmpty()) { - Index(mIndexIdentifier, transaction).remove(data, uid); - } - } - } - - QByteArray mIndexIdentifier; - QByteArray mBufferType; - QByteArray mProperty; -}; - -template -class DefaultIndexUpdater : public Sink::Preprocessor -{ -public: - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE - { - Sink::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); - } - - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE - { - Sink::ApplicationDomain::TypeImplementation::removeIndex(uid, oldEntity, transaction); - Sink::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); - } - - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE - { - Sink::ApplicationDomain::TypeImplementation::removeIndex(uid, oldEntity, transaction); - } -}; diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp index b978323..d45afe6 100644 --- a/common/mailpreprocessor.cpp +++ b/common/mailpreprocessor.cpp @@ -116,7 +116,7 @@ static void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, KMime: } } -void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) +void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail) { MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); auto msg = mimeMessageReader.mimeMessage(); @@ -125,7 +125,7 @@ void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink: } } -void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) +void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) { MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); auto msg = mimeMessageReader.mimeMessage(); @@ -161,21 +161,21 @@ QString MimeMessageMover::moveMessage(const QString &oldPath, const Sink::Applic return oldPath; } -void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) +void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail) { if (!mail.getMimeMessagePath().isEmpty()) { mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail)); } } -void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) +void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) { if (!newMail.getMimeMessagePath().isEmpty()) { newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail)); } } -void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) +void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail) { QFile::remove(mail.getMimeMessagePath()); } diff --git a/common/mailpreprocessor.h b/common/mailpreprocessor.h index c66517e..f979f22 100644 --- a/common/mailpreprocessor.h +++ b/common/mailpreprocessor.h @@ -24,8 +24,8 @@ class SINK_EXPORT MailPropertyExtractor : public Sink::EntityPreprocessor>> processors; bool revisionChanged; - void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); QTime transactionTime; int transactionItemCount; }; -void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) -{ - SinkTrace() << "Committing new revision: " << uid << newRevision; - DataStore::mainDatabase(transaction, bufferType) - .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), - [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); - revisionChanged = true; - DataStore::setMaxRevision(transaction, newRevision); - DataStore::recordRevision(transaction, newRevision, uid, bufferType); -} - Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) { @@ -78,7 +66,6 @@ Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Pri Pipeline::~Pipeline() { - d->transaction = DataStore::Transaction(); } void Pipeline::setPreprocessors(const QString &entityType, const QVector &processors) @@ -98,27 +85,10 @@ void Pipeline::startTransaction() // for (auto processor : d->processors[bufferType]) { // processor->startBatch(); // } - if (d->transaction) { - return; - } SinkTrace() << "Starting transaction."; d->transactionTime.start(); d->transactionItemCount = 0; - d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { - SinkWarning() << error.message; - }); - - //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. - //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). - //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... - if (d->storage.exists()) { - while (!d->transaction.validateNamedDatabases()) { - SinkWarning() << "Opened an invalid transaction!!!!!!"; - d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { - SinkWarning() << error.message; - }); - } - } + d->entityStore.startTransaction(DataStore::ReadWrite); } void Pipeline::commit() @@ -129,34 +99,20 @@ void Pipeline::commit() // processor->finalize(); // } if (!d->revisionChanged) { - d->transaction.abort(); - d->transaction = DataStore::Transaction(); + d->entityStore.abortTransaction(); return; } - const auto revision = DataStore::maxRevision(d->transaction); + const auto revision = d->entityStore.maxRevision(); const auto elapsed = d->transactionTime.elapsed(); SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; - if (d->transaction) { - d->transaction.commit(); - } - d->transaction = DataStore::Transaction(); + d->entityStore.commitTransaction(); if (d->revisionChanged) { d->revisionChanged = false; emit revisionUpdated(revision); } } -DataStore::Transaction &Pipeline::transaction() -{ - return d->transaction; -} - -DataStore &Pipeline::storage() const -{ - return d->storage; -} - KAsync::Job Pipeline::newEntity(void const *command, size_t size) { d->transactionItemCount++; @@ -175,7 +131,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) QByteArray key; if (createEntity->entityId()) { key = QByteArray(reinterpret_cast(createEntity->entityId()->Data()), createEntity->entityId()->size()); - if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { + if (d->entityStore.contains(bufferType, key)) { SinkError() << "An entity with this id already exists: " << key; return KAsync::error(0); } @@ -208,29 +164,23 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) auto adaptor = adaptorFactory->createAdaptor(*entity); auto memoryAdaptor = QSharedPointer::create(*(adaptor), adaptor->availableProperties()); - foreach (const auto &processor, d->processors[bufferType]) { - processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); - } - //The maxRevision may have changed meanwhile if the entity created sub-entities - const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; - - // Add metadata buffer - flatbuffers::FlatBufferBuilder metadataFbb; - auto metadataBuilder = MetadataBuilder(metadataFbb); - metadataBuilder.add_revision(newRevision); - metadataBuilder.add_operation(Operation_Creation); - metadataBuilder.add_replayToSource(replayToSource); - auto metadataBuffer = metadataBuilder.Finish(); - FinishMetadataBuffer(metadataFbb, metadataBuffer); - flatbuffers::FlatBufferBuilder fbb; - adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + d->revisionChanged = true; + auto revision = d->entityStore.maxRevision(); + auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; + o.setChangedProperties(o.availableProperties().toSet()); - d->storeNewRevision(newRevision, fbb, bufferType, key); + auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { + foreach (const auto &processor, d->processors[bufferType]) { + processor->newEntity(newEntity); + } + }; - //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) + if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { + return KAsync::error(0); + } - return KAsync::value(newRevision); + return KAsync::value(d->entityStore.maxRevision()); } KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) @@ -254,6 +204,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } const qint64 baseRevision = modifyEntity->revision(); const bool replayToSource = modifyEntity->replayToSource(); + const QByteArray bufferType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; @@ -269,7 +220,6 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } } - // TODO use only readPropertyMapper and writePropertyMapper auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarning() << "no adaptor factory for type " << bufferType; @@ -278,72 +228,26 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diffEntity = GetEntity(modifyEntity->delta()->Data()); Q_ASSERT(diffEntity); - auto diff = adaptorFactory->createAdaptor(*diffEntity); - - QSharedPointer current; - DataStore::mainDatabase(d->transaction, bufferType) - .findLatest(key, - [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { - EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - SinkWarning() << "Read invalid buffer from disk"; - } else { - current = adaptorFactory->createAdaptor(buffer.entity()); - } - return false; - }, - [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); - - if (!current) { - SinkWarning() << "Failed to read local value " << key; - return KAsync::error(0); - } - - auto newAdaptor = QSharedPointer::create(*(current), current->availableProperties()); - - // Apply diff - // FIXME only apply the properties that are available in the buffer - SinkTrace() << "Applying changed properties: " << changeset; - for (const auto &property : changeset) { - const auto value = diff->getProperty(property); - if (value.isValid()) { - newAdaptor->setProperty(property, value); - } - } + Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)}; + diff.setChangedProperties(changeset.toSet()); - // Remove deletions + QByteArrayList deletions; if (modifyEntity->deletions()) { - for (const flatbuffers::String *property : *modifyEntity->deletions()) { - newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant()); - } + deletions = BufferUtils::fromVector(*modifyEntity->deletions()); } - newAdaptor->resetChangedProperties(); - foreach (const auto &processor, d->processors[bufferType]) { - processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); - } - //The maxRevision may have changed meanwhile if the entity created sub-entities - const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; + auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { + foreach (const auto &processor, d->processors[bufferType]) { + processor->modifiedEntity(oldEntity, newEntity); + } + }; - // Add metadata buffer - flatbuffers::FlatBufferBuilder metadataFbb; - { - //We add availableProperties to account for the properties that have been changed by the preprocessors - auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newAdaptor->changedProperties()); - auto metadataBuilder = MetadataBuilder(metadataFbb); - metadataBuilder.add_revision(newRevision); - metadataBuilder.add_operation(Operation_Modification); - metadataBuilder.add_replayToSource(replayToSource); - metadataBuilder.add_modifiedProperties(modifiedProperties); - auto metadataBuffer = metadataBuilder.Finish(); - FinishMetadataBuffer(metadataFbb, metadataBuffer); + d->revisionChanged = true; + if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { + return KAsync::error(0); } - flatbuffers::FlatBufferBuilder fbb; - adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); - - d->storeNewRevision(newRevision, fbb, bufferType, key); - return KAsync::value(newRevision); + return KAsync::value(d->entityStore.maxRevision()); } KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) @@ -364,106 +268,38 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; - bool found = false; - bool alreadyRemoved = false; - DataStore::mainDatabase(d->transaction, bufferType) - .findLatest(key, - [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { - auto entity = GetEntity(data.data()); - if (entity && entity->metadata()) { - auto metadata = GetMetadata(entity->metadata()->Data()); - found = true; - if (metadata->operation() == Operation_Removal) { - alreadyRemoved = true; - } - } - return false; - }, - [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); - - if (!found) { - SinkWarning() << "Failed to find entity " << key; - return KAsync::error(0); - } - if (alreadyRemoved) { - SinkWarning() << "Entity is already removed " << key; - return KAsync::error(0); - } - - const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; - - // Add metadata buffer - flatbuffers::FlatBufferBuilder metadataFbb; - auto metadataBuilder = MetadataBuilder(metadataFbb); - metadataBuilder.add_revision(newRevision); - metadataBuilder.add_operation(Operation_Removal); - metadataBuilder.add_replayToSource(replayToSource); - auto metadataBuffer = metadataBuilder.Finish(); - FinishMetadataBuffer(metadataFbb, metadataBuffer); - - flatbuffers::FlatBufferBuilder fbb; - EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); + auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { + foreach (const auto &processor, d->processors[bufferType]) { + processor->deletedEntity(oldEntity); + } + }; - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); - if (!adaptorFactory) { - SinkWarning() << "no adaptor factory for type " << bufferType; + d->revisionChanged = true; + if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { return KAsync::error(0); } - QSharedPointer current; - DataStore::mainDatabase(d->transaction, bufferType) - .findLatest(key, - [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { - EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - SinkWarning() << "Read invalid buffer from disk"; - } else { - current = adaptorFactory->createAdaptor(buffer.entity()); - } - return false; - }, - [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); - - d->storeNewRevision(newRevision, fbb, bufferType, key); - - foreach (const auto &processor, d->processors[bufferType]) { - processor->deletedEntity(key, newRevision, *current, d->transaction); - } - - return KAsync::value(newRevision); + return KAsync::value(d->entityStore.maxRevision()); } void Pipeline::cleanupRevision(qint64 revision) { + d->entityStore.cleanupRevision(revision); d->revisionChanged = true; - const auto uid = DataStore::getUidFromRevision(d->transaction, revision); - const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); - SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; - DataStore::mainDatabase(d->transaction, bufferType) - .scan(uid, - [&](const QByteArray &key, const QByteArray &data) -> bool { - EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - SinkWarning() << "Read invalid buffer from disk"; - } else { - const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); - const qint64 rev = metadata->revision(); - // Remove old revisions, and the current if the entity has already been removed - if (rev < revision || metadata->operation() == Operation_Removal) { - DataStore::removeRevision(d->transaction, rev); - DataStore::mainDatabase(d->transaction, bufferType).remove(key); - } - } - - return true; - }, - [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); - DataStore::setCleanedUpRevision(d->transaction, revision); } qint64 Pipeline::cleanedUpRevision() { - return DataStore::cleanedUpRevision(d->transaction); + /* return d->entityStore.cleanedUpRevision(); */ + /* return DataStore::cleanedUpRevision(d->transaction); */ + //FIXME Just move the whole cleanup revision iteration into the entitystore + return 0; +} + +qint64 Pipeline::revision() +{ + //FIXME Just move the whole cleanup revision iteration into the entitystore + return 0; } class Preprocessor::Private { @@ -492,7 +328,7 @@ void Preprocessor::startBatch() { } -void Preprocessor::finalize() +void Preprocessor::finalizeBatch() { } @@ -510,7 +346,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(entity.identifier()); - // This is the resource buffer type and not the domain type auto type = fbb.CreateString(typeName); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); diff --git a/common/pipeline.h b/common/pipeline.h index bf94017..0461507 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -31,7 +31,8 @@ #include -#include "domainadaptor.h" +#include +#include namespace Sink { @@ -45,16 +46,14 @@ public: Pipeline(const ResourceContext &context); ~Pipeline(); - Storage::DataStore &storage() const; - void setPreprocessors(const QString &entityType, const QVector &preprocessors); void startTransaction(); void commit(); - Storage::DataStore::Transaction &transaction(); KAsync::Job newEntity(void const *command, size_t size); KAsync::Job modifiedEntity(void const *command, size_t size); KAsync::Job deletedEntity(void const *command, size_t size); + /* * Cleans up a single revision. * @@ -66,6 +65,7 @@ public: * Returns the latest cleaned up revision. */ qint64 cleanedUpRevision(); + qint64 revision(); signals: void revisionUpdated(qint64); @@ -82,11 +82,10 @@ public: virtual ~Preprocessor(); virtual void startBatch(); - virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; - virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, - ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; - virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, Storage::DataStore::Transaction &transaction) {}; - virtual void finalize(); + virtual void newEntity(ApplicationDomain::ApplicationDomainType &newEntity) {}; + virtual void modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {}; + virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) {}; + virtual void finalizeBatch(); void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *); @@ -110,27 +109,28 @@ template class SINK_EXPORT EntityPreprocessor: public Preprocessor { public: - virtual void newEntity(DomainType &, Storage::DataStore::Transaction &transaction) {}; - virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity, Storage::DataStore::Transaction &transaction) {}; - virtual void deletedEntity(const DomainType &oldEntity, Storage::DataStore::Transaction &transaction) {}; + virtual void newEntity(DomainType &) {}; + virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity) {}; + virtual void deletedEntity(const DomainType &oldEntity) {}; private: - static void nullDeleter(ApplicationDomain::BufferAdaptor *) {} - virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + virtual void newEntity(ApplicationDomain::ApplicationDomainType &newEntity_) Q_DECL_OVERRIDE { - auto o = DomainType("", uid, revision, QSharedPointer(&bufferAdaptor, nullDeleter)); - newEntity(o, transaction); + //Modifications still work due to the underlying shared adaptor + auto newEntityCopy = DomainType(newEntity_); + newEntity(newEntityCopy); } - virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, - ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + virtual void modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity_) Q_DECL_OVERRIDE { - auto o = DomainType("", uid, revision, QSharedPointer(&bufferAdaptor, nullDeleter)); - modifiedEntity(DomainType("", uid, 0, QSharedPointer(const_cast(&oldEntity), nullDeleter)), o, transaction); + //Modifications still work due to the underlying shared adaptor + auto newEntityCopy = DomainType(newEntity_); + modifiedEntity(DomainType(oldEntity), newEntityCopy); } - virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + + virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE { - deletedEntity(DomainType("", uid, revision, QSharedPointer(const_cast(&bufferAdaptor), nullDeleter)), transaction); + deletedEntity(DomainType(oldEntity)); } }; diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp index 0fd8e34..920f78a 100644 --- a/common/specialpurposepreprocessor.cpp +++ b/common/specialpurposepreprocessor.cpp @@ -46,11 +46,11 @@ QByteArray getSpecialPurposeType(const QString &name) SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {} -QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose) +QByteArray SpecialPurposeProcessor::ensureFolder(const QByteArray &specialPurpose) { /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */ /* //Try to find an existing drafts folder */ - /* Sink::EntityReader reader(mResourceType, mResourceInstanceIdentifier, transaction); */ + /* Sink::EntityReader reader(mResourceType, mResourceInstanceIdentifier); */ /* reader.query(Sink::Query().filter(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */ /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */ /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */ @@ -70,23 +70,23 @@ QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Trans return mSpecialPurposeFolders.value(specialPurpose); } -void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) +void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::ApplicationDomainType &newEntity) { if (newEntity.getProperty("trash").toBool()) { - newEntity.setProperty("folder", ensureFolder(transaction, "trash")); + newEntity.setProperty("folder", ensureFolder("trash")); return; } if (newEntity.getProperty("draft").toBool()) { - newEntity.setProperty("folder", ensureFolder(transaction, "drafts")); + newEntity.setProperty("folder", ensureFolder("drafts")); } } -void SpecialPurposeProcessor::newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) +void SpecialPurposeProcessor::newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) { - moveToFolder(newEntity, transaction); + moveToFolder(newEntity); } -void SpecialPurposeProcessor::modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) +void SpecialPurposeProcessor::modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) { - moveToFolder(newEntity, transaction); + moveToFolder(newEntity); } diff --git a/common/specialpurposepreprocessor.h b/common/specialpurposepreprocessor.h index 8b2d9e9..f2aeb20 100644 --- a/common/specialpurposepreprocessor.h +++ b/common/specialpurposepreprocessor.h @@ -30,12 +30,12 @@ class SINK_EXPORT SpecialPurposeProcessor : public Sink::Preprocessor public: SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); - QByteArray ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose); + QByteArray ensureFolder(const QByteArray &specialPurpose); - void moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction); + void moveToFolder(Sink::ApplicationDomain::ApplicationDomainType &newEntity); - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; + void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE; + void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE; QHash mSpecialPurposeFolders; QByteArray mResourceType; diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index fe63f0b..30c7a71 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -25,6 +25,8 @@ #include "definitions.h" #include "resourcecontext.h" #include "index.h" +#include "bufferutils.h" +#include "entity_generated.h" #include "mail.h" #include "folder.h" @@ -108,16 +110,199 @@ void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMo void EntityStore::commitTransaction() { + SinkTrace() << "Committing transaction"; d->transaction.commit(); d->transaction = Storage::DataStore::Transaction(); } void EntityStore::abortTransaction() { + SinkTrace() << "Aborting transaction"; d->transaction.abort(); d->transaction = Storage::DataStore::Transaction(); } +bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) +{ + if (entity_.identifier().isEmpty()) { + SinkWarning() << "Can't write entity with an empty identifier"; + return false; + } + + auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(entity_, entity_.availableProperties()); + entity.setChangedProperties(entity.availableProperties().toSet()); + + preprocess(entity); + d->typeIndex(type).add(entity.identifier(), entity, d->transaction); + + //The maxRevision may have changed meanwhile if the entity created sub-entities + const qint64 newRevision = maxRevision() + 1; + + // Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_operation(Operation_Creation); + metadataBuilder.add_replayToSource(replayToSource); + auto metadataBuffer = metadataBuilder.Finish(); + FinishMetadataBuffer(metadataFbb, metadataBuffer); + + flatbuffers::FlatBufferBuilder fbb; + d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + + DataStore::mainDatabase(d->transaction, type) + .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), + [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); + DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); + SinkTrace() << "Wrote entity: " << entity.identifier() << type << newRevision; + return true; +} + +bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) +{ + auto changeset = diff.changedProperties(); + //TODO handle errors + const auto current = readLatest(type, diff.identifier()); + if (current.identifier().isEmpty()) { + SinkWarning() << "Failed to read current version: " << diff.identifier(); + return false; + } + + auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(current, current.availableProperties()); + + // Apply diff + //SinkTrace() << "Applying changed properties: " << changeset; + for (const auto &property : changeset) { + const auto value = diff.getProperty(property); + if (value.isValid()) { + //SinkTrace() << "Setting property: " << property; + newEntity.setProperty(property, value); + } + } + + // Remove deletions + for (const auto property : deletions) { + //SinkTrace() << "Removing property: " << property; + newEntity.setProperty(property, QVariant()); + } + + preprocess(current, newEntity); + d->typeIndex(type).remove(current.identifier(), current, d->transaction); + d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); + + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; + + // Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + { + //We add availableProperties to account for the properties that have been changed by the preprocessors + auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); + auto metadataBuilder = MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_operation(Operation_Modification); + metadataBuilder.add_replayToSource(replayToSource); + metadataBuilder.add_modifiedProperties(modifiedProperties); + auto metadataBuffer = metadataBuilder.Finish(); + FinishMetadataBuffer(metadataFbb, metadataBuffer); + } + + newEntity.setChangedProperties(newEntity.availableProperties().toSet()); + SinkTrace() << "All properties: " << newEntity.availableProperties(); + + flatbuffers::FlatBufferBuilder fbb; + d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + + DataStore::mainDatabase(d->transaction, type) + .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), + [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); + DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); + SinkTrace() << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; + return true; +} + +bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) +{ + bool found = false; + bool alreadyRemoved = false; + DataStore::mainDatabase(d->transaction, type) + .findLatest(uid, + [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { + auto entity = GetEntity(data.data()); + if (entity && entity->metadata()) { + auto metadata = GetMetadata(entity->metadata()->Data()); + found = true; + if (metadata->operation() == Operation_Removal) { + alreadyRemoved = true; + } + } + return false; + }, + [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); + + if (!found) { + SinkWarning() << "Failed to find entity " << uid; + return false; + } + if (alreadyRemoved) { + SinkWarning() << "Entity is already removed " << uid; + return false; + } + + const auto current = readLatest(type, uid); + preprocess(current); + d->typeIndex(type).remove(current.identifier(), current, d->transaction); + + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; + + // Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_operation(Operation_Removal); + metadataBuilder.add_replayToSource(replayToSource); + auto metadataBuffer = metadataBuilder.Finish(); + FinishMetadataBuffer(metadataFbb, metadataBuffer); + + flatbuffers::FlatBufferBuilder fbb; + EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); + + DataStore::mainDatabase(d->transaction, type) + .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), + [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); + DataStore::recordRevision(d->transaction, newRevision, uid, type); + return true; +} + +void EntityStore::cleanupRevision(qint64 revision) +{ + const auto uid = DataStore::getUidFromRevision(d->transaction, revision); + const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); + SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; + DataStore::mainDatabase(d->transaction, bufferType) + .scan(uid, + [&](const QByteArray &key, const QByteArray &data) -> bool { + EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + SinkWarning() << "Read invalid buffer from disk"; + } else { + const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); + const qint64 rev = metadata->revision(); + // Remove old revisions, and the current if the entity has already been removed + if (rev < revision || metadata->operation() == Operation_Removal) { + DataStore::removeRevision(d->transaction, rev); + DataStore::mainDatabase(d->transaction, bufferType).remove(key); + } + } + + return true; + }, + [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); + DataStore::setCleanedUpRevision(d->transaction, revision); +} + QVector EntityStore::fullScan(const QByteArray &type) { SinkTrace() << "Looking for : " << type; diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 455e9c3..65bff50 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h @@ -38,9 +38,14 @@ public: typedef QSharedPointer Ptr; EntityStore(const ResourceContext &resourceContext); - void add(const ApplicationDomain::ApplicationDomainType &); - void modify(const ApplicationDomain::ApplicationDomainType &); - void remove(const ApplicationDomain::ApplicationDomainType &); + typedef std::function PreprocessModification; + typedef std::function PreprocessCreation; + typedef std::function PreprocessRemoval; + + bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); + bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); + bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); + void cleanupRevision(qint64 revision); void startTransaction(Sink::Storage::DataStore::AccessMode); void commitTransaction(); diff --git a/common/typeindex.cpp b/common/typeindex.cpp index 64c2a01..7920efc 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp @@ -108,30 +108,30 @@ void TypeIndex::addPropertyWithSorting(const QByteArray & mSortedProperties.insert(property, sortProperty); } -void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) +void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction) { for (const auto &property : mProperties) { - const auto value = bufferAdaptor.getProperty(property); + const auto value = entity.getProperty(property); auto indexer = mIndexer.value(property); indexer(identifier, value, transaction); } for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { - const auto value = bufferAdaptor.getProperty(it.key()); - const auto sortValue = bufferAdaptor.getProperty(it.value()); + const auto value = entity.getProperty(it.key()); + const auto sortValue = entity.getProperty(it.value()); auto indexer = mSortIndexer.value(it.key() + it.value()); indexer(identifier, value, sortValue, transaction); } } -void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) +void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction) { for (const auto &property : mProperties) { - const auto value = bufferAdaptor.getProperty(property); + const auto value = entity.getProperty(property); Index(indexName(property), transaction).remove(getByteArray(value), identifier); } for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { - const auto propertyValue = bufferAdaptor.getProperty(it.key()); - const auto sortValue = bufferAdaptor.getProperty(it.value()); + const auto propertyValue = entity.getProperty(it.key()); + const auto sortValue = entity.getProperty(it.value()); if (sortValue.type() == QVariant::DateTime) { Index(indexName(it.key(), it.value()), transaction).remove(propertyValue.toByteArray() + toSortableByteArray(sortValue.toDateTime()), identifier); } else { diff --git a/common/typeindex.h b/common/typeindex.h index 2638577..e11e673 100644 --- a/common/typeindex.h +++ b/common/typeindex.h @@ -19,7 +19,6 @@ #pragma once #include "resultset.h" -#include "bufferadaptor.h" #include "storage.h" #include "query.h" #include "log.h" @@ -52,8 +51,8 @@ public: { mSecondaryProperties.insert(Left::name, Right::name); } - void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); - void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); + void add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction); + void remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction); QVector query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction); QVector lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index e288be2..5513986 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -35,7 +35,6 @@ #include "dummystore.h" #include "definitions.h" #include "facadefactory.h" -#include "indexupdater.h" #include "adaptorfactoryregistry.h" #include "synchronizer.h" #include "mailpreprocessor.h" @@ -135,11 +134,11 @@ DummyResource::DummyResource(const Sink::ResourceContext &resourceContext, const setupSynchronizer(QSharedPointer::create(resourceContext)); setupChangereplay(QSharedPointer::create(resourceContext)); setupPreprocessors(ENTITY_TYPE_MAIL, - QVector() << new MailPropertyExtractor << new DefaultIndexUpdater); + QVector() << new MailPropertyExtractor); setupPreprocessors(ENTITY_TYPE_FOLDER, - QVector() << new DefaultIndexUpdater); + QVector()); setupPreprocessors(ENTITY_TYPE_EVENT, - QVector() << new DefaultIndexUpdater); + QVector()); } DummyResource::~DummyResource() diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index 90dae7a..9656a04 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp @@ -33,7 +33,6 @@ #include "domain/mail.h" #include "definitions.h" #include "facadefactory.h" -#include "indexupdater.h" #include "inspection.h" #include "synchronizer.h" #include "sourcewriteback.h" @@ -527,8 +526,8 @@ ImapResource::ImapResource(const ResourceContext &resourceContext, const QShared changereplay->mPassword = mPassword; setupChangereplay(changereplay); - setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor << new DefaultIndexUpdater); - setupPreprocessors(ENTITY_TYPE_FOLDER, QVector() << new DefaultIndexUpdater); + setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor); + setupPreprocessors(ENTITY_TYPE_FOLDER, QVector()); } void ImapResource::removeFromDisk(const QByteArray &instanceIdentifier) diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index b89d78c..920bd28 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp @@ -33,7 +33,6 @@ #include "domain/mail.h" #include "definitions.h" #include "facadefactory.h" -#include "indexupdater.h" #include "libmaildir/maildir.h" #include "inspection.h" #include "synchronizer.h" @@ -85,7 +84,7 @@ class MaildirMimeMessageMover : public Sink::Preprocessor public: MaildirMimeMessageMover(const QByteArray &resourceInstanceIdentifier, const QString &maildirPath) : mResourceInstanceIdentifier(resourceInstanceIdentifier), mMaildirPath(maildirPath) {} - QString getPath(const QByteArray &folderIdentifier, Sink::Storage::DataStore::Transaction &transaction) + QString getPath(const QByteArray &folderIdentifier) { if (folderIdentifier.isEmpty()) { return mMaildirPath; @@ -108,10 +107,10 @@ public: return folderPath; } - QString moveMessage(const QString &oldPath, const QByteArray &folder, Sink::Storage::DataStore::Transaction &transaction) + QString moveMessage(const QString &oldPath, const QByteArray &folder) { if (oldPath.startsWith(Sink::temporaryFileLocation())) { - const auto path = getPath(folder, transaction); + const auto path = getPath(folder); KPIM::Maildir maildir(path, false); if (!maildir.isValid(true)) { SinkWarning() << "Maildir is not existing: " << path; @@ -120,7 +119,7 @@ public: return path + "/" + identifier; } else { //Handle moves - const auto path = getPath(folder, transaction); + const auto path = getPath(folder); KPIM::Maildir maildir(path, false); if (!maildir.isValid(true)) { SinkWarning() << "Maildir is not existing: " << path; @@ -141,16 +140,15 @@ public: } } - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE { const auto mimeMessage = newEntity.getProperty("mimeMessage"); if (mimeMessage.isValid()) { - newEntity.setProperty("mimeMessage", moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray(), transaction)); + newEntity.setProperty("mimeMessage", moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray())); } } - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE { const auto mimeMessage = newEntity.getProperty("mimeMessage"); const auto newFolder = newEntity.getProperty("folder"); @@ -158,7 +156,7 @@ public: const bool folderChanged = newFolder.isValid() && newFolder.toString() != oldEntity.getProperty("mimeMessage").toString(); if (mimeMessageChanged || folderChanged) { SinkTrace() << "Moving mime message: " << mimeMessageChanged << folderChanged; - auto newPath = moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray(), transaction); + auto newPath = moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray()); if (newPath != oldEntity.getProperty("mimeMessage").toString()) { const auto oldPath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); newEntity.setProperty("mimeMessage", newPath); @@ -168,7 +166,7 @@ public: } auto mimeMessagePath = newEntity.getProperty("mimeMessage").toString(); - const auto maildirPath = getPath(newEntity.getProperty("folder").toByteArray(), transaction); + const auto maildirPath = getPath(newEntity.getProperty("folder").toByteArray()); KPIM::Maildir maildir(maildirPath, false); const auto file = getFilePathFromMimeMessagePath(mimeMessagePath); QString identifier = KPIM::Maildir::getKeyFromFile(file); @@ -185,7 +183,7 @@ public: maildir.changeEntryFlags(identifier, flags); } - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE { const auto filePath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); QFile::remove(filePath); @@ -199,7 +197,7 @@ class FolderPreprocessor : public Sink::Preprocessor public: FolderPreprocessor(const QString maildirPath) : mMaildirPath(maildirPath) {} - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE { auto folderName = newEntity.getProperty("name").toString(); const auto path = mMaildirPath + "/" + folderName; @@ -207,12 +205,11 @@ public: maildir.create(); } - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE { } - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE { } QString mMaildirPath; @@ -440,8 +437,8 @@ MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext, c changereplay->mMaildirPath = mMaildirPath; setupChangereplay(changereplay); - setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor << new DefaultIndexUpdater); - setupPreprocessors(ENTITY_TYPE_FOLDER, QVector() << new FolderPreprocessor(mMaildirPath) << new DefaultIndexUpdater); + setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor); + setupPreprocessors(ENTITY_TYPE_FOLDER, QVector() << new FolderPreprocessor(mMaildirPath)); KPIM::Maildir dir(mMaildirPath, true); SinkTrace() << "Started maildir resource for maildir: " << mMaildirPath; diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp index 9a22c41..25231c8 100644 --- a/examples/mailtransportresource/mailtransportresource.cpp +++ b/examples/mailtransportresource/mailtransportresource.cpp @@ -40,7 +40,6 @@ #include #include #include -#include #include #define ENTITY_TYPE_MAIL "mail" @@ -162,7 +161,7 @@ MailtransportResource::MailtransportResource(const Sink::ResourceContext &resour changereplay->mSettings = mSettings; setupChangereplay(changereplay); - setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new MimeMessageMover << new MailPropertyExtractor << new DefaultIndexUpdater); + setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new MimeMessageMover << new MailPropertyExtractor); } void MailtransportResource::removeFromDisk(const QByteArray &instanceIdentifier) diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp index c44b9f6..90cc4ba 100644 --- a/tests/mailquerybenchmark.cpp +++ b/tests/mailquerybenchmark.cpp @@ -32,7 +32,6 @@ #include #include #include -#include #include #include "hawd/dataset.h" @@ -64,10 +63,6 @@ class MailQueryBenchmark : public QObject auto pipeline = QSharedPointer::create(Sink::ResourceContext{resourceIdentifier, "test"}); - auto indexer = QSharedPointer>::create(); - - pipeline->setPreprocessors("mail", QVector() << indexer.data()); - auto domainTypeAdaptorFactory = QSharedPointer::create(); pipeline->startTransaction(); diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp index 16806c7..2e614ef 100644 --- a/tests/pipelinebenchmark.cpp +++ b/tests/pipelinebenchmark.cpp @@ -32,7 +32,6 @@ #include #include #include -#include #include #include "hawd/dataset.h" @@ -45,27 +44,6 @@ #include "createentity_generated.h" #include "getrssusage.h" -// class IndexUpdater : public Sink::Preprocessor { -// public: -// void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE -// { -// for (int i = 0; i < 10; i++) { -// Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); -// ridIndex.add("foo", uid); -// } -// } -// -// void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, -// Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE -// { -// } -// -// void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE -// { -// } -// }; -// - /** * Benchmark pipeline processing speed. * @@ -133,15 +111,9 @@ private slots: resourceIdentifier = "sink.test.instance1"; } - void testWithoutIndex() - { - populateDatabase(10000, QVector()); - } - void testWithIndex() { - auto indexer = QSharedPointer>::create(); - populateDatabase(10000, QVector() << indexer.data()); + populateDatabase(10000, QVector()); } }; diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 112453e..4e04152 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -152,23 +152,22 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) class TestProcessor : public Sink::Preprocessor { public: - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE { - newUids << uid; - newRevisions << revision; + newUids << newEntity.identifier(); + newRevisions << newEntity.revision(); } - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE { - modifiedUids << uid; - modifiedRevisions << revision; + modifiedUids << newEntity.identifier(); + modifiedRevisions << newEntity.revision(); } - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE + void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE { - deletedUids << uid; - deletedRevisions << revision; + deletedUids << oldEntity.identifier(); + deletedRevisions << oldEntity.revision(); deletedSummaries << oldEntity.getProperty("summary").toByteArray(); } @@ -187,6 +186,17 @@ public: class PipelineTest : public QObject { Q_OBJECT + + QByteArray instanceIdentifier() + { + return "pipelinetest.instance1"; + } + + Sink::ResourceContext getContext() + { + return Sink::ResourceContext{instanceIdentifier(), "test", Sink::AdaptorFactoryRegistry::instance().getFactories("test")}; + } + private slots: void initTestCase() { @@ -195,7 +205,7 @@ private slots: void init() { - removeFromDisk("sink.pipelinetest.instance1"); + removeFromDisk(instanceIdentifier()); } void testCreate() @@ -203,15 +213,22 @@ private slots: flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb)); - Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); + Sink::Pipeline pipeline(getContext()); pipeline.startTransaction(); pipeline.newEntity(command.constData(), command.size()); pipeline.commit(); - auto result = getKeys("sink.pipelinetest.instance1", "event.main"); + auto result = getKeys(instanceIdentifier(), "event.main"); qDebug() << result; QCOMPARE(result.size(), 1); + + auto adaptorFactory = QSharedPointer::create(); + auto buffer = getEntity(instanceIdentifier(), "event.main", result.first()); + QVERIFY(!buffer.isEmpty()); + Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); + auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); + QVERIFY2(adaptor->getProperty("summary").toString() == QString("summary"), "The modification isn't applied."); } void testModify() @@ -219,7 +236,7 @@ private slots: flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); - Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); + Sink::Pipeline pipeline(getContext()); auto adaptorFactory = QSharedPointer::create(); @@ -229,7 +246,7 @@ private slots: pipeline.commit(); // Get uid of written entity - auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); + auto keys = getKeys(instanceIdentifier(), "event.main"); QCOMPARE(keys.size(), 1); const auto key = keys.first(); const auto uid = Sink::Storage::DataStore::uidFromKey(key); @@ -242,7 +259,7 @@ private slots: pipeline.commit(); // Ensure we've got the new revision with the modification - auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::DataStore::assembleKey(uid, 2)); + auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 2)); QVERIFY(!buffer.isEmpty()); Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); @@ -251,7 +268,7 @@ private slots: QVERIFY2(adaptor->getProperty("description").toString() == QString("description"), "The modification has sideeffects."); // Both revisions are in the store at this point - QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 2); + QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); // Cleanup old revisions pipeline.startTransaction(); @@ -259,7 +276,7 @@ private slots: pipeline.commit(); // And now only the latest revision is left - QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 1); + QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1); } void testModifyWithUnrelatedOperationInbetween() @@ -267,7 +284,7 @@ private slots: flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb)); - Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); + Sink::Pipeline pipeline(getContext()); auto adaptorFactory = QSharedPointer::create(); @@ -277,7 +294,7 @@ private slots: pipeline.commit(); // Get uid of written entity - auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); + auto keys = getKeys(instanceIdentifier(), "event.main"); QCOMPARE(keys.size(), 1); const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); @@ -299,7 +316,7 @@ private slots: pipeline.commit(); // Ensure we've got the new revision with the modification - auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); + auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); QVERIFY(!buffer.isEmpty()); Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); @@ -310,14 +327,14 @@ private slots: { flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb)); - Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); + Sink::Pipeline pipeline(getContext()); // Create the initial revision pipeline.startTransaction(); pipeline.newEntity(command.constData(), command.size()); pipeline.commit(); - auto result = getKeys("sink.pipelinetest.instance1", "event.main"); + auto result = getKeys(instanceIdentifier(), "event.main"); QCOMPARE(result.size(), 1); const auto uid = Sink::Storage::DataStore::uidFromKey(result.first()); @@ -329,7 +346,7 @@ private slots: pipeline.commit(); // We have a new revision that indicates the deletion - QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 2); + QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); // Cleanup old revisions pipeline.startTransaction(); @@ -337,7 +354,7 @@ private slots: pipeline.commit(); // And all revisions are gone - QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 0); + QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0); } void testPreprocessor() @@ -346,7 +363,7 @@ private slots: auto testProcessor = new TestProcessor; - Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); + Sink::Pipeline pipeline(getContext()); pipeline.setPreprocessors("event", QVector() << testProcessor); pipeline.startTransaction(); // pipeline.setAdaptorFactory("event", QSharedPointer::create()); @@ -363,7 +380,7 @@ private slots: pipeline.commit(); entityFbb.Clear(); pipeline.startTransaction(); - auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); + auto keys = getKeys(instanceIdentifier(), "event.main"); QCOMPARE(keys.size(), 1); const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); { -- cgit v1.2.3