From 1acf9f3c486813df807ff6931e56cc13eb26eeaf Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 23 Aug 2015 18:56:43 +0200 Subject: Store indexes as named databases in the same db. Because we also keep using the same transactions this finally makes the resource somewhat performant. On my system genericresourcebenchmark now processes ~4200 messages per second instead of ~280. --- common/domain/event.cpp | 8 ++++---- common/domain/event.h | 6 ++++-- common/entitystorage.cpp | 4 ++-- common/entitystorage.h | 8 ++++---- common/index.cpp | 17 ++++++++++------- common/index.h | 4 +++- common/pipeline.cpp | 14 ++------------ common/pipeline.h | 19 +++++++++++++------ 8 files changed, 42 insertions(+), 38 deletions(-) (limited to 'common') diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 15f5d11..e107441 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -34,11 +34,11 @@ using namespace Akonadi2::ApplicationDomain; -ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters) +ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) { QVector keys; if (query.propertyFilter.contains("uid")) { - Index uidIndex(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly); + Index uidIndex("index.uid", transaction); uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) { keys << value; }, @@ -50,11 +50,11 @@ ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, return ResultSet(keys); } -void TypeImplementation::index(const Event &type) +void TypeImplementation::index(const Event &type, Akonadi2::Storage::Transaction &transaction) { - Index uidIndex(Akonadi2::storageLocation(), type.resourceInstanceIdentifier() + ".index.uid", Akonadi2::Storage::ReadWrite); const auto uid = type.getProperty("uid"); if (uid.isValid()) { + Index uidIndex("index.uid", transaction); uidIndex.add(uid.toByteArray(), type.identifier()); } } diff --git a/common/domain/event.h b/common/domain/event.h index 13cfc6e..f21cd34 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -20,6 +20,8 @@ #include "applicationdomaintype.h" +#include "storage.h" + class ResultSet; class QByteArray; @@ -53,8 +55,8 @@ public: * * An empty result set indicates that a full scan is required. */ - static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters); - static void index(const Event &type); + static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction); + static void index(const Event &type, Akonadi2::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index 420c3b0..22fd9e6 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp @@ -91,10 +91,10 @@ ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std:: return ResultSet(generator); } -ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) +ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) { QSet appliedFilters; - ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters); + ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); const auto remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; //We do a full scan if there were no indexes available to create the initial set. diff --git a/common/entitystorage.h b/common/entitystorage.h index 2fce880..8256938 100644 --- a/common/entitystorage.h +++ b/common/entitystorage.h @@ -44,11 +44,11 @@ protected: virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr create(const QByteArray &key, qint64 revision, const QSharedPointer &adaptor) = 0; virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; - virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters) = 0; + virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) = 0; void readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); ResultSet filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); - ResultSet getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); + ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); protected: QByteArray mResourceInstanceIdentifier; @@ -77,9 +77,9 @@ protected: return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(object); } - ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters) Q_DECL_OVERRIDE + ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE { - return Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, resourceInstanceIdentifier, appliedFilters); + return Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, resourceInstanceIdentifier, appliedFilters, transaction); } public: diff --git a/common/index.cpp b/common/index.cpp index 7e3c09e..75ffe3f 100644 --- a/common/index.cpp +++ b/common/index.cpp @@ -2,24 +2,27 @@ #include Index::Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode) - : mStorage(storageRoot, name, mode, true) + : mTransaction(Akonadi2::Storage(storageRoot, name, mode, true).createTransaction(mode)), + mDb(mTransaction.openDatabase(name.toLatin1(), std::function(), true)) +{ + +} + +Index::Index(const QByteArray &name, Akonadi2::Storage::Transaction &transaction) + : mDb(transaction.openDatabase(name, std::function(), true)) { } void Index::add(const QByteArray &key, const QByteArray &value) { - mStorage.createTransaction(Akonadi2::Storage::ReadWrite).write(key, value); + mDb.write(key, value); } void Index::lookup(const QByteArray &key, const std::function &resultHandler, const std::function &errorHandler) { - if (!mStorage.exists()) { - errorHandler(Error("index", IndexNotAvailable, "Index not existing")); - return; - } - mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool { + mDb.scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool { resultHandler(value); return true; }, diff --git a/common/index.h b/common/index.h index 08b499f..e1b7e3a 100644 --- a/common/index.h +++ b/common/index.h @@ -26,6 +26,7 @@ public: }; Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode = Akonadi2::Storage::ReadOnly); + Index(const QByteArray &name, Akonadi2::Storage::Transaction &); void add(const QByteArray &key, const QByteArray &value); // void remove(const QByteArray &key, const QByteArray &value); @@ -35,5 +36,6 @@ public: private: Q_DISABLE_COPY(Index); - Akonadi2::Storage mStorage; + Akonadi2::Storage::Transaction mTransaction; + Akonadi2::Storage::NamedDatabase mDb; }; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ce4ad41..c5e36ee 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -441,18 +441,8 @@ void PipelineState::step() d->idle = false; if (d->filterIt.hasNext()) { //TODO skip step if already processed - //FIXME error handling if no result is found auto preprocessor = d->filterIt.next(); - //FIXME this read should not be necessary - //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant) - d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { - auto entity = Akonadi2::GetEntity(value); - preprocessor->process(*this, *entity); - return false; - }, [this](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Failed to find value in pipeline: " << error.message; - d->pipeline->pipelineCompleted(*this); - }); + preprocessor->process(*this, d->pipeline->transaction()); } else { //This object becomes invalid after this call d->pipeline->pipelineCompleted(*this); @@ -483,7 +473,7 @@ Preprocessor::~Preprocessor() { } -void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &) +void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) { processingCompleted(state); } diff --git a/common/pipeline.h b/common/pipeline.h index 496e037..fee6a5e 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -115,7 +115,7 @@ public: virtual ~Preprocessor(); //TODO pass actual command as well, for changerecording - virtual void process(const PipelineState &state, const Akonadi2::Entity &); + virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); //TODO to record progress virtual QString id() const; @@ -133,17 +133,24 @@ private: class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor { public: - SimpleProcessor(const QString &id, const std::function &f) + SimpleProcessor(const QString &id, const std::function &f) : Akonadi2::Preprocessor(), mFunction(f), mId(id) { } - void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE + void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE { - mFunction(state, e); - processingCompleted(state); + transaction.scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { + auto entity = Akonadi2::GetEntity(value); + mFunction(state, *entity, transaction); + processingCompleted(state); + return false; + }, [this, state](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + processingCompleted(state); + }); } QString id() const @@ -152,7 +159,7 @@ public: } protected: - std::function mFunction; + std::function mFunction; QString mId; }; -- cgit v1.2.3