From 75c231f0758603120ec562af772b48b5f6ac0e24 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 13 Nov 2015 23:31:41 +0100 Subject: DummyResourceTest and QueryTest are passing sync has been removed from the query code and is now a separate step --- common/clientapi.cpp | 2 +- common/facade.h | 147 +++++++++++++++---------------------- common/modelresult.h | 8 ++ common/resultprovider.h | 189 ++++-------------------------------------------- 4 files changed, 83 insertions(+), 263 deletions(-) (limited to 'common') diff --git a/common/clientapi.cpp b/common/clientapi.cpp index f99ebb8..839e77b 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp @@ -95,7 +95,7 @@ KAsync::Job Store::synchronize(const Akonadi2::Query &query) .template each([query](const QByteArray &resource, KAsync::Future &future) { auto resourceAccess = QSharedPointer::create(resource); resourceAccess->open(); - resourceAccess->synchronizeResource(true, false).then([&future]() { + resourceAccess->synchronizeResource(query.syncOnDemand, query.processAll).then([&future, resourceAccess]() { future.setFinished(); }).exec(); }) diff --git a/common/facade.h b/common/facade.h index eb55c98..5be1c73 100644 --- a/common/facade.h +++ b/common/facade.h @@ -44,19 +44,15 @@ class QueryRunner : public QObject { Q_OBJECT public: - typedef std::function(qint64 oldRevision)> QueryFunction; + typedef std::function()> QueryFunction; - QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; + QueryRunner(const Akonadi2::Query &query) {}; /** * Starts query */ KAsync::Job run(qint64 newRevision = 0) { - //TODO: JOBAPI: that last empty .then should not be necessary - //TODO: remove newRevision - return queryFunction(mLatestRevision + 1).then([this](qint64 revision) { - mLatestRevision = revision; - }).then([](){}); + return queryFunction(); } /** @@ -74,12 +70,11 @@ public slots: void revisionChanged(qint64 newRevision) { Trace() << "New revision: " << newRevision; - run(newRevision).exec(); + run().exec(); } private: QueryFunction queryFunction; - qint64 mLatestRevision; }; static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) @@ -125,10 +120,9 @@ public: * @param resourceIdentifier is the identifier of the resource instance * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa */ - GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer > storage = QSharedPointer >(), const QSharedPointer resourceAccess = QSharedPointer()) + GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()) : Akonadi2::StoreFacade(), mResourceAccess(resourceAccess), - mStorage(storage), mDomainTypeAdaptorFactory(adaptorFactory), mResourceInstanceIdentifier(resourceIdentifier) { @@ -177,48 +171,28 @@ public: //TODO JOBAPI return job from sync continuation to execute it as subjob? KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { - { - QSet remainingFilters; - auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; - } - } - return true; - }; - - auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) { - Trace() << "Running fetchEntities" << parent; - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); + auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) { + Trace() << "Fetching initial set for parent:" << parent; - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); - auto modifiedQuery = query; - modifiedQuery.propertyFilter.insert("parent", parent); - //TODO - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction); - QSet remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - auto filteredSet = filterSet(resultSet, filter, transaction, true); - replaySet(filteredSet, resultProvider); - resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); - qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); - //TODO send newRevision to resource - // mResourceAccess->sendRevisionReplayedCommand(newRevision); - }; - resultProvider->setFetcher(fetchEntities); - } + auto modifiedQuery = query; + modifiedQuery.propertyFilter.insert("parent", parent); + + QSet remainingFilters; + auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true); + replaySet(filteredSet, resultProvider); + const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); + resultProvider->setRevision(newRevision); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + }; + resultProvider->setFetcher(fetchEntities); auto runner = QSharedPointer::create(query); QWeakPointer > weakResultProvider = resultProvider; @@ -233,8 +207,6 @@ public: return; } executeQuery(query, resultProvider).template then([&future, this](qint64 queriedRevision) { - //TODO set revision in result provider? - //TODO update all existing results with new revision mResourceAccess->sendRevisionReplayedCommand(queriedRevision); future.setFinished(); }).exec(); @@ -249,27 +221,12 @@ public: mResourceAccess->open(); QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); } + return KAsync::null(); //We have to capture the runner to keep it alive - return synchronizeResource(query).template then([runner](KAsync::Future &future) { - future.setFinished(); - }, - [](int error, const QString &errorString) { - Warning() << "Error during sync " << error << errorString; - }); } private: - KAsync::Job synchronizeResource(const Akonadi2::Query &query) - { - //TODO check if a sync is necessary - //TODO Only sync what was requested - //TODO timeout - if (query.syncOnDemand || query.processAll) { - return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll); - } - return KAsync::null(); - } //TODO move into result provider? void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) @@ -280,17 +237,14 @@ private: Trace() << "Got creation"; //TODO Only copy in result provider resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->add(); break; case Akonadi2::Operation_Modification: Trace() << "Got modification"; resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->modify(); break; case Akonadi2::Operation_Removal: Trace() << "Got removal"; resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->remove(); break; } return true; @@ -320,13 +274,28 @@ private: }); } - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + ResultSet loadInitialResultSet(const QByteArray &parent, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { + Trace() << "Fetching initial set for parent:" << parent; + //TODO + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + return resultSet; + } + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + { + Trace() << "Loading incremental result set starting from revision: " << baseRevision; const auto bufferType = bufferTypeForDomainType(); auto revisionCounter = QSharedPointer::create(baseRevision); - //TODO apply filter from index - return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { + return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); //Spit out the revision keys one by one. while (*revisionCounter <= topRevision) { @@ -354,7 +323,7 @@ private: //Read through the source values and return whatever matches the filter std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { while (resultSetPtr->next()) { - //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) + //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { //Always remove removals, they probably don't match due to non-available properties if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { @@ -374,6 +343,20 @@ private: return ResultSet(generator); } + + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query) + { + return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + } + virtual KAsync::Job executeQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) { /* @@ -384,16 +367,6 @@ private: const qint64 baseRevision = resultProvider->revision() + 1; Trace() << "Running query " << baseRevision; QSet remainingFilters; - auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; - } - } - return true; - }; - qint64 newRevision = 0; Trace() << "Fetching updates"; Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); @@ -404,10 +377,10 @@ private: auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, filter, transaction, false); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); replaySet(filteredSet, resultProvider); resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); - newRevision = Akonadi2::Storage::maxRevision(transaction); + qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); return KAsync::start([=]() -> qint64 { return newRevision; diff --git a/common/modelresult.h b/common/modelresult.h index 756f4d6..eabb868 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -111,6 +111,10 @@ public: { auto childId = qHash(value->identifier()); auto id = parentId(value); + //Ignore updates we get before the initial fetch is done + if (!mEntityChildrenFetched[id]) { + return; + } auto parent = createIndexFromId(id); qDebug() << "Added entity " << childId; const auto keys = mTree[id]; @@ -131,6 +135,10 @@ public: { auto childId = qHash(value->identifier()); auto id = parentId(value); + //Ignore updates we get before the initial fetch is done + if (!mEntityChildrenFetched[id]) { + return; + } auto parent = createIndexFromId(id); qDebug() << "Modified entity" << childId; auto i = mTree[id].indexOf(childId); diff --git a/common/resultprovider.h b/common/resultprovider.h index 43d21a4..0d23127 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -23,6 +23,9 @@ #include #include #include "threadboundary.h" +#include "resultset.h" +#include "log.h" +#include "modelresult.h" using namespace async; @@ -117,18 +120,6 @@ public: // mResultEmitter->clear(); } - // QSharedPointer > emitter() - // { - // if (!mResultEmitter) { - // //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again - // auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); - // mResultEmitter = sharedPtr; - // return sharedPtr; - // } - // - // return mResultEmitter.toStrongRef(); - // } - /** * For lifetimemanagement only. * We keep the runner alive as long as the result provider exists. @@ -162,40 +153,6 @@ public: mQueryRunner = runner; } - // qint64 fetch(const ResultSet &resultSet) - // { - // //Fetch a bunch - // // - // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - // // Warning() << "Error during query: " << error.store << error.message; - // // }); - // // - // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - // - // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); - // // auto resultSet = getResultSet(query, transaction, baseRevision); - // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - // switch (operation) { - // case Akonadi2::Operation_Creation: - // Trace() << "Got creation"; - // //TODO Only copy in result provider - // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // case Akonadi2::Operation_Modification: - // Trace() << "Got modification"; - // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // case Akonadi2::Operation_Removal: - // Trace() << "Got removal"; - // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // } - // return true; - // })){}; - // // return Akonadi2::Storage::maxRevision(transaction); - // } - private: void done() { @@ -212,129 +169,6 @@ private: std::function mOnDoneCallback; }; - - - - - -template -class SyncResultProvider : public ResultProviderInterface { -public: - void add(const T &value) - { - mResultEmitter->addHandler(value); - } - - void modify(const T &value) - { - mResultEmitter->modifyHandler(value); - } - - void remove(const T &value) - { - mResultEmitter->removeHandler(value); - } - - void initialResultSetComplete() - { - mResultEmitter->initialResultSetComplete(); - } - - void complete() - { - mResultEmitter->complete(); - } - - void clear() - { - mResultEmitter->clear(); - } - - QSharedPointer > emitter() - { - if (!mResultEmitter) { - //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again - auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); - mResultEmitter = sharedPtr; - return sharedPtr; - } - - return mResultEmitter.toStrongRef(); - } - - /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ - void setFacade(const std::shared_ptr &facade) - { - mFacade = facade; - } - - void onDone(const std::function &callback) - { - mOnDoneCallback = callback; - } - - bool isDone() const - { - //The existance of the emitter currently defines wether we're done or not. - return mResultEmitter.toStrongRef().isNull(); - } - - // qint64 fetch(const ResultSet &resultSet) - // { - // //Fetch a bunch - // // - // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - // // Warning() << "Error during query: " << error.store << error.message; - // // }); - // // - // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - // - // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); - // // auto resultSet = getResultSet(query, transaction, baseRevision); - // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - // switch (operation) { - // case Akonadi2::Operation_Creation: - // Trace() << "Got creation"; - // //TODO Only copy in result provider - // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // case Akonadi2::Operation_Modification: - // Trace() << "Got modification"; - // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // case Akonadi2::Operation_Removal: - // Trace() << "Got removal"; - // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // } - // return true; - // })){}; - // // return Akonadi2::Storage::maxRevision(transaction); - // } - -private: - void done() - { - qWarning() << "done"; - if (mOnDoneCallback) { - mOnDoneCallback(); - mOnDoneCallback = std::function(); - } - } - - QWeakPointer > mResultEmitter; - std::shared_ptr mFacade; - std::function mOnDoneCallback; - QSharedPointer mThreadBoundary; -}; - - - - /* * The promise side for the result emitter */ @@ -434,18 +268,18 @@ public: } /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ void setQueryRunner(const QSharedPointer &runner) { mQueryRunner = runner; } /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ void setFacade(const std::shared_ptr &facade) { mFacade = facade; @@ -463,6 +297,11 @@ public: return mResultEmitter.toStrongRef().isNull(); } + void setFetcher(const std::function &fetcher) + { + fetcher(QByteArray()); + } + private: void done() { -- cgit v1.2.3