From 09aafbd1373b5d1152ac7a453a140a7f76c2e90e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 13 Nov 2015 19:34:47 +0100 Subject: It's starting to work --- common/facade.h | 263 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 188 insertions(+), 75 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index 643ebec..eb55c98 100644 --- a/common/facade.h +++ b/common/facade.h @@ -135,68 +135,6 @@ public: if (!mResourceAccess) { mResourceAccess = QSharedPointer::create(resourceIdentifier); } - if (!mStorage) { - mStorage = QSharedPointer >::create(resourceIdentifier); - const auto bufferType = bufferTypeForDomainType(); - - mStorage->readEntity = [bufferType, this] (const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) - { - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - }; - - mStorage->loadInitialResultSet = [bufferType, this] (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet - { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - return fullScan(transaction, bufferType); - } - return resultSet; - }; - - mStorage->loadIncrementalResultSet = [bufferType, this] (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet - { - auto revisionCounter = QSharedPointer::create(baseRevision); - return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - //We're done - return QByteArray(); - }); - }; - } } ~GenericFacade() @@ -237,13 +175,56 @@ public: } //TODO JOBAPI return job from sync continuation to execute it as subjob? - KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE + KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { + { + QSet remainingFilters; + auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + + auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) { + Trace() << "Running fetchEntities" << parent; + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + auto modifiedQuery = query; + modifiedQuery.propertyFilter.insert("parent", parent); + //TODO + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction); + QSet remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + auto filteredSet = filterSet(resultSet, filter, transaction, true); + replaySet(filteredSet, resultProvider); + resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); + qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); + //TODO send newRevision to resource + // mResourceAccess->sendRevisionReplayedCommand(newRevision); + }; + resultProvider->setFetcher(fetchEntities); + } + auto runner = QSharedPointer::create(query); - QWeakPointer > weakResultProvider = resultProvider; - runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision) -> KAsync::Job { - return KAsync::start([this, weakResultProvider, query, oldRevision](KAsync::Future &future) { - Trace() << "Executing query " << oldRevision; + QWeakPointer > weakResultProvider = resultProvider; + runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job { + return KAsync::start([this, weakResultProvider, query](KAsync::Future &future) { + Trace() << "Executing query "; auto resultProvider = weakResultProvider.toStrongRef(); if (!resultProvider) { Warning() << "Tried executing query after result provider is already gone"; @@ -251,11 +232,10 @@ public: future.setFinished(); return; } - load(query, resultProvider, oldRevision).template then([&future, this](qint64 queriedRevision) { + executeQuery(query, resultProvider).template then([&future, this](qint64 queriedRevision) { //TODO set revision in result provider? //TODO update all existing results with new revision mResourceAccess->sendRevisionReplayedCommand(queriedRevision); - future.setValue(queriedRevision); future.setFinished(); }).exec(); }); @@ -272,9 +252,7 @@ public: //We have to capture the runner to keep it alive return synchronizeResource(query).template then([runner](KAsync::Future &future) { - runner->run().then([&future]() { - future.setFinished(); - }).exec(); + future.setFinished(); }, [](int error, const QString &errorString) { Warning() << "Error during sync " << error << errorString; @@ -293,17 +271,152 @@ private: return KAsync::null(); } - virtual KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider, qint64 oldRevision) + //TODO move into result provider? + void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) + { + while (resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + Trace() << "Got creation"; + //TODO Only copy in result provider + resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->add(); + break; + case Akonadi2::Operation_Modification: + Trace() << "Got modification"; + resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->modify(); + break; + case Akonadi2::Operation_Removal: + Trace() << "Got removal"; + resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->remove(); + break; + } + return true; + })){}; + } + + void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) + { + const auto bufferType = bufferTypeForDomainType(); + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + // + // Akonadi2::Storage::getLatest(transaction, bufferTye, key); + transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + } + + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + { + + const auto bufferType = bufferTypeForDomainType(); + auto revisionCounter = QSharedPointer::create(baseRevision); + //TODO apply filter from index + return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + //We're done + return QByteArray(); + }); + } + + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) { + auto resultSetPtr = QSharedPointer::create(resultSet); + + //Read through the source values and return whatever matches the filter + std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { + while (resultSetPtr->next()) { + //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) + readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + //Always remove removals, they probably don't match due to non-available properties + if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { + if (initialQuery) { + //We're not interested in removals during the initial query + if (operation != Akonadi2::Operation_Removal) { + callback(domainObject, Akonadi2::Operation_Creation); + } + } else { + callback(domainObject, operation); + } + } + }); + } + return false; + }; + return ResultSet(generator); + } + + virtual KAsync::Job executeQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + { + /* + * This method gets called initially, and after every revision change. + * * We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + * * Incremental updates are loaded directly, leaving it up to the model to discard the changes if they are not interesting + */ + const qint64 baseRevision = resultProvider->revision() + 1; + Trace() << "Running query " << baseRevision; + QSet remainingFilters; + auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + qint64 newRevision = 0; + + Trace() << "Fetching updates"; + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, filter, transaction, false); + replaySet(filteredSet, resultProvider); + resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); + newRevision = Akonadi2::Storage::maxRevision(transaction); + return KAsync::start([=]() -> qint64 { - return mStorage->read(query, oldRevision, resultProvider); + return newRevision; }); } protected: //TODO use one resource access instance per application & per resource QSharedPointer mResourceAccess; - QSharedPointer > mStorage; DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; QByteArray mResourceInstanceIdentifier; }; -- cgit v1.2.3 From 75c231f0758603120ec562af772b48b5f6ac0e24 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 13 Nov 2015 23:31:41 +0100 Subject: DummyResourceTest and QueryTest are passing sync has been removed from the query code and is now a separate step --- common/facade.h | 147 +++++++++++++++++++++++--------------------------------- 1 file changed, 60 insertions(+), 87 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index eb55c98..5be1c73 100644 --- a/common/facade.h +++ b/common/facade.h @@ -44,19 +44,15 @@ class QueryRunner : public QObject { Q_OBJECT public: - typedef std::function(qint64 oldRevision)> QueryFunction; + typedef std::function()> QueryFunction; - QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; + QueryRunner(const Akonadi2::Query &query) {}; /** * Starts query */ KAsync::Job run(qint64 newRevision = 0) { - //TODO: JOBAPI: that last empty .then should not be necessary - //TODO: remove newRevision - return queryFunction(mLatestRevision + 1).then([this](qint64 revision) { - mLatestRevision = revision; - }).then([](){}); + return queryFunction(); } /** @@ -74,12 +70,11 @@ public slots: void revisionChanged(qint64 newRevision) { Trace() << "New revision: " << newRevision; - run(newRevision).exec(); + run().exec(); } private: QueryFunction queryFunction; - qint64 mLatestRevision; }; static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) @@ -125,10 +120,9 @@ public: * @param resourceIdentifier is the identifier of the resource instance * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa */ - GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer > storage = QSharedPointer >(), const QSharedPointer resourceAccess = QSharedPointer()) + GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()) : Akonadi2::StoreFacade(), mResourceAccess(resourceAccess), - mStorage(storage), mDomainTypeAdaptorFactory(adaptorFactory), mResourceInstanceIdentifier(resourceIdentifier) { @@ -177,48 +171,28 @@ public: //TODO JOBAPI return job from sync continuation to execute it as subjob? KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { - { - QSet remainingFilters; - auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; - } - } - return true; - }; - - auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) { - Trace() << "Running fetchEntities" << parent; - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); + auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) { + Trace() << "Fetching initial set for parent:" << parent; - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); - auto modifiedQuery = query; - modifiedQuery.propertyFilter.insert("parent", parent); - //TODO - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction); - QSet remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - auto filteredSet = filterSet(resultSet, filter, transaction, true); - replaySet(filteredSet, resultProvider); - resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); - qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); - //TODO send newRevision to resource - // mResourceAccess->sendRevisionReplayedCommand(newRevision); - }; - resultProvider->setFetcher(fetchEntities); - } + auto modifiedQuery = query; + modifiedQuery.propertyFilter.insert("parent", parent); + + QSet remainingFilters; + auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true); + replaySet(filteredSet, resultProvider); + const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); + resultProvider->setRevision(newRevision); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + }; + resultProvider->setFetcher(fetchEntities); auto runner = QSharedPointer::create(query); QWeakPointer > weakResultProvider = resultProvider; @@ -233,8 +207,6 @@ public: return; } executeQuery(query, resultProvider).template then([&future, this](qint64 queriedRevision) { - //TODO set revision in result provider? - //TODO update all existing results with new revision mResourceAccess->sendRevisionReplayedCommand(queriedRevision); future.setFinished(); }).exec(); @@ -249,27 +221,12 @@ public: mResourceAccess->open(); QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); } + return KAsync::null(); //We have to capture the runner to keep it alive - return synchronizeResource(query).template then([runner](KAsync::Future &future) { - future.setFinished(); - }, - [](int error, const QString &errorString) { - Warning() << "Error during sync " << error << errorString; - }); } private: - KAsync::Job synchronizeResource(const Akonadi2::Query &query) - { - //TODO check if a sync is necessary - //TODO Only sync what was requested - //TODO timeout - if (query.syncOnDemand || query.processAll) { - return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll); - } - return KAsync::null(); - } //TODO move into result provider? void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) @@ -280,17 +237,14 @@ private: Trace() << "Got creation"; //TODO Only copy in result provider resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->add(); break; case Akonadi2::Operation_Modification: Trace() << "Got modification"; resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->modify(); break; case Akonadi2::Operation_Removal: Trace() << "Got removal"; resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->remove(); break; } return true; @@ -320,13 +274,28 @@ private: }); } - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + ResultSet loadInitialResultSet(const QByteArray &parent, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { + Trace() << "Fetching initial set for parent:" << parent; + //TODO + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + return resultSet; + } + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + { + Trace() << "Loading incremental result set starting from revision: " << baseRevision; const auto bufferType = bufferTypeForDomainType(); auto revisionCounter = QSharedPointer::create(baseRevision); - //TODO apply filter from index - return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { + return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); //Spit out the revision keys one by one. while (*revisionCounter <= topRevision) { @@ -354,7 +323,7 @@ private: //Read through the source values and return whatever matches the filter std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { while (resultSetPtr->next()) { - //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) + //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { //Always remove removals, they probably don't match due to non-available properties if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { @@ -374,6 +343,20 @@ private: return ResultSet(generator); } + + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query) + { + return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + } + virtual KAsync::Job executeQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) { /* @@ -384,16 +367,6 @@ private: const qint64 baseRevision = resultProvider->revision() + 1; Trace() << "Running query " << baseRevision; QSet remainingFilters; - auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; - } - } - return true; - }; - qint64 newRevision = 0; Trace() << "Fetching updates"; Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); @@ -404,10 +377,10 @@ private: auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, filter, transaction, false); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); replaySet(filteredSet, resultProvider); resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); - newRevision = Akonadi2::Storage::maxRevision(transaction); + qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); return KAsync::start([=]() -> qint64 { return newRevision; -- cgit v1.2.3 From 972f3a4e96876e4c36162a11062e40863d88a2a1 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 15 Nov 2015 12:46:26 +0100 Subject: Cleanup --- common/facade.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index 5be1c73..6e45e08 100644 --- a/common/facade.h +++ b/common/facade.h @@ -29,7 +29,8 @@ #include "domainadaptor.h" #include "log.h" #include "resultset.h" -#include "entitystorage.h" +#include "storage.h" +#include "definitions.h" /** * A QueryRunner runs a query and updates the corresponding result set. -- cgit v1.2.3 From b68a67fdbe0eb73aaef648ceb686824c7fbc1552 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 17 Nov 2015 09:43:36 +0100 Subject: Facade cleanup --- common/facade.h | 91 ++++++++++++++++++++++++++------------------------------- 1 file changed, 41 insertions(+), 50 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index 6e45e08..dcbe589 100644 --- a/common/facade.h +++ b/common/facade.h @@ -172,45 +172,31 @@ public: //TODO JOBAPI return job from sync continuation to execute it as subjob? KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { - auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) { - Trace() << "Fetching initial set for parent:" << parent; - - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - - auto modifiedQuery = query; - modifiedQuery.propertyFilter.insert("parent", parent); + QWeakPointer > weakResultProvider = resultProvider; - QSet remainingFilters; - auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true); - replaySet(filteredSet, resultProvider); - const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); - resultProvider->setRevision(newRevision); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - }; - resultProvider->setFetcher(fetchEntities); + //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + resultProvider->setFetcher([this, query, weakResultProvider](const QByteArray &parent) { + if (auto resultProvider = weakResultProvider.toStrongRef()) { + const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + } else { + Warning() << "Tried executing query after result provider is already gone"; + } + }); auto runner = QSharedPointer::create(query); - QWeakPointer > weakResultProvider = resultProvider; + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job { return KAsync::start([this, weakResultProvider, query](KAsync::Future &future) { Trace() << "Executing query "; - auto resultProvider = weakResultProvider.toStrongRef(); - if (!resultProvider) { + if (auto resultProvider = weakResultProvider.toStrongRef()) { + const qint64 newRevision = executeIncrementalQuery(query, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + } else { Warning() << "Tried executing query after result provider is already gone"; future.setError(0, QString()); - future.setFinished(); - return; } - executeQuery(query, resultProvider).template then([&future, this](qint64 queriedRevision) { - mResourceAccess->sendRevisionReplayedCommand(queriedRevision); - future.setFinished(); - }).exec(); + future.setFinished(); }); }); @@ -230,13 +216,12 @@ public: private: //TODO move into result provider? - void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) + static void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) { - while (resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + while (resultSet.next([resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { switch (operation) { case Akonadi2::Operation_Creation: Trace() << "Got creation"; - //TODO Only copy in result provider resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Modification: @@ -358,34 +343,40 @@ private: }; } - virtual KAsync::Job executeQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, const QSharedPointer > &resultProvider) { - /* - * This method gets called initially, and after every revision change. - * * We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - * * Incremental updates are loaded directly, leaving it up to the model to discard the changes if they are not interesting - */ - const qint64 baseRevision = resultProvider->revision() + 1; - Trace() << "Running query " << baseRevision; - QSet remainingFilters; - - Trace() << "Fetching updates"; Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + QSet remainingFilters; + auto resultSet = baseSetRetriever(transaction, remainingFilters); auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); replaySet(filteredSet, resultProvider); resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); - qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); + return Akonadi2::Storage::maxRevision(transaction); + } - return KAsync::start([=]() -> qint64 { - return newRevision; - }); + + qint64 executeIncrementalQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + { + const qint64 baseRevision = resultProvider->revision() + 1; + Trace() << "Running incremental query " << baseRevision; + return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + }, resultProvider); + } + + qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, const QSharedPointer > &resultProvider) + { + Trace() << "Running initial query for parent:" << parent; + auto modifiedQuery = query; + modifiedQuery.propertyFilter.insert("parent", parent); + return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); + }, resultProvider); } protected: -- cgit v1.2.3 From 0f24357d01bd8a278f03793db863d3f71ac37ef2 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 18 Nov 2015 00:51:55 +0100 Subject: Don't use a smart pointer for the result provider We're not doing any lifetime management anyways. --- common/facade.h | 77 ++++++++++++++++++++++++++------------------------------- 1 file changed, 35 insertions(+), 42 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index dcbe589..82fd5ff 100644 --- a/common/facade.h +++ b/common/facade.h @@ -169,68 +169,54 @@ public: return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); } - //TODO JOBAPI return job from sync continuation to execute it as subjob? - KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE + KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE { - QWeakPointer > weakResultProvider = resultProvider; - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider->setFetcher([this, query, weakResultProvider](const QByteArray &parent) { - if (auto resultProvider = weakResultProvider.toStrongRef()) { - const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - } else { - Warning() << "Tried executing query after result provider is already gone"; - } + resultProvider.setFetcher([this, query, &resultProvider](const QByteArray &parent) { + const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); }); - auto runner = QSharedPointer::create(query); - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job { - return KAsync::start([this, weakResultProvider, query](KAsync::Future &future) { - Trace() << "Executing query "; - if (auto resultProvider = weakResultProvider.toStrongRef()) { - const qint64 newRevision = executeIncrementalQuery(query, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - } else { - Warning() << "Tried executing query after result provider is already gone"; - future.setError(0, QString()); - } - future.setFinished(); - }); - }); //In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery) { - resultProvider->setQueryRunner(runner); + auto runner = QSharedPointer::create(query); + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { + return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { + Trace() << "Executing query "; + const qint64 newRevision = executeIncrementalQuery(query, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + future.setFinished(); + }); + }); + resultProvider.setQueryRunner(runner); //Ensure the connection is open, if it wasn't already opened //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates mResourceAccess->open(); QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); } return KAsync::null(); - - //We have to capture the runner to keep it alive } private: //TODO move into result provider? - static void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) + static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) { - while (resultSet.next([resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { switch (operation) { case Akonadi2::Operation_Creation: Trace() << "Got creation"; - resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Modification: Trace() << "Got modification"; - resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Removal: Trace() << "Got removal"; - resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; } return true; @@ -281,6 +267,7 @@ private: Trace() << "Loading incremental result set starting from revision: " << baseRevision; const auto bufferType = bufferTypeForDomainType(); auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = query.propertyFilter.keys().toSet(); return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); //Spit out the revision keys one by one. @@ -334,16 +321,22 @@ private: { return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; + const auto property = domainObject->getProperty(filterProperty); + if (property.isValid()) { + //TODO implement other comparison operators than equality + if (property != query.propertyFilter.value(filterProperty)) { + Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); + return false; + } + } else { + Warning() << "Ignored property filter because value is invalid: " << filterProperty; } } return true; }; } - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, const QSharedPointer > &resultProvider) + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) { Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { @@ -355,21 +348,21 @@ private: auto resultSet = baseSetRetriever(transaction, remainingFilters); auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); replaySet(filteredSet, resultProvider); - resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); + resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); return Akonadi2::Storage::maxRevision(transaction); } - qint64 executeIncrementalQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) { - const qint64 baseRevision = resultProvider->revision() + 1; + const qint64 baseRevision = resultProvider.revision() + 1; Trace() << "Running incremental query " << baseRevision; return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); }, resultProvider); } - qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, const QSharedPointer > &resultProvider) + qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, Akonadi2::ResultProviderInterface &resultProvider) { Trace() << "Running initial query for parent:" << parent; auto modifiedQuery = query; -- cgit v1.2.3 From b42047ad90470ecab329375fdacff03564c80074 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 18 Nov 2015 23:15:25 +0100 Subject: fixup --- common/facade.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index 82fd5ff..f5c05f9 100644 --- a/common/facade.h +++ b/common/facade.h @@ -246,10 +246,8 @@ private: }); } - ResultSet loadInitialResultSet(const QByteArray &parent, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { - Trace() << "Fetching initial set for parent:" << parent; - //TODO QSet appliedFilters; auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; @@ -264,7 +262,6 @@ private: ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { - Trace() << "Loading incremental result set starting from revision: " << baseRevision; const auto bufferType = bufferTypeForDomainType(); auto revisionCounter = QSharedPointer::create(baseRevision); remainingFilters = query.propertyFilter.keys().toSet(); @@ -368,7 +365,7 @@ private: auto modifiedQuery = query; modifiedQuery.propertyFilter.insert("parent", parent); return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); + return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); }, resultProvider); } -- cgit v1.2.3 From 8d5684292ef92f32487ba32df716a00c4a0841b5 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 17:37:39 +0100 Subject: Loading data with the new model for the test client --- common/facade.h | 1 - 1 file changed, 1 deletion(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index f5c05f9..d150d60 100644 --- a/common/facade.h +++ b/common/facade.h @@ -184,7 +184,6 @@ public: //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { - Trace() << "Executing query "; const qint64 newRevision = executeIncrementalQuery(query, resultProvider); mResourceAccess->sendRevisionReplayedCommand(newRevision); future.setFinished(); -- cgit v1.2.3 From c4a6746e4420b580fe35cc89783de4dbc3205ac6 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 18:14:09 +0100 Subject: The parent is always an object, so we might as well make that explicit --- common/facade.h | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index d150d60..8b8a2a8 100644 --- a/common/facade.h +++ b/common/facade.h @@ -172,7 +172,7 @@ public: KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE { //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider.setFetcher([this, query, &resultProvider](const QByteArray &parent) { + resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); mResourceAccess->sendRevisionReplayedCommand(newRevision); }); @@ -358,11 +358,16 @@ private: }, resultProvider); } - qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, Akonadi2::ResultProviderInterface &resultProvider) + qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) { - Trace() << "Running initial query for parent:" << parent; auto modifiedQuery = query; - modifiedQuery.propertyFilter.insert("parent", parent); + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert("parent", parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert("parent", QVariant()); + } return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); }, resultProvider); -- cgit v1.2.3 From ddb28417ccbcd22e771b7610c1727eac63471609 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 23:47:34 +0100 Subject: Moved facade implementation to cpp file --- common/facade.h | 335 +++----------------------------------------------------- 1 file changed, 18 insertions(+), 317 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index 8b8a2a8..aa50941 100644 --- a/common/facade.h +++ b/common/facade.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2014 Christian Mollekopf + * Copyright (C) 2014 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -25,79 +25,8 @@ #include #include "resourceaccess.h" -#include "commands.h" -#include "domainadaptor.h" -#include "log.h" #include "resultset.h" -#include "storage.h" -#include "definitions.h" - -/** - * A QueryRunner runs a query and updates the corresponding result set. - * - * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), - * and by how long a result set must be updated. If the query is one off the runner dies after the execution, - * otherwise it lives on the react to changes and updates the corresponding result set. - * - * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. - */ -class QueryRunner : public QObject -{ - Q_OBJECT -public: - typedef std::function()> QueryFunction; - - QueryRunner(const Akonadi2::Query &query) {}; - /** - * Starts query - */ - KAsync::Job run(qint64 newRevision = 0) - { - return queryFunction(); - } - - /** - * Set the query to run - */ - void setQuery(const QueryFunction &query) - { - queryFunction = query; - } - -public slots: - /** - * Rerun query with new revision - */ - void revisionChanged(qint64 newRevision) - { - Trace() << "New revision: " << newRevision; - run().exec(); - } - -private: - QueryFunction queryFunction; -}; - -static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - //TODO use a result set with an iterator, to read values on demand - QVector keys; - transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - keys << Akonadi2::Storage::uidFromKey(key); - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - - Trace() << "Full scan found " << keys.size() << " results"; - return ResultSet(keys); -} - +#include "domainadaptor.h" namespace Akonadi2 { /** @@ -121,257 +50,29 @@ public: * @param resourceIdentifier is the identifier of the resource instance * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa */ - GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()) - : Akonadi2::StoreFacade(), - mResourceAccess(resourceAccess), - mDomainTypeAdaptorFactory(adaptorFactory), - mResourceInstanceIdentifier(resourceIdentifier) - { - if (!mResourceAccess) { - mResourceAccess = QSharedPointer::create(resourceIdentifier); - } - } - - ~GenericFacade() - { - } - - static QByteArray bufferTypeForDomainType() - { - //We happen to have a one to one mapping - return Akonadi2::ApplicationDomain::getTypeName(); - } - - KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE - { - if (!mDomainTypeAdaptorFactory) { - Warning() << "No domain type adaptor factory available"; - return KAsync::error(); - } - flatbuffers::FlatBufferBuilder entityFbb; - mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); - return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); - } - - KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE - { - if (!mDomainTypeAdaptorFactory) { - Warning() << "No domain type adaptor factory available"; - return KAsync::error(); - } - flatbuffers::FlatBufferBuilder entityFbb; - mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); - return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); - } - - KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE - { - return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); - } - - KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE - { - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { - const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - }); - + GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()); + ~GenericFacade(); - //In case of a live query we keep the runner for as long alive as the result provider exists - if (query.liveQuery) { - auto runner = QSharedPointer::create(query); - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { - return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { - const qint64 newRevision = executeIncrementalQuery(query, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - future.setFinished(); - }); - }); - resultProvider.setQueryRunner(runner); - //Ensure the connection is open, if it wasn't already opened - //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates - mResourceAccess->open(); - QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); - } - return KAsync::null(); - } + static QByteArray bufferTypeForDomainType(); + KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; private: - //TODO move into result provider? - static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) - { - while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - switch (operation) { - case Akonadi2::Operation_Creation: - Trace() << "Got creation"; - resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Modification: - Trace() << "Got modification"; - resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Removal: - Trace() << "Got removal"; - resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - } - return true; - })){}; - } - - void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) - { - const auto bufferType = bufferTypeForDomainType(); - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - // - // Akonadi2::Storage::getLatest(transaction, bufferTye, key); - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - } - - ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) - { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - return resultSet; - } - - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) - { - const auto bufferType = bufferTypeForDomainType(); - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = query.propertyFilter.keys().toSet(); - return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - //We're done - return QByteArray(); - }); - } - - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) - { - auto resultSetPtr = QSharedPointer::create(resultSet); - - //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { - while (resultSetPtr->next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { - //Always remove removals, they probably don't match due to non-available properties - if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { - if (initialQuery) { - //We're not interested in removals during the initial query - if (operation != Akonadi2::Operation_Removal) { - callback(domainObject, Akonadi2::Operation_Creation); - } - } else { - callback(domainObject, operation); - } - } - }); - } - return false; - }; - return ResultSet(generator); - } - - - std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query) - { - return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - const auto property = domainObject->getProperty(filterProperty); - if (property.isValid()) { - //TODO implement other comparison operators than equality - if (property != query.propertyFilter.value(filterProperty)) { - Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); - return false; - } - } else { - Warning() << "Ignored property filter because value is invalid: " << filterProperty; - } - } - return true; - }; - } - - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) - { - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - - QSet remainingFilters; - auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); - replaySet(filteredSet, resultProvider); - resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); - return Akonadi2::Storage::maxRevision(transaction); - } + static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); + void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); - qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) - { - const qint64 baseRevision = resultProvider.revision() + 1; - Trace() << "Running incremental query " << baseRevision; - return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider); - } + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) - { - auto modifiedQuery = query; - if (parent) { - Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert("parent", parent->identifier()); - } else { - Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert("parent", QVariant()); - } - return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider); - } + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery); + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); protected: //TODO use one resource access instance per application & per resource -- cgit v1.2.3 From 110817a23463c71eacbc986af3ae509462758a3c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 21 Nov 2015 11:07:47 +0100 Subject: Separated DomainTypeAdaptorFactoryInterface --- common/facade.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index aa50941..794e35e 100644 --- a/common/facade.h +++ b/common/facade.h @@ -26,9 +26,11 @@ #include "resourceaccess.h" #include "resultset.h" -#include "domainadaptor.h" +#include "domaintypeadaptorfactoryinterface.h" +#include "storage.h" namespace Akonadi2 { + /** * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class. * -- cgit v1.2.3 From 89aa339dd91765d67b4606938e60358f41d33884 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 25 Nov 2015 15:40:41 +0100 Subject: Fixed modifications. Without this modifications are ignored also in incremental queries. --- common/facade.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index 794e35e..df09d73 100644 --- a/common/facade.h +++ b/common/facade.h @@ -72,7 +72,7 @@ private: ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery); std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider); + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); -- cgit v1.2.3 From 27164870a7a664daaca4ab6d3e3893a91d4eab5a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 26 Nov 2015 14:28:34 +0100 Subject: Avoid repeatedly opening the name db. Although, the benchmarks say it doesn't really have an impact on performance. --- common/facade.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index df09d73..d8b878b 100644 --- a/common/facade.h +++ b/common/facade.h @@ -65,12 +65,12 @@ private: //TODO move into result provider? static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); - void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); + void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery); + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); -- cgit v1.2.3 From 5b41b26a349967acf2197f9f9228526193fd826e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 27 Nov 2015 17:30:04 +0100 Subject: Introduced a QueryRunner object The QueryRunner object lives for the duration of the query (so just for the initial query for non-live queries, and for the lifetime of the result model for live queries). It's supposed to handle all the threading internally and decouple the lifetime of the facade. --- common/facade.h | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) (limited to 'common/facade.h') diff --git a/common/facade.h b/common/facade.h index d8b878b..de67e05 100644 --- a/common/facade.h +++ b/common/facade.h @@ -59,22 +59,7 @@ public: KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE; KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE; KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE; - KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; - -private: - //TODO move into result provider? - static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); - - void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); - - ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); - std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); - qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); - qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); + QPair, typename ResultEmitter::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; protected: //TODO use one resource access instance per application & per resource -- cgit v1.2.3