From ddb28417ccbcd22e771b7610c1727eac63471609 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 23:47:34 +0100 Subject: Moved facade implementation to cpp file --- common/CMakeLists.txt | 1 + common/facade.cpp | 352 +++++++++++++++++++++++++++++++++++++++++++++++++- common/facade.h | 335 +++-------------------------------------------- 3 files changed, 370 insertions(+), 318 deletions(-) (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index bdb9eac..01056d0 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -12,6 +12,7 @@ else (STORAGE_unqlite) endif (STORAGE_unqlite) set(command_SRCS + modelresult.cpp definitions.cpp log.cpp entitybuffer.cpp diff --git a/common/facade.cpp b/common/facade.cpp index e51b32a..b4931cf 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Christian Mollekopf + * Copyright (C) 2015 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -18,3 +18,353 @@ */ #include "facade.h" + +#include "commands.h" +#include "domainadaptor.h" +#include "log.h" +#include "storage.h" +#include "definitions.h" + +using namespace Akonadi2; + +/** + * A QueryRunner runs a query and updates the corresponding result set. + * + * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), + * and by how long a result set must be updated. If the query is one off the runner dies after the execution, + * otherwise it lives on the react to changes and updates the corresponding result set. + * + * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. + */ +class QueryRunner : public QObject +{ + Q_OBJECT +public: + typedef std::function()> QueryFunction; + + QueryRunner(const Akonadi2::Query &query) {}; + /** + * Starts query + */ + KAsync::Job run(qint64 newRevision = 0) + { + return queryFunction(); + } + + /** + * Set the query to run + */ + void setQuery(const QueryFunction &query) + { + queryFunction = query; + } + +public slots: + /** + * Rerun query with new revision + */ + void revisionChanged(qint64 newRevision) + { + Trace() << "New revision: " << newRevision; + run().exec(); + } + +private: + QueryFunction queryFunction; +}; + +static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + //TODO use a result set with an iterator, to read values on demand + QVector keys; + transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { + //Skip internals + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + keys << Akonadi2::Storage::uidFromKey(key); + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + + Trace() << "Full scan found " << keys.size() << " results"; + return ResultSet(keys); +} + + + +template +GenericFacade::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer resourceAccess) + : Akonadi2::StoreFacade(), + mResourceAccess(resourceAccess), + mDomainTypeAdaptorFactory(adaptorFactory), + mResourceInstanceIdentifier(resourceIdentifier) +{ + if (!mResourceAccess) { + mResourceAccess = QSharedPointer::create(resourceIdentifier); + } +} + +template +GenericFacade::~GenericFacade() +{ +} + +template +QByteArray GenericFacade::bufferTypeForDomainType() +{ + //We happen to have a one to one mapping + return Akonadi2::ApplicationDomain::getTypeName(); +} + +template +KAsync::Job GenericFacade::create(const DomainType &domainObject) +{ + if (!mDomainTypeAdaptorFactory) { + Warning() << "No domain type adaptor factory available"; + return KAsync::error(); + } + flatbuffers::FlatBufferBuilder entityFbb; + mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); + return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); +} + +template +KAsync::Job GenericFacade::modify(const DomainType &domainObject) +{ + if (!mDomainTypeAdaptorFactory) { + Warning() << "No domain type adaptor factory available"; + return KAsync::error(); + } + flatbuffers::FlatBufferBuilder entityFbb; + mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); + return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); +} + +template +KAsync::Job GenericFacade::remove(const DomainType &domainObject) +{ + return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); +} + +template +KAsync::Job GenericFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +{ + //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { + const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + }); + + + //In case of a live query we keep the runner for as long alive as the result provider exists + if (query.liveQuery) { + auto runner = QSharedPointer::create(query); + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { + return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { + const qint64 newRevision = executeIncrementalQuery(query, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + future.setFinished(); + }); + }); + resultProvider.setQueryRunner(runner); + //Ensure the connection is open, if it wasn't already opened + //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates + mResourceAccess->open(); + QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); + } + return KAsync::null(); +} + + //TODO move into result provider? +template +void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) +{ + while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + Trace() << "Got creation"; + resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Modification: + Trace() << "Got modification"; + resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Removal: + Trace() << "Got removal"; + resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + } + return true; + })){}; +} + +template +void GenericFacade::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) +{ + const auto bufferType = bufferTypeForDomainType(); + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + // + // Akonadi2::Storage::getLatest(transaction, bufferTye, key); + transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); +} + +template +ResultSet GenericFacade::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + return resultSet; +} + +template +ResultSet GenericFacade::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + const auto bufferType = bufferTypeForDomainType(); + auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = query.propertyFilter.keys().toSet(); + return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + Trace() << "Finished reading incremental result set:" << *revisionCounter; + //We're done + return QByteArray(); + }); +} + +template +ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) +{ + auto resultSetPtr = QSharedPointer::create(resultSet); + + //Read through the source values and return whatever matches the filter + std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { + while (resultSetPtr->next()) { + //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + //Always remove removals, they probably don't match due to non-available properties + if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { + if (initialQuery) { + //We're not interested in removals during the initial query + if (operation != Akonadi2::Operation_Removal) { + callback(domainObject, Akonadi2::Operation_Creation); + } + } else { + callback(domainObject, operation); + } + } + }); + } + return false; + }; + return ResultSet(generator); +} + + +template +std::function GenericFacade::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) +{ + return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + const auto property = domainObject->getProperty(filterProperty); + if (property.isValid()) { + //TODO implement other comparison operators than equality + if (property != query.propertyFilter.value(filterProperty)) { + Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); + return false; + } + } else { + Warning() << "Ignored property filter because value is invalid: " << filterProperty; + } + } + return true; + }; +} + +template +qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) +{ + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + QSet remainingFilters; + auto resultSet = baseSetRetriever(transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); + replaySet(filteredSet, resultProvider); + resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); + return Akonadi2::Storage::maxRevision(transaction); +} + + +template +qint64 GenericFacade::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +{ + const qint64 baseRevision = resultProvider.revision() + 1; + Trace() << "Running incremental query " << baseRevision; + return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + }, resultProvider); +} + +template +qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) +{ + auto modifiedQuery = query; + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert("parent", parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert("parent", QVariant()); + } + return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); + }, resultProvider); +} + +template class Akonadi2::GenericFacade; +template class Akonadi2::GenericFacade; +template class Akonadi2::GenericFacade; +// template class Akonadi2::GenericFacade; + +#include "facade.moc" diff --git a/common/facade.h b/common/facade.h index 8b8a2a8..aa50941 100644 --- a/common/facade.h +++ b/common/facade.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2014 Christian Mollekopf + * Copyright (C) 2014 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -25,79 +25,8 @@ #include #include "resourceaccess.h" -#include "commands.h" -#include "domainadaptor.h" -#include "log.h" #include "resultset.h" -#include "storage.h" -#include "definitions.h" - -/** - * A QueryRunner runs a query and updates the corresponding result set. - * - * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), - * and by how long a result set must be updated. If the query is one off the runner dies after the execution, - * otherwise it lives on the react to changes and updates the corresponding result set. - * - * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. - */ -class QueryRunner : public QObject -{ - Q_OBJECT -public: - typedef std::function()> QueryFunction; - - QueryRunner(const Akonadi2::Query &query) {}; - /** - * Starts query - */ - KAsync::Job run(qint64 newRevision = 0) - { - return queryFunction(); - } - - /** - * Set the query to run - */ - void setQuery(const QueryFunction &query) - { - queryFunction = query; - } - -public slots: - /** - * Rerun query with new revision - */ - void revisionChanged(qint64 newRevision) - { - Trace() << "New revision: " << newRevision; - run().exec(); - } - -private: - QueryFunction queryFunction; -}; - -static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - //TODO use a result set with an iterator, to read values on demand - QVector keys; - transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - keys << Akonadi2::Storage::uidFromKey(key); - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - - Trace() << "Full scan found " << keys.size() << " results"; - return ResultSet(keys); -} - +#include "domainadaptor.h" namespace Akonadi2 { /** @@ -121,257 +50,29 @@ public: * @param resourceIdentifier is the identifier of the resource instance * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa */ - GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()) - : Akonadi2::StoreFacade(), - mResourceAccess(resourceAccess), - mDomainTypeAdaptorFactory(adaptorFactory), - mResourceInstanceIdentifier(resourceIdentifier) - { - if (!mResourceAccess) { - mResourceAccess = QSharedPointer::create(resourceIdentifier); - } - } - - ~GenericFacade() - { - } - - static QByteArray bufferTypeForDomainType() - { - //We happen to have a one to one mapping - return Akonadi2::ApplicationDomain::getTypeName(); - } - - KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE - { - if (!mDomainTypeAdaptorFactory) { - Warning() << "No domain type adaptor factory available"; - return KAsync::error(); - } - flatbuffers::FlatBufferBuilder entityFbb; - mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); - return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); - } - - KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE - { - if (!mDomainTypeAdaptorFactory) { - Warning() << "No domain type adaptor factory available"; - return KAsync::error(); - } - flatbuffers::FlatBufferBuilder entityFbb; - mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); - return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); - } - - KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE - { - return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); - } - - KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE - { - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { - const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - }); - + GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()); + ~GenericFacade(); - //In case of a live query we keep the runner for as long alive as the result provider exists - if (query.liveQuery) { - auto runner = QSharedPointer::create(query); - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { - return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { - const qint64 newRevision = executeIncrementalQuery(query, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - future.setFinished(); - }); - }); - resultProvider.setQueryRunner(runner); - //Ensure the connection is open, if it wasn't already opened - //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates - mResourceAccess->open(); - QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); - } - return KAsync::null(); - } + static QByteArray bufferTypeForDomainType(); + KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; private: - //TODO move into result provider? - static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) - { - while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - switch (operation) { - case Akonadi2::Operation_Creation: - Trace() << "Got creation"; - resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Modification: - Trace() << "Got modification"; - resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Removal: - Trace() << "Got removal"; - resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - } - return true; - })){}; - } - - void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) - { - const auto bufferType = bufferTypeForDomainType(); - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - // - // Akonadi2::Storage::getLatest(transaction, bufferTye, key); - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - } - - ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) - { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - return resultSet; - } - - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) - { - const auto bufferType = bufferTypeForDomainType(); - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = query.propertyFilter.keys().toSet(); - return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - //We're done - return QByteArray(); - }); - } - - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) - { - auto resultSetPtr = QSharedPointer::create(resultSet); - - //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { - while (resultSetPtr->next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { - //Always remove removals, they probably don't match due to non-available properties - if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { - if (initialQuery) { - //We're not interested in removals during the initial query - if (operation != Akonadi2::Operation_Removal) { - callback(domainObject, Akonadi2::Operation_Creation); - } - } else { - callback(domainObject, operation); - } - } - }); - } - return false; - }; - return ResultSet(generator); - } - - - std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query) - { - return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - const auto property = domainObject->getProperty(filterProperty); - if (property.isValid()) { - //TODO implement other comparison operators than equality - if (property != query.propertyFilter.value(filterProperty)) { - Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); - return false; - } - } else { - Warning() << "Ignored property filter because value is invalid: " << filterProperty; - } - } - return true; - }; - } - - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) - { - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - - QSet remainingFilters; - auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); - replaySet(filteredSet, resultProvider); - resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); - return Akonadi2::Storage::maxRevision(transaction); - } + static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); + void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); - qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) - { - const qint64 baseRevision = resultProvider.revision() + 1; - Trace() << "Running incremental query " << baseRevision; - return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider); - } + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) - { - auto modifiedQuery = query; - if (parent) { - Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert("parent", parent->identifier()); - } else { - Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert("parent", QVariant()); - } - return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider); - } + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery); + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); protected: //TODO use one resource access instance per application & per resource -- cgit v1.2.3