From 1acf9f3c486813df807ff6931e56cc13eb26eeaf Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 23 Aug 2015 18:56:43 +0200 Subject: Store indexes as named databases in the same db. Because we also keep using the same transactions this finally makes the resource somewhat performant. On my system genericresourcebenchmark now processes ~4200 messages per second instead of ~280. --- common/domain/event.cpp | 8 ++--- common/domain/event.h | 6 ++-- common/entitystorage.cpp | 4 +-- common/entitystorage.h | 8 ++--- common/index.cpp | 17 +++++---- common/index.h | 4 ++- common/pipeline.cpp | 14 ++------ common/pipeline.h | 19 ++++++---- examples/dummyresource/resourcefactory.cpp | 10 +++--- tests/genericresourcebenchmark.cpp | 56 +++++++++++++++++++++++++++--- tests/storagetest.cpp | 4 +-- 11 files changed, 100 insertions(+), 50 deletions(-) diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 15f5d11..e107441 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -34,11 +34,11 @@ using namespace Akonadi2::ApplicationDomain; -ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters) +ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) { QVector keys; if (query.propertyFilter.contains("uid")) { - Index uidIndex(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly); + Index uidIndex("index.uid", transaction); uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) { keys << value; }, @@ -50,11 +50,11 @@ ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, return ResultSet(keys); } -void TypeImplementation::index(const Event &type) +void TypeImplementation::index(const Event &type, Akonadi2::Storage::Transaction &transaction) { - Index uidIndex(Akonadi2::storageLocation(), type.resourceInstanceIdentifier() + ".index.uid", Akonadi2::Storage::ReadWrite); const auto uid = type.getProperty("uid"); if (uid.isValid()) { + Index uidIndex("index.uid", transaction); uidIndex.add(uid.toByteArray(), type.identifier()); } } diff --git a/common/domain/event.h b/common/domain/event.h index 13cfc6e..f21cd34 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -20,6 +20,8 @@ #include "applicationdomaintype.h" +#include "storage.h" + class ResultSet; class QByteArray; @@ -53,8 +55,8 @@ public: * * An empty result set indicates that a full scan is required. */ - static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters); - static void index(const Event &type); + static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction); + static void index(const Event &type, Akonadi2::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index 420c3b0..22fd9e6 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp @@ -91,10 +91,10 @@ ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std:: return ResultSet(generator); } -ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) +ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) { QSet appliedFilters; - ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters); + ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); const auto remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; //We do a full scan if there were no indexes available to create the initial set. diff --git a/common/entitystorage.h b/common/entitystorage.h index 2fce880..8256938 100644 --- a/common/entitystorage.h +++ b/common/entitystorage.h @@ -44,11 +44,11 @@ protected: virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr create(const QByteArray &key, qint64 revision, const QSharedPointer &adaptor) = 0; virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; - virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters) = 0; + virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) = 0; void readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); ResultSet filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); - ResultSet getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); + ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); protected: QByteArray mResourceInstanceIdentifier; @@ -77,9 +77,9 @@ protected: return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(object); } - ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters) Q_DECL_OVERRIDE + ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE { - return Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, resourceInstanceIdentifier, appliedFilters); + return Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, resourceInstanceIdentifier, appliedFilters, transaction); } public: diff --git a/common/index.cpp b/common/index.cpp index 7e3c09e..75ffe3f 100644 --- a/common/index.cpp +++ b/common/index.cpp @@ -2,24 +2,27 @@ #include Index::Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode) - : mStorage(storageRoot, name, mode, true) + : mTransaction(Akonadi2::Storage(storageRoot, name, mode, true).createTransaction(mode)), + mDb(mTransaction.openDatabase(name.toLatin1(), std::function(), true)) +{ + +} + +Index::Index(const QByteArray &name, Akonadi2::Storage::Transaction &transaction) + : mDb(transaction.openDatabase(name, std::function(), true)) { } void Index::add(const QByteArray &key, const QByteArray &value) { - mStorage.createTransaction(Akonadi2::Storage::ReadWrite).write(key, value); + mDb.write(key, value); } void Index::lookup(const QByteArray &key, const std::function &resultHandler, const std::function &errorHandler) { - if (!mStorage.exists()) { - errorHandler(Error("index", IndexNotAvailable, "Index not existing")); - return; - } - mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool { + mDb.scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool { resultHandler(value); return true; }, diff --git a/common/index.h b/common/index.h index 08b499f..e1b7e3a 100644 --- a/common/index.h +++ b/common/index.h @@ -26,6 +26,7 @@ public: }; Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode = Akonadi2::Storage::ReadOnly); + Index(const QByteArray &name, Akonadi2::Storage::Transaction &); void add(const QByteArray &key, const QByteArray &value); // void remove(const QByteArray &key, const QByteArray &value); @@ -35,5 +36,6 @@ public: private: Q_DISABLE_COPY(Index); - Akonadi2::Storage mStorage; + Akonadi2::Storage::Transaction mTransaction; + Akonadi2::Storage::NamedDatabase mDb; }; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ce4ad41..c5e36ee 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -441,18 +441,8 @@ void PipelineState::step() d->idle = false; if (d->filterIt.hasNext()) { //TODO skip step if already processed - //FIXME error handling if no result is found auto preprocessor = d->filterIt.next(); - //FIXME this read should not be necessary - //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant) - d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { - auto entity = Akonadi2::GetEntity(value); - preprocessor->process(*this, *entity); - return false; - }, [this](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Failed to find value in pipeline: " << error.message; - d->pipeline->pipelineCompleted(*this); - }); + preprocessor->process(*this, d->pipeline->transaction()); } else { //This object becomes invalid after this call d->pipeline->pipelineCompleted(*this); @@ -483,7 +473,7 @@ Preprocessor::~Preprocessor() { } -void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &) +void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) { processingCompleted(state); } diff --git a/common/pipeline.h b/common/pipeline.h index 496e037..fee6a5e 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -115,7 +115,7 @@ public: virtual ~Preprocessor(); //TODO pass actual command as well, for changerecording - virtual void process(const PipelineState &state, const Akonadi2::Entity &); + virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); //TODO to record progress virtual QString id() const; @@ -133,17 +133,24 @@ private: class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor { public: - SimpleProcessor(const QString &id, const std::function &f) + SimpleProcessor(const QString &id, const std::function &f) : Akonadi2::Preprocessor(), mFunction(f), mId(id) { } - void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE + void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE { - mFunction(state, e); - processingCompleted(state); + transaction.scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { + auto entity = Akonadi2::GetEntity(value); + mFunction(state, *entity, transaction); + processingCompleted(state); + return false; + }, [this, state](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + processingCompleted(state); + }); } QString id() const @@ -152,7 +159,7 @@ public: } protected: - std::function mFunction; + std::function mFunction; QString mId; }; diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index e029308..0e18282 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -42,13 +42,13 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared { auto eventFactory = QSharedPointer::create(); const auto resourceIdentifier = mResourceInstanceIdentifier; - auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { + auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { auto adaptor = eventFactory->createAdaptor(entity); //FIXME set revision? Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor); - Akonadi2::ApplicationDomain::TypeImplementation::index(event); + Akonadi2::ApplicationDomain::TypeImplementation::index(event, transaction); - Index ridIndex(Akonadi2::storageLocation(), resourceIdentifier + ".index.rid", Akonadi2::Storage::ReadWrite); + Index ridIndex("index.rid", transaction); const auto rid = event.getProperty("remoteId"); if (rid.isValid()) { ridIndex.add(rid.toByteArray(), event.identifier()); @@ -63,8 +63,8 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared KAsync::Job DummyResource::synchronizeWithSource() { return KAsync::start([this](KAsync::Future &f) { - //TODO start transaction on index - Index uidIndex(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly); + auto transaction = Akonadi2::Storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly).createTransaction(Akonadi2::Storage::ReadOnly); + Index uidIndex("index.uid", transaction); const auto data = DummyStore::instance().data(); for (auto it = data.constBegin(); it != data.constEnd(); it++) { diff --git a/tests/genericresourcebenchmark.cpp b/tests/genericresourcebenchmark.cpp index 01dc95d..27678b0 100644 --- a/tests/genericresourcebenchmark.cpp +++ b/tests/genericresourcebenchmark.cpp @@ -12,6 +12,7 @@ #include "genericresource.h" #include "definitions.h" #include "domainadaptor.h" +#include "index.h" #include class TestResource : public Akonadi2::GenericResource @@ -88,20 +89,21 @@ class GenericResourceBenchmark : public QObject private Q_SLOTS: void init() + { + Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Warning); + } + + void initTestCase() { removeFromDisk("org.kde.test.instance1"); removeFromDisk("org.kde.test.instance1.userqueue"); removeFromDisk("org.kde.test.instance1.synchronizerqueue"); - Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Warning); - qDebug(); - qDebug() << "-----------------------------------------"; - qDebug(); } void testWriteInProcess() { - int num = 50000; + int num = 10000; auto pipeline = QSharedPointer::create("org.kde.test.instance1"); TestResource resource("org.kde.test.instance1", pipeline); @@ -125,6 +127,50 @@ private Q_SLOTS: std::cout << "All processed: " << allProcessedTime << " /sec " << num*1000/allProcessedTime << std::endl; } + void testWriteInProcessWithIndex() + { + int num = 10000; + + auto pipeline = QSharedPointer::create("org.kde.test.instance1"); + + auto eventFactory = QSharedPointer::create(); + const QByteArray resourceIdentifier = "org.kde.test.instance1"; + auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { + auto adaptor = eventFactory->createAdaptor(entity); + Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor); + Akonadi2::ApplicationDomain::TypeImplementation::index(event, transaction); + + //Create a bunch of indexes + for (int i = 0; i < 10; i++) { + Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); + ridIndex.add("foo", event.identifier()); + } + }); + + pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); + pipeline->setAdaptorFactory("event", eventFactory); + + TestResource resource("org.kde.test.instance1", pipeline); + + auto command = createEntityBuffer(); + + QTime time; + time.start(); + + for (int i = 0; i < num; i++) { + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); + } + auto appendTime = time.elapsed(); + + //Wait until all messages have been processed + resource.processAllMessages().exec().waitForFinished(); + + auto allProcessedTime = time.elapsed(); + + std::cout << "Append to messagequeue " << appendTime << " /sec " << num*1000/appendTime << std::endl; + std::cout << "All processed: " << allProcessedTime << " /sec " << num*1000/allProcessedTime << std::endl; + } + void testCreateCommand() { Akonadi2::ApplicationDomain::Event event; diff --git a/tests/storagetest.cpp b/tests/storagetest.cpp index e872c44..6ba4bcd 100644 --- a/tests/storagetest.cpp +++ b/tests/storagetest.cpp @@ -223,7 +223,7 @@ private Q_SLOTS: bool gotError = false; Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, false); auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); - auto db = transaction.openDatabase(); + auto db = transaction.openDatabase("default", nullptr, false); db.write("key","value"); db.write("key","value"); @@ -246,7 +246,7 @@ private Q_SLOTS: bool gotError = false; Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, true); auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); - auto db = transaction.openDatabase(); + auto db = transaction.openDatabase("default", nullptr, true); db.write("key","value1"); db.write("key","value2"); int numValues = db.scan("key", [&](const QByteArray &key, const QByteArray &value) -> bool { -- cgit v1.2.3