From 35b28af1f449edb1bac0b0bda606c3c06b2fe102 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 15 Oct 2015 13:56:45 +0200 Subject: Moved more entitystorage functionality back to facade. To avoid unnecessary abstraction layers that don't solve a problem, and to allow facades to customize how entities are loaded. --- common/entitystorage.cpp | 118 +++------------------------------------ common/entitystorage.h | 63 ++++++++++----------- common/facade.h | 85 +++++++++++++++++++++++++++- tests/genericfacadebenchmark.cpp | 2 +- tests/genericfacadetest.cpp | 8 +-- 5 files changed, 126 insertions(+), 150 deletions(-) diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index e5346f4..c77e408 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp @@ -19,74 +19,6 @@ #include "entitystorage.h" -static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function callback, const QByteArray &bufferType) -{ - - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - //Extract buffers - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - - //FIXME implement buffer.isValid() - // const auto resourceBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().resource()); - // const auto localBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().local()); - // const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().metadata()); - - // if ((!resourceBuffer && !localBuffer) || !metadataBuffer) { - // qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast(keyValue), keySize); - // return true; - // } - // - //We're cutting the revision off the key - return callback(Akonadi2::Storage::uidFromKey(key), buffer.entity()); - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); -} - -void EntityStorageBase::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) -{ - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - //TODO: resource implementations should be able to customize the retrieval function for non 1:1 entity-buffer mapping cases - scan(transaction, key, [=](const QByteArray &key, const Akonadi2::Entity &entity) { - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - auto operation = metadataBuffer->operation(); - - auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)); - if (operation == Akonadi2::Operation_Removal) { - resultCallback(create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), operation); - } else { - resultCallback(create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), operation); - } - return false; - }, mBufferType); -} - -static ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - //TODO use a result set with an iterator, to read values on demand - QVector keys; - transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - keys << Akonadi2::Storage::uidFromKey(key); - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - - Trace() << "Full scan found " << keys.size() << " results"; - return ResultSet(keys); -} - ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) { auto resultSetPtr = QSharedPointer::create(resultSet); @@ -112,23 +44,9 @@ ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std:: return ResultSet(generator); } -ResultSet EntityStorageBase::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -{ - QSet appliedFilters; - auto resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - return fullScan(transaction, mBufferType); - } - return resultSet; -} ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision) { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); QSet remainingFilters = query.propertyFilter.keys().toSet(); ResultSet resultSet; const bool initialQuery = (baseRevision == 1); @@ -137,35 +55,17 @@ ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2 resultSet = loadInitialResultSet(query, transaction, remainingFilters); } else { //TODO fallback in case the old revision is no longer available to clear + redo complete initial scan - Trace() << "Incremental result set update" << baseRevision << topRevision; - auto revisionCounter = QSharedPointer::create(baseRevision); - resultSet = ResultSet([revisionCounter, topRevision, &transaction, this]() -> QByteArray { - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != mBufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - //We're done - return QByteArray(); - }); + Trace() << "Incremental result set update" << baseRevision; + resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); } - auto filter = [remainingFilters, query, baseRevision, topRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - if (topRevision > 0) { - Trace() << "filtering by revision " << domainObject->revision(); - if (domainObject->revision() < baseRevision || domainObject->revision() > topRevision) { - return false; - } - } + auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + // if (topRevision > 0) { + // Trace() << "filtering by revision " << domainObject->revision(); + // if (domainObject->revision() < baseRevision) { + // return false; + // } + // } for (const auto &filterProperty : remainingFilters) { //TODO implement other comparison operators than equality if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { diff --git a/common/entitystorage.h b/common/entitystorage.h index 6b09cad..8e73083 100644 --- a/common/entitystorage.h +++ b/common/entitystorage.h @@ -32,41 +32,44 @@ /** * Wraps storage, entity adaptor factory and indexes into one. * - * TODO: customize with readEntity instead of adaptor factory */ class EntityStorageBase { +public: + typedef std::function &remainingFilters)> InitialResultLoader; + typedef std::function &remainingFilters)> IncrementalResultLoader; + typedef std::function &resultCallback)> EntityReader; + + /** + * Returns the initial result set that still needs to be filtered. + * + * To make this efficient indexes should be chosen that are as selective as possible. + */ + InitialResultLoader loadInitialResultSet; + /** + * Returns the incremental result set that still needs to be filtered. + */ + IncrementalResultLoader loadIncrementalResultSet; + + /** + * Loads a single entity by uid from storage. + */ + EntityReader readEntity; + protected: - EntityStorageBase(const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory) - : mResourceInstanceIdentifier(instanceIdentifier), - mDomainTypeAdaptorFactory(adaptorFactory) + EntityStorageBase(const QByteArray &instanceIdentifier) + : mResourceInstanceIdentifier(instanceIdentifier) { } - virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr create(const QByteArray &key, qint64 revision, const QSharedPointer &adaptor) = 0; virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; - virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) = 0; - /** - * Loads a single entity by uid from storage. - * - * TODO: Resources should be able to customize this for cases where an entity is not the same as a single buffer. - */ - void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision); -protected: QByteArray mResourceInstanceIdentifier; - QByteArray mBufferType; - DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; + private: - /** - * Returns the initial result set that still needs to be filtered. - * - * To make this efficient indexes should be chosen that are as selective as possible. - */ - ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); ResultSet filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool isInitialQuery); }; @@ -75,28 +78,18 @@ class EntityStorage : public EntityStorageBase { public: - EntityStorage(const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory, const QByteArray &bufferType) - : EntityStorageBase(instanceIdentifier, adaptorFactory) - { - mBufferType = bufferType; - } -protected: - Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr create(const QByteArray &key, qint64 revision, const QSharedPointer &adaptor) Q_DECL_OVERRIDE + EntityStorage(const QByteArray &instanceIdentifier) + : EntityStorageBase(instanceIdentifier) { - return DomainType::Ptr::create(mResourceInstanceIdentifier, key, revision, adaptor); } +protected: Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &object) Q_DECL_OVERRIDE { return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(object); } - ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE - { - return Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, resourceInstanceIdentifier, appliedFilters, transaction); - } - public: virtual qint64 read(const Akonadi2::Query &query, qint64 baseRevision, const QSharedPointer > &resultProvider) @@ -108,7 +101,7 @@ public: auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - Log() << "Querying" << baseRevision; + Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); auto resultSet = getResultSet(query, transaction, baseRevision); while(resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { switch (operation) { diff --git a/common/facade.h b/common/facade.h index 38388c7..8b37579 100644 --- a/common/facade.h +++ b/common/facade.h @@ -82,6 +82,27 @@ private: qint64 mLatestRevision; }; +static ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + //TODO use a result set with an iterator, to read values on demand + QVector keys; + transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { + //Skip internals + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + keys << Akonadi2::Storage::uidFromKey(key); + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + + Trace() << "Full scan found " << keys.size() << " results"; + return ResultSet(keys); +} + + namespace Akonadi2 { /** * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class. @@ -107,13 +128,75 @@ public: GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer > storage = QSharedPointer >(), const QSharedPointer resourceAccess = QSharedPointer()) : Akonadi2::StoreFacade(), mResourceAccess(resourceAccess), - mStorage(storage ? storage : QSharedPointer >::create(resourceIdentifier, adaptorFactory, bufferTypeForDomainType())), + mStorage(storage), mDomainTypeAdaptorFactory(adaptorFactory), mResourceInstanceIdentifier(resourceIdentifier) { if (!mResourceAccess) { mResourceAccess = QSharedPointer::create(resourceIdentifier); } + if (!mStorage) { + mStorage = QSharedPointer >::create(resourceIdentifier); + const auto bufferType = bufferTypeForDomainType(); + + mStorage->readEntity = [bufferType, this] (const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) + { + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + }; + + mStorage->loadInitialResultSet = [bufferType, this] (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet + { + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + return fullScan(transaction, bufferType); + } + return resultSet; + }; + + mStorage->loadIncrementalResultSet = [bufferType, this] (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet + { + auto revisionCounter = QSharedPointer::create(baseRevision); + return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + //We're done + return QByteArray(); + }); + }; + } } ~GenericFacade() diff --git a/tests/genericfacadebenchmark.cpp b/tests/genericfacadebenchmark.cpp index 3e98fce..10aabd4 100644 --- a/tests/genericfacadebenchmark.cpp +++ b/tests/genericfacadebenchmark.cpp @@ -53,7 +53,7 @@ private Q_SLOTS: QBENCHMARK { auto resultSet = QSharedPointer >::create(); auto resourceAccess = QSharedPointer::create(); - auto storage = QSharedPointer >::create("identifier", domainTypeAdaptorFactory, "bufferType"); + auto storage = QSharedPointer >::create("identifier"); TestResourceFacade facade(identifier, storage, resourceAccess); async::SyncListResult result(resultSet->emitter()); diff --git a/tests/genericfacadetest.cpp b/tests/genericfacadetest.cpp index ae6a685..183a62a 100644 --- a/tests/genericfacadetest.cpp +++ b/tests/genericfacadetest.cpp @@ -29,7 +29,7 @@ private Q_SLOTS: query.liveQuery = false; auto resultSet = QSharedPointer >::create(); - auto storage = QSharedPointer::create("identifier", QSharedPointer::create(), "bufferType"); + auto storage = QSharedPointer::create("identifier"); auto resourceAccess = QSharedPointer::create(); storage->mResults << Akonadi2::ApplicationDomain::Event::Ptr::create(); TestResourceFacade facade("identifier", storage, resourceAccess); @@ -51,7 +51,7 @@ private Q_SLOTS: query.liveQuery = true; auto resultSet = QSharedPointer >::create(); - auto storage = QSharedPointer::create("identifier", QSharedPointer::create(), "bufferType"); + auto storage = QSharedPointer::create("identifier"); auto resourceAccess = QSharedPointer::create(); storage->mResults << Akonadi2::ApplicationDomain::Event::Ptr::create(); TestResourceFacade facade("identifier", storage, resourceAccess); @@ -83,7 +83,7 @@ private Q_SLOTS: query.liveQuery = true; auto resultSet = QSharedPointer >::create(); - auto storage = QSharedPointer::create("identifier", QSharedPointer::create(), "bufferType"); + auto storage = QSharedPointer::create("identifier"); auto resourceAccess = QSharedPointer::create(); auto entity = QSharedPointer::create("resource", "id2", 0, QSharedPointer::create()); entity->setProperty("test", "test1"); @@ -120,7 +120,7 @@ private Q_SLOTS: query.liveQuery = true; auto resultSet = QSharedPointer >::create(); - auto storage = QSharedPointer::create("identifier", QSharedPointer::create(), "bufferType"); + auto storage = QSharedPointer::create("identifier"); auto resourceAccess = QSharedPointer::create(); auto entity = QSharedPointer::create("resource", "id2", 0, QSharedPointer()); storage->mResults << entity; -- cgit v1.2.3