From 129333371d28c06d85f75ca579ce17798e615e84 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 16:39:16 +0100 Subject: Made pipeline preprocessing synchronous. Instead of having the asynchronous preprocessor concept with different pipelines for new/modify/delete we have a single pipeline with synchronous preprocessors that act upon new/modify/delete. This keeps the code simpler due to lack of asynchronity and keeps the new/modify/delete operations together (which at least for the indexing makes a lot of sense). Not supporting asynchronity is ok because the tasks done in preprocessing are not cpu intensive (if they were we had a problem since they are directly involved in the round-trip time), and the main cost comes from i/o, meaning we don't gain much by doing multithreading. Costly tasks (such as full-text indexing) should rather be implemented as post-processing, since that doesn't increase the round-trip time directly, and eventually consistent is typically good enough for that. --- tests/genericresourcebenchmark.cpp | 35 +++++++++++------- tests/genericresourcetest.cpp | 4 +- tests/pipelinetest.cpp | 75 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 16 deletions(-) (limited to 'tests') diff --git a/tests/genericresourcebenchmark.cpp b/tests/genericresourcebenchmark.cpp index b8635d7..fbe0d12 100644 --- a/tests/genericresourcebenchmark.cpp +++ b/tests/genericresourcebenchmark.cpp @@ -60,6 +60,25 @@ static QByteArray createEntityBuffer() return QByteArray(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); } +class IndexUpdater : public Akonadi2::Preprocessor { +public: + void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + for (int i = 0; i < 10; i++) { + Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); + ridIndex.add("foo", uid); + } + } + + void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + } + + void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + } +}; + /** * Benchmark write performance of generic resource implementation including queues and pipeline. */ @@ -124,19 +143,9 @@ private Q_SLOTS: auto eventFactory = QSharedPointer::create(); const QByteArray resourceIdentifier = "org.kde.test.instance1"; - auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { - auto adaptor = eventFactory->createAdaptor(entity); - Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor); - Akonadi2::ApplicationDomain::TypeImplementation::index(event, transaction); - - //Create a bunch of indexes - for (int i = 0; i < 10; i++) { - Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); - ridIndex.add("foo", event.identifier()); - } - }); - - pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); + auto indexer = QSharedPointer::create(); + + pipeline->setPreprocessors("event", QVector() << indexer.data()); pipeline->setAdaptorFactory("event", eventFactory); TestResource resource("org.kde.test.instance1", pipeline); diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 6dd4108..141a5f8 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp @@ -32,9 +32,7 @@ private Q_SLOTS: void init() { - removeFromDisk("org.kde.test.instance1"); - removeFromDisk("org.kde.test.instance1.userqueue"); - removeFromDisk("org.kde.test.instance1.synchronizerqueue"); + Akonadi2::GenericResource::removeFromDisk("org.kde.test.instance1"); Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); } diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 7efba13..0b4c13e 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -139,6 +139,34 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) return command; } +class TestProcessor : public Akonadi2::Preprocessor { +public: + void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + newUids << uid; + newRevisions << revision; + } + + void modifiedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + modifiedUids << uid; + modifiedRevisions << revision; + } + + void deletedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE + { + deletedUids << uid; + deletedRevisions << revision; + } + + QList newUids; + QList newRevisions; + QList modifiedUids; + QList modifiedRevisions; + QList deletedUids; + QList deletedRevisions; +}; + /** * Test of the pipeline implementation to ensure new revisions are created correctly in the database. */ @@ -251,6 +279,53 @@ private Q_SLOTS: //And all revisions are gone QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); } + + void testPreprocessor() + { + flatbuffers::FlatBufferBuilder entityFbb; + + TestProcessor testProcessor; + + Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + pipeline.setPreprocessors("event", QVector() << &testProcessor); + pipeline.startTransaction(); + pipeline.setAdaptorFactory("event", QSharedPointer::create()); + + //Actual test + { + auto command = createEntityCommand(createEvent(entityFbb)); + pipeline.newEntity(command.constData(), command.size()); + QCOMPARE(testProcessor.newUids.size(), 1); + QCOMPARE(testProcessor.newRevisions.size(), 1); + //Key doesn't contain revision and is just the uid + QCOMPARE(testProcessor.newUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.newUids.at(0))); + } + pipeline.commit(); + entityFbb.Clear(); + pipeline.startTransaction(); + auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main"); + QCOMPARE(keys.size(), 1); + const auto uid = Akonadi2::Storage::uidFromKey(keys.first()); + { + auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); + pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); + QCOMPARE(testProcessor.modifiedUids.size(), 1); + QCOMPARE(testProcessor.modifiedRevisions.size(), 1); + //Key doesn't contain revision and is just the uid + QCOMPARE(testProcessor.modifiedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.modifiedUids.at(0))); + } + pipeline.commit(); + entityFbb.Clear(); + pipeline.startTransaction(); + { + auto deleteCommand = deleteEntityCommand(uid, 1); + pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); + QCOMPARE(testProcessor.deletedUids.size(), 1); + QCOMPARE(testProcessor.deletedUids.size(), 1); + //Key doesn't contain revision and is just the uid + QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0))); + } + } }; QTEST_MAIN(PipelineTest) -- cgit v1.2.3