From ddb28417ccbcd22e771b7610c1727eac63471609 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 23:47:34 +0100 Subject: Moved facade implementation to cpp file --- common/facade.cpp | 352 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 351 insertions(+), 1 deletion(-) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index e51b32a..b4931cf 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Christian Mollekopf + * Copyright (C) 2015 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -18,3 +18,353 @@ */ #include "facade.h" + +#include "commands.h" +#include "domainadaptor.h" +#include "log.h" +#include "storage.h" +#include "definitions.h" + +using namespace Akonadi2; + +/** + * A QueryRunner runs a query and updates the corresponding result set. + * + * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), + * and by how long a result set must be updated. If the query is one off the runner dies after the execution, + * otherwise it lives on the react to changes and updates the corresponding result set. + * + * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. + */ +class QueryRunner : public QObject +{ + Q_OBJECT +public: + typedef std::function()> QueryFunction; + + QueryRunner(const Akonadi2::Query &query) {}; + /** + * Starts query + */ + KAsync::Job run(qint64 newRevision = 0) + { + return queryFunction(); + } + + /** + * Set the query to run + */ + void setQuery(const QueryFunction &query) + { + queryFunction = query; + } + +public slots: + /** + * Rerun query with new revision + */ + void revisionChanged(qint64 newRevision) + { + Trace() << "New revision: " << newRevision; + run().exec(); + } + +private: + QueryFunction queryFunction; +}; + +static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + //TODO use a result set with an iterator, to read values on demand + QVector keys; + transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { + //Skip internals + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + keys << Akonadi2::Storage::uidFromKey(key); + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + + Trace() << "Full scan found " << keys.size() << " results"; + return ResultSet(keys); +} + + + +template +GenericFacade::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer resourceAccess) + : Akonadi2::StoreFacade(), + mResourceAccess(resourceAccess), + mDomainTypeAdaptorFactory(adaptorFactory), + mResourceInstanceIdentifier(resourceIdentifier) +{ + if (!mResourceAccess) { + mResourceAccess = QSharedPointer::create(resourceIdentifier); + } +} + +template +GenericFacade::~GenericFacade() +{ +} + +template +QByteArray GenericFacade::bufferTypeForDomainType() +{ + //We happen to have a one to one mapping + return Akonadi2::ApplicationDomain::getTypeName(); +} + +template +KAsync::Job GenericFacade::create(const DomainType &domainObject) +{ + if (!mDomainTypeAdaptorFactory) { + Warning() << "No domain type adaptor factory available"; + return KAsync::error(); + } + flatbuffers::FlatBufferBuilder entityFbb; + mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); + return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); +} + +template +KAsync::Job GenericFacade::modify(const DomainType &domainObject) +{ + if (!mDomainTypeAdaptorFactory) { + Warning() << "No domain type adaptor factory available"; + return KAsync::error(); + } + flatbuffers::FlatBufferBuilder entityFbb; + mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); + return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); +} + +template +KAsync::Job GenericFacade::remove(const DomainType &domainObject) +{ + return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); +} + +template +KAsync::Job GenericFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +{ + //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { + const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + }); + + + //In case of a live query we keep the runner for as long alive as the result provider exists + if (query.liveQuery) { + auto runner = QSharedPointer::create(query); + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { + return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { + const qint64 newRevision = executeIncrementalQuery(query, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + future.setFinished(); + }); + }); + resultProvider.setQueryRunner(runner); + //Ensure the connection is open, if it wasn't already opened + //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates + mResourceAccess->open(); + QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); + } + return KAsync::null(); +} + + //TODO move into result provider? +template +void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) +{ + while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + Trace() << "Got creation"; + resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Modification: + Trace() << "Got modification"; + resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Removal: + Trace() << "Got removal"; + resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + } + return true; + })){}; +} + +template +void GenericFacade::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) +{ + const auto bufferType = bufferTypeForDomainType(); + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + // + // Akonadi2::Storage::getLatest(transaction, bufferTye, key); + transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); +} + +template +ResultSet GenericFacade::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + return resultSet; +} + +template +ResultSet GenericFacade::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + const auto bufferType = bufferTypeForDomainType(); + auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = query.propertyFilter.keys().toSet(); + return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + Trace() << "Finished reading incremental result set:" << *revisionCounter; + //We're done + return QByteArray(); + }); +} + +template +ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) +{ + auto resultSetPtr = QSharedPointer::create(resultSet); + + //Read through the source values and return whatever matches the filter + std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { + while (resultSetPtr->next()) { + //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + //Always remove removals, they probably don't match due to non-available properties + if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { + if (initialQuery) { + //We're not interested in removals during the initial query + if (operation != Akonadi2::Operation_Removal) { + callback(domainObject, Akonadi2::Operation_Creation); + } + } else { + callback(domainObject, operation); + } + } + }); + } + return false; + }; + return ResultSet(generator); +} + + +template +std::function GenericFacade::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) +{ + return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + const auto property = domainObject->getProperty(filterProperty); + if (property.isValid()) { + //TODO implement other comparison operators than equality + if (property != query.propertyFilter.value(filterProperty)) { + Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); + return false; + } + } else { + Warning() << "Ignored property filter because value is invalid: " << filterProperty; + } + } + return true; + }; +} + +template +qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) +{ + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + QSet remainingFilters; + auto resultSet = baseSetRetriever(transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); + replaySet(filteredSet, resultProvider); + resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); + return Akonadi2::Storage::maxRevision(transaction); +} + + +template +qint64 GenericFacade::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +{ + const qint64 baseRevision = resultProvider.revision() + 1; + Trace() << "Running incremental query " << baseRevision; + return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + }, resultProvider); +} + +template +qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) +{ + auto modifiedQuery = query; + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert("parent", parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert("parent", QVariant()); + } + return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); + }, resultProvider); +} + +template class Akonadi2::GenericFacade; +template class Akonadi2::GenericFacade; +template class Akonadi2::GenericFacade; +// template class Akonadi2::GenericFacade; + +#include "facade.moc" -- cgit v1.2.3 From 0b967e06a1a50c1f540b941d381680cdf3ac4706 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 21 Nov 2015 02:29:35 +0100 Subject: Fixed build --- common/facade.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index b4931cf..f534319 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -20,7 +20,6 @@ #include "facade.h" #include "commands.h" -#include "domainadaptor.h" #include "log.h" #include "storage.h" #include "definitions.h" -- cgit v1.2.3 From 110817a23463c71eacbc986af3ae509462758a3c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 21 Nov 2015 11:07:47 +0100 Subject: Separated DomainTypeAdaptorFactoryInterface --- common/facade.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index f534319..08f7500 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -23,6 +23,7 @@ #include "log.h" #include "storage.h" #include "definitions.h" +#include "domainadaptor.h" using namespace Akonadi2; -- cgit v1.2.3 From 9ad96df6cd1526de32bff2b4f98491dd8318f760 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 24 Nov 2015 23:00:45 +0100 Subject: Use Query::parentProperty to express tree queries That way we don't have to hardcode the parent property, and we can use the property to express non-tree queries as well. --- common/facade.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index 08f7500..59972bf 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -350,12 +350,12 @@ template qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) { auto modifiedQuery = query; - if (parent) { + if (parent && !query.parentProperty.isEmpty()) { Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert("parent", parent->identifier()); + modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); } else { Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert("parent", QVariant()); + modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); } return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); @@ -365,6 +365,5 @@ qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &que template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; -// template class Akonadi2::GenericFacade; #include "facade.moc" -- cgit v1.2.3 From 00e6b843e9f2881faccb312594a0e91c42df0096 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 25 Nov 2015 09:22:03 +0100 Subject: Less noise --- common/facade.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index 59972bf..850d28b 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -186,15 +186,15 @@ void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::Result while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { switch (operation) { case Akonadi2::Operation_Creation: - Trace() << "Got creation"; + // Trace() << "Got creation"; resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Modification: - Trace() << "Got modification"; + // Trace() << "Got modification"; resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Removal: - Trace() << "Got removal"; + // Trace() << "Got removal"; resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; } -- cgit v1.2.3 From a4acb7e251cba5ba6d66bf6235736202255c4eac Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 25 Nov 2015 09:37:59 +0100 Subject: Only use the parent index when it's available --- common/facade.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index 850d28b..68770b5 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -350,12 +350,14 @@ template qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) { auto modifiedQuery = query; - if (parent && !query.parentProperty.isEmpty()) { - Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); - } else { - Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); + if (!query.parentProperty.isEmpty()) { + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); + } } return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); -- cgit v1.2.3 From 89aa339dd91765d67b4606938e60358f41d33884 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 25 Nov 2015 15:40:41 +0100 Subject: Fixed modifications. Without this modifications are ignored also in incremental queries. --- common/facade.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index 68770b5..2806f4d 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -329,7 +329,7 @@ qint64 GenericFacade::load(const Akonadi2::Query &query, const std:: QSet remainingFilters; auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, initialQuery); replaySet(filteredSet, resultProvider); resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); return Akonadi2::Storage::maxRevision(transaction); @@ -343,7 +343,7 @@ qint64 GenericFacade::executeIncrementalQuery(const Akonadi2::Query Trace() << "Running incremental query " << baseRevision; return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider); + }, resultProvider, false); } template @@ -361,7 +361,7 @@ qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &que } return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider); + }, resultProvider, true); } template class Akonadi2::GenericFacade; -- cgit v1.2.3 From 27164870a7a664daaca4ab6d3e3893a91d4eab5a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 26 Nov 2015 14:28:34 +0100 Subject: Avoid repeatedly opening the name db. Although, the benchmarks say it doesn't really have an impact on performance. --- common/facade.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index 2806f4d..92124fc 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -203,16 +203,15 @@ void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::Result } template -void GenericFacade::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) +void GenericFacade::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) { - const auto bufferType = bufferTypeForDomainType(); //This only works for a 1:1 mapping of resource to domain types. //Not i.e. for tags that are stored as flags in each entity of an imap store. //additional properties that don't have a 1:1 mapping (such as separately stored tags), //could be added to the adaptor. // // Akonadi2::Storage::getLatest(transaction, bufferTye, key); - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { Akonadi2::EntityBuffer buffer(value.data(), value.size()); const Akonadi2::Entity &entity = buffer.entity(); const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); @@ -270,15 +269,15 @@ ResultSet GenericFacade::loadIncrementalResultSet(qint64 baseRevisio } template -ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) +ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) { auto resultSetPtr = QSharedPointer::create(resultSet); //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { + std::function)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { while (resultSetPtr->next()) { //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { //Always remove removals, they probably don't match due to non-available properties if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { if (initialQuery) { @@ -319,17 +318,18 @@ std::function -qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) +qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) { Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main"); QSet remainingFilters; auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, initialQuery); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); replaySet(filteredSet, resultProvider); resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); return Akonadi2::Storage::maxRevision(transaction); -- cgit v1.2.3 From 5b41b26a349967acf2197f9f9228526193fd826e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 27 Nov 2015 17:30:04 +0100 Subject: Introduced a QueryRunner object The QueryRunner object lives for the duration of the query (so just for the initial query for non-live queries, and for the lifetime of the result model for live queries). It's supposed to handle all the threading internally and decouple the lifetime of the facade. --- common/facade.cpp | 282 +----------------------------------------------------- 1 file changed, 5 insertions(+), 277 deletions(-) (limited to 'common/facade.cpp') diff --git a/common/facade.cpp b/common/facade.cpp index 92124fc..1d6b9a7 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -24,76 +24,10 @@ #include "storage.h" #include "definitions.h" #include "domainadaptor.h" +#include "queryrunner.h" using namespace Akonadi2; -/** - * A QueryRunner runs a query and updates the corresponding result set. - * - * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), - * and by how long a result set must be updated. If the query is one off the runner dies after the execution, - * otherwise it lives on the react to changes and updates the corresponding result set. - * - * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. - */ -class QueryRunner : public QObject -{ - Q_OBJECT -public: - typedef std::function()> QueryFunction; - - QueryRunner(const Akonadi2::Query &query) {}; - /** - * Starts query - */ - KAsync::Job run(qint64 newRevision = 0) - { - return queryFunction(); - } - - /** - * Set the query to run - */ - void setQuery(const QueryFunction &query) - { - queryFunction = query; - } - -public slots: - /** - * Rerun query with new revision - */ - void revisionChanged(qint64 newRevision) - { - Trace() << "New revision: " << newRevision; - run().exec(); - } - -private: - QueryFunction queryFunction; -}; - -static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - //TODO use a result set with an iterator, to read values on demand - QVector keys; - transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - keys << Akonadi2::Storage::uidFromKey(key); - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - - Trace() << "Full scan found " << keys.size() << " results"; - return ResultSet(keys); -} - - template GenericFacade::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer resourceAccess) @@ -150,220 +84,14 @@ KAsync::Job GenericFacade::remove(const DomainType &domainObje } template -KAsync::Job GenericFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) -{ - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { - const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - }); - - - //In case of a live query we keep the runner for as long alive as the result provider exists - if (query.liveQuery) { - auto runner = QSharedPointer::create(query); - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { - return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { - const qint64 newRevision = executeIncrementalQuery(query, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - future.setFinished(); - }); - }); - resultProvider.setQueryRunner(runner); - //Ensure the connection is open, if it wasn't already opened - //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates - mResourceAccess->open(); - QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); - } - return KAsync::null(); -} - - //TODO move into result provider? -template -void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) -{ - while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - switch (operation) { - case Akonadi2::Operation_Creation: - // Trace() << "Got creation"; - resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Modification: - // Trace() << "Got modification"; - resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Removal: - // Trace() << "Got removal"; - resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - } - return true; - })){}; -} - -template -void GenericFacade::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) -{ - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - // - // Akonadi2::Storage::getLatest(transaction, bufferTye, key); - db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); -} - -template -ResultSet GenericFacade::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +QPair, typename ResultEmitter::Ptr> GenericFacade::load(const Akonadi2::Query &query) { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - return resultSet; -} - -template -ResultSet GenericFacade::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -{ - const auto bufferType = bufferTypeForDomainType(); - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = query.propertyFilter.keys().toSet(); - return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - Trace() << "Finished reading incremental result set:" << *revisionCounter; - //We're done - return QByteArray(); - }); -} - -template -ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) -{ - auto resultSetPtr = QSharedPointer::create(resultSet); - - //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { - while (resultSetPtr->next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { - //Always remove removals, they probably don't match due to non-available properties - if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { - if (initialQuery) { - //We're not interested in removals during the initial query - if (operation != Akonadi2::Operation_Removal) { - callback(domainObject, Akonadi2::Operation_Creation); - } - } else { - callback(domainObject, operation); - } - } - }); - } - return false; - }; - return ResultSet(generator); -} - - -template -std::function GenericFacade::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) -{ - return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - const auto property = domainObject->getProperty(filterProperty); - if (property.isValid()) { - //TODO implement other comparison operators than equality - if (property != query.propertyFilter.value(filterProperty)) { - Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); - return false; - } - } else { - Warning() << "Ignored property filter because value is invalid: " << filterProperty; - } - } - return true; - }; -} - -template -qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) -{ - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main"); - - QSet remainingFilters; - auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); - replaySet(filteredSet, resultProvider); - resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); - return Akonadi2::Storage::maxRevision(transaction); + //The runner lives for the lifetime of the query + auto runner = new QueryRunner(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); + return qMakePair(KAsync::null(), runner->emitter()); } -template -qint64 GenericFacade::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) -{ - const qint64 baseRevision = resultProvider.revision() + 1; - Trace() << "Running incremental query " << baseRevision; - return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider, false); -} - -template -qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) -{ - auto modifiedQuery = query; - if (!query.parentProperty.isEmpty()) { - if (parent) { - Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); - } else { - Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); - } - } - return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider, true); -} - template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; -- cgit v1.2.3