From 5b41b26a349967acf2197f9f9228526193fd826e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 27 Nov 2015 17:30:04 +0100 Subject: Introduced a QueryRunner object The QueryRunner object lives for the duration of the query (so just for the initial query for non-live queries, and for the lifetime of the result model for live queries). It's supposed to handle all the threading internally and decouple the lifetime of the facade. --- common/CMakeLists.txt | 1 + common/clientapi.cpp | 62 +++------- common/facade.cpp | 282 +------------------------------------------- common/facade.h | 17 +-- common/facadeinterface.h | 29 ++++- common/modelresult.cpp | 22 ++++ common/modelresult.h | 6 +- common/queryrunner.cpp | 292 ++++++++++++++++++++++++++++++++++++++++++++++ common/queryrunner.h | 107 +++++++++++++++++ common/resourceaccess.h | 2 + common/resourcefacade.cpp | 17 ++- common/resourcefacade.h | 3 +- common/resultprovider.h | 150 +++++------------------- common/threadboundary.cpp | 5 +- 14 files changed, 522 insertions(+), 473 deletions(-) create mode 100644 common/queryrunner.cpp create mode 100644 common/queryrunner.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 01056d0..be312b9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -26,6 +26,7 @@ set(command_SRCS resource.cpp genericresource.cpp resourceaccess.cpp + queryrunner.cpp listener.cpp storage_common.cpp threadboundary.cpp diff --git a/common/clientapi.cpp b/common/clientapi.cpp index 02f8ce6..b24dfa8 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp @@ -34,6 +34,7 @@ #include "definitions.h" #include "resourceconfig.h" #include "facadefactory.h" +#include "modelresult.h" #include "log.h" #define ASYNCINTHREAD @@ -100,38 +101,8 @@ template QSharedPointer > Store::load(Query query) { auto resultSet = QSharedPointer >::create(); - - //Execute the search in a thread. - //We must guarantee that the emitter is returned before the first result is emitted. - //The result provider must be threadsafe. - async::run([query, resultSet](){ - QEventLoop eventLoop; - resultSet->onDone([&eventLoop](){ - eventLoop.quit(); - }); - // Query all resources and aggregate results - KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) - .template each([query, resultSet](const QByteArray &resource, KAsync::Future &future) { - if (auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource)) { - facade->load(query, *resultSet).template then([&future](){future.setFinished();}).exec(); - //Keep the facade alive for the lifetime of the resultSet. - resultSet->setFacade(facade); - } else { - //Ignore the error and carry on - future.setFinished(); - } - }).template then([query, resultSet]() { - resultSet->initialResultSetComplete(); - if (!query.liveQuery) { - resultSet->complete(); - } - }).exec(); - - //Keep the thread alive until the result is ready - if (!resultSet->isDone()) { - eventLoop.exec(); - } - }); + qWarning() << "Main thread " << QThread::currentThreadId(); + //FIXME remove return resultSet->emitter(); } @@ -139,28 +110,29 @@ template QSharedPointer Store::loadModel(Query query) { auto model = QSharedPointer >::create(query, query.requestedProperties.toList()); - auto resultProvider = std::make_shared >(model); - //Keep the resultprovider alive for as long as the model lives - model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr(resultProvider))); + + //* Client defines lifetime of model + //* The model lifetime defines the duration of live-queries + //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks + //* The emitter needs to live or the duration of query (respectively, the model) + //* The result provider needs to live for as long as results are provided (until the last thread exits). // Query all resources and aggregate results KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) - .template each([query, resultProvider](const QByteArray &resource, KAsync::Future &future) { + .template each([query, model](const QByteArray &resource, KAsync::Future &future) { auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); if (facade) { - facade->load(query, *resultProvider).template then([&future](){future.setFinished();}).exec(); - //Keep the facade alive for the lifetime of the resultSet. - //FIXME this would have to become a list - resultProvider->setFacade(facade); + Trace() << "Trying to fetch from resource"; + auto result = facade->load(query); + auto emitter = result.second; + //TODO use aggregating emitter instead + model->setEmitter(emitter); + model->fetchMore(QModelIndex()); + result.first.template then([&future](){future.setFinished();}).exec(); } else { //Ignore the error and carry on future.setFinished(); } - }).template then([query, resultProvider]() { - resultProvider->initialResultSetComplete(); - if (!query.liveQuery) { - resultProvider->complete(); - } }).exec(); return model; diff --git a/common/facade.cpp b/common/facade.cpp index 92124fc..1d6b9a7 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -24,76 +24,10 @@ #include "storage.h" #include "definitions.h" #include "domainadaptor.h" +#include "queryrunner.h" using namespace Akonadi2; -/** - * A QueryRunner runs a query and updates the corresponding result set. - * - * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), - * and by how long a result set must be updated. If the query is one off the runner dies after the execution, - * otherwise it lives on the react to changes and updates the corresponding result set. - * - * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. - */ -class QueryRunner : public QObject -{ - Q_OBJECT -public: - typedef std::function()> QueryFunction; - - QueryRunner(const Akonadi2::Query &query) {}; - /** - * Starts query - */ - KAsync::Job run(qint64 newRevision = 0) - { - return queryFunction(); - } - - /** - * Set the query to run - */ - void setQuery(const QueryFunction &query) - { - queryFunction = query; - } - -public slots: - /** - * Rerun query with new revision - */ - void revisionChanged(qint64 newRevision) - { - Trace() << "New revision: " << newRevision; - run().exec(); - } - -private: - QueryFunction queryFunction; -}; - -static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - //TODO use a result set with an iterator, to read values on demand - QVector keys; - transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - keys << Akonadi2::Storage::uidFromKey(key); - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - - Trace() << "Full scan found " << keys.size() << " results"; - return ResultSet(keys); -} - - template GenericFacade::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer resourceAccess) @@ -150,220 +84,14 @@ KAsync::Job GenericFacade::remove(const DomainType &domainObje } template -KAsync::Job GenericFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) -{ - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { - const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - }); - - - //In case of a live query we keep the runner for as long alive as the result provider exists - if (query.liveQuery) { - auto runner = QSharedPointer::create(query); - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { - return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { - const qint64 newRevision = executeIncrementalQuery(query, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - future.setFinished(); - }); - }); - resultProvider.setQueryRunner(runner); - //Ensure the connection is open, if it wasn't already opened - //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates - mResourceAccess->open(); - QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); - } - return KAsync::null(); -} - - //TODO move into result provider? -template -void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) -{ - while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - switch (operation) { - case Akonadi2::Operation_Creation: - // Trace() << "Got creation"; - resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Modification: - // Trace() << "Got modification"; - resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Removal: - // Trace() << "Got removal"; - resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - } - return true; - })){}; -} - -template -void GenericFacade::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) -{ - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - // - // Akonadi2::Storage::getLatest(transaction, bufferTye, key); - db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); -} - -template -ResultSet GenericFacade::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +QPair, typename ResultEmitter::Ptr> GenericFacade::load(const Akonadi2::Query &query) { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - return resultSet; -} - -template -ResultSet GenericFacade::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -{ - const auto bufferType = bufferTypeForDomainType(); - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = query.propertyFilter.keys().toSet(); - return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - Trace() << "Finished reading incremental result set:" << *revisionCounter; - //We're done - return QByteArray(); - }); -} - -template -ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) -{ - auto resultSetPtr = QSharedPointer::create(resultSet); - - //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { - while (resultSetPtr->next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { - //Always remove removals, they probably don't match due to non-available properties - if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { - if (initialQuery) { - //We're not interested in removals during the initial query - if (operation != Akonadi2::Operation_Removal) { - callback(domainObject, Akonadi2::Operation_Creation); - } - } else { - callback(domainObject, operation); - } - } - }); - } - return false; - }; - return ResultSet(generator); -} - - -template -std::function GenericFacade::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) -{ - return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - const auto property = domainObject->getProperty(filterProperty); - if (property.isValid()) { - //TODO implement other comparison operators than equality - if (property != query.propertyFilter.value(filterProperty)) { - Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); - return false; - } - } else { - Warning() << "Ignored property filter because value is invalid: " << filterProperty; - } - } - return true; - }; -} - -template -qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) -{ - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main"); - - QSet remainingFilters; - auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); - replaySet(filteredSet, resultProvider); - resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); - return Akonadi2::Storage::maxRevision(transaction); + //The runner lives for the lifetime of the query + auto runner = new QueryRunner(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); + return qMakePair(KAsync::null(), runner->emitter()); } -template -qint64 GenericFacade::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) -{ - const qint64 baseRevision = resultProvider.revision() + 1; - Trace() << "Running incremental query " << baseRevision; - return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider, false); -} - -template -qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) -{ - auto modifiedQuery = query; - if (!query.parentProperty.isEmpty()) { - if (parent) { - Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); - } else { - Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); - } - } - return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider, true); -} - template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; diff --git a/common/facade.h b/common/facade.h index d8b878b..de67e05 100644 --- a/common/facade.h +++ b/common/facade.h @@ -59,22 +59,7 @@ public: KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE; KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE; KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE; - KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; - -private: - //TODO move into result provider? - static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); - - void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); - - ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); - std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); - qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); - qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); + QPair, typename ResultEmitter::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; protected: //TODO use one resource access instance per application & per resource diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 7ec21bc..318abf3 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "applicationdomaintype.h" #include "resultprovider.h" @@ -42,10 +43,32 @@ class StoreFacade { public: virtual ~StoreFacade(){}; QByteArray type() const { return ApplicationDomain::getTypeName(); } + + /** + * Create an entity in the store. + * + * The job returns succefully once the task has been successfully placed in the queue + */ virtual KAsync::Job create(const DomainType &domainObject) = 0; + + /** + * Modify an entity in the store. + * + * The job returns succefully once the task has been successfully placed in the queue + */ virtual KAsync::Job modify(const DomainType &domainObject) = 0; + + /** + * Remove an entity from the store. + * + * The job returns succefully once the task has been successfully placed in the queue + */ virtual KAsync::Job remove(const DomainType &domainObject) = 0; - virtual KAsync::Job load(const Query &query, Akonadi2::ResultProviderInterface &resultProvider) = 0; + + /** + * Load entities from the store. + */ + virtual QPair, typename Akonadi2::ResultEmitter::Ptr > load(const Query &query) = 0; }; template @@ -67,9 +90,9 @@ public: return KAsync::error(-1, "Failed to create a facade"); } - KAsync::Job load(const Query &query, Akonadi2::ResultProviderInterface &resultProvider) + QPair, typename Akonadi2::ResultEmitter::Ptr > load(const Query &query) { - return KAsync::error(-1, "Failed to create a facade"); + return qMakePair(KAsync::null(), typename Akonadi2::ResultEmitter::Ptr()); } }; diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 935e2e8..65eaba9 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -182,6 +182,28 @@ void ModelResult::setFetcher(const std::function +void ModelResult::setEmitter(const typename Akonadi2::ResultEmitter::Ptr &emitter) +{ + setFetcher(emitter->mFetcher); + emitter->onAdded([this](const Ptr &value) { + this->add(value); + }); + emitter->onModified([this](const Ptr &value) { + this->modify(value); + }); + emitter->onRemoved([this](const Ptr &value) { + this->remove(value); + }); + emitter->onInitialResultSetComplete([this]() { + }); + emitter->onComplete([this]() { + }); + emitter->onClear([this]() { + }); + mEmitter = emitter; +} + template void ModelResult::modify(const Ptr &value) { diff --git a/common/modelresult.h b/common/modelresult.h index 66dfce5..eb6c86b 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -23,20 +23,23 @@ #include #include #include +#include #include #include "query.h" +#include "resultprovider.h" template class ModelResult : public QAbstractItemModel { public: - enum Roles { DomainObjectRole = Qt::UserRole + 1 }; ModelResult(const Akonadi2::Query &query, const QList &propertyColumns); + void setEmitter(const typename Akonadi2::ResultEmitter::Ptr &); + int rowCount(const QModelIndex &parent = QModelIndex()) const; int columnCount(const QModelIndex &parent = QModelIndex()) const; QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; @@ -65,5 +68,6 @@ private: QList mPropertyColumns; Akonadi2::Query mQuery; std::function loadEntities; + typename Akonadi2::ResultEmitter::Ptr mEmitter; }; diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp new file mode 100644 index 0000000..4159112 --- /dev/null +++ b/common/queryrunner.cpp @@ -0,0 +1,292 @@ +/* + Copyright (c) 2015 Christian Mollekopf + + This library is free software; you can redistribute it and/or modify it + under the terms of the GNU Library General Public License as published by + the Free Software Foundation; either version 2 of the License, or (at your + option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public + License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. +*/ +#include "queryrunner.h" + +#include +#include +#include +#include "commands.h" +#include "log.h" +#include "storage.h" +#include "definitions.h" +#include "domainadaptor.h" + +using namespace Akonadi2; + +static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + //TODO use a result set with an iterator, to read values on demand + QVector keys; + transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { + //Skip internals + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + keys << Akonadi2::Storage::uidFromKey(key); + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + + Trace() << "Full scan found " << keys.size() << " results"; + return ResultSet(keys); +} + +template +QueryRunner::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) + : QueryRunnerBase(), + mResourceAccess(resourceAccess), + mResultProvider(new ResultProvider), + mDomainTypeAdaptorFactory(factory), + mQuery(query), + mResourceInstanceIdentifier(instanceIdentifier), + mBufferType(bufferType) +{ + Trace() << "Starting query"; + //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { + Trace() << "Running fetcher"; + + // auto watcher = new QFutureWatcher; + // QObject::connect(watcher, &QFutureWatcher::finished, [](qint64 newRevision) { + // mResourceAccess->sendRevisionReplayedCommand(newRevision); + // }); + // auto future = QtConcurrent::run([&resultProvider]() -> qint64 { + // const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + // return newRevision; + // }); + // watcher->setFuture(future); + const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + }); + + + //In case of a live query we keep the runner for as long alive as the result provider exists + if (query.liveQuery) { + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + setQuery([this, query] () -> KAsync::Job { + return KAsync::start([this, query](KAsync::Future &future) { + //TODO execute in thread + const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + future.setFinished(); + }); + }); + //Ensure the connection is open, if it wasn't already opened + //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates + mResourceAccess->open(); + QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); + } +} + +template +typename Akonadi2::ResultEmitter::Ptr QueryRunner::emitter() +{ + return mResultProvider->emitter(); +} + +//TODO move into result provider? +template +void QueryRunner::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) +{ + // Trace() << "Replay set"; + while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + // Trace() << "Got creation"; + resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Modification: + // Trace() << "Got modification"; + resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Removal: + // Trace() << "Got removal"; + resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + } + return true; + })){}; +} + +template +void QueryRunner::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) +{ + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); +} + +template +ResultSet QueryRunner::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, mBufferType); + } + return resultSet; +} + +template +ResultSet QueryRunner::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + const auto bufferType = mBufferType; + auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = query.propertyFilter.keys().toSet(); + return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + // Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + Trace() << "Finished reading incremental result set:" << *revisionCounter; + //We're done + return QByteArray(); + }); +} + +template +ResultSet QueryRunner::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) +{ + auto resultSetPtr = QSharedPointer::create(resultSet); + + //Read through the source values and return whatever matches the filter + std::function)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { + while (resultSetPtr->next()) { + //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + //Always remove removals, they probably don't match due to non-available properties + if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { + Trace() << "entity is not filtered" << initialQuery; + if (initialQuery) { + //We're not interested in removals during the initial query + if (operation != Akonadi2::Operation_Removal) { + callback(domainObject, Akonadi2::Operation_Creation); + } + } else { + callback(domainObject, operation); + } + } + }); + } + return false; + }; + return ResultSet(generator); +} + + +template +std::function QueryRunner::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) +{ + return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + const auto property = domainObject->getProperty(filterProperty); + if (property.isValid()) { + //TODO implement other comparison operators than equality + if (property != query.propertyFilter.value(filterProperty)) { + Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); + return false; + } + } else { + Warning() << "Ignored property filter because value is invalid: " << filterProperty; + } + } + return true; + }; +} + +template +qint64 QueryRunner::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) +{ + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + auto db = transaction.openDatabase(mBufferType + ".main"); + + QSet remainingFilters; + auto resultSet = baseSetRetriever(transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); + replaySet(filteredSet, resultProvider); + resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); + return Akonadi2::Storage::maxRevision(transaction); +} + + +template +qint64 QueryRunner::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +{ + const qint64 baseRevision = resultProvider.revision() + 1; + Trace() << "Running incremental query " << baseRevision; + return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + }, resultProvider, false); +} + +template +qint64 QueryRunner::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) +{ + auto modifiedQuery = query; + if (!query.parentProperty.isEmpty()) { + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); + } + } + return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); + }, resultProvider, true); +} + +template class QueryRunner; +template class QueryRunner; +template class QueryRunner; diff --git a/common/queryrunner.h b/common/queryrunner.h new file mode 100644 index 0000000..e2af9de --- /dev/null +++ b/common/queryrunner.h @@ -0,0 +1,107 @@ +/* + Copyright (c) 2015 Christian Mollekopf + + This library is free software; you can redistribute it and/or modify it + under the terms of the GNU Library General Public License as published by + the Free Software Foundation; either version 2 of the License, or (at your + option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public + License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. +*/ + +#pragma once + +#include +#include "facadeinterface.h" +#include "resourceaccess.h" +#include "resultprovider.h" +#include "domaintypeadaptorfactoryinterface.h" +#include "storage.h" +#include "query.h" + +/** + * A QueryRunner runs a query and updates the corresponding result set. + * + * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), + * and by how long a result set must be updated. If the query is one off the runner dies after the execution, + * otherwise it lives on the react to changes and updates the corresponding result set. + * + * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. + */ + +class QueryRunnerBase : public QObject +{ + Q_OBJECT +protected: + typedef std::function()> QueryFunction; + + /** + * Set the query to run + */ + void setQuery(const QueryFunction &query) + { + queryFunction = query; + } + + +protected slots: + /** + * Rerun query with new revision + */ + void revisionChanged(qint64 newRevision) + { + Trace() << "New revision: " << newRevision; + run().exec(); + } + +private: + /** + * Starts query + */ + KAsync::Job run(qint64 newRevision = 0) + { + return queryFunction(); + } + + QueryFunction queryFunction; +}; + +template +class QueryRunner : public QueryRunnerBase +{ +public: + QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); + + typename Akonadi2::ResultEmitter::Ptr emitter(); + +private: + static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); + + void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); + + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); + qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); + +private: + QSharedPointer > mResultProvider; + QSharedPointer mResourceAccess; + DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; + QByteArray mResourceInstanceIdentifier; + QByteArray mBufferType; + Akonadi2::Query mQuery; +}; + diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 8e27054..e87a1f7 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -37,6 +37,8 @@ class ResourceAccessInterface : public QObject { Q_OBJECT public: + typedef QSharedPointer Ptr; + ResourceAccessInterface() {} virtual ~ResourceAccessInterface() {} virtual KAsync::Job sendCommand(int commandId) = 0; diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 1796271..3d207e4 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -54,9 +54,15 @@ KAsync::Job ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon }); } -KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +QPair, typename Akonadi2::ResultEmitter::Ptr > ResourceFacade::load(const Akonadi2::Query &query) { - return KAsync::start([query, &resultProvider]() { + auto resultProvider = new Akonadi2::ResultProvider(); + auto emitter = resultProvider->emitter(); + resultProvider->setFetcher([](const QSharedPointer &) {}); + resultProvider->onDone([resultProvider]() { + delete resultProvider; + }); + auto job = KAsync::start([query, resultProvider]() { const auto configuredResources = ResourceConfig::getResources(); for (const auto &res : configuredResources.keys()) { const auto type = configuredResources.value(res); @@ -64,12 +70,13 @@ KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::R auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create(); resource->setProperty("identifier", res); resource->setProperty("type", type); - resultProvider.add(resource); + resultProvider->add(resource); } } //TODO initialResultSetComplete should be implicit - resultProvider.initialResultSetComplete(); - resultProvider.complete(); + resultProvider->initialResultSetComplete(); + resultProvider->complete(); }); + return qMakePair(job, emitter); } diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 123b481..38e0c0e 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h @@ -37,5 +37,6 @@ public: KAsync::Job create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; - KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; + QPair, typename Akonadi2::ResultEmitter::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; }; + diff --git a/common/resultprovider.h b/common/resultprovider.h index 921cd6b..86382ef 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -20,12 +20,12 @@ #pragma once +#include #include #include #include "threadboundary.h" #include "resultset.h" #include "log.h" -#include "modelresult.h" using namespace async; @@ -53,12 +53,7 @@ public: virtual void initialResultSetComplete() = 0; virtual void complete() = 0; virtual void clear() = 0; - virtual void setFetcher(const std::function &fetcher) - { - } - - virtual void setFacade(const std::shared_ptr &facade) = 0; - virtual void setQueryRunner(const QSharedPointer &runner) = 0; + virtual void setFetcher(const std::function &fetcher) = 0; void setRevision(qint64 revision) { @@ -74,101 +69,6 @@ private: qint64 mRevision; }; -template -class ModelResultProvider : public ResultProviderInterface { -public: - ModelResultProvider(QWeakPointer > model) - : ResultProviderInterface(), - mModel(model) - { - - } - - void add(const Ptr &value) - { - if (auto model = mModel.toStrongRef()) { - model->add(value); - } - } - - void modify(const Ptr &value) - { - if (auto model = mModel.toStrongRef()) { - model->modify(value); - } - } - - void remove(const Ptr &value) - { - if (auto model = mModel.toStrongRef()) { - model->remove(value); - } - } - - void initialResultSetComplete() - { - // mResultEmitter->initialResultSetComplete(); - } - - void complete() - { - // mResultEmitter->complete(); - } - - void clear() - { - // mResultEmitter->clear(); - } - - /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ - void setFacade(const std::shared_ptr &facade) - { - mFacade = facade; - } - - void onDone(const std::function &callback) - { - mOnDoneCallback = callback; - } - - bool isDone() const - { - //The existance of the emitter currently defines wether we're done or not. - // return mResultEmitter.toStrongRef().isNull(); - return true; - } - - void setFetcher(const std::function &fetcher) - { - if (auto model = mModel.toStrongRef()) { - model->setFetcher(fetcher); - } - } - - void setQueryRunner(const QSharedPointer &runner) - { - mQueryRunner = runner; - } - -private: - void done() - { - qWarning() << "done"; - if (mOnDoneCallback) { - mOnDoneCallback(); - mOnDoneCallback = std::function(); - } - } - - QWeakPointer > mModel; - QSharedPointer mQueryRunner; - std::shared_ptr mFacade; - std::function mOnDoneCallback; -}; - /* * The promise side for the result emitter */ @@ -204,6 +104,12 @@ private: } public: + typedef QSharedPointer > Ptr; + + ~ResultProvider() + { + } + //Called from worker thread void add(const T &value) { @@ -261,30 +167,16 @@ public: //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); mResultEmitter = sharedPtr; + sharedPtr->setFetcher([this](const T &parent) { + Q_ASSERT(mFetcher); + mFetcher(parent); + }); return sharedPtr; } return mResultEmitter.toStrongRef(); } - /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ - void setQueryRunner(const QSharedPointer &runner) - { - mQueryRunner = runner; - } - - /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ - void setFacade(const std::shared_ptr &facade) - { - mFacade = facade; - } - void onDone(const std::function &callback) { mThreadBoundary = QSharedPointer::create(); @@ -299,7 +191,7 @@ public: void setFetcher(const std::function &fetcher) { - fetcher(T()); + mFetcher = fetcher; } private: @@ -307,16 +199,17 @@ private: { qWarning() << "done"; if (mOnDoneCallback) { - mOnDoneCallback(); + auto callback = mOnDoneCallback; mOnDoneCallback = std::function(); + //This may delete this object + callback(); } } QWeakPointer > mResultEmitter; - QSharedPointer mQueryRunner; - std::shared_ptr mFacade; std::function mOnDoneCallback; QSharedPointer mThreadBoundary; + std::function mFetcher; }; /* @@ -334,6 +227,8 @@ private: template class ResultEmitter { public: + typedef QSharedPointer > Ptr; + void onAdded(const std::function &handler) { addHandler = handler; @@ -394,6 +289,13 @@ public: clearHandler(); } + void setFetcher(const std::function &fetcher) + { + mFetcher = fetcher; + } + + std::function mFetcher; + private: friend class ResultProvider; diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp index 47ec508..48fd11a 100644 --- a/common/threadboundary.cpp +++ b/common/threadboundary.cpp @@ -24,6 +24,9 @@ Q_DECLARE_METATYPE(std::function); namespace async { ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType >("std::function"); } -ThreadBoundary:: ~ThreadBoundary() {} +ThreadBoundary:: ~ThreadBoundary() +{ +} + } -- cgit v1.2.3