From 129333371d28c06d85f75ca579ce17798e615e84 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 16:39:16 +0100 Subject: Made pipeline preprocessing synchronous. Instead of having the asynchronous preprocessor concept with different pipelines for new/modify/delete we have a single pipeline with synchronous preprocessors that act upon new/modify/delete. This keeps the code simpler due to lack of asynchronity and keeps the new/modify/delete operations together (which at least for the indexing makes a lot of sense). Not supporting asynchronity is ok because the tasks done in preprocessing are not cpu intensive (if they were we had a problem since they are directly involved in the round-trip time), and the main cost comes from i/o, meaning we don't gain much by doing multithreading. Costly tasks (such as full-text indexing) should rather be implemented as post-processing, since that doesn't increase the round-trip time directly, and eventually consistent is typically good enough for that. --- examples/dummyresource/resourcefactory.cpp | 89 ++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 23 deletions(-) (limited to 'examples/dummyresource') diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 8e6bd42..0a2e90b 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -48,31 +48,76 @@ static void index(const QByteArray &index, const QVariant &value, const QByteArr } } -DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) - : Akonadi2::GenericResource(instanceIdentifier, pipeline) -{ - auto eventFactory = QSharedPointer::create(); - const auto resourceIdentifier = mResourceInstanceIdentifier; +/** + * Index types: + * * uid - property + * + * * Property can be: + * * fixed value like uid + * * fixed value where we want to do smaller/greater-than comparisons. (like start date) + * * range indexes like what date range an event affects. + * * group indexes like tree hierarchies as nested sets + */ +template +class IndexUpdater : public Akonadi2::Preprocessor { +public: + IndexUpdater(const QByteArray &index, const QByteArray &type) + :mIndexIdentifier(index), + mBufferType(type) + { - auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { - Akonadi2::ApplicationDomain::Event event(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, eventFactory->createAdaptor(entity)); - Akonadi2::ApplicationDomain::TypeImplementation::index(event, transaction); - index("event.index.rid", event.getProperty("remoteId"), event.identifier(), transaction); - }); + } + + void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + Akonadi2::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); + add(newEntity.getProperty("remoteId"), uid, transaction); + } - mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); - mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); - //TODO cleanup indexes during removal + void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + } + void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE { - auto mailFactory = QSharedPointer::create(); - auto mailIndexer = new Akonadi2::SimpleProcessor("mailIndexer", [mailFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { - Akonadi2::ApplicationDomain::Mail mail(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, mailFactory->createAdaptor(entity)); - Akonadi2::ApplicationDomain::TypeImplementation::index(mail, transaction); - index("mail.index.rid", mail.getProperty("remoteId"), mail.identifier(), transaction); - }); + } +private: + void add(const QVariant &value, const QByteArray &uid, Akonadi2::Storage::Transaction &transaction) + { + if (value.isValid()) { + Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid); + } + } + + void remove(const QByteArray &uid, Akonadi2::Storage::Transaction &transaction) + { + //Knowning the indexed value would probably help removing the uid efficiently. Otherwise we have to execute a full scan. + // Index(mIndexIdentifier, transaction).remove(uid); + } + + void modify(Akonadi2::Storage::Transaction &transaction) + { + //Knowning the indexed value would probably help removing the uid efficiently. Otherwise we have to execute a full scan. + // Index(mIndexIdentifier, transaction).remove(uid); + } - mPipeline->setPreprocessors(ENTITY_TYPE_MAIL, Akonadi2::Pipeline::NewPipeline, QVector() << mailIndexer); + QByteArray mIndexIdentifier; + QByteArray mBufferType; +}; + +DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) + : Akonadi2::GenericResource(instanceIdentifier, pipeline) +{ + { + auto eventFactory = QSharedPointer::create(); + auto eventIndexer = new IndexUpdater("event.index.rid", ENTITY_TYPE_EVENT); + mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, QVector() << eventIndexer); + mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); + } + { + auto mailFactory = QSharedPointer::create(); + auto mailIndexer = new IndexUpdater("mail.index.rid", ENTITY_TYPE_MAIL); + mPipeline->setPreprocessors(ENTITY_TYPE_MAIL, QVector() << mailIndexer); mPipeline->setAdaptorFactory(ENTITY_TYPE_MAIL, mailFactory); } } @@ -171,9 +216,7 @@ KAsync::Job DummyResource::synchronizeWithSource() void DummyResource::removeFromDisk(const QByteArray &instanceIdentifier) { - Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); - Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); - Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); + GenericResource::removeFromDisk(instanceIdentifier); Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".event.index.uid", Akonadi2::Storage::ReadWrite).removeFromDisk(); } -- cgit v1.2.3