From 129333371d28c06d85f75ca579ce17798e615e84 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 16:39:16 +0100 Subject: Made pipeline preprocessing synchronous. Instead of having the asynchronous preprocessor concept with different pipelines for new/modify/delete we have a single pipeline with synchronous preprocessors that act upon new/modify/delete. This keeps the code simpler due to lack of asynchronity and keeps the new/modify/delete operations together (which at least for the indexing makes a lot of sense). Not supporting asynchronity is ok because the tasks done in preprocessing are not cpu intensive (if they were we had a problem since they are directly involved in the round-trip time), and the main cost comes from i/o, meaning we don't gain much by doing multithreading. Costly tasks (such as full-text indexing) should rather be implemented as post-processing, since that doesn't increase the round-trip time directly, and eventually consistent is typically good enough for that. --- common/domain/event.cpp | 6 +- common/domain/event.h | 2 +- common/domain/mail.cpp | 7 +- common/domain/mail.h | 2 +- common/genericresource.cpp | 10 +- common/genericresource.h | 1 + common/pipeline.cpp | 286 +++++++---------------------- common/pipeline.h | 90 +-------- examples/dummyresource/resourcefactory.cpp | 89 ++++++--- tests/genericresourcebenchmark.cpp | 35 ++-- tests/genericresourcetest.cpp | 4 +- tests/pipelinetest.cpp | 75 ++++++++ 12 files changed, 251 insertions(+), 356 deletions(-) diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 9759fc3..83a6906 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -50,11 +50,11 @@ ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, return ResultSet(keys); } -void TypeImplementation::index(const Event &type, Akonadi2::Storage::Transaction &transaction) +void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) { - const auto uid = type.getProperty("uid"); + const auto uid = bufferAdaptor.getProperty("uid"); if (uid.isValid()) { - Index("event.index.uid", transaction).add(uid.toByteArray(), type.identifier()); + Index("event.index.uid", transaction).add(uid.toByteArray(), identifier); } } diff --git a/common/domain/event.h b/common/domain/event.h index f21cd34..e9ba52a 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -56,7 +56,7 @@ public: * An empty result set indicates that a full scan is required. */ static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction); - static void index(const Event &type, Akonadi2::Storage::Transaction &transaction); + static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index d40dde9..ffe322e 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -50,12 +50,11 @@ ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, c return ResultSet(keys); } -void TypeImplementation::index(const Mail &type, Akonadi2::Storage::Transaction &transaction) +void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) { - const auto uid = type.getProperty("uid"); + const auto uid = bufferAdaptor.getProperty("uid"); if (uid.isValid()) { - Index uidIndex("mail.index.uid", transaction); - uidIndex.add(uid.toByteArray(), type.identifier()); + Index("mail.index.uid", transaction).add(uid.toByteArray(), identifier); } } diff --git a/common/domain/mail.h b/common/domain/mail.h index b58ce44..38f1d03 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h @@ -51,7 +51,7 @@ public: * An empty result set indicates that a full scan is required. */ static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction); - static void index(const Mail &type, Akonadi2::Storage::Transaction &transaction); + static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index dcae43d..ec68f33 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -285,10 +285,12 @@ GenericResource::~GenericResource() delete mSourceChangeReplay; } -// void GenericResource::revisionChanged() -// { -// //TODO replay revision -// } +void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) +{ + Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); + Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); + Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); +} void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) { diff --git a/common/genericresource.h b/common/genericresource.h index cfc6653..33de0e7 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -48,6 +48,7 @@ public: int error() const; + static void removeFromDisk(const QByteArray &instanceIdentifier); private Q_SLOTS: void updateLowerBoundRevision(); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 15d2401..de63288 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -1,5 +1,6 @@ /* * Copyright (C) 2014 Aaron Seigo + * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -41,19 +42,13 @@ class Pipeline::Private { public: Private(const QString &resourceName) - : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), - stepScheduled(false) + : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite) { } Storage storage; Storage::Transaction transaction; - QHash > nullPipeline; - QHash > newPipeline; - QHash > modifiedPipeline; - QHash > deletedPipeline; - QVector activePipelines; - bool stepScheduled; + QHash > processors; QHash adaptorFactory; }; @@ -68,21 +63,9 @@ Pipeline::~Pipeline() delete d; } -void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors) +void Pipeline::setPreprocessors(const QString &entityType, const QVector &processors) { - switch (pipelineType) { - case NewPipeline: - d->newPipeline[entityType] = preprocessors; - break; - case ModifiedPipeline: - d->modifiedPipeline[entityType] = preprocessors; - break; - case DeletedPipeline: - d->deletedPipeline[entityType] = preprocessors; - break; - default: - break; - }; + d->processors[entityType] = processors; } void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) @@ -92,6 +75,11 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac void Pipeline::startTransaction() { + //TODO call for all types + //But avoid doing it during cleanup + // for (auto processor : d->processors[bufferType]) { + // processor->startBatch(); + // } if (d->transaction) { return; } @@ -100,7 +88,13 @@ void Pipeline::startTransaction() void Pipeline::commit() { + //TODO call for all types + //But avoid doing it during cleanup + // for (auto processor : d->processors[bufferType]) { + // processor->finalize(); + // } const auto revision = Akonadi2::Storage::maxRevision(d->transaction); + Trace() << "Committing " << revision; if (d->transaction) { d->transaction.commit(); } @@ -118,14 +112,6 @@ Storage &Pipeline::storage() const return d->storage; } -void Pipeline::null() -{ - //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) - // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); - // d->activePipelines << state; - // state.step(); -} - void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) { d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()), @@ -181,15 +167,25 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) storeNewRevision(newRevision, fbb, bufferType, key); - Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; + auto adaptorFactory = d->adaptorFactory.value(bufferType); + if (!adaptorFactory) { + Warning() << "no adaptor factory for type " << bufferType; + return KAsync::error(0); + } - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { - future.setValue(newRevision); - future.setFinished(); - }, bufferType); - d->activePipelines << state; - state.step(); + Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; + d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { + auto entity = Akonadi2::GetEntity(value); + auto adaptor = adaptorFactory->createAdaptor(*entity); + for (auto processor : d->processors[bufferType]) { + processor->newEntity(key, newRevision, *adaptor, d->transaction); + } + return false; + }, [this](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + }); + return KAsync::start([newRevision](){ + return newRevision; }); } @@ -287,14 +283,18 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) storeNewRevision(newRevision, fbb, bufferType, key); Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; - - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { - future.setValue(newRevision); - future.setFinished(); - }, bufferType); - d->activePipelines << state; - state.step(); + d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { + auto entity = Akonadi2::GetEntity(value); + auto newEntity = adaptorFactory->createAdaptor(*entity); + for (auto processor : d->processors[bufferType]) { + processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); + } + return false; + }, [this](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + }); + return KAsync::start([newRevision](){ + return newRevision; }); } @@ -331,13 +331,24 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) storeNewRevision(newRevision, fbb, bufferType, key); Log() << "Pipeline: deleted entity: "<< newRevision; - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ - future.setValue(newRevision); - future.setFinished(); - }, bufferType); - d->activePipelines << state; - state.step(); + auto adaptorFactory = d->adaptorFactory.value(bufferType); + if (!adaptorFactory) { + Warning() << "no adaptor factory for type " << bufferType; + return KAsync::error(0); + } + + // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { + // auto entity = Akonadi2::GetEntity(value); + // auto newEntity = adaptorFactory->createAdaptor(*entity); + for (auto processor : d->processors[bufferType]) { + processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); + } + // return false; + // }, [this](const Akonadi2::Storage::Error &error) { + // ErrorMsg() << "Failed to find value in pipeline: " << error.message; + // }); + return KAsync::start([newRevision](){ + return newRevision; }); } @@ -372,164 +383,6 @@ qint64 Pipeline::cleanedUpRevision() return Akonadi2::Storage::cleanedUpRevision(d->transaction); } -void Pipeline::pipelineStepped(const PipelineState &state) -{ - scheduleStep(); -} - -void Pipeline::scheduleStep() -{ - if (!d->stepScheduled) { - d->stepScheduled = true; - QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); - } -} - -void Pipeline::stepPipelines() -{ - d->stepScheduled = false; - for (PipelineState &state: d->activePipelines) { - if (state.isIdle()) { - state.step(); - } - } -} - -void Pipeline::pipelineCompleted(PipelineState state) -{ - //TODO finalize the datastore, inform clients of the new rev - const int index = d->activePipelines.indexOf(state); - if (index > -1) { - d->activePipelines.remove(index); - } - state.callback(); - - scheduleStep(); - if (d->activePipelines.isEmpty()) { - emit pipelinesDrained(); - } -} - - -class PipelineState::Private : public QSharedData -{ -public: - Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector filters, const std::function &c, qint64 r, const QByteArray &b) - : pipeline(p), - type(t), - key(k), - filterIt(filters), - idle(true), - callback(c), - revision(r), - bufferType(b) - {} - - Private() - : pipeline(0), - filterIt(QVector()), - idle(true), - revision(-1) - {} - - Pipeline *pipeline; - Pipeline::Type type; - QByteArray key; - QVectorIterator filterIt; - bool idle; - std::function callback; - qint64 revision; - QByteArray bufferType; -}; - -PipelineState::PipelineState() - : d(new Private()) -{ - -} - -PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters, qint64 revision, const std::function &callback, const QByteArray &bufferType) - : d(new Private(pipeline, type, key, filters, callback, revision, bufferType)) -{ -} - -PipelineState::PipelineState(const PipelineState &other) - : d(other.d) -{ -} - -PipelineState::~PipelineState() -{ -} - -PipelineState &PipelineState::operator=(const PipelineState &rhs) -{ - d = rhs.d; - return *this; -} - -bool PipelineState::operator==(const PipelineState &rhs) -{ - return d == rhs.d; -} - -bool PipelineState::isIdle() const -{ - return d->idle; -} - -QByteArray PipelineState::key() const -{ - return d->key; -} - -Pipeline::Type PipelineState::type() const -{ - return d->type; -} - -qint64 PipelineState::revision() const -{ - return d->revision; -} - -QByteArray PipelineState::bufferType() const -{ - return d->bufferType; -} - -void PipelineState::step() -{ - if (!d->pipeline) { - Q_ASSERT(false); - return; - } - - d->idle = false; - if (d->filterIt.hasNext()) { - //TODO skip step if already processed - auto preprocessor = d->filterIt.next(); - preprocessor->process(*this, d->pipeline->transaction()); - } else { - //This object becomes invalid after this call - d->pipeline->pipelineCompleted(*this); - } -} - -void PipelineState::processingCompleted(Preprocessor *filter) -{ - if (d->pipeline && filter == d->filterIt.peekPrevious()) { - d->idle = true; - d->pipeline->pipelineStepped(*this); - } -} - -void PipelineState::callback() -{ - d->callback(); -} - - Preprocessor::Preprocessor() : d(0) { @@ -539,19 +392,12 @@ Preprocessor::~Preprocessor() { } -void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) -{ - processingCompleted(state); -} - -void Preprocessor::processingCompleted(PipelineState state) +void Preprocessor::startBatch() { - state.processingCompleted(this); } -QString Preprocessor::id() const +void Preprocessor::finalize() { - return QLatin1String("unknown processor"); } } // namespace Akonadi2 diff --git a/common/pipeline.h b/common/pipeline.h index c8d9ddc..f11d880 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -35,7 +35,6 @@ namespace Akonadi2 { -class PipelineState; class Preprocessor; class AKONADI2COMMON_EXPORT Pipeline : public QObject @@ -43,19 +42,16 @@ class AKONADI2COMMON_EXPORT Pipeline : public QObject Q_OBJECT public: - enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline }; - Pipeline(const QString &storagePath, QObject *parent = 0); ~Pipeline(); Storage &storage() const; - void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); + void setPreprocessors(const QString &entityType, const QVector &preprocessors); void startTransaction(); void commit(); Storage::Transaction &transaction(); - void null(); void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); KAsync::Job newEntity(void const *command, size_t size); @@ -75,104 +71,30 @@ public: Q_SIGNALS: void revisionUpdated(qint64); - void pipelinesDrained(); - -private Q_SLOTS: - void stepPipelines(); private: - void pipelineStepped(const PipelineState &state); - //Don't use a reference here (it would invalidate itself) - void pipelineCompleted(PipelineState state); - void scheduleStep(); void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); - friend class PipelineState; - class Private; Private * const d; }; -class AKONADI2COMMON_EXPORT PipelineState -{ -public: - PipelineState(); - PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters, qint64 revision, const std::function &callback, const QByteArray &bufferType); - PipelineState(const PipelineState &other); - ~PipelineState(); - - PipelineState &operator=(const PipelineState &rhs); - bool operator==(const PipelineState &rhs); - - bool isIdle() const; - QByteArray key() const; - Pipeline::Type type() const; - qint64 revision() const; - QByteArray bufferType() const; - - void step(); - void processingCompleted(Preprocessor *filter); - - void callback(); - -private: - class Private; - QExplicitlySharedDataPointer d; -}; - class AKONADI2COMMON_EXPORT Preprocessor { public: Preprocessor(); virtual ~Preprocessor(); - virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); - //TODO to record progress - virtual QString id() const; - -protected: - void processingCompleted(PipelineState state); + virtual void startBatch(); + virtual void newEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; + virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; + virtual void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) = 0; + virtual void finalize(); private: class Private; Private * const d; }; -/** - * A simple processor that takes a single function - */ -class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor -{ -public: - SimpleProcessor(const QString &id, const std::function &f) - : Akonadi2::Preprocessor(), - mFunction(f), - mId(id) - { - } - - void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE - { - transaction.openDatabase(state.bufferType() + ".main").scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { - auto entity = Akonadi2::GetEntity(value); - mFunction(state, *entity, transaction); - processingCompleted(state); - return false; - }, [this, state](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Failed to find value in pipeline: " << error.message; - processingCompleted(state); - }); - } - - QString id() const Q_DECL_OVERRIDE - { - return mId; - } - -protected: - std::function mFunction; - QString mId; -}; - } // namespace Akonadi2 diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 8e6bd42..0a2e90b 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -48,31 +48,76 @@ static void index(const QByteArray &index, const QVariant &value, const QByteArr } } -DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) - : Akonadi2::GenericResource(instanceIdentifier, pipeline) -{ - auto eventFactory = QSharedPointer::create(); - const auto resourceIdentifier = mResourceInstanceIdentifier; +/** + * Index types: + * * uid - property + * + * * Property can be: + * * fixed value like uid + * * fixed value where we want to do smaller/greater-than comparisons. (like start date) + * * range indexes like what date range an event affects. + * * group indexes like tree hierarchies as nested sets + */ +template +class IndexUpdater : public Akonadi2::Preprocessor { +public: + IndexUpdater(const QByteArray &index, const QByteArray &type) + :mIndexIdentifier(index), + mBufferType(type) + { - auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { - Akonadi2::ApplicationDomain::Event event(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, eventFactory->createAdaptor(entity)); - Akonadi2::ApplicationDomain::TypeImplementation::index(event, transaction); - index("event.index.rid", event.getProperty("remoteId"), event.identifier(), transaction); - }); + } + + void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + Akonadi2::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); + add(newEntity.getProperty("remoteId"), uid, transaction); + } - mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); - mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); - //TODO cleanup indexes during removal + void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + } + void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE { - auto mailFactory = QSharedPointer::create(); - auto mailIndexer = new Akonadi2::SimpleProcessor("mailIndexer", [mailFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { - Akonadi2::ApplicationDomain::Mail mail(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, mailFactory->createAdaptor(entity)); - Akonadi2::ApplicationDomain::TypeImplementation::index(mail, transaction); - index("mail.index.rid", mail.getProperty("remoteId"), mail.identifier(), transaction); - }); + } +private: + void add(const QVariant &value, const QByteArray &uid, Akonadi2::Storage::Transaction &transaction) + { + if (value.isValid()) { + Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid); + } + } + + void remove(const QByteArray &uid, Akonadi2::Storage::Transaction &transaction) + { + //Knowning the indexed value would probably help removing the uid efficiently. Otherwise we have to execute a full scan. + // Index(mIndexIdentifier, transaction).remove(uid); + } + + void modify(Akonadi2::Storage::Transaction &transaction) + { + //Knowning the indexed value would probably help removing the uid efficiently. Otherwise we have to execute a full scan. + // Index(mIndexIdentifier, transaction).remove(uid); + } - mPipeline->setPreprocessors(ENTITY_TYPE_MAIL, Akonadi2::Pipeline::NewPipeline, QVector() << mailIndexer); + QByteArray mIndexIdentifier; + QByteArray mBufferType; +}; + +DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) + : Akonadi2::GenericResource(instanceIdentifier, pipeline) +{ + { + auto eventFactory = QSharedPointer::create(); + auto eventIndexer = new IndexUpdater("event.index.rid", ENTITY_TYPE_EVENT); + mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, QVector() << eventIndexer); + mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); + } + { + auto mailFactory = QSharedPointer::create(); + auto mailIndexer = new IndexUpdater("mail.index.rid", ENTITY_TYPE_MAIL); + mPipeline->setPreprocessors(ENTITY_TYPE_MAIL, QVector() << mailIndexer); mPipeline->setAdaptorFactory(ENTITY_TYPE_MAIL, mailFactory); } } @@ -171,9 +216,7 @@ KAsync::Job DummyResource::synchronizeWithSource() void DummyResource::removeFromDisk(const QByteArray &instanceIdentifier) { - Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); - Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); - Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); + GenericResource::removeFromDisk(instanceIdentifier); Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".event.index.uid", Akonadi2::Storage::ReadWrite).removeFromDisk(); } diff --git a/tests/genericresourcebenchmark.cpp b/tests/genericresourcebenchmark.cpp index b8635d7..fbe0d12 100644 --- a/tests/genericresourcebenchmark.cpp +++ b/tests/genericresourcebenchmark.cpp @@ -60,6 +60,25 @@ static QByteArray createEntityBuffer() return QByteArray(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); } +class IndexUpdater : public Akonadi2::Preprocessor { +public: + void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + for (int i = 0; i < 10; i++) { + Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); + ridIndex.add("foo", uid); + } + } + + void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + } + + void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + } +}; + /** * Benchmark write performance of generic resource implementation including queues and pipeline. */ @@ -124,19 +143,9 @@ private Q_SLOTS: auto eventFactory = QSharedPointer::create(); const QByteArray resourceIdentifier = "org.kde.test.instance1"; - auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { - auto adaptor = eventFactory->createAdaptor(entity); - Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor); - Akonadi2::ApplicationDomain::TypeImplementation::index(event, transaction); - - //Create a bunch of indexes - for (int i = 0; i < 10; i++) { - Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); - ridIndex.add("foo", event.identifier()); - } - }); - - pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); + auto indexer = QSharedPointer::create(); + + pipeline->setPreprocessors("event", QVector() << indexer.data()); pipeline->setAdaptorFactory("event", eventFactory); TestResource resource("org.kde.test.instance1", pipeline); diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 6dd4108..141a5f8 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp @@ -32,9 +32,7 @@ private Q_SLOTS: void init() { - removeFromDisk("org.kde.test.instance1"); - removeFromDisk("org.kde.test.instance1.userqueue"); - removeFromDisk("org.kde.test.instance1.synchronizerqueue"); + Akonadi2::GenericResource::removeFromDisk("org.kde.test.instance1"); Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); } diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 7efba13..0b4c13e 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -139,6 +139,34 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) return command; } +class TestProcessor : public Akonadi2::Preprocessor { +public: + void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + newUids << uid; + newRevisions << revision; + } + + void modifiedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + modifiedUids << uid; + modifiedRevisions << revision; + } + + void deletedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + deletedUids << uid; + deletedRevisions << revision; + } + + QList newUids; + QList newRevisions; + QList modifiedUids; + QList modifiedRevisions; + QList deletedUids; + QList deletedRevisions; +}; + /** * Test of the pipeline implementation to ensure new revisions are created correctly in the database. */ @@ -251,6 +279,53 @@ private Q_SLOTS: //And all revisions are gone QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); } + + void testPreprocessor() + { + flatbuffers::FlatBufferBuilder entityFbb; + + TestProcessor testProcessor; + + Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + pipeline.setPreprocessors("event", QVector() << &testProcessor); + pipeline.startTransaction(); + pipeline.setAdaptorFactory("event", QSharedPointer::create()); + + //Actual test + { + auto command = createEntityCommand(createEvent(entityFbb)); + pipeline.newEntity(command.constData(), command.size()); + QCOMPARE(testProcessor.newUids.size(), 1); + QCOMPARE(testProcessor.newRevisions.size(), 1); + //Key doesn't contain revision and is just the uid + QCOMPARE(testProcessor.newUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.newUids.at(0))); + } + pipeline.commit(); + entityFbb.Clear(); + pipeline.startTransaction(); + auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main"); + QCOMPARE(keys.size(), 1); + const auto uid = Akonadi2::Storage::uidFromKey(keys.first()); + { + auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); + pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); + QCOMPARE(testProcessor.modifiedUids.size(), 1); + QCOMPARE(testProcessor.modifiedRevisions.size(), 1); + //Key doesn't contain revision and is just the uid + QCOMPARE(testProcessor.modifiedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.modifiedUids.at(0))); + } + pipeline.commit(); + entityFbb.Clear(); + pipeline.startTransaction(); + { + auto deleteCommand = deleteEntityCommand(uid, 1); + pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); + QCOMPARE(testProcessor.deletedUids.size(), 1); + QCOMPARE(testProcessor.deletedUids.size(), 1); + //Key doesn't contain revision and is just the uid + QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0))); + } + } }; QTEST_MAIN(PipelineTest) -- cgit v1.2.3 From 81859328bf30c2aeecdf3ee48e5939e0496552fd Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 16:46:10 +0100 Subject: Reuse the existing transaction --- common/pipeline.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/pipeline.cpp b/common/pipeline.cpp index de63288..ae4cc3d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -233,7 +233,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; -- cgit v1.2.3 From 043cd5c9e1c90ba04659b67000b974cf8c35f7ba Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 17:21:47 +0100 Subject: Correctly execute modifications and removals ... also if there are intermediate revisions. --- common/pipeline.cpp | 33 ++++++++++++++++++------------- tests/pipelinetest.cpp | 53 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ae4cc3d..0ce478b 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -233,7 +233,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; @@ -328,25 +328,32 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); - storeNewRevision(newRevision, fbb, bufferType, key); - Log() << "Pipeline: deleted entity: "<< newRevision; - auto adaptorFactory = d->adaptorFactory.value(bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); } - // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { - // auto entity = Akonadi2::GetEntity(value); - // auto newEntity = adaptorFactory->createAdaptor(*entity); - for (auto processor : d->processors[bufferType]) { - processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); + QSharedPointer current; + d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { + Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory->createAdaptor(buffer.entity()); } - // return false; - // }, [this](const Akonadi2::Storage::Error &error) { - // ErrorMsg() << "Failed to find value in pipeline: " << error.message; - // }); + return false; + }, [this](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + }); + + storeNewRevision(newRevision, fbb, bufferType, key); + Log() << "Pipeline: deleted entity: "<< newRevision; + + for (auto processor : d->processors[bufferType]) { + processor->deletedEntity(key, newRevision, *current, d->transaction); + } + return KAsync::start([newRevision](){ return newRevision; }); diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 0b4c13e..47090a8 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -157,6 +157,7 @@ public: { deletedUids << uid; deletedRevisions << revision; + deletedSummaries << oldEntity.getProperty("summary").toByteArray(); } QList newUids; @@ -165,6 +166,7 @@ public: QList modifiedRevisions; QList deletedUids; QList deletedRevisions; + QList deletedSummaries; }; /** @@ -245,18 +247,63 @@ private Q_SLOTS: QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 1); } - void testDelete() + void testModifyWithUnrelatedOperationInbetween() { flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb)); + Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + + auto adaptorFactory = QSharedPointer::create(); + pipeline.setAdaptorFactory("event", adaptorFactory); + //Create the initial revision + pipeline.startTransaction(); + pipeline.newEntity(command.constData(), command.size()); + pipeline.commit(); + + //Get uid of written entity + auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main"); + QCOMPARE(keys.size(), 1); + const auto uid = Akonadi2::Storage::uidFromKey(keys.first()); + + + //Create another operation inbetween + { + entityFbb.Clear(); + auto command = createEntityCommand(createEvent(entityFbb)); + pipeline.startTransaction(); + pipeline.newEntity(command.constData(), command.size()); + pipeline.commit(); + } + + //Execute the modification on revision 2 + entityFbb.Clear(); + auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 2); + pipeline.startTransaction(); + pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); + pipeline.commit(); + + //Ensure we've got the new revision with the modification + auto buffer = getEntity("org.kde.pipelinetest.instance1", "event.main", Akonadi2::Storage::assembleKey(uid, 3)); + QVERIFY(!buffer.isEmpty()); + Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size()); + auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); + QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2")); + } + + void testDelete() + { + flatbuffers::FlatBufferBuilder entityFbb; + auto command = createEntityCommand(createEvent(entityFbb)); Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + pipeline.setAdaptorFactory("event", QSharedPointer::create()); + + //Create the initial revision pipeline.startTransaction(); pipeline.newEntity(command.constData(), command.size()); pipeline.commit(); - // const auto uid = Akonadi2::Storage::uidFromKey(key); auto result = getKeys("org.kde.pipelinetest.instance1", "event.main"); QCOMPARE(result.size(), 1); @@ -322,8 +369,10 @@ private Q_SLOTS: pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); QCOMPARE(testProcessor.deletedUids.size(), 1); QCOMPARE(testProcessor.deletedUids.size(), 1); + QCOMPARE(testProcessor.deletedSummaries.size(), 1); //Key doesn't contain revision and is just the uid QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0))); + QCOMPARE(testProcessor.deletedSummaries.at(0), QByteArray("summary2")); } } }; -- cgit v1.2.3 From bb1b238d6982abe1e640fbf424234b2c5389642b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 18:28:23 +0100 Subject: Cleanup changereplay, and replay revision 1 as the first --- common/genericresource.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ec68f33..652154d 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -54,7 +54,7 @@ public Q_SLOTS: { auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); - qint64 lastReplayedRevision = 0; + qint64 lastReplayedRevision = 1; replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { lastReplayedRevision = value.toLongLong(); return false; @@ -290,6 +290,7 @@ void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); + Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".changereplay", Akonadi2::Storage::ReadWrite).removeFromDisk(); } void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) -- cgit v1.2.3