From 09aafbd1373b5d1152ac7a453a140a7f76c2e90e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 13 Nov 2015 19:34:47 +0100 Subject: It's starting to work --- common/clientapi.h | 32 ++++- common/facade.h | 263 ++++++++++++++++++++++++++++------------ common/facadeinterface.h | 4 +- common/modelresult.h | 139 +++++++++++---------- common/resourcefacade.cpp | 2 +- common/resourcefacade.h | 2 +- common/resultprovider.h | 303 +++++++++++++++++++++++++++++++++++++++++++++- 7 files changed, 598 insertions(+), 147 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index 9a32188..a424424 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -101,11 +102,34 @@ public: /** * Asynchronusly load a dataset with tree structure information */ - // template - // static TreeSet loadTree(Query) - // { + template + static QSharedPointer loadModel(Query query) + { + auto model = QSharedPointer >::create(query, QList() << "summary" << "uid"); + auto resultProvider = QSharedPointer >::create(model); + + // Query all resources and aggregate results + KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) + .template each([query, resultProvider](const QByteArray &resource, KAsync::Future &future) { + auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); + if (facade) { + facade->load(query, resultProvider).template then([&future](){future.setFinished();}).exec(); + //Keep the facade alive for the lifetime of the resultSet. + resultProvider->setFacade(facade); + } else { + //Ignore the error and carry on + future.setFinished(); + } + }).template then([query, resultProvider]() { + resultProvider->initialResultSetComplete(); + if (!query.liveQuery) { + resultProvider->complete(); + } + }).exec(); + + return model; + } - // } template static std::shared_ptr > getFacade(const QByteArray &resourceInstanceIdentifier) { diff --git a/common/facade.h b/common/facade.h index 643ebec..eb55c98 100644 --- a/common/facade.h +++ b/common/facade.h @@ -135,68 +135,6 @@ public: if (!mResourceAccess) { mResourceAccess = QSharedPointer::create(resourceIdentifier); } - if (!mStorage) { - mStorage = QSharedPointer >::create(resourceIdentifier); - const auto bufferType = bufferTypeForDomainType(); - - mStorage->readEntity = [bufferType, this] (const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) - { - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - }; - - mStorage->loadInitialResultSet = [bufferType, this] (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet - { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - return fullScan(transaction, bufferType); - } - return resultSet; - }; - - mStorage->loadIncrementalResultSet = [bufferType, this] (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet - { - auto revisionCounter = QSharedPointer::create(baseRevision); - return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - //We're done - return QByteArray(); - }); - }; - } } ~GenericFacade() @@ -237,13 +175,56 @@ public: } //TODO JOBAPI return job from sync continuation to execute it as subjob? - KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE + KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { + { + QSet remainingFilters; + auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + + auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) { + Trace() << "Running fetchEntities" << parent; + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + auto modifiedQuery = query; + modifiedQuery.propertyFilter.insert("parent", parent); + //TODO + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction); + QSet remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + auto filteredSet = filterSet(resultSet, filter, transaction, true); + replaySet(filteredSet, resultProvider); + resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); + qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); + //TODO send newRevision to resource + // mResourceAccess->sendRevisionReplayedCommand(newRevision); + }; + resultProvider->setFetcher(fetchEntities); + } + auto runner = QSharedPointer::create(query); - QWeakPointer > weakResultProvider = resultProvider; - runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision) -> KAsync::Job { - return KAsync::start([this, weakResultProvider, query, oldRevision](KAsync::Future &future) { - Trace() << "Executing query " << oldRevision; + QWeakPointer > weakResultProvider = resultProvider; + runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job { + return KAsync::start([this, weakResultProvider, query](KAsync::Future &future) { + Trace() << "Executing query "; auto resultProvider = weakResultProvider.toStrongRef(); if (!resultProvider) { Warning() << "Tried executing query after result provider is already gone"; @@ -251,11 +232,10 @@ public: future.setFinished(); return; } - load(query, resultProvider, oldRevision).template then([&future, this](qint64 queriedRevision) { + executeQuery(query, resultProvider).template then([&future, this](qint64 queriedRevision) { //TODO set revision in result provider? //TODO update all existing results with new revision mResourceAccess->sendRevisionReplayedCommand(queriedRevision); - future.setValue(queriedRevision); future.setFinished(); }).exec(); }); @@ -272,9 +252,7 @@ public: //We have to capture the runner to keep it alive return synchronizeResource(query).template then([runner](KAsync::Future &future) { - runner->run().then([&future]() { - future.setFinished(); - }).exec(); + future.setFinished(); }, [](int error, const QString &errorString) { Warning() << "Error during sync " << error << errorString; @@ -293,17 +271,152 @@ private: return KAsync::null(); } - virtual KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider, qint64 oldRevision) + //TODO move into result provider? + void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) + { + while (resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + Trace() << "Got creation"; + //TODO Only copy in result provider + resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->add(); + break; + case Akonadi2::Operation_Modification: + Trace() << "Got modification"; + resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->modify(); + break; + case Akonadi2::Operation_Removal: + Trace() << "Got removal"; + resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->remove(); + break; + } + return true; + })){}; + } + + void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) + { + const auto bufferType = bufferTypeForDomainType(); + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + // + // Akonadi2::Storage::getLatest(transaction, bufferTye, key); + transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + } + + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + { + + const auto bufferType = bufferTypeForDomainType(); + auto revisionCounter = QSharedPointer::create(baseRevision); + //TODO apply filter from index + return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + //We're done + return QByteArray(); + }); + } + + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) { + auto resultSetPtr = QSharedPointer::create(resultSet); + + //Read through the source values and return whatever matches the filter + std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { + while (resultSetPtr->next()) { + //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) + readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + //Always remove removals, they probably don't match due to non-available properties + if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { + if (initialQuery) { + //We're not interested in removals during the initial query + if (operation != Akonadi2::Operation_Removal) { + callback(domainObject, Akonadi2::Operation_Creation); + } + } else { + callback(domainObject, operation); + } + } + }); + } + return false; + }; + return ResultSet(generator); + } + + virtual KAsync::Job executeQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + { + /* + * This method gets called initially, and after every revision change. + * * We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + * * Incremental updates are loaded directly, leaving it up to the model to discard the changes if they are not interesting + */ + const qint64 baseRevision = resultProvider->revision() + 1; + Trace() << "Running query " << baseRevision; + QSet remainingFilters; + auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + qint64 newRevision = 0; + + Trace() << "Fetching updates"; + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, filter, transaction, false); + replaySet(filteredSet, resultProvider); + resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); + newRevision = Akonadi2::Storage::maxRevision(transaction); + return KAsync::start([=]() -> qint64 { - return mStorage->read(query, oldRevision, resultProvider); + return newRevision; }); } protected: //TODO use one resource access instance per application & per resource QSharedPointer mResourceAccess; - QSharedPointer > mStorage; DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; QByteArray mResourceInstanceIdentifier; }; diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 3a38db8..571a1e8 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h @@ -45,7 +45,7 @@ public: virtual KAsync::Job create(const DomainType &domainObject) = 0; virtual KAsync::Job modify(const DomainType &domainObject) = 0; virtual KAsync::Job remove(const DomainType &domainObject) = 0; - virtual KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) = 0; + virtual KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) = 0; }; template @@ -67,7 +67,7 @@ public: return KAsync::error(-1, "Failed to create a facade"); } - KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) + KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) { return KAsync::error(-1, "Failed to create a facade"); } diff --git a/common/modelresult.h b/common/modelresult.h index c23c41e..756f4d6 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -24,11 +24,10 @@ #include #include #include "query.h" -#include "clientapi.h" #include "resultprovider.h" -template +template class ModelResult : public QAbstractItemModel { public: @@ -79,13 +78,18 @@ public: return createIndex(row, column, childId); } + QModelIndex createIndexFromId(const qint64 &id) const + { + auto grandParentId = mParents.value(id, 0); + auto row = mTree.value(grandParentId).indexOf(id); + return createIndex(row, 0, id); + } + QModelIndex parent(const QModelIndex &index) const { auto id = getIdentifier(index); auto parentId = mParents.value(id); - auto grandParentId = mParents.value(parentId, 0); - auto row = mTree.value(grandParentId).indexOf(parentId); - return createIndex(row, 0, parentId); + return createIndexFromId(parentId); } bool canFetchMore(const QModelIndex &parent) const @@ -98,83 +102,92 @@ public: fetchEntities(parent); } + qint64 parentId(const Ptr &value) + { + return qHash(value->getProperty("parent").toByteArray()); + } + + void add(const Ptr &value) + { + auto childId = qHash(value->identifier()); + auto id = parentId(value); + auto parent = createIndexFromId(id); + qDebug() << "Added entity " << childId; + const auto keys = mTree[id]; + int index = 0; + for (; index < keys.size(); index++) { + if (childId < keys.at(index)) { + break; + } + } + beginInsertRows(parent, index, index); + mEntities.insert(childId, value); + mTree[id].insert(index, childId); + mParents.insert(childId, id); + endInsertRows(); + } + + void modify(const Ptr &value) + { + auto childId = qHash(value->identifier()); + auto id = parentId(value); + auto parent = createIndexFromId(id); + qDebug() << "Modified entity" << childId; + auto i = mTree[id].indexOf(childId); + mEntities.remove(childId); + mEntities.insert(childId, value); + //TODO check for change of parents + auto idx = index(i, 0, parent); + emit dataChanged(idx, idx); + } + + void remove(const Ptr &value) + { + auto childId = qHash(value->identifier()); + auto id = parentId(value); + auto parent = createIndexFromId(id); + qDebug() << "Removed entity" << childId; + auto index = mTree[id].indexOf(qHash(value->identifier())); + beginRemoveRows(parent, index, index); + mEntities.remove(childId); + mTree[id].removeAll(childId); + mParents.remove(childId); + //TODO remove children + endRemoveRows(); + } + void fetchEntities(const QModelIndex &parent) { qDebug() << "Fetching entities"; const auto id = getIdentifier(parent); - // beginResetModel(); - // mEntities.remove(id); mEntityChildrenFetched[id] = true; - auto query = mQuery; + QByteArray parentIdentifier; if (!parent.isValid()) { qDebug() << "no parent"; - query.propertyFilter.insert("parent", QByteArray()); } else { qDebug() << "parent is valid"; - auto object = parent.data(DomainObjectRole).template value(); + auto object = parent.data(DomainObjectRole).template value(); Q_ASSERT(object); - query.propertyFilter.insert("parent", object->identifier()); + parentIdentifier = object->identifier(); } - auto emitter = Akonadi2::Store::load(query); - emitter->onAdded([this, id, parent](const typename T::Ptr &value) { - auto childId = qHash(value->identifier()); - qDebug() << "Added entity " << childId; - const auto keys = mTree[id]; - int index = 0; - for (; index < keys.size(); index++) { - if (childId < keys.at(index)) { - break; - } - } - beginInsertRows(parent, index, index); - mEntities.insert(childId, value); - mTree[id].insert(index, childId); - mParents.insert(childId, id); - endInsertRows(); - }); - emitter->onModified([this, id, parent](const typename T::Ptr &value) { - auto childId = qHash(value->identifier()); - qDebug() << "Modified entity" << childId; - auto i = mTree[id].indexOf(childId); - mEntities.remove(childId); - mEntities.insert(childId, value); - //TODO check for change of parents - auto idx = index(i, 0, parent); - emit dataChanged(idx, idx); - }); - emitter->onRemoved([this, id, parent](const typename T::Ptr &value) { - auto childId = qHash(value->identifier()); - qDebug() << "Removed entity" << childId; - auto index = mTree[id].indexOf(qHash(value->identifier())); - beginRemoveRows(parent, index, index); - mEntities.remove(childId); - mTree[id].removeAll(childId); - mParents.remove(childId); - //TODO remove children - endRemoveRows(); - }); - emitter->onInitialResultSetComplete([this]() { - }); - emitter->onComplete([this, id]() { - mEmitter[id].clear(); - }); - emitter->onClear([this]() { - // beginResetModel(); - // mEntities.clear(); - // endResetModel(); - }); - mEmitter.insert(id, emitter); - // endResetModel(); + Trace() << "Loading entities"; + loadEntities(parentIdentifier); + } + + void setFetcher(const std::function &fetcher) + { + Trace() << "Setting fetcher"; + loadEntities = fetcher; } private: - QMap >> mEmitter; //TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap and QList - QMap mEntities; + QMap mEntities; QMap /* child entity id*/> mTree; QMap mParents; QMap mEntityChildrenFetched; QList mPropertyColumns; Akonadi2::Query mQuery; + std::function loadEntities; }; diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 54185f8..0b7c5a3 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -54,7 +54,7 @@ KAsync::Job ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon }); } -KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) +KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) { return KAsync::start([query, resultProvider]() { const auto configuredResources = ResourceConfig::getResources(); diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 437ff75..850d380 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h @@ -37,5 +37,5 @@ public: KAsync::Job create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; - KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE; + KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE; }; diff --git a/common/resultprovider.h b/common/resultprovider.h index bc03152..43d21a4 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -34,11 +34,312 @@ namespace Akonadi2 { template class ResultEmitter; +template +class ResultProviderInterface +{ +public: + ResultProviderInterface() + : mRevision(0) + { + + } + + virtual void add(const T &value) = 0; + virtual void modify(const T &value) = 0; + virtual void remove(const T &value) = 0; + virtual void initialResultSetComplete() = 0; + virtual void complete() = 0; + virtual void clear() = 0; + virtual void setFetcher(const std::function &fetcher) + { + } + + virtual void setFacade(const std::shared_ptr &facade) = 0; + virtual void setQueryRunner(const QSharedPointer &runner) = 0; + + void setRevision(qint64 revision) + { + mRevision = revision; + } + + qint64 revision() const + { + return mRevision; + } + +private: + qint64 mRevision; +}; + +template +class ModelResultProvider : public ResultProviderInterface { +public: + ModelResultProvider(QWeakPointer > model) + : ResultProviderInterface(), + mModel(model) + { + + } + + void add(const Ptr &value) + { + if (auto model = mModel.toStrongRef()) { + model->add(value); + } + } + + void modify(const Ptr &value) + { + if (auto model = mModel.toStrongRef()) { + model->modify(value); + } + } + + void remove(const Ptr &value) + { + if (auto model = mModel.toStrongRef()) { + model->remove(value); + } + } + + void initialResultSetComplete() + { + // mResultEmitter->initialResultSetComplete(); + } + + void complete() + { + // mResultEmitter->complete(); + } + + void clear() + { + // mResultEmitter->clear(); + } + + // QSharedPointer > emitter() + // { + // if (!mResultEmitter) { + // //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again + // auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); + // mResultEmitter = sharedPtr; + // return sharedPtr; + // } + // + // return mResultEmitter.toStrongRef(); + // } + + /** + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ + void setFacade(const std::shared_ptr &facade) + { + mFacade = facade; + } + + void onDone(const std::function &callback) + { + mOnDoneCallback = callback; + } + + bool isDone() const + { + //The existance of the emitter currently defines wether we're done or not. + // return mResultEmitter.toStrongRef().isNull(); + return true; + } + + void setFetcher(const std::function &fetcher) + { + if (auto model = mModel.toStrongRef()) { + model->setFetcher(fetcher); + } + } + + void setQueryRunner(const QSharedPointer &runner) + { + mQueryRunner = runner; + } + + // qint64 fetch(const ResultSet &resultSet) + // { + // //Fetch a bunch + // // + // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + // // Warning() << "Error during query: " << error.store << error.message; + // // }); + // // + // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + // + // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); + // // auto resultSet = getResultSet(query, transaction, baseRevision); + // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + // switch (operation) { + // case Akonadi2::Operation_Creation: + // Trace() << "Got creation"; + // //TODO Only copy in result provider + // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // case Akonadi2::Operation_Modification: + // Trace() << "Got modification"; + // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // case Akonadi2::Operation_Removal: + // Trace() << "Got removal"; + // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // } + // return true; + // })){}; + // // return Akonadi2::Storage::maxRevision(transaction); + // } + +private: + void done() + { + qWarning() << "done"; + if (mOnDoneCallback) { + mOnDoneCallback(); + mOnDoneCallback = std::function(); + } + } + + QWeakPointer > mModel; + QSharedPointer mQueryRunner; + std::shared_ptr mFacade; + std::function mOnDoneCallback; +}; + + + + + + +template +class SyncResultProvider : public ResultProviderInterface { +public: + void add(const T &value) + { + mResultEmitter->addHandler(value); + } + + void modify(const T &value) + { + mResultEmitter->modifyHandler(value); + } + + void remove(const T &value) + { + mResultEmitter->removeHandler(value); + } + + void initialResultSetComplete() + { + mResultEmitter->initialResultSetComplete(); + } + + void complete() + { + mResultEmitter->complete(); + } + + void clear() + { + mResultEmitter->clear(); + } + + QSharedPointer > emitter() + { + if (!mResultEmitter) { + //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again + auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); + mResultEmitter = sharedPtr; + return sharedPtr; + } + + return mResultEmitter.toStrongRef(); + } + + /** + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ + void setFacade(const std::shared_ptr &facade) + { + mFacade = facade; + } + + void onDone(const std::function &callback) + { + mOnDoneCallback = callback; + } + + bool isDone() const + { + //The existance of the emitter currently defines wether we're done or not. + return mResultEmitter.toStrongRef().isNull(); + } + + // qint64 fetch(const ResultSet &resultSet) + // { + // //Fetch a bunch + // // + // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + // // Warning() << "Error during query: " << error.store << error.message; + // // }); + // // + // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + // + // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); + // // auto resultSet = getResultSet(query, transaction, baseRevision); + // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + // switch (operation) { + // case Akonadi2::Operation_Creation: + // Trace() << "Got creation"; + // //TODO Only copy in result provider + // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // case Akonadi2::Operation_Modification: + // Trace() << "Got modification"; + // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // case Akonadi2::Operation_Removal: + // Trace() << "Got removal"; + // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // } + // return true; + // })){}; + // // return Akonadi2::Storage::maxRevision(transaction); + // } + +private: + void done() + { + qWarning() << "done"; + if (mOnDoneCallback) { + mOnDoneCallback(); + mOnDoneCallback = std::function(); + } + } + + QWeakPointer > mResultEmitter; + std::shared_ptr mFacade; + std::function mOnDoneCallback; + QSharedPointer mThreadBoundary; +}; + + + + /* * The promise side for the result emitter */ template -class ResultProvider { +class ResultProvider : public ResultProviderInterface { private: void callInMainThreadOnEmitter(void (ResultEmitter::*f)()) { -- cgit v1.2.3