From 9f6e0fbfd8cf23104eba5a78f89a69fab1a417f5 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 9 Nov 2015 16:04:46 +0100 Subject: Added a folder type --- common/CMakeLists.txt | 3 ++ common/domain/applicationdomaintype.h | 2 + common/domain/dummy.fbs | 7 ++++ common/domain/folder.cpp | 71 +++++++++++++++++++++++++++++++++++ common/domain/folder.fbs | 9 +++++ common/domain/folder.h | 56 +++++++++++++++++++++++++++ common/domainadaptor.h | 1 + 7 files changed, 149 insertions(+) create mode 100644 common/domain/dummy.fbs create mode 100644 common/domain/folder.cpp create mode 100644 common/domain/folder.fbs create mode 100644 common/domain/folder.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index b4a4703..f24ec46 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -36,6 +36,7 @@ set(command_SRCS domain/applicationdomaintype.cpp domain/event.cpp domain/mail.cpp + domain/folder.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) @@ -55,6 +56,8 @@ generate_flatbuffers( commands/revisionreplayed domain/event domain/mail + domain/folder + domain/dummy entity metadata queuedcommand diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h index 5514d26..b4cf8c4 100644 --- a/common/domain/applicationdomaintype.h +++ b/common/domain/applicationdomaintype.h @@ -160,3 +160,5 @@ Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event) Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event::Ptr) Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail) Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail::Ptr) +Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Folder) +Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Folder::Ptr) diff --git a/common/domain/dummy.fbs b/common/domain/dummy.fbs new file mode 100644 index 0000000..8816b09 --- /dev/null +++ b/common/domain/dummy.fbs @@ -0,0 +1,7 @@ +namespace Akonadi2.ApplicationDomain.Buffer; + +table Dummy { +} + +root_type Dummy; +file_identifier "AKFB"; diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp new file mode 100644 index 0000000..50f73c2 --- /dev/null +++ b/common/domain/folder.cpp @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2015 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "folder.h" + +#include +#include +#include + +#include "../resultset.h" +#include "../index.h" +#include "../storage.h" +#include "../log.h" +#include "../propertymapper.h" +#include "../query.h" +#include "../definitions.h" + +#include "folder_generated.h" + +using namespace Akonadi2::ApplicationDomain; + +ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) +{ + QVector keys; + return ResultSet(keys); +} + +void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) +{ +} + +void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) +{ +} + +QSharedPointer::Buffer> > TypeImplementation::initializeReadPropertyMapper() +{ + auto propertyMapper = QSharedPointer >::create(); + propertyMapper->addMapping("parent", &Buffer::parent); + propertyMapper->addMapping("name", &Buffer::name); + return propertyMapper; +} + +QSharedPointer::BufferBuilder> > TypeImplementation::initializeWritePropertyMapper() +{ + auto propertyMapper = QSharedPointer >::create(); + propertyMapper->addMapping("parent", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { + auto offset = variantToProperty(value, fbb); + return [offset](BufferBuilder &builder) { builder.add_parent(offset); }; + }); + propertyMapper->addMapping("name", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { + auto offset = variantToProperty(value, fbb); + return [offset](BufferBuilder &builder) { builder.add_name(offset); }; + }); + return propertyMapper; +} diff --git a/common/domain/folder.fbs b/common/domain/folder.fbs new file mode 100644 index 0000000..3476d58 --- /dev/null +++ b/common/domain/folder.fbs @@ -0,0 +1,9 @@ +namespace Akonadi2.ApplicationDomain.Buffer; + +table Folder { + name:string; + parent:string; +} + +root_type Folder; +file_identifier "AKFB"; diff --git a/common/domain/folder.h b/common/domain/folder.h new file mode 100644 index 0000000..545836f --- /dev/null +++ b/common/domain/folder.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2015 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "applicationdomaintype.h" + +#include "storage.h" + +class ResultSet; +class QByteArray; + +template +class ReadPropertyMapper; +template +class WritePropertyMapper; + +namespace Akonadi2 { + class Query; + +namespace ApplicationDomain { + namespace Buffer { + struct Folder; + struct FolderBuilder; + } + +template<> +class TypeImplementation { +public: + typedef Akonadi2::ApplicationDomain::Buffer::Folder Buffer; + typedef Akonadi2::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; + static QSet indexedProperties(); + static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction); + static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); + static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); + static QSharedPointer > initializeReadPropertyMapper(); + static QSharedPointer > initializeWritePropertyMapper(); +}; + +} +} diff --git a/common/domainadaptor.h b/common/domainadaptor.h index b14fbcd..620a658 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h @@ -26,6 +26,7 @@ #include "domain/applicationdomaintype.h" #include "domain/event.h" #include "domain/mail.h" +#include "domain/folder.h" #include "bufferadaptor.h" #include "entity_generated.h" #include "metadata_generated.h" -- cgit v1.2.3 From fa1f58e8a83c6dc524ee0540f450065014e1a825 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 10 Nov 2015 11:48:00 +0100 Subject: Cleanups --- common/domain/applicationdomaintype.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'common') diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp index 1b5d870..c9a8bba 100644 --- a/common/domain/applicationdomaintype.cpp +++ b/common/domain/applicationdomaintype.cpp @@ -60,10 +60,13 @@ ApplicationDomainType& ApplicationDomainType::operator=(const ApplicationDomainT return *this; } -ApplicationDomainType::~ApplicationDomainType() {} +ApplicationDomainType::~ApplicationDomainType() +{ +} QVariant ApplicationDomainType::getProperty(const QByteArray &key) const { + Q_ASSERT(mAdaptor); if (!mAdaptor->availableProperties().contains(key)) { Warning() << "No such property available " << key; } @@ -72,7 +75,9 @@ QVariant ApplicationDomainType::getProperty(const QByteArray &key) const void ApplicationDomainType::setProperty(const QByteArray &key, const QVariant &value) { - mChangeSet.insert(key, value); mAdaptor->setProperty(key, value); + Q_ASSERT(mAdaptor); + mChangeSet.insert(key, value); + mAdaptor->setProperty(key, value); } QByteArrayList ApplicationDomainType::changedProperties() const -- cgit v1.2.3 From 10d19014fe2c9c02f2bc3e19732cbe340e316076 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 10 Nov 2015 12:05:39 +0100 Subject: A result model The result model drives the data retrieval and provides the interace for consumers --- common/modelresult.h | 180 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 common/modelresult.h (limited to 'common') diff --git a/common/modelresult.h b/common/modelresult.h new file mode 100644 index 0000000..c23c41e --- /dev/null +++ b/common/modelresult.h @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2014 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ + +#pragma once + +#include +#include +#include +#include "query.h" +#include "clientapi.h" + +#include "resultprovider.h" + +template +class ModelResult : public QAbstractItemModel +{ +public: + + enum Roles { + DomainObjectRole = Qt::UserRole + 1 + }; + + ModelResult(const Akonadi2::Query &query, const QList &propertyColumns) + :QAbstractItemModel(), + mPropertyColumns(propertyColumns) + { + } + + static qint64 getIdentifier(const QModelIndex &idx) + { + if (!idx.isValid()) { + return 0; + } + return idx.internalId(); + } + + int rowCount(const QModelIndex &parent = QModelIndex()) const + { + return mTree[getIdentifier(parent)].size(); + } + + int columnCount(const QModelIndex &parent = QModelIndex()) const + { + return mPropertyColumns.size(); + } + + virtual QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const + { + if (role == DomainObjectRole) { + qWarning() << "trying to get entity " << index.internalId(); + Q_ASSERT(mEntities.contains(index.internalId())); + return QVariant::fromValue(mEntities.value(index.internalId())); + } + qDebug() << "Invalid role"; + return QVariant(); + } + + QModelIndex index(int row, int column, const QModelIndex & parent = QModelIndex()) const + { + auto id = getIdentifier(parent); + auto childId = mTree.value(id).at(row); + return createIndex(row, column, childId); + } + + QModelIndex parent(const QModelIndex &index) const + { + auto id = getIdentifier(index); + auto parentId = mParents.value(id); + auto grandParentId = mParents.value(parentId, 0); + auto row = mTree.value(grandParentId).indexOf(parentId); + return createIndex(row, 0, parentId); + } + + bool canFetchMore(const QModelIndex &parent) const + { + return mEntityChildrenFetched.value(parent.internalId()); + } + + void fetchMore(const QModelIndex &parent) + { + fetchEntities(parent); + } + + void fetchEntities(const QModelIndex &parent) + { + qDebug() << "Fetching entities"; + const auto id = getIdentifier(parent); + // beginResetModel(); + // mEntities.remove(id); + mEntityChildrenFetched[id] = true; + auto query = mQuery; + if (!parent.isValid()) { + qDebug() << "no parent"; + query.propertyFilter.insert("parent", QByteArray()); + } else { + qDebug() << "parent is valid"; + auto object = parent.data(DomainObjectRole).template value(); + Q_ASSERT(object); + query.propertyFilter.insert("parent", object->identifier()); + } + auto emitter = Akonadi2::Store::load(query); + emitter->onAdded([this, id, parent](const typename T::Ptr &value) { + auto childId = qHash(value->identifier()); + qDebug() << "Added entity " << childId; + const auto keys = mTree[id]; + int index = 0; + for (; index < keys.size(); index++) { + if (childId < keys.at(index)) { + break; + } + } + beginInsertRows(parent, index, index); + mEntities.insert(childId, value); + mTree[id].insert(index, childId); + mParents.insert(childId, id); + endInsertRows(); + }); + emitter->onModified([this, id, parent](const typename T::Ptr &value) { + auto childId = qHash(value->identifier()); + qDebug() << "Modified entity" << childId; + auto i = mTree[id].indexOf(childId); + mEntities.remove(childId); + mEntities.insert(childId, value); + //TODO check for change of parents + auto idx = index(i, 0, parent); + emit dataChanged(idx, idx); + }); + emitter->onRemoved([this, id, parent](const typename T::Ptr &value) { + auto childId = qHash(value->identifier()); + qDebug() << "Removed entity" << childId; + auto index = mTree[id].indexOf(qHash(value->identifier())); + beginRemoveRows(parent, index, index); + mEntities.remove(childId); + mTree[id].removeAll(childId); + mParents.remove(childId); + //TODO remove children + endRemoveRows(); + }); + emitter->onInitialResultSetComplete([this]() { + }); + emitter->onComplete([this, id]() { + mEmitter[id].clear(); + }); + emitter->onClear([this]() { + // beginResetModel(); + // mEntities.clear(); + // endResetModel(); + }); + mEmitter.insert(id, emitter); + // endResetModel(); + } + +private: + QMap >> mEmitter; + //TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap and QList + QMap mEntities; + QMap /* child entity id*/> mTree; + QMap mParents; + QMap mEntityChildrenFetched; + QList mPropertyColumns; + Akonadi2::Query mQuery; +}; + -- cgit v1.2.3 From 09aafbd1373b5d1152ac7a453a140a7f76c2e90e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 13 Nov 2015 19:34:47 +0100 Subject: It's starting to work --- common/clientapi.h | 32 ++++- common/facade.h | 263 ++++++++++++++++++++++++++++------------ common/facadeinterface.h | 4 +- common/modelresult.h | 139 +++++++++++---------- common/resourcefacade.cpp | 2 +- common/resourcefacade.h | 2 +- common/resultprovider.h | 303 +++++++++++++++++++++++++++++++++++++++++++++- 7 files changed, 598 insertions(+), 147 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index 9a32188..a424424 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -101,11 +102,34 @@ public: /** * Asynchronusly load a dataset with tree structure information */ - // template - // static TreeSet loadTree(Query) - // { + template + static QSharedPointer loadModel(Query query) + { + auto model = QSharedPointer >::create(query, QList() << "summary" << "uid"); + auto resultProvider = QSharedPointer >::create(model); + + // Query all resources and aggregate results + KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) + .template each([query, resultProvider](const QByteArray &resource, KAsync::Future &future) { + auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); + if (facade) { + facade->load(query, resultProvider).template then([&future](){future.setFinished();}).exec(); + //Keep the facade alive for the lifetime of the resultSet. + resultProvider->setFacade(facade); + } else { + //Ignore the error and carry on + future.setFinished(); + } + }).template then([query, resultProvider]() { + resultProvider->initialResultSetComplete(); + if (!query.liveQuery) { + resultProvider->complete(); + } + }).exec(); + + return model; + } - // } template static std::shared_ptr > getFacade(const QByteArray &resourceInstanceIdentifier) { diff --git a/common/facade.h b/common/facade.h index 643ebec..eb55c98 100644 --- a/common/facade.h +++ b/common/facade.h @@ -135,68 +135,6 @@ public: if (!mResourceAccess) { mResourceAccess = QSharedPointer::create(resourceIdentifier); } - if (!mStorage) { - mStorage = QSharedPointer >::create(resourceIdentifier); - const auto bufferType = bufferTypeForDomainType(); - - mStorage->readEntity = [bufferType, this] (const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) - { - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - }; - - mStorage->loadInitialResultSet = [bufferType, this] (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet - { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - return fullScan(transaction, bufferType); - } - return resultSet; - }; - - mStorage->loadIncrementalResultSet = [bufferType, this] (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet - { - auto revisionCounter = QSharedPointer::create(baseRevision); - return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - //We're done - return QByteArray(); - }); - }; - } } ~GenericFacade() @@ -237,13 +175,56 @@ public: } //TODO JOBAPI return job from sync continuation to execute it as subjob? - KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE + KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { + { + QSet remainingFilters; + auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + + auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) { + Trace() << "Running fetchEntities" << parent; + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + auto modifiedQuery = query; + modifiedQuery.propertyFilter.insert("parent", parent); + //TODO + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction); + QSet remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + auto filteredSet = filterSet(resultSet, filter, transaction, true); + replaySet(filteredSet, resultProvider); + resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); + qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); + //TODO send newRevision to resource + // mResourceAccess->sendRevisionReplayedCommand(newRevision); + }; + resultProvider->setFetcher(fetchEntities); + } + auto runner = QSharedPointer::create(query); - QWeakPointer > weakResultProvider = resultProvider; - runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision) -> KAsync::Job { - return KAsync::start([this, weakResultProvider, query, oldRevision](KAsync::Future &future) { - Trace() << "Executing query " << oldRevision; + QWeakPointer > weakResultProvider = resultProvider; + runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job { + return KAsync::start([this, weakResultProvider, query](KAsync::Future &future) { + Trace() << "Executing query "; auto resultProvider = weakResultProvider.toStrongRef(); if (!resultProvider) { Warning() << "Tried executing query after result provider is already gone"; @@ -251,11 +232,10 @@ public: future.setFinished(); return; } - load(query, resultProvider, oldRevision).template then([&future, this](qint64 queriedRevision) { + executeQuery(query, resultProvider).template then([&future, this](qint64 queriedRevision) { //TODO set revision in result provider? //TODO update all existing results with new revision mResourceAccess->sendRevisionReplayedCommand(queriedRevision); - future.setValue(queriedRevision); future.setFinished(); }).exec(); }); @@ -272,9 +252,7 @@ public: //We have to capture the runner to keep it alive return synchronizeResource(query).template then([runner](KAsync::Future &future) { - runner->run().then([&future]() { - future.setFinished(); - }).exec(); + future.setFinished(); }, [](int error, const QString &errorString) { Warning() << "Error during sync " << error << errorString; @@ -293,17 +271,152 @@ private: return KAsync::null(); } - virtual KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider, qint64 oldRevision) + //TODO move into result provider? + void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) + { + while (resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + Trace() << "Got creation"; + //TODO Only copy in result provider + resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->add(); + break; + case Akonadi2::Operation_Modification: + Trace() << "Got modification"; + resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->modify(); + break; + case Akonadi2::Operation_Removal: + Trace() << "Got removal"; + resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // modelResult->remove(); + break; + } + return true; + })){}; + } + + void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) + { + const auto bufferType = bufferTypeForDomainType(); + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + // + // Akonadi2::Storage::getLatest(transaction, bufferTye, key); + transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + } + + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + { + + const auto bufferType = bufferTypeForDomainType(); + auto revisionCounter = QSharedPointer::create(baseRevision); + //TODO apply filter from index + return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + //We're done + return QByteArray(); + }); + } + + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) { + auto resultSetPtr = QSharedPointer::create(resultSet); + + //Read through the source values and return whatever matches the filter + std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { + while (resultSetPtr->next()) { + //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) + readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + //Always remove removals, they probably don't match due to non-available properties + if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { + if (initialQuery) { + //We're not interested in removals during the initial query + if (operation != Akonadi2::Operation_Removal) { + callback(domainObject, Akonadi2::Operation_Creation); + } + } else { + callback(domainObject, operation); + } + } + }); + } + return false; + }; + return ResultSet(generator); + } + + virtual KAsync::Job executeQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + { + /* + * This method gets called initially, and after every revision change. + * * We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + * * Incremental updates are loaded directly, leaving it up to the model to discard the changes if they are not interesting + */ + const qint64 baseRevision = resultProvider->revision() + 1; + Trace() << "Running query " << baseRevision; + QSet remainingFilters; + auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + qint64 newRevision = 0; + + Trace() << "Fetching updates"; + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, filter, transaction, false); + replaySet(filteredSet, resultProvider); + resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); + newRevision = Akonadi2::Storage::maxRevision(transaction); + return KAsync::start([=]() -> qint64 { - return mStorage->read(query, oldRevision, resultProvider); + return newRevision; }); } protected: //TODO use one resource access instance per application & per resource QSharedPointer mResourceAccess; - QSharedPointer > mStorage; DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; QByteArray mResourceInstanceIdentifier; }; diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 3a38db8..571a1e8 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h @@ -45,7 +45,7 @@ public: virtual KAsync::Job create(const DomainType &domainObject) = 0; virtual KAsync::Job modify(const DomainType &domainObject) = 0; virtual KAsync::Job remove(const DomainType &domainObject) = 0; - virtual KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) = 0; + virtual KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) = 0; }; template @@ -67,7 +67,7 @@ public: return KAsync::error(-1, "Failed to create a facade"); } - KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) + KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) { return KAsync::error(-1, "Failed to create a facade"); } diff --git a/common/modelresult.h b/common/modelresult.h index c23c41e..756f4d6 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -24,11 +24,10 @@ #include #include #include "query.h" -#include "clientapi.h" #include "resultprovider.h" -template +template class ModelResult : public QAbstractItemModel { public: @@ -79,13 +78,18 @@ public: return createIndex(row, column, childId); } + QModelIndex createIndexFromId(const qint64 &id) const + { + auto grandParentId = mParents.value(id, 0); + auto row = mTree.value(grandParentId).indexOf(id); + return createIndex(row, 0, id); + } + QModelIndex parent(const QModelIndex &index) const { auto id = getIdentifier(index); auto parentId = mParents.value(id); - auto grandParentId = mParents.value(parentId, 0); - auto row = mTree.value(grandParentId).indexOf(parentId); - return createIndex(row, 0, parentId); + return createIndexFromId(parentId); } bool canFetchMore(const QModelIndex &parent) const @@ -98,83 +102,92 @@ public: fetchEntities(parent); } + qint64 parentId(const Ptr &value) + { + return qHash(value->getProperty("parent").toByteArray()); + } + + void add(const Ptr &value) + { + auto childId = qHash(value->identifier()); + auto id = parentId(value); + auto parent = createIndexFromId(id); + qDebug() << "Added entity " << childId; + const auto keys = mTree[id]; + int index = 0; + for (; index < keys.size(); index++) { + if (childId < keys.at(index)) { + break; + } + } + beginInsertRows(parent, index, index); + mEntities.insert(childId, value); + mTree[id].insert(index, childId); + mParents.insert(childId, id); + endInsertRows(); + } + + void modify(const Ptr &value) + { + auto childId = qHash(value->identifier()); + auto id = parentId(value); + auto parent = createIndexFromId(id); + qDebug() << "Modified entity" << childId; + auto i = mTree[id].indexOf(childId); + mEntities.remove(childId); + mEntities.insert(childId, value); + //TODO check for change of parents + auto idx = index(i, 0, parent); + emit dataChanged(idx, idx); + } + + void remove(const Ptr &value) + { + auto childId = qHash(value->identifier()); + auto id = parentId(value); + auto parent = createIndexFromId(id); + qDebug() << "Removed entity" << childId; + auto index = mTree[id].indexOf(qHash(value->identifier())); + beginRemoveRows(parent, index, index); + mEntities.remove(childId); + mTree[id].removeAll(childId); + mParents.remove(childId); + //TODO remove children + endRemoveRows(); + } + void fetchEntities(const QModelIndex &parent) { qDebug() << "Fetching entities"; const auto id = getIdentifier(parent); - // beginResetModel(); - // mEntities.remove(id); mEntityChildrenFetched[id] = true; - auto query = mQuery; + QByteArray parentIdentifier; if (!parent.isValid()) { qDebug() << "no parent"; - query.propertyFilter.insert("parent", QByteArray()); } else { qDebug() << "parent is valid"; - auto object = parent.data(DomainObjectRole).template value(); + auto object = parent.data(DomainObjectRole).template value(); Q_ASSERT(object); - query.propertyFilter.insert("parent", object->identifier()); + parentIdentifier = object->identifier(); } - auto emitter = Akonadi2::Store::load(query); - emitter->onAdded([this, id, parent](const typename T::Ptr &value) { - auto childId = qHash(value->identifier()); - qDebug() << "Added entity " << childId; - const auto keys = mTree[id]; - int index = 0; - for (; index < keys.size(); index++) { - if (childId < keys.at(index)) { - break; - } - } - beginInsertRows(parent, index, index); - mEntities.insert(childId, value); - mTree[id].insert(index, childId); - mParents.insert(childId, id); - endInsertRows(); - }); - emitter->onModified([this, id, parent](const typename T::Ptr &value) { - auto childId = qHash(value->identifier()); - qDebug() << "Modified entity" << childId; - auto i = mTree[id].indexOf(childId); - mEntities.remove(childId); - mEntities.insert(childId, value); - //TODO check for change of parents - auto idx = index(i, 0, parent); - emit dataChanged(idx, idx); - }); - emitter->onRemoved([this, id, parent](const typename T::Ptr &value) { - auto childId = qHash(value->identifier()); - qDebug() << "Removed entity" << childId; - auto index = mTree[id].indexOf(qHash(value->identifier())); - beginRemoveRows(parent, index, index); - mEntities.remove(childId); - mTree[id].removeAll(childId); - mParents.remove(childId); - //TODO remove children - endRemoveRows(); - }); - emitter->onInitialResultSetComplete([this]() { - }); - emitter->onComplete([this, id]() { - mEmitter[id].clear(); - }); - emitter->onClear([this]() { - // beginResetModel(); - // mEntities.clear(); - // endResetModel(); - }); - mEmitter.insert(id, emitter); - // endResetModel(); + Trace() << "Loading entities"; + loadEntities(parentIdentifier); + } + + void setFetcher(const std::function &fetcher) + { + Trace() << "Setting fetcher"; + loadEntities = fetcher; } private: - QMap >> mEmitter; //TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap and QList - QMap mEntities; + QMap mEntities; QMap /* child entity id*/> mTree; QMap mParents; QMap mEntityChildrenFetched; QList mPropertyColumns; Akonadi2::Query mQuery; + std::function loadEntities; }; diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 54185f8..0b7c5a3 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -54,7 +54,7 @@ KAsync::Job ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon }); } -KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) +KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) { return KAsync::start([query, resultProvider]() { const auto configuredResources = ResourceConfig::getResources(); diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 437ff75..850d380 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h @@ -37,5 +37,5 @@ public: KAsync::Job create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; - KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE; + KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE; }; diff --git a/common/resultprovider.h b/common/resultprovider.h index bc03152..43d21a4 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -34,11 +34,312 @@ namespace Akonadi2 { template class ResultEmitter; +template +class ResultProviderInterface +{ +public: + ResultProviderInterface() + : mRevision(0) + { + + } + + virtual void add(const T &value) = 0; + virtual void modify(const T &value) = 0; + virtual void remove(const T &value) = 0; + virtual void initialResultSetComplete() = 0; + virtual void complete() = 0; + virtual void clear() = 0; + virtual void setFetcher(const std::function &fetcher) + { + } + + virtual void setFacade(const std::shared_ptr &facade) = 0; + virtual void setQueryRunner(const QSharedPointer &runner) = 0; + + void setRevision(qint64 revision) + { + mRevision = revision; + } + + qint64 revision() const + { + return mRevision; + } + +private: + qint64 mRevision; +}; + +template +class ModelResultProvider : public ResultProviderInterface { +public: + ModelResultProvider(QWeakPointer > model) + : ResultProviderInterface(), + mModel(model) + { + + } + + void add(const Ptr &value) + { + if (auto model = mModel.toStrongRef()) { + model->add(value); + } + } + + void modify(const Ptr &value) + { + if (auto model = mModel.toStrongRef()) { + model->modify(value); + } + } + + void remove(const Ptr &value) + { + if (auto model = mModel.toStrongRef()) { + model->remove(value); + } + } + + void initialResultSetComplete() + { + // mResultEmitter->initialResultSetComplete(); + } + + void complete() + { + // mResultEmitter->complete(); + } + + void clear() + { + // mResultEmitter->clear(); + } + + // QSharedPointer > emitter() + // { + // if (!mResultEmitter) { + // //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again + // auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); + // mResultEmitter = sharedPtr; + // return sharedPtr; + // } + // + // return mResultEmitter.toStrongRef(); + // } + + /** + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ + void setFacade(const std::shared_ptr &facade) + { + mFacade = facade; + } + + void onDone(const std::function &callback) + { + mOnDoneCallback = callback; + } + + bool isDone() const + { + //The existance of the emitter currently defines wether we're done or not. + // return mResultEmitter.toStrongRef().isNull(); + return true; + } + + void setFetcher(const std::function &fetcher) + { + if (auto model = mModel.toStrongRef()) { + model->setFetcher(fetcher); + } + } + + void setQueryRunner(const QSharedPointer &runner) + { + mQueryRunner = runner; + } + + // qint64 fetch(const ResultSet &resultSet) + // { + // //Fetch a bunch + // // + // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + // // Warning() << "Error during query: " << error.store << error.message; + // // }); + // // + // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + // + // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); + // // auto resultSet = getResultSet(query, transaction, baseRevision); + // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + // switch (operation) { + // case Akonadi2::Operation_Creation: + // Trace() << "Got creation"; + // //TODO Only copy in result provider + // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // case Akonadi2::Operation_Modification: + // Trace() << "Got modification"; + // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // case Akonadi2::Operation_Removal: + // Trace() << "Got removal"; + // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // } + // return true; + // })){}; + // // return Akonadi2::Storage::maxRevision(transaction); + // } + +private: + void done() + { + qWarning() << "done"; + if (mOnDoneCallback) { + mOnDoneCallback(); + mOnDoneCallback = std::function(); + } + } + + QWeakPointer > mModel; + QSharedPointer mQueryRunner; + std::shared_ptr mFacade; + std::function mOnDoneCallback; +}; + + + + + + +template +class SyncResultProvider : public ResultProviderInterface { +public: + void add(const T &value) + { + mResultEmitter->addHandler(value); + } + + void modify(const T &value) + { + mResultEmitter->modifyHandler(value); + } + + void remove(const T &value) + { + mResultEmitter->removeHandler(value); + } + + void initialResultSetComplete() + { + mResultEmitter->initialResultSetComplete(); + } + + void complete() + { + mResultEmitter->complete(); + } + + void clear() + { + mResultEmitter->clear(); + } + + QSharedPointer > emitter() + { + if (!mResultEmitter) { + //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again + auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); + mResultEmitter = sharedPtr; + return sharedPtr; + } + + return mResultEmitter.toStrongRef(); + } + + /** + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ + void setFacade(const std::shared_ptr &facade) + { + mFacade = facade; + } + + void onDone(const std::function &callback) + { + mOnDoneCallback = callback; + } + + bool isDone() const + { + //The existance of the emitter currently defines wether we're done or not. + return mResultEmitter.toStrongRef().isNull(); + } + + // qint64 fetch(const ResultSet &resultSet) + // { + // //Fetch a bunch + // // + // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + // // Warning() << "Error during query: " << error.store << error.message; + // // }); + // // + // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + // + // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); + // // auto resultSet = getResultSet(query, transaction, baseRevision); + // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + // switch (operation) { + // case Akonadi2::Operation_Creation: + // Trace() << "Got creation"; + // //TODO Only copy in result provider + // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // case Akonadi2::Operation_Modification: + // Trace() << "Got modification"; + // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // case Akonadi2::Operation_Removal: + // Trace() << "Got removal"; + // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + // break; + // } + // return true; + // })){}; + // // return Akonadi2::Storage::maxRevision(transaction); + // } + +private: + void done() + { + qWarning() << "done"; + if (mOnDoneCallback) { + mOnDoneCallback(); + mOnDoneCallback = std::function(); + } + } + + QWeakPointer > mResultEmitter; + std::shared_ptr mFacade; + std::function mOnDoneCallback; + QSharedPointer mThreadBoundary; +}; + + + + /* * The promise side for the result emitter */ template -class ResultProvider { +class ResultProvider : public ResultProviderInterface { private: void callInMainThreadOnEmitter(void (ResultEmitter::*f)()) { -- cgit v1.2.3 From 75c231f0758603120ec562af772b48b5f6ac0e24 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 13 Nov 2015 23:31:41 +0100 Subject: DummyResourceTest and QueryTest are passing sync has been removed from the query code and is now a separate step --- common/clientapi.cpp | 2 +- common/facade.h | 147 +++++++++++++++---------------------- common/modelresult.h | 8 ++ common/resultprovider.h | 189 ++++-------------------------------------------- 4 files changed, 83 insertions(+), 263 deletions(-) (limited to 'common') diff --git a/common/clientapi.cpp b/common/clientapi.cpp index f99ebb8..839e77b 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp @@ -95,7 +95,7 @@ KAsync::Job Store::synchronize(const Akonadi2::Query &query) .template each([query](const QByteArray &resource, KAsync::Future &future) { auto resourceAccess = QSharedPointer::create(resource); resourceAccess->open(); - resourceAccess->synchronizeResource(true, false).then([&future]() { + resourceAccess->synchronizeResource(query.syncOnDemand, query.processAll).then([&future, resourceAccess]() { future.setFinished(); }).exec(); }) diff --git a/common/facade.h b/common/facade.h index eb55c98..5be1c73 100644 --- a/common/facade.h +++ b/common/facade.h @@ -44,19 +44,15 @@ class QueryRunner : public QObject { Q_OBJECT public: - typedef std::function(qint64 oldRevision)> QueryFunction; + typedef std::function()> QueryFunction; - QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; + QueryRunner(const Akonadi2::Query &query) {}; /** * Starts query */ KAsync::Job run(qint64 newRevision = 0) { - //TODO: JOBAPI: that last empty .then should not be necessary - //TODO: remove newRevision - return queryFunction(mLatestRevision + 1).then([this](qint64 revision) { - mLatestRevision = revision; - }).then([](){}); + return queryFunction(); } /** @@ -74,12 +70,11 @@ public slots: void revisionChanged(qint64 newRevision) { Trace() << "New revision: " << newRevision; - run(newRevision).exec(); + run().exec(); } private: QueryFunction queryFunction; - qint64 mLatestRevision; }; static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) @@ -125,10 +120,9 @@ public: * @param resourceIdentifier is the identifier of the resource instance * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa */ - GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer > storage = QSharedPointer >(), const QSharedPointer resourceAccess = QSharedPointer()) + GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()) : Akonadi2::StoreFacade(), mResourceAccess(resourceAccess), - mStorage(storage), mDomainTypeAdaptorFactory(adaptorFactory), mResourceInstanceIdentifier(resourceIdentifier) { @@ -177,48 +171,28 @@ public: //TODO JOBAPI return job from sync continuation to execute it as subjob? KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { - { - QSet remainingFilters; - auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; - } - } - return true; - }; - - auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) { - Trace() << "Running fetchEntities" << parent; - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); + auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) { + Trace() << "Fetching initial set for parent:" << parent; - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); - auto modifiedQuery = query; - modifiedQuery.propertyFilter.insert("parent", parent); - //TODO - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction); - QSet remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - auto filteredSet = filterSet(resultSet, filter, transaction, true); - replaySet(filteredSet, resultProvider); - resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); - qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); - //TODO send newRevision to resource - // mResourceAccess->sendRevisionReplayedCommand(newRevision); - }; - resultProvider->setFetcher(fetchEntities); - } + auto modifiedQuery = query; + modifiedQuery.propertyFilter.insert("parent", parent); + + QSet remainingFilters; + auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true); + replaySet(filteredSet, resultProvider); + const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); + resultProvider->setRevision(newRevision); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + }; + resultProvider->setFetcher(fetchEntities); auto runner = QSharedPointer::create(query); QWeakPointer > weakResultProvider = resultProvider; @@ -233,8 +207,6 @@ public: return; } executeQuery(query, resultProvider).template then([&future, this](qint64 queriedRevision) { - //TODO set revision in result provider? - //TODO update all existing results with new revision mResourceAccess->sendRevisionReplayedCommand(queriedRevision); future.setFinished(); }).exec(); @@ -249,27 +221,12 @@ public: mResourceAccess->open(); QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); } + return KAsync::null(); //We have to capture the runner to keep it alive - return synchronizeResource(query).template then([runner](KAsync::Future &future) { - future.setFinished(); - }, - [](int error, const QString &errorString) { - Warning() << "Error during sync " << error << errorString; - }); } private: - KAsync::Job synchronizeResource(const Akonadi2::Query &query) - { - //TODO check if a sync is necessary - //TODO Only sync what was requested - //TODO timeout - if (query.syncOnDemand || query.processAll) { - return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll); - } - return KAsync::null(); - } //TODO move into result provider? void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) @@ -280,17 +237,14 @@ private: Trace() << "Got creation"; //TODO Only copy in result provider resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->add(); break; case Akonadi2::Operation_Modification: Trace() << "Got modification"; resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->modify(); break; case Akonadi2::Operation_Removal: Trace() << "Got removal"; resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // modelResult->remove(); break; } return true; @@ -320,13 +274,28 @@ private: }); } - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + ResultSet loadInitialResultSet(const QByteArray &parent, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { + Trace() << "Fetching initial set for parent:" << parent; + //TODO + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + return resultSet; + } + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + { + Trace() << "Loading incremental result set starting from revision: " << baseRevision; const auto bufferType = bufferTypeForDomainType(); auto revisionCounter = QSharedPointer::create(baseRevision); - //TODO apply filter from index - return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { + return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); //Spit out the revision keys one by one. while (*revisionCounter <= topRevision) { @@ -354,7 +323,7 @@ private: //Read through the source values and return whatever matches the filter std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { while (resultSetPtr->next()) { - //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) + //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { //Always remove removals, they probably don't match due to non-available properties if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { @@ -374,6 +343,20 @@ private: return ResultSet(generator); } + + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query) + { + return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + //TODO implement other comparison operators than equality + if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { + return false; + } + } + return true; + }; + } + virtual KAsync::Job executeQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) { /* @@ -384,16 +367,6 @@ private: const qint64 baseRevision = resultProvider->revision() + 1; Trace() << "Running query " << baseRevision; QSet remainingFilters; - auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; - } - } - return true; - }; - qint64 newRevision = 0; Trace() << "Fetching updates"; Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); @@ -404,10 +377,10 @@ private: auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, filter, transaction, false); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); replaySet(filteredSet, resultProvider); resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); - newRevision = Akonadi2::Storage::maxRevision(transaction); + qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); return KAsync::start([=]() -> qint64 { return newRevision; diff --git a/common/modelresult.h b/common/modelresult.h index 756f4d6..eabb868 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -111,6 +111,10 @@ public: { auto childId = qHash(value->identifier()); auto id = parentId(value); + //Ignore updates we get before the initial fetch is done + if (!mEntityChildrenFetched[id]) { + return; + } auto parent = createIndexFromId(id); qDebug() << "Added entity " << childId; const auto keys = mTree[id]; @@ -131,6 +135,10 @@ public: { auto childId = qHash(value->identifier()); auto id = parentId(value); + //Ignore updates we get before the initial fetch is done + if (!mEntityChildrenFetched[id]) { + return; + } auto parent = createIndexFromId(id); qDebug() << "Modified entity" << childId; auto i = mTree[id].indexOf(childId); diff --git a/common/resultprovider.h b/common/resultprovider.h index 43d21a4..0d23127 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -23,6 +23,9 @@ #include #include #include "threadboundary.h" +#include "resultset.h" +#include "log.h" +#include "modelresult.h" using namespace async; @@ -117,18 +120,6 @@ public: // mResultEmitter->clear(); } - // QSharedPointer > emitter() - // { - // if (!mResultEmitter) { - // //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again - // auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); - // mResultEmitter = sharedPtr; - // return sharedPtr; - // } - // - // return mResultEmitter.toStrongRef(); - // } - /** * For lifetimemanagement only. * We keep the runner alive as long as the result provider exists. @@ -162,40 +153,6 @@ public: mQueryRunner = runner; } - // qint64 fetch(const ResultSet &resultSet) - // { - // //Fetch a bunch - // // - // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - // // Warning() << "Error during query: " << error.store << error.message; - // // }); - // // - // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - // - // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); - // // auto resultSet = getResultSet(query, transaction, baseRevision); - // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - // switch (operation) { - // case Akonadi2::Operation_Creation: - // Trace() << "Got creation"; - // //TODO Only copy in result provider - // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // case Akonadi2::Operation_Modification: - // Trace() << "Got modification"; - // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // case Akonadi2::Operation_Removal: - // Trace() << "Got removal"; - // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // } - // return true; - // })){}; - // // return Akonadi2::Storage::maxRevision(transaction); - // } - private: void done() { @@ -212,129 +169,6 @@ private: std::function mOnDoneCallback; }; - - - - - -template -class SyncResultProvider : public ResultProviderInterface { -public: - void add(const T &value) - { - mResultEmitter->addHandler(value); - } - - void modify(const T &value) - { - mResultEmitter->modifyHandler(value); - } - - void remove(const T &value) - { - mResultEmitter->removeHandler(value); - } - - void initialResultSetComplete() - { - mResultEmitter->initialResultSetComplete(); - } - - void complete() - { - mResultEmitter->complete(); - } - - void clear() - { - mResultEmitter->clear(); - } - - QSharedPointer > emitter() - { - if (!mResultEmitter) { - //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again - auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); - mResultEmitter = sharedPtr; - return sharedPtr; - } - - return mResultEmitter.toStrongRef(); - } - - /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ - void setFacade(const std::shared_ptr &facade) - { - mFacade = facade; - } - - void onDone(const std::function &callback) - { - mOnDoneCallback = callback; - } - - bool isDone() const - { - //The existance of the emitter currently defines wether we're done or not. - return mResultEmitter.toStrongRef().isNull(); - } - - // qint64 fetch(const ResultSet &resultSet) - // { - // //Fetch a bunch - // // - // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - // // Warning() << "Error during query: " << error.store << error.message; - // // }); - // // - // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - // - // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); - // // auto resultSet = getResultSet(query, transaction, baseRevision); - // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - // switch (operation) { - // case Akonadi2::Operation_Creation: - // Trace() << "Got creation"; - // //TODO Only copy in result provider - // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // case Akonadi2::Operation_Modification: - // Trace() << "Got modification"; - // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // case Akonadi2::Operation_Removal: - // Trace() << "Got removal"; - // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - // break; - // } - // return true; - // })){}; - // // return Akonadi2::Storage::maxRevision(transaction); - // } - -private: - void done() - { - qWarning() << "done"; - if (mOnDoneCallback) { - mOnDoneCallback(); - mOnDoneCallback = std::function(); - } - } - - QWeakPointer > mResultEmitter; - std::shared_ptr mFacade; - std::function mOnDoneCallback; - QSharedPointer mThreadBoundary; -}; - - - - /* * The promise side for the result emitter */ @@ -434,18 +268,18 @@ public: } /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ void setQueryRunner(const QSharedPointer &runner) { mQueryRunner = runner; } /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ void setFacade(const std::shared_ptr &facade) { mFacade = facade; @@ -463,6 +297,11 @@ public: return mResultEmitter.toStrongRef().isNull(); } + void setFetcher(const std::function &fetcher) + { + fetcher(QByteArray()); + } + private: void done() { -- cgit v1.2.3 From d4b10a3de396eebc6c815093e9e1725ece270e9e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 15 Nov 2015 11:09:31 +0100 Subject: Working folder tree query --- common/domain/folder.cpp | 28 ++++++++++++++++++++++++++++ common/modelresult.h | 5 ++--- 2 files changed, 30 insertions(+), 3 deletions(-) (limited to 'common') diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 50f73c2..82f6c1f 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp @@ -37,15 +37,43 @@ using namespace Akonadi2::ApplicationDomain; ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) { QVector keys; + if (query.propertyFilter.contains("parent")) { + Index index("folder.index.parent", transaction); + auto lookupKey = query.propertyFilter.value("parent").toByteArray(); + if (lookupKey.isEmpty()) { + lookupKey = "toplevel"; + } + index.lookup(lookupKey, [&](const QByteArray &value) { + keys << value; + }, + [](const Index::Error &error) { + Warning() << "Error in uid index: " << error.message; + }); + appliedFilters << "parent"; + } + Trace() << "Index lookup found " << keys.size() << " keys."; return ResultSet(keys); } void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) { + const auto parent = bufferAdaptor.getProperty("parent"); + Trace() << "indexing " << identifier << " with parent " << parent.toByteArray(); + if (parent.isValid()) { + Index("folder.index.parent", transaction).add(parent.toByteArray(), identifier); + } else { + Index("folder.index.parent", transaction).add("toplevel", identifier); + } } void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) { + const auto parent = bufferAdaptor.getProperty("parent"); + if (parent.isValid()) { + Index("folder.index.parent", transaction).remove(parent.toByteArray(), identifier); + } else { + Index("folder.index.parent", transaction).remove("toplevel", identifier); + } } QSharedPointer::Buffer> > TypeImplementation::initializeReadPropertyMapper() diff --git a/common/modelresult.h b/common/modelresult.h index eabb868..8ca6daa 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -116,7 +116,7 @@ public: return; } auto parent = createIndexFromId(id); - qDebug() << "Added entity " << childId; + qDebug() << "Added entity " << childId << value->identifier(); const auto keys = mTree[id]; int index = 0; for (; index < keys.size(); index++) { @@ -166,7 +166,6 @@ public: void fetchEntities(const QModelIndex &parent) { - qDebug() << "Fetching entities"; const auto id = getIdentifier(parent); mEntityChildrenFetched[id] = true; QByteArray parentIdentifier; @@ -178,7 +177,7 @@ public: Q_ASSERT(object); parentIdentifier = object->identifier(); } - Trace() << "Loading entities"; + Trace() << "Loading child entities of: " << parentIdentifier; loadEntities(parentIdentifier); } -- cgit v1.2.3 From 972f3a4e96876e4c36162a11062e40863d88a2a1 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 15 Nov 2015 12:46:26 +0100 Subject: Cleanup --- common/CMakeLists.txt | 1 - common/entitystorage.cpp | 74 --------------------------- common/entitystorage.h | 126 ---------------------------------------------- common/facade.h | 3 +- common/resourceaccess.cpp | 2 +- 5 files changed, 3 insertions(+), 203 deletions(-) delete mode 100644 common/entitystorage.cpp delete mode 100644 common/entitystorage.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index f24ec46..bdb9eac 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -15,7 +15,6 @@ set(command_SRCS definitions.cpp log.cpp entitybuffer.cpp - entitystorage.cpp clientapi.cpp facadefactory.cpp commands.cpp diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp deleted file mode 100644 index 5d4df9f..0000000 --- a/common/entitystorage.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (C) 2014 Christian Mollekopf - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include "entitystorage.h" - -ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) -{ - auto resultSetPtr = QSharedPointer::create(resultSet); - - //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { - while (resultSetPtr->next()) { - readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { - //Always remove removals, they probably don't match due to non-available properties - if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { - if (initialQuery) { - //We're not interested in removals during the initial query - if (operation != Akonadi2::Operation_Removal) { - callback(domainObject, Akonadi2::Operation_Creation); - } - } else { - callback(domainObject, operation); - } - } - }); - } - return false; - }; - return ResultSet(generator); -} - - -ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision) -{ - QSet remainingFilters = query.propertyFilter.keys().toSet(); - ResultSet resultSet; - const bool initialQuery = (baseRevision == 1); - if (initialQuery) { - Trace() << "Initial result set update"; - resultSet = loadInitialResultSet(query, transaction, remainingFilters); - } else { - //TODO fallback in case the old revision is no longer available to clear + redo complete initial scan - Trace() << "Incremental result set update" << baseRevision; - resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - } - - auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; - } - } - return true; - }; - - return filteredSet(resultSet, filter, transaction, initialQuery); -} diff --git a/common/entitystorage.h b/common/entitystorage.h deleted file mode 100644 index 8e73083..0000000 --- a/common/entitystorage.h +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (C) 2014 Christian Mollekopf - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ -#pragma once - -#include - -#include "query.h" -#include "domainadaptor.h" -#include "entitybuffer.h" -#include "log.h" -#include "storage.h" -#include "resultset.h" -#include "resultprovider.h" -#include "definitions.h" - -/** - * Wraps storage, entity adaptor factory and indexes into one. - * - */ -class EntityStorageBase -{ -public: - typedef std::function &remainingFilters)> InitialResultLoader; - typedef std::function &remainingFilters)> IncrementalResultLoader; - typedef std::function &resultCallback)> EntityReader; - - /** - * Returns the initial result set that still needs to be filtered. - * - * To make this efficient indexes should be chosen that are as selective as possible. - */ - InitialResultLoader loadInitialResultSet; - /** - * Returns the incremental result set that still needs to be filtered. - */ - IncrementalResultLoader loadIncrementalResultSet; - - /** - * Loads a single entity by uid from storage. - */ - EntityReader readEntity; - -protected: - EntityStorageBase(const QByteArray &instanceIdentifier) - : mResourceInstanceIdentifier(instanceIdentifier) - { - - } - - virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; - - ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision); - - QByteArray mResourceInstanceIdentifier; - -private: - ResultSet filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool isInitialQuery); -}; - -template -class EntityStorage : public EntityStorageBase -{ - -public: - - EntityStorage(const QByteArray &instanceIdentifier) - : EntityStorageBase(instanceIdentifier) - { - } - -protected: - Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &object) Q_DECL_OVERRIDE - { - return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(object); - } - -public: - - virtual qint64 read(const Akonadi2::Query &query, qint64 baseRevision, const QSharedPointer > &resultProvider) - { - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - - Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); - auto resultSet = getResultSet(query, transaction, baseRevision); - while(resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - switch (operation) { - case Akonadi2::Operation_Creation: - Trace() << "Got creation"; - resultProvider->add(copy(*value).template staticCast()); - break; - case Akonadi2::Operation_Modification: - Trace() << "Got modification"; - resultProvider->modify(copy(*value).template staticCast()); - break; - case Akonadi2::Operation_Removal: - Trace() << "Got removal"; - resultProvider->remove(copy(*value).template staticCast()); - break; - } - return true; - })){}; - return Akonadi2::Storage::maxRevision(transaction); - } - -}; diff --git a/common/facade.h b/common/facade.h index 5be1c73..6e45e08 100644 --- a/common/facade.h +++ b/common/facade.h @@ -29,7 +29,8 @@ #include "domainadaptor.h" #include "log.h" #include "resultset.h" -#include "entitystorage.h" +#include "storage.h" +#include "definitions.h" /** * A QueryRunner runs a query and updates the corresponding result set. diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index bd9e2c9..88f785f 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -340,7 +340,7 @@ KAsync::Job ResourceAccess::sendRevisionReplayedCommand(qint64 revision) void ResourceAccess::open() { if (d->socket && d->socket->isValid()) { - log("Socket valid, so not opening again"); + Trace() << "Socket valid, so not opening again"; return; } if (d->openingSocket) { -- cgit v1.2.3 From b68a67fdbe0eb73aaef648ceb686824c7fbc1552 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 17 Nov 2015 09:43:36 +0100 Subject: Facade cleanup --- common/facade.h | 91 ++++++++++++++++++++++++++------------------------------- 1 file changed, 41 insertions(+), 50 deletions(-) (limited to 'common') diff --git a/common/facade.h b/common/facade.h index 6e45e08..dcbe589 100644 --- a/common/facade.h +++ b/common/facade.h @@ -172,45 +172,31 @@ public: //TODO JOBAPI return job from sync continuation to execute it as subjob? KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { - auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) { - Trace() << "Fetching initial set for parent:" << parent; - - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - - auto modifiedQuery = query; - modifiedQuery.propertyFilter.insert("parent", parent); + QWeakPointer > weakResultProvider = resultProvider; - QSet remainingFilters; - auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true); - replaySet(filteredSet, resultProvider); - const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); - resultProvider->setRevision(newRevision); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - }; - resultProvider->setFetcher(fetchEntities); + //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + resultProvider->setFetcher([this, query, weakResultProvider](const QByteArray &parent) { + if (auto resultProvider = weakResultProvider.toStrongRef()) { + const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + } else { + Warning() << "Tried executing query after result provider is already gone"; + } + }); auto runner = QSharedPointer::create(query); - QWeakPointer > weakResultProvider = resultProvider; + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job { return KAsync::start([this, weakResultProvider, query](KAsync::Future &future) { Trace() << "Executing query "; - auto resultProvider = weakResultProvider.toStrongRef(); - if (!resultProvider) { + if (auto resultProvider = weakResultProvider.toStrongRef()) { + const qint64 newRevision = executeIncrementalQuery(query, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + } else { Warning() << "Tried executing query after result provider is already gone"; future.setError(0, QString()); - future.setFinished(); - return; } - executeQuery(query, resultProvider).template then([&future, this](qint64 queriedRevision) { - mResourceAccess->sendRevisionReplayedCommand(queriedRevision); - future.setFinished(); - }).exec(); + future.setFinished(); }); }); @@ -230,13 +216,12 @@ public: private: //TODO move into result provider? - void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) + static void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) { - while (resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + while (resultSet.next([resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { switch (operation) { case Akonadi2::Operation_Creation: Trace() << "Got creation"; - //TODO Only copy in result provider resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Modification: @@ -358,34 +343,40 @@ private: }; } - virtual KAsync::Job executeQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, const QSharedPointer > &resultProvider) { - /* - * This method gets called initially, and after every revision change. - * * We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - * * Incremental updates are loaded directly, leaving it up to the model to discard the changes if they are not interesting - */ - const qint64 baseRevision = resultProvider->revision() + 1; - Trace() << "Running query " << baseRevision; - QSet remainingFilters; - - Trace() << "Fetching updates"; Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + QSet remainingFilters; + auto resultSet = baseSetRetriever(transaction, remainingFilters); auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); replaySet(filteredSet, resultProvider); resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); - qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); + return Akonadi2::Storage::maxRevision(transaction); + } - return KAsync::start([=]() -> qint64 { - return newRevision; - }); + + qint64 executeIncrementalQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + { + const qint64 baseRevision = resultProvider->revision() + 1; + Trace() << "Running incremental query " << baseRevision; + return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + }, resultProvider); + } + + qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, const QSharedPointer > &resultProvider) + { + Trace() << "Running initial query for parent:" << parent; + auto modifiedQuery = query; + modifiedQuery.propertyFilter.insert("parent", parent); + return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); + }, resultProvider); } protected: -- cgit v1.2.3 From 0f24357d01bd8a278f03793db863d3f71ac37ef2 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 18 Nov 2015 00:51:55 +0100 Subject: Don't use a smart pointer for the result provider We're not doing any lifetime management anyways. --- common/clientapi.h | 14 ++++++--- common/facade.h | 77 +++++++++++++++++++++-------------------------- common/facadeinterface.h | 4 +-- common/resourcefacade.cpp | 10 +++--- common/resourcefacade.h | 2 +- 5 files changed, 52 insertions(+), 55 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index a424424..707e81d 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -35,6 +35,8 @@ #include "facadefactory.h" #include "log.h" +Q_DECLARE_METATYPE(std::shared_ptr); + namespace async { //This should abstract if we execute from eventloop or in thread. //It supposed to allow the caller to finish the current method before executing the runner. @@ -75,9 +77,8 @@ public: // Query all resources and aggregate results KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) .template each([query, resultSet](const QByteArray &resource, KAsync::Future &future) { - auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); - if (facade) { - facade->load(query, resultSet).template then([&future](){future.setFinished();}).exec(); + if (auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource)) { + facade->load(query, *resultSet).template then([&future](){future.setFinished();}).exec(); //Keep the facade alive for the lifetime of the resultSet. resultSet->setFacade(facade); } else { @@ -106,15 +107,18 @@ public: static QSharedPointer loadModel(Query query) { auto model = QSharedPointer >::create(query, QList() << "summary" << "uid"); - auto resultProvider = QSharedPointer >::create(model); + auto resultProvider = std::make_shared >(model); + //Keep the resultprovider alive for as long as the model lives + model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr(resultProvider))); // Query all resources and aggregate results KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) .template each([query, resultProvider](const QByteArray &resource, KAsync::Future &future) { auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); if (facade) { - facade->load(query, resultProvider).template then([&future](){future.setFinished();}).exec(); + facade->load(query, *resultProvider).template then([&future](){future.setFinished();}).exec(); //Keep the facade alive for the lifetime of the resultSet. + //FIXME this would have to become a list resultProvider->setFacade(facade); } else { //Ignore the error and carry on diff --git a/common/facade.h b/common/facade.h index dcbe589..82fd5ff 100644 --- a/common/facade.h +++ b/common/facade.h @@ -169,68 +169,54 @@ public: return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); } - //TODO JOBAPI return job from sync continuation to execute it as subjob? - KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE + KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE { - QWeakPointer > weakResultProvider = resultProvider; - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider->setFetcher([this, query, weakResultProvider](const QByteArray &parent) { - if (auto resultProvider = weakResultProvider.toStrongRef()) { - const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - } else { - Warning() << "Tried executing query after result provider is already gone"; - } + resultProvider.setFetcher([this, query, &resultProvider](const QByteArray &parent) { + const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); }); - auto runner = QSharedPointer::create(query); - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job { - return KAsync::start([this, weakResultProvider, query](KAsync::Future &future) { - Trace() << "Executing query "; - if (auto resultProvider = weakResultProvider.toStrongRef()) { - const qint64 newRevision = executeIncrementalQuery(query, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - } else { - Warning() << "Tried executing query after result provider is already gone"; - future.setError(0, QString()); - } - future.setFinished(); - }); - }); //In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery) { - resultProvider->setQueryRunner(runner); + auto runner = QSharedPointer::create(query); + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { + return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { + Trace() << "Executing query "; + const qint64 newRevision = executeIncrementalQuery(query, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + future.setFinished(); + }); + }); + resultProvider.setQueryRunner(runner); //Ensure the connection is open, if it wasn't already opened //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates mResourceAccess->open(); QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); } return KAsync::null(); - - //We have to capture the runner to keep it alive } private: //TODO move into result provider? - static void replaySet(ResultSet &resultSet, const QSharedPointer > &resultProvider) + static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) { - while (resultSet.next([resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { switch (operation) { case Akonadi2::Operation_Creation: Trace() << "Got creation"; - resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Modification: Trace() << "Got modification"; - resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Removal: Trace() << "Got removal"; - resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; } return true; @@ -281,6 +267,7 @@ private: Trace() << "Loading incremental result set starting from revision: " << baseRevision; const auto bufferType = bufferTypeForDomainType(); auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = query.propertyFilter.keys().toSet(); return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); //Spit out the revision keys one by one. @@ -334,16 +321,22 @@ private: { return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { for (const auto &filterProperty : remainingFilters) { - //TODO implement other comparison operators than equality - if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { - return false; + const auto property = domainObject->getProperty(filterProperty); + if (property.isValid()) { + //TODO implement other comparison operators than equality + if (property != query.propertyFilter.value(filterProperty)) { + Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); + return false; + } + } else { + Warning() << "Ignored property filter because value is invalid: " << filterProperty; } } return true; }; } - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, const QSharedPointer > &resultProvider) + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) { Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { @@ -355,21 +348,21 @@ private: auto resultSet = baseSetRetriever(transaction, remainingFilters); auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); replaySet(filteredSet, resultProvider); - resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); + resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); return Akonadi2::Storage::maxRevision(transaction); } - qint64 executeIncrementalQuery(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) { - const qint64 baseRevision = resultProvider->revision() + 1; + const qint64 baseRevision = resultProvider.revision() + 1; Trace() << "Running incremental query " << baseRevision; return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); }, resultProvider); } - qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, const QSharedPointer > &resultProvider) + qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, Akonadi2::ResultProviderInterface &resultProvider) { Trace() << "Running initial query for parent:" << parent; auto modifiedQuery = query; diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 571a1e8..7ec21bc 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h @@ -45,7 +45,7 @@ public: virtual KAsync::Job create(const DomainType &domainObject) = 0; virtual KAsync::Job modify(const DomainType &domainObject) = 0; virtual KAsync::Job remove(const DomainType &domainObject) = 0; - virtual KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) = 0; + virtual KAsync::Job load(const Query &query, Akonadi2::ResultProviderInterface &resultProvider) = 0; }; template @@ -67,7 +67,7 @@ public: return KAsync::error(-1, "Failed to create a facade"); } - KAsync::Job load(const Query &query, const QSharedPointer > &resultProvider) + KAsync::Job load(const Query &query, Akonadi2::ResultProviderInterface &resultProvider) { return KAsync::error(-1, "Failed to create a facade"); } diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 0b7c5a3..1796271 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -54,9 +54,9 @@ KAsync::Job ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon }); } -KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) +KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) { - return KAsync::start([query, resultProvider]() { + return KAsync::start([query, &resultProvider]() { const auto configuredResources = ResourceConfig::getResources(); for (const auto &res : configuredResources.keys()) { const auto type = configuredResources.value(res); @@ -64,12 +64,12 @@ KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, const QShar auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create(); resource->setProperty("identifier", res); resource->setProperty("type", type); - resultProvider->add(resource); + resultProvider.add(resource); } } //TODO initialResultSetComplete should be implicit - resultProvider->initialResultSetComplete(); - resultProvider->complete(); + resultProvider.initialResultSetComplete(); + resultProvider.complete(); }); } diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 850d380..123b481 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h @@ -37,5 +37,5 @@ public: KAsync::Job create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; - KAsync::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE; + KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; }; -- cgit v1.2.3 From b42047ad90470ecab329375fdacff03564c80074 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 18 Nov 2015 23:15:25 +0100 Subject: fixup --- common/facade.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'common') diff --git a/common/facade.h b/common/facade.h index 82fd5ff..f5c05f9 100644 --- a/common/facade.h +++ b/common/facade.h @@ -246,10 +246,8 @@ private: }); } - ResultSet loadInitialResultSet(const QByteArray &parent, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { - Trace() << "Fetching initial set for parent:" << parent; - //TODO QSet appliedFilters; auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; @@ -264,7 +262,6 @@ private: ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { - Trace() << "Loading incremental result set starting from revision: " << baseRevision; const auto bufferType = bufferTypeForDomainType(); auto revisionCounter = QSharedPointer::create(baseRevision); remainingFilters = query.propertyFilter.keys().toSet(); @@ -368,7 +365,7 @@ private: auto modifiedQuery = query; modifiedQuery.propertyFilter.insert("parent", parent); return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); + return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); }, resultProvider); } -- cgit v1.2.3 From ef205affdb73bfdbef5830996e6336e583660fbc Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 09:37:42 +0100 Subject: Use the new modelresult in the dummyclient --- common/clientapi.h | 4 ++++ common/modelresult.h | 11 +++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index 707e81d..179bb5c 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -100,6 +100,10 @@ public: return resultSet->emitter(); } + enum Roles { + DomainObjectRole = Qt::UserRole + 1 //Must be the same as in ModelResult + }; + /** * Asynchronusly load a dataset with tree structure information */ diff --git a/common/modelresult.h b/common/modelresult.h index 8ca6daa..3b45955 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -63,11 +63,18 @@ public: virtual QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const { if (role == DomainObjectRole) { - qWarning() << "trying to get entity " << index.internalId(); Q_ASSERT(mEntities.contains(index.internalId())); return QVariant::fromValue(mEntities.value(index.internalId())); } - qDebug() << "Invalid role"; + if (role == Qt::DisplayRole) { + if (index.column() < mPropertyColumns.size()) { + Q_ASSERT(mEntities.contains(index.internalId())); + auto entity = mEntities.value(index.internalId()); + return entity->getProperty(mPropertyColumns.at(index.column())).toString(); + } else { + return "No data available"; + } + } return QVariant(); } -- cgit v1.2.3 From 8d5684292ef92f32487ba32df716a00c4a0841b5 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 17:37:39 +0100 Subject: Loading data with the new model for the test client --- common/clientapi.h | 2 +- common/domain/folder.cpp | 1 + common/facade.h | 1 - common/modelresult.h | 6 +++++- 4 files changed, 7 insertions(+), 3 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index 179bb5c..6b11ad5 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -110,7 +110,7 @@ public: template static QSharedPointer loadModel(Query query) { - auto model = QSharedPointer >::create(query, QList() << "summary" << "uid"); + auto model = QSharedPointer >::create(query, query.requestedProperties.toList()); auto resultProvider = std::make_shared >(model); //Keep the resultprovider alive for as long as the model lives model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr(resultProvider))); diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 82f6c1f..989d2c4 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp @@ -60,6 +60,7 @@ void TypeImplementation::index(const QByteArray &identifier, const Buffe const auto parent = bufferAdaptor.getProperty("parent"); Trace() << "indexing " << identifier << " with parent " << parent.toByteArray(); if (parent.isValid()) { + Q_ASSERT(!parent.toByteArray().isEmpty()); Index("folder.index.parent", transaction).add(parent.toByteArray(), identifier); } else { Index("folder.index.parent", transaction).add("toplevel", identifier); diff --git a/common/facade.h b/common/facade.h index f5c05f9..d150d60 100644 --- a/common/facade.h +++ b/common/facade.h @@ -184,7 +184,6 @@ public: //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { - Trace() << "Executing query "; const qint64 newRevision = executeIncrementalQuery(query, resultProvider); mResourceAccess->sendRevisionReplayedCommand(newRevision); future.setFinished(); diff --git a/common/modelresult.h b/common/modelresult.h index 3b45955..1675e60 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -131,7 +131,11 @@ public: break; } } - beginInsertRows(parent, index, index); + if (mEntities.contains(childId)) { + qWarning() << "Entity already in model " << value->identifier(); + return; + } + beginInsertRows(QModelIndex(), index, index); mEntities.insert(childId, value); mTree[id].insert(index, childId); mParents.insert(childId, id); -- cgit v1.2.3 From c4a6746e4420b580fe35cc89783de4dbc3205ac6 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 18:14:09 +0100 Subject: The parent is always an object, so we might as well make that explicit --- common/facade.h | 13 +++++++++---- common/modelresult.h | 17 ++++------------- common/resultprovider.h | 8 ++++---- 3 files changed, 17 insertions(+), 21 deletions(-) (limited to 'common') diff --git a/common/facade.h b/common/facade.h index d150d60..8b8a2a8 100644 --- a/common/facade.h +++ b/common/facade.h @@ -172,7 +172,7 @@ public: KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE { //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider.setFetcher([this, query, &resultProvider](const QByteArray &parent) { + resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); mResourceAccess->sendRevisionReplayedCommand(newRevision); }); @@ -358,11 +358,16 @@ private: }, resultProvider); } - qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, Akonadi2::ResultProviderInterface &resultProvider) + qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) { - Trace() << "Running initial query for parent:" << parent; auto modifiedQuery = query; - modifiedQuery.propertyFilter.insert("parent", parent); + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert("parent", parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert("parent", QVariant()); + } return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); }, resultProvider); diff --git a/common/modelresult.h b/common/modelresult.h index 1675e60..26f96d8 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -179,20 +179,11 @@ public: { const auto id = getIdentifier(parent); mEntityChildrenFetched[id] = true; - QByteArray parentIdentifier; - if (!parent.isValid()) { - qDebug() << "no parent"; - } else { - qDebug() << "parent is valid"; - auto object = parent.data(DomainObjectRole).template value(); - Q_ASSERT(object); - parentIdentifier = object->identifier(); - } - Trace() << "Loading child entities of: " << parentIdentifier; - loadEntities(parentIdentifier); + Trace() << "Loading child entities"; + loadEntities(parent.data(DomainObjectRole).template value()); } - void setFetcher(const std::function &fetcher) + void setFetcher(const std::function &fetcher) { Trace() << "Setting fetcher"; loadEntities = fetcher; @@ -206,6 +197,6 @@ private: QMap mEntityChildrenFetched; QList mPropertyColumns; Akonadi2::Query mQuery; - std::function loadEntities; + std::function loadEntities; }; diff --git a/common/resultprovider.h b/common/resultprovider.h index 0d23127..921cd6b 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -53,7 +53,7 @@ public: virtual void initialResultSetComplete() = 0; virtual void complete() = 0; virtual void clear() = 0; - virtual void setFetcher(const std::function &fetcher) + virtual void setFetcher(const std::function &fetcher) { } @@ -141,7 +141,7 @@ public: return true; } - void setFetcher(const std::function &fetcher) + void setFetcher(const std::function &fetcher) { if (auto model = mModel.toStrongRef()) { model->setFetcher(fetcher); @@ -297,9 +297,9 @@ public: return mResultEmitter.toStrongRef().isNull(); } - void setFetcher(const std::function &fetcher) + void setFetcher(const std::function &fetcher) { - fetcher(QByteArray()); + fetcher(T()); } private: -- cgit v1.2.3 From 94a2cd6ec21bf0466a9a50d6e4a0a956ed47bc82 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 23:23:56 +0100 Subject: Move implementations to the cpp file. I finally figured out how to do that with cpp files. It requires instantiating the code with all expected classes, but that's not a big problem since we know all types. This will hopefully greatly reduce the compiletimes... --- common/clientapi.cpp | 151 ++++++++++++++++++++++++- common/clientapi.h | 109 ++---------------- common/domain/applicationdomaintype.h | 2 + common/modelresult.cpp | 203 ++++++++++++++++++++++++++++++++++ common/modelresult.h | 167 +++------------------------- 5 files changed, 378 insertions(+), 254 deletions(-) create mode 100644 common/modelresult.cpp (limited to 'common') diff --git a/common/clientapi.cpp b/common/clientapi.cpp index 839e77b..02f8ce6 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp @@ -1,13 +1,40 @@ +/* + * Copyright (C) 2014 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ #include "clientapi.h" + +#include +#include +#include +#include +#include +#include + #include "resourceaccess.h" #include "commands.h" #include "resourcefacade.h" #include "log.h" #include "definitions.h" #include "resourceconfig.h" -#include -#include +#include "facadefactory.h" +#include "log.h" #define ASYNCINTHREAD @@ -69,6 +96,114 @@ QList Store::getResources(const QList &resourceFilter, c return resources; } +template +QSharedPointer > Store::load(Query query) +{ + auto resultSet = QSharedPointer >::create(); + + //Execute the search in a thread. + //We must guarantee that the emitter is returned before the first result is emitted. + //The result provider must be threadsafe. + async::run([query, resultSet](){ + QEventLoop eventLoop; + resultSet->onDone([&eventLoop](){ + eventLoop.quit(); + }); + // Query all resources and aggregate results + KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) + .template each([query, resultSet](const QByteArray &resource, KAsync::Future &future) { + if (auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource)) { + facade->load(query, *resultSet).template then([&future](){future.setFinished();}).exec(); + //Keep the facade alive for the lifetime of the resultSet. + resultSet->setFacade(facade); + } else { + //Ignore the error and carry on + future.setFinished(); + } + }).template then([query, resultSet]() { + resultSet->initialResultSetComplete(); + if (!query.liveQuery) { + resultSet->complete(); + } + }).exec(); + + //Keep the thread alive until the result is ready + if (!resultSet->isDone()) { + eventLoop.exec(); + } + }); + return resultSet->emitter(); +} + +template +QSharedPointer Store::loadModel(Query query) +{ + auto model = QSharedPointer >::create(query, query.requestedProperties.toList()); + auto resultProvider = std::make_shared >(model); + //Keep the resultprovider alive for as long as the model lives + model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr(resultProvider))); + + // Query all resources and aggregate results + KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) + .template each([query, resultProvider](const QByteArray &resource, KAsync::Future &future) { + auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); + if (facade) { + facade->load(query, *resultProvider).template then([&future](){future.setFinished();}).exec(); + //Keep the facade alive for the lifetime of the resultSet. + //FIXME this would have to become a list + resultProvider->setFacade(facade); + } else { + //Ignore the error and carry on + future.setFinished(); + } + }).template then([query, resultProvider]() { + resultProvider->initialResultSetComplete(); + if (!query.liveQuery) { + resultProvider->complete(); + } + }).exec(); + + return model; +} + +template +static std::shared_ptr > getFacade(const QByteArray &resourceInstanceIdentifier) +{ + if (auto facade = FacadeFactory::instance().getFacade(resourceName(resourceInstanceIdentifier), resourceInstanceIdentifier)) { + return facade; + } + return std::make_shared >(); +} + +template +KAsync::Job Store::create(const DomainType &domainObject) { + //Potentially move to separate thread as well + auto facade = getFacade(domainObject.resourceInstanceIdentifier()); + return facade->create(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { + Warning() << "Failed to create"; + }); +} + +template +KAsync::Job Store::modify(const DomainType &domainObject) +{ + //Potentially move to separate thread as well + auto facade = getFacade(domainObject.resourceInstanceIdentifier()); + return facade->modify(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { + Warning() << "Failed to modify"; + }); +} + +template +KAsync::Job Store::remove(const DomainType &domainObject) +{ + //Potentially move to separate thread as well + auto facade = getFacade(domainObject.resourceInstanceIdentifier()); + return facade->remove(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { + Warning() << "Failed to remove"; + }); +} + KAsync::Job Store::shutdown(const QByteArray &identifier) { Trace() << "shutdown"; @@ -103,4 +238,16 @@ KAsync::Job Store::synchronize(const Akonadi2::Query &query) .template then([](){}); } +#define REGISTER_TYPE(T) template KAsync::Job Store::remove(const T &domainObject); \ + template KAsync::Job Store::create(const T &domainObject); \ + template KAsync::Job Store::modify(const T &domainObject); \ + template QSharedPointer > Store::load(Query query); \ + template QSharedPointer Store::loadModel(Query query); \ + +REGISTER_TYPE(ApplicationDomain::Event); +REGISTER_TYPE(ApplicationDomain::Mail); +REGISTER_TYPE(ApplicationDomain::Folder); +REGISTER_TYPE(ApplicationDomain::AkonadiResource); + } // namespace Akonadi2 + diff --git a/common/clientapi.h b/common/clientapi.h index 6b11ad5..c48c6e9 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -22,21 +22,17 @@ #include #include -#include -#include -#include -#include #include #include "query.h" #include "resultprovider.h" #include "applicationdomaintype.h" -#include "facadefactory.h" -#include "log.h" Q_DECLARE_METATYPE(std::shared_ptr); +class QAbstractItemModel; + namespace async { //This should abstract if we execute from eventloop or in thread. //It supposed to allow the caller to finish the current method before executing the runner. @@ -62,43 +58,7 @@ public: * Asynchronusly load a dataset */ template - static QSharedPointer > load(Query query) - { - auto resultSet = QSharedPointer >::create(); - - //Execute the search in a thread. - //We must guarantee that the emitter is returned before the first result is emitted. - //The result provider must be threadsafe. - async::run([query, resultSet](){ - QEventLoop eventLoop; - resultSet->onDone([&eventLoop](){ - eventLoop.quit(); - }); - // Query all resources and aggregate results - KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) - .template each([query, resultSet](const QByteArray &resource, KAsync::Future &future) { - if (auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource)) { - facade->load(query, *resultSet).template then([&future](){future.setFinished();}).exec(); - //Keep the facade alive for the lifetime of the resultSet. - resultSet->setFacade(facade); - } else { - //Ignore the error and carry on - future.setFinished(); - } - }).template then([query, resultSet]() { - resultSet->initialResultSetComplete(); - if (!query.liveQuery) { - resultSet->complete(); - } - }).exec(); - - //Keep the thread alive until the result is ready - if (!resultSet->isDone()) { - eventLoop.exec(); - } - }); - return resultSet->emitter(); - } + static QSharedPointer > load(Query query); enum Roles { DomainObjectRole = Qt::UserRole + 1 //Must be the same as in ModelResult @@ -108,56 +68,13 @@ public: * Asynchronusly load a dataset with tree structure information */ template - static QSharedPointer loadModel(Query query) - { - auto model = QSharedPointer >::create(query, query.requestedProperties.toList()); - auto resultProvider = std::make_shared >(model); - //Keep the resultprovider alive for as long as the model lives - model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr(resultProvider))); - - // Query all resources and aggregate results - KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) - .template each([query, resultProvider](const QByteArray &resource, KAsync::Future &future) { - auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); - if (facade) { - facade->load(query, *resultProvider).template then([&future](){future.setFinished();}).exec(); - //Keep the facade alive for the lifetime of the resultSet. - //FIXME this would have to become a list - resultProvider->setFacade(facade); - } else { - //Ignore the error and carry on - future.setFinished(); - } - }).template then([query, resultProvider]() { - resultProvider->initialResultSetComplete(); - if (!query.liveQuery) { - resultProvider->complete(); - } - }).exec(); - - return model; - } - - template - static std::shared_ptr > getFacade(const QByteArray &resourceInstanceIdentifier) - { - if (auto facade = FacadeFactory::instance().getFacade(resourceName(resourceInstanceIdentifier), resourceInstanceIdentifier)) { - return facade; - } - return std::make_shared >(); - } + static QSharedPointer loadModel(Query query); /** * Create a new entity. */ template - static KAsync::Job create(const DomainType &domainObject) { - //Potentially move to separate thread as well - auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->create(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { - Warning() << "Failed to create"; - }); - } + static KAsync::Job create(const DomainType &domainObject); /** * Modify an entity. @@ -165,25 +82,13 @@ public: * This includes moving etc. since these are also simple settings on a property. */ template - static KAsync::Job modify(const DomainType &domainObject) { - //Potentially move to separate thread as well - auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->modify(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { - Warning() << "Failed to modify"; - }); - } + static KAsync::Job modify(const DomainType &domainObject); /** * Remove an entity. */ template - static KAsync::Job remove(const DomainType &domainObject) { - //Potentially move to separate thread as well - auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->remove(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { - Warning() << "Failed to remove"; - }); - } + static KAsync::Job remove(const DomainType &domainObject); /** * Shutdown resource. diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h index b4cf8c4..227ab4d 100644 --- a/common/domain/applicationdomaintype.h +++ b/common/domain/applicationdomaintype.h @@ -162,3 +162,5 @@ Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail) Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail::Ptr) Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Folder) Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Folder::Ptr) +Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::AkonadiResource) +Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::AkonadiResource::Ptr) diff --git a/common/modelresult.cpp b/common/modelresult.cpp new file mode 100644 index 0000000..1abcc62 --- /dev/null +++ b/common/modelresult.cpp @@ -0,0 +1,203 @@ +/* + * Copyright (C) 2014 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "modelresult.h" + +#include + +#include "domain/folder.h" +#include "log.h" + +template +ModelResult::ModelResult(const Akonadi2::Query &query, const QList &propertyColumns) + :QAbstractItemModel(), + mPropertyColumns(propertyColumns) +{ +} + +static qint64 getIdentifier(const QModelIndex &idx) +{ + if (!idx.isValid()) { + return 0; + } + return idx.internalId(); +} + +template +qint64 ModelResult::parentId(const Ptr &value) +{ + return qHash(value->getProperty("parent").toByteArray()); +} + +template +int ModelResult::rowCount(const QModelIndex &parent) const +{ + return mTree[getIdentifier(parent)].size(); +} + +template +int ModelResult::columnCount(const QModelIndex &parent) const +{ + return mPropertyColumns.size(); +} + +template +QVariant ModelResult::data(const QModelIndex &index, int role) const +{ + if (role == DomainObjectRole) { + Q_ASSERT(mEntities.contains(index.internalId())); + return QVariant::fromValue(mEntities.value(index.internalId())); + } + if (role == Qt::DisplayRole) { + if (index.column() < mPropertyColumns.size()) { + Q_ASSERT(mEntities.contains(index.internalId())); + auto entity = mEntities.value(index.internalId()); + return entity->getProperty(mPropertyColumns.at(index.column())).toString(); + } else { + return "No data available"; + } + } + return QVariant(); +} + +template +QModelIndex ModelResult::index(int row, int column, const QModelIndex &parent) const +{ + auto id = getIdentifier(parent); + auto childId = mTree.value(id).at(row); + return createIndex(row, column, childId); +} + +template +QModelIndex ModelResult::createIndexFromId(const qint64 &id) const +{ + auto grandParentId = mParents.value(id, 0); + auto row = mTree.value(grandParentId).indexOf(id); + return createIndex(row, 0, id); +} + +template +QModelIndex ModelResult::parent(const QModelIndex &index) const +{ + auto id = getIdentifier(index); + auto parentId = mParents.value(id); + return createIndexFromId(parentId); +} + +template +bool ModelResult::canFetchMore(const QModelIndex &parent) const +{ + qDebug() << "Can fetch more: " << parent << mEntityChildrenFetched.value(parent.internalId()); + return mEntityChildrenFetched.value(parent.internalId()); +} + +template +void ModelResult::fetchMore(const QModelIndex &parent) +{ + qDebug() << "Fetch more: " << parent; + fetchEntities(parent); +} + +template +void ModelResult::add(const Ptr &value) +{ + auto childId = qHash(value->identifier()); + auto id = parentId(value); + //Ignore updates we get before the initial fetch is done + if (!mEntityChildrenFetched[id]) { + return; + } + auto parent = createIndexFromId(id); + qDebug() << "Added entity " << childId << value->identifier() << id; + const auto keys = mTree[id]; + int index = 0; + for (; index < keys.size(); index++) { + if (childId < keys.at(index)) { + break; + } + } + if (mEntities.contains(childId)) { + qWarning() << "Entity already in model " << value->identifier(); + return; + } + qDebug() << "Inserting rows " << index << parent; + beginInsertRows(QModelIndex(), index, index); + mEntities.insert(childId, value); + mTree[id].insert(index, childId); + mParents.insert(childId, id); + endInsertRows(); + qDebug() << "Inserted rows " << mTree[id].size(); +} + + +template +void ModelResult::remove(const Ptr &value) +{ + auto childId = qHash(value->identifier()); + auto id = parentId(value); + auto parent = createIndexFromId(id); + qDebug() << "Removed entity" << childId; + auto index = mTree[id].indexOf(qHash(value->identifier())); + beginRemoveRows(parent, index, index); + mEntities.remove(childId); + mTree[id].removeAll(childId); + mParents.remove(childId); + //TODO remove children + endRemoveRows(); +} + +template +void ModelResult::fetchEntities(const QModelIndex &parent) +{ + const auto id = getIdentifier(parent); + mEntityChildrenFetched[id] = true; + Trace() << "Loading child entities"; + loadEntities(parent.data(DomainObjectRole).template value()); +} + +template +void ModelResult::setFetcher(const std::function &fetcher) +{ + Trace() << "Setting fetcher"; + loadEntities = fetcher; +} + +template +void ModelResult::modify(const Ptr &value) +{ + auto childId = qHash(value->identifier()); + auto id = parentId(value); + //Ignore updates we get before the initial fetch is done + if (!mEntityChildrenFetched[id]) { + return; + } + auto parent = createIndexFromId(id); + qDebug() << "Modified entity" << childId; + auto i = mTree[id].indexOf(childId); + mEntities.remove(childId); + mEntities.insert(childId, value); + //TODO check for change of parents + auto idx = index(i, 0, parent); + emit dataChanged(idx, idx); +} + +template class ModelResult; +template class ModelResult; +template class ModelResult; +template class ModelResult; diff --git a/common/modelresult.h b/common/modelresult.h index 26f96d8..40a9d9d 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -23,10 +23,9 @@ #include #include #include +#include #include "query.h" -#include "resultprovider.h" - template class ModelResult : public QAbstractItemModel { @@ -36,160 +35,28 @@ public: DomainObjectRole = Qt::UserRole + 1 }; - ModelResult(const Akonadi2::Query &query, const QList &propertyColumns) - :QAbstractItemModel(), - mPropertyColumns(propertyColumns) - { - } - - static qint64 getIdentifier(const QModelIndex &idx) - { - if (!idx.isValid()) { - return 0; - } - return idx.internalId(); - } - - int rowCount(const QModelIndex &parent = QModelIndex()) const - { - return mTree[getIdentifier(parent)].size(); - } - - int columnCount(const QModelIndex &parent = QModelIndex()) const - { - return mPropertyColumns.size(); - } - - virtual QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const - { - if (role == DomainObjectRole) { - Q_ASSERT(mEntities.contains(index.internalId())); - return QVariant::fromValue(mEntities.value(index.internalId())); - } - if (role == Qt::DisplayRole) { - if (index.column() < mPropertyColumns.size()) { - Q_ASSERT(mEntities.contains(index.internalId())); - auto entity = mEntities.value(index.internalId()); - return entity->getProperty(mPropertyColumns.at(index.column())).toString(); - } else { - return "No data available"; - } - } - return QVariant(); - } - - QModelIndex index(int row, int column, const QModelIndex & parent = QModelIndex()) const - { - auto id = getIdentifier(parent); - auto childId = mTree.value(id).at(row); - return createIndex(row, column, childId); - } - - QModelIndex createIndexFromId(const qint64 &id) const - { - auto grandParentId = mParents.value(id, 0); - auto row = mTree.value(grandParentId).indexOf(id); - return createIndex(row, 0, id); - } + ModelResult(const Akonadi2::Query &query, const QList &propertyColumns); - QModelIndex parent(const QModelIndex &index) const - { - auto id = getIdentifier(index); - auto parentId = mParents.value(id); - return createIndexFromId(parentId); - } + int rowCount(const QModelIndex &parent = QModelIndex()) const; + int columnCount(const QModelIndex &parent = QModelIndex()) const; + QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; + QModelIndex index(int row, int column, const QModelIndex & parent = QModelIndex()) const; + QModelIndex parent(const QModelIndex &index) const; - bool canFetchMore(const QModelIndex &parent) const - { - return mEntityChildrenFetched.value(parent.internalId()); - } + bool canFetchMore(const QModelIndex &parent) const; + void fetchMore(const QModelIndex &parent); - void fetchMore(const QModelIndex &parent) - { - fetchEntities(parent); - } + void add(const Ptr &value); + void modify(const Ptr &value); + void remove(const Ptr &value); - qint64 parentId(const Ptr &value) - { - return qHash(value->getProperty("parent").toByteArray()); - } - - void add(const Ptr &value) - { - auto childId = qHash(value->identifier()); - auto id = parentId(value); - //Ignore updates we get before the initial fetch is done - if (!mEntityChildrenFetched[id]) { - return; - } - auto parent = createIndexFromId(id); - qDebug() << "Added entity " << childId << value->identifier(); - const auto keys = mTree[id]; - int index = 0; - for (; index < keys.size(); index++) { - if (childId < keys.at(index)) { - break; - } - } - if (mEntities.contains(childId)) { - qWarning() << "Entity already in model " << value->identifier(); - return; - } - beginInsertRows(QModelIndex(), index, index); - mEntities.insert(childId, value); - mTree[id].insert(index, childId); - mParents.insert(childId, id); - endInsertRows(); - } - - void modify(const Ptr &value) - { - auto childId = qHash(value->identifier()); - auto id = parentId(value); - //Ignore updates we get before the initial fetch is done - if (!mEntityChildrenFetched[id]) { - return; - } - auto parent = createIndexFromId(id); - qDebug() << "Modified entity" << childId; - auto i = mTree[id].indexOf(childId); - mEntities.remove(childId); - mEntities.insert(childId, value); - //TODO check for change of parents - auto idx = index(i, 0, parent); - emit dataChanged(idx, idx); - } - - void remove(const Ptr &value) - { - auto childId = qHash(value->identifier()); - auto id = parentId(value); - auto parent = createIndexFromId(id); - qDebug() << "Removed entity" << childId; - auto index = mTree[id].indexOf(qHash(value->identifier())); - beginRemoveRows(parent, index, index); - mEntities.remove(childId); - mTree[id].removeAll(childId); - mParents.remove(childId); - //TODO remove children - endRemoveRows(); - } - - void fetchEntities(const QModelIndex &parent) - { - const auto id = getIdentifier(parent); - mEntityChildrenFetched[id] = true; - Trace() << "Loading child entities"; - loadEntities(parent.data(DomainObjectRole).template value()); - } - - void setFetcher(const std::function &fetcher) - { - Trace() << "Setting fetcher"; - loadEntities = fetcher; - } + void setFetcher(const std::function &fetcher); private: + static qint64 parentId(const Ptr &value); + QModelIndex createIndexFromId(const qint64 &id) const; + void fetchEntities(const QModelIndex &parent); + //TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap and QList QMap mEntities; QMap /* child entity id*/> mTree; -- cgit v1.2.3 From ddb28417ccbcd22e771b7610c1727eac63471609 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 19 Nov 2015 23:47:34 +0100 Subject: Moved facade implementation to cpp file --- common/CMakeLists.txt | 1 + common/facade.cpp | 352 +++++++++++++++++++++++++++++++++++++++++++++++++- common/facade.h | 335 +++-------------------------------------------- 3 files changed, 370 insertions(+), 318 deletions(-) (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index bdb9eac..01056d0 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -12,6 +12,7 @@ else (STORAGE_unqlite) endif (STORAGE_unqlite) set(command_SRCS + modelresult.cpp definitions.cpp log.cpp entitybuffer.cpp diff --git a/common/facade.cpp b/common/facade.cpp index e51b32a..b4931cf 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Christian Mollekopf + * Copyright (C) 2015 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -18,3 +18,353 @@ */ #include "facade.h" + +#include "commands.h" +#include "domainadaptor.h" +#include "log.h" +#include "storage.h" +#include "definitions.h" + +using namespace Akonadi2; + +/** + * A QueryRunner runs a query and updates the corresponding result set. + * + * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), + * and by how long a result set must be updated. If the query is one off the runner dies after the execution, + * otherwise it lives on the react to changes and updates the corresponding result set. + * + * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. + */ +class QueryRunner : public QObject +{ + Q_OBJECT +public: + typedef std::function()> QueryFunction; + + QueryRunner(const Akonadi2::Query &query) {}; + /** + * Starts query + */ + KAsync::Job run(qint64 newRevision = 0) + { + return queryFunction(); + } + + /** + * Set the query to run + */ + void setQuery(const QueryFunction &query) + { + queryFunction = query; + } + +public slots: + /** + * Rerun query with new revision + */ + void revisionChanged(qint64 newRevision) + { + Trace() << "New revision: " << newRevision; + run().exec(); + } + +private: + QueryFunction queryFunction; +}; + +static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + //TODO use a result set with an iterator, to read values on demand + QVector keys; + transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { + //Skip internals + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + keys << Akonadi2::Storage::uidFromKey(key); + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + + Trace() << "Full scan found " << keys.size() << " results"; + return ResultSet(keys); +} + + + +template +GenericFacade::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer resourceAccess) + : Akonadi2::StoreFacade(), + mResourceAccess(resourceAccess), + mDomainTypeAdaptorFactory(adaptorFactory), + mResourceInstanceIdentifier(resourceIdentifier) +{ + if (!mResourceAccess) { + mResourceAccess = QSharedPointer::create(resourceIdentifier); + } +} + +template +GenericFacade::~GenericFacade() +{ +} + +template +QByteArray GenericFacade::bufferTypeForDomainType() +{ + //We happen to have a one to one mapping + return Akonadi2::ApplicationDomain::getTypeName(); +} + +template +KAsync::Job GenericFacade::create(const DomainType &domainObject) +{ + if (!mDomainTypeAdaptorFactory) { + Warning() << "No domain type adaptor factory available"; + return KAsync::error(); + } + flatbuffers::FlatBufferBuilder entityFbb; + mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); + return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); +} + +template +KAsync::Job GenericFacade::modify(const DomainType &domainObject) +{ + if (!mDomainTypeAdaptorFactory) { + Warning() << "No domain type adaptor factory available"; + return KAsync::error(); + } + flatbuffers::FlatBufferBuilder entityFbb; + mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); + return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); +} + +template +KAsync::Job GenericFacade::remove(const DomainType &domainObject) +{ + return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); +} + +template +KAsync::Job GenericFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +{ + //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { + const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + }); + + + //In case of a live query we keep the runner for as long alive as the result provider exists + if (query.liveQuery) { + auto runner = QSharedPointer::create(query); + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { + return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { + const qint64 newRevision = executeIncrementalQuery(query, resultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + future.setFinished(); + }); + }); + resultProvider.setQueryRunner(runner); + //Ensure the connection is open, if it wasn't already opened + //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates + mResourceAccess->open(); + QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); + } + return KAsync::null(); +} + + //TODO move into result provider? +template +void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) +{ + while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + Trace() << "Got creation"; + resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Modification: + Trace() << "Got modification"; + resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Removal: + Trace() << "Got removal"; + resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + } + return true; + })){}; +} + +template +void GenericFacade::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) +{ + const auto bufferType = bufferTypeForDomainType(); + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + // + // Akonadi2::Storage::getLatest(transaction, bufferTye, key); + transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); +} + +template +ResultSet GenericFacade::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, bufferTypeForDomainType()); + } + return resultSet; +} + +template +ResultSet GenericFacade::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + const auto bufferType = bufferTypeForDomainType(); + auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = query.propertyFilter.keys().toSet(); + return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + Trace() << "Finished reading incremental result set:" << *revisionCounter; + //We're done + return QByteArray(); + }); +} + +template +ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) +{ + auto resultSetPtr = QSharedPointer::create(resultSet); + + //Read through the source values and return whatever matches the filter + std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { + while (resultSetPtr->next()) { + //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + //Always remove removals, they probably don't match due to non-available properties + if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { + if (initialQuery) { + //We're not interested in removals during the initial query + if (operation != Akonadi2::Operation_Removal) { + callback(domainObject, Akonadi2::Operation_Creation); + } + } else { + callback(domainObject, operation); + } + } + }); + } + return false; + }; + return ResultSet(generator); +} + + +template +std::function GenericFacade::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) +{ + return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + const auto property = domainObject->getProperty(filterProperty); + if (property.isValid()) { + //TODO implement other comparison operators than equality + if (property != query.propertyFilter.value(filterProperty)) { + Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); + return false; + } + } else { + Warning() << "Ignored property filter because value is invalid: " << filterProperty; + } + } + return true; + }; +} + +template +qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) +{ + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + + QSet remainingFilters; + auto resultSet = baseSetRetriever(transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); + replaySet(filteredSet, resultProvider); + resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); + return Akonadi2::Storage::maxRevision(transaction); +} + + +template +qint64 GenericFacade::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +{ + const qint64 baseRevision = resultProvider.revision() + 1; + Trace() << "Running incremental query " << baseRevision; + return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + }, resultProvider); +} + +template +qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) +{ + auto modifiedQuery = query; + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert("parent", parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert("parent", QVariant()); + } + return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); + }, resultProvider); +} + +template class Akonadi2::GenericFacade; +template class Akonadi2::GenericFacade; +template class Akonadi2::GenericFacade; +// template class Akonadi2::GenericFacade; + +#include "facade.moc" diff --git a/common/facade.h b/common/facade.h index 8b8a2a8..aa50941 100644 --- a/common/facade.h +++ b/common/facade.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2014 Christian Mollekopf + * Copyright (C) 2014 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -25,79 +25,8 @@ #include #include "resourceaccess.h" -#include "commands.h" -#include "domainadaptor.h" -#include "log.h" #include "resultset.h" -#include "storage.h" -#include "definitions.h" - -/** - * A QueryRunner runs a query and updates the corresponding result set. - * - * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), - * and by how long a result set must be updated. If the query is one off the runner dies after the execution, - * otherwise it lives on the react to changes and updates the corresponding result set. - * - * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. - */ -class QueryRunner : public QObject -{ - Q_OBJECT -public: - typedef std::function()> QueryFunction; - - QueryRunner(const Akonadi2::Query &query) {}; - /** - * Starts query - */ - KAsync::Job run(qint64 newRevision = 0) - { - return queryFunction(); - } - - /** - * Set the query to run - */ - void setQuery(const QueryFunction &query) - { - queryFunction = query; - } - -public slots: - /** - * Rerun query with new revision - */ - void revisionChanged(qint64 newRevision) - { - Trace() << "New revision: " << newRevision; - run().exec(); - } - -private: - QueryFunction queryFunction; -}; - -static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - //TODO use a result set with an iterator, to read values on demand - QVector keys; - transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - keys << Akonadi2::Storage::uidFromKey(key); - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - - Trace() << "Full scan found " << keys.size() << " results"; - return ResultSet(keys); -} - +#include "domainadaptor.h" namespace Akonadi2 { /** @@ -121,257 +50,29 @@ public: * @param resourceIdentifier is the identifier of the resource instance * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa */ - GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()) - : Akonadi2::StoreFacade(), - mResourceAccess(resourceAccess), - mDomainTypeAdaptorFactory(adaptorFactory), - mResourceInstanceIdentifier(resourceIdentifier) - { - if (!mResourceAccess) { - mResourceAccess = QSharedPointer::create(resourceIdentifier); - } - } - - ~GenericFacade() - { - } - - static QByteArray bufferTypeForDomainType() - { - //We happen to have a one to one mapping - return Akonadi2::ApplicationDomain::getTypeName(); - } - - KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE - { - if (!mDomainTypeAdaptorFactory) { - Warning() << "No domain type adaptor factory available"; - return KAsync::error(); - } - flatbuffers::FlatBufferBuilder entityFbb; - mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); - return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); - } - - KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE - { - if (!mDomainTypeAdaptorFactory) { - Warning() << "No domain type adaptor factory available"; - return KAsync::error(); - } - flatbuffers::FlatBufferBuilder entityFbb; - mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); - return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); - } - - KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE - { - return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); - } - - KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE - { - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { - const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - }); - + GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()); + ~GenericFacade(); - //In case of a live query we keep the runner for as long alive as the result provider exists - if (query.liveQuery) { - auto runner = QSharedPointer::create(query); - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { - return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { - const qint64 newRevision = executeIncrementalQuery(query, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - future.setFinished(); - }); - }); - resultProvider.setQueryRunner(runner); - //Ensure the connection is open, if it wasn't already opened - //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates - mResourceAccess->open(); - QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); - } - return KAsync::null(); - } + static QByteArray bufferTypeForDomainType(); + KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE; + KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; private: - //TODO move into result provider? - static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) - { - while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - switch (operation) { - case Akonadi2::Operation_Creation: - Trace() << "Got creation"; - resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Modification: - Trace() << "Got modification"; - resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Removal: - Trace() << "Got removal"; - resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - } - return true; - })){}; - } - - void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) - { - const auto bufferType = bufferTypeForDomainType(); - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - // - // Akonadi2::Storage::getLatest(transaction, bufferTye, key); - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - } - - ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) - { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - return resultSet; - } - - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) - { - const auto bufferType = bufferTypeForDomainType(); - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = query.propertyFilter.keys().toSet(); - return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - //We're done - return QByteArray(); - }); - } - - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) - { - auto resultSetPtr = QSharedPointer::create(resultSet); - - //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { - while (resultSetPtr->next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { - //Always remove removals, they probably don't match due to non-available properties - if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { - if (initialQuery) { - //We're not interested in removals during the initial query - if (operation != Akonadi2::Operation_Removal) { - callback(domainObject, Akonadi2::Operation_Creation); - } - } else { - callback(domainObject, operation); - } - } - }); - } - return false; - }; - return ResultSet(generator); - } - - - std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query) - { - return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - const auto property = domainObject->getProperty(filterProperty); - if (property.isValid()) { - //TODO implement other comparison operators than equality - if (property != query.propertyFilter.value(filterProperty)) { - Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); - return false; - } - } else { - Warning() << "Ignored property filter because value is invalid: " << filterProperty; - } - } - return true; - }; - } - - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) - { - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - - QSet remainingFilters; - auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); - replaySet(filteredSet, resultProvider); - resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); - return Akonadi2::Storage::maxRevision(transaction); - } + static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); + void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); - qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) - { - const qint64 baseRevision = resultProvider.revision() + 1; - Trace() << "Running incremental query " << baseRevision; - return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider); - } + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) - { - auto modifiedQuery = query; - if (parent) { - Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert("parent", parent->identifier()); - } else { - Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert("parent", QVariant()); - } - return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider); - } + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery); + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); protected: //TODO use one resource access instance per application & per resource -- cgit v1.2.3 From 0b967e06a1a50c1f540b941d381680cdf3ac4706 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 21 Nov 2015 02:29:35 +0100 Subject: Fixed build --- common/facade.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'common') diff --git a/common/facade.cpp b/common/facade.cpp index b4931cf..f534319 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -20,7 +20,6 @@ #include "facade.h" #include "commands.h" -#include "domainadaptor.h" #include "log.h" #include "storage.h" #include "definitions.h" -- cgit v1.2.3 From ec92f856854a35bd888b883802a1ef618cc9f69c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 21 Nov 2015 02:29:57 +0100 Subject: Don't try to fetch more once the parent is fetched. We're not doing partial fetches yet --- common/modelresult.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'common') diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 1abcc62..5b9e24f 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -104,7 +104,7 @@ template bool ModelResult::canFetchMore(const QModelIndex &parent) const { qDebug() << "Can fetch more: " << parent << mEntityChildrenFetched.value(parent.internalId()); - return mEntityChildrenFetched.value(parent.internalId()); + return !mEntityChildrenFetched.value(parent.internalId(), false); } template @@ -133,7 +133,7 @@ void ModelResult::add(const Ptr &value) } } if (mEntities.contains(childId)) { - qWarning() << "Entity already in model " << value->identifier(); + Warning() << "Entity already in model " << value->identifier(); return; } qDebug() << "Inserting rows " << index << parent; -- cgit v1.2.3 From 110817a23463c71eacbc986af3ae509462758a3c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 21 Nov 2015 11:07:47 +0100 Subject: Separated DomainTypeAdaptorFactoryInterface --- common/domainadaptor.h | 12 ++------- common/domaintypeadaptorfactoryinterface.h | 42 ++++++++++++++++++++++++++++++ common/facade.cpp | 1 + common/facade.h | 4 ++- 4 files changed, 48 insertions(+), 11 deletions(-) create mode 100644 common/domaintypeadaptorfactoryinterface.h (limited to 'common') diff --git a/common/domainadaptor.h b/common/domainadaptor.h index 620a658..b541e23 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2014 Christian Mollekopf + * Copyright (C) 2014 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -23,6 +23,7 @@ #include #include +#include "domaintypeadaptorfactoryinterface.h" #include "domain/applicationdomaintype.h" #include "domain/event.h" #include "domain/mail.h" @@ -124,15 +125,6 @@ public: QSharedPointer > mResourceMapper; }; -class DomainTypeAdaptorFactoryInterface -{ -public: - typedef QSharedPointer Ptr; - virtual ~DomainTypeAdaptorFactoryInterface() {}; - virtual QSharedPointer createAdaptor(const Akonadi2::Entity &entity) = 0; - virtual void createBuffer(const Akonadi2::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; -}; - /** * The factory should define how to go from an entitybuffer (local + resource buffer), to a domain type adapter. * It defines how values are split accross local and resource buffer. diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h new file mode 100644 index 0000000..8c99aa1 --- /dev/null +++ b/common/domaintypeadaptorfactoryinterface.h @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2014 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include + +namespace Akonadi2 { + namespace ApplicationDomain { + class BufferAdaptor; + class ApplicationDomainType; + } + struct Entity; +} + +namespace flatbuffers { + class FlatBufferBuilder; +} + +class DomainTypeAdaptorFactoryInterface +{ +public: + typedef QSharedPointer Ptr; + virtual ~DomainTypeAdaptorFactoryInterface() {}; + virtual QSharedPointer createAdaptor(const Akonadi2::Entity &entity) = 0; + virtual void createBuffer(const Akonadi2::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; +}; diff --git a/common/facade.cpp b/common/facade.cpp index f534319..08f7500 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -23,6 +23,7 @@ #include "log.h" #include "storage.h" #include "definitions.h" +#include "domainadaptor.h" using namespace Akonadi2; diff --git a/common/facade.h b/common/facade.h index aa50941..794e35e 100644 --- a/common/facade.h +++ b/common/facade.h @@ -26,9 +26,11 @@ #include "resourceaccess.h" #include "resultset.h" -#include "domainadaptor.h" +#include "domaintypeadaptorfactoryinterface.h" +#include "storage.h" namespace Akonadi2 { + /** * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class. * -- cgit v1.2.3 From 9ad96df6cd1526de32bff2b4f98491dd8318f760 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 24 Nov 2015 23:00:45 +0100 Subject: Use Query::parentProperty to express tree queries That way we don't have to hardcode the parent property, and we can use the property to express non-tree queries as well. --- common/facade.cpp | 7 +++---- common/query.h | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'common') diff --git a/common/facade.cpp b/common/facade.cpp index 08f7500..59972bf 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -350,12 +350,12 @@ template qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) { auto modifiedQuery = query; - if (parent) { + if (parent && !query.parentProperty.isEmpty()) { Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert("parent", parent->identifier()); + modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); } else { Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert("parent", QVariant()); + modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); } return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); @@ -365,6 +365,5 @@ qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &que template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; -// template class Akonadi2::GenericFacade; #include "facade.moc" diff --git a/common/query.h b/common/query.h index 0cad9fb..5313fa9 100644 --- a/common/query.h +++ b/common/query.h @@ -53,6 +53,7 @@ public: QHash propertyFilter; //Properties to retrieve QSet requestedProperties; + QByteArray parentProperty; bool syncOnDemand; bool processAll; //If live query is false, this query will not continuously be updated -- cgit v1.2.3 From e4a4d72fd206fc2d5c1095b39b2839e53cd114bb Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 25 Nov 2015 00:37:42 +0100 Subject: Optimize findLast This just gave a 700% boost to query performance from ~2k to 14k reads per second... --- common/storage_lmdb.cpp | 71 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 12 deletions(-) (limited to 'common') diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 1516e69..a247e38 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -253,22 +253,69 @@ int Storage::NamedDatabase::scan(const QByteArray &k, return numberOfRetrievedValues; } -void Storage::NamedDatabase::findLatest(const QByteArray &uid, + +void Storage::NamedDatabase::findLatest(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler) const { - QByteArray latestKey; - scan(uid, [&](const QByteArray &key, const QByteArray &value) -> bool { - latestKey = key; - return true; - }, - errorHandler, true); + if (!d || !d->transaction) { + //Not an error. We rely on this to read nothing from non-existing databases. + return; + } - scan(latestKey, [=](const QByteArray &key, const QByteArray &value) -> bool { - resultHandler(key, value); - return false; - }, - errorHandler); + int rc; + MDB_val key; + MDB_val data; + MDB_cursor *cursor; + + key.mv_data = (void*)k.constData(); + key.mv_size = k.size(); + + rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); + if (rc) { + Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during mdb_cursor open: ") + QByteArray(mdb_strerror(rc))); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + return; + } + + MDB_cursor_op op = MDB_SET_RANGE; + if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { + //The first lookup will find a key that is equal or greather than our key + if (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) { + bool advanced = false; + while (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) { + advanced = true; + MDB_cursor_op nextOp = MDB_NEXT; + rc = mdb_cursor_get(cursor, &key, &data, nextOp); + if (rc) { + break; + } + } + if (advanced) { + MDB_cursor_op prefOp = MDB_PREV; + //We read past the end above, just take the last value + if (rc == MDB_NOTFOUND) { + prefOp = MDB_LAST; + } + rc = mdb_cursor_get(cursor, &key, &data, prefOp); + resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size)); + } + } + } + + //We never find the last value + if (rc == MDB_NOTFOUND) { + rc = 0; + } + + mdb_cursor_close(cursor); + + if (rc) { + Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } + + return; } -- cgit v1.2.3 From 00e6b843e9f2881faccb312594a0e91c42df0096 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 25 Nov 2015 09:22:03 +0100 Subject: Less noise --- common/facade.cpp | 6 +++--- common/modelresult.cpp | 6 +++--- common/resourceaccess.cpp | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) (limited to 'common') diff --git a/common/facade.cpp b/common/facade.cpp index 59972bf..850d28b 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -186,15 +186,15 @@ void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::Result while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { switch (operation) { case Akonadi2::Operation_Creation: - Trace() << "Got creation"; + // Trace() << "Got creation"; resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Modification: - Trace() << "Got modification"; + // Trace() << "Got modification"; resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; case Akonadi2::Operation_Removal: - Trace() << "Got removal"; + // Trace() << "Got removal"; resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); break; } diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 5b9e24f..4102cda 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -124,7 +124,7 @@ void ModelResult::add(const Ptr &value) return; } auto parent = createIndexFromId(id); - qDebug() << "Added entity " << childId << value->identifier() << id; + // qDebug() << "Added entity " << childId << value->identifier() << id; const auto keys = mTree[id]; int index = 0; for (; index < keys.size(); index++) { @@ -136,13 +136,13 @@ void ModelResult::add(const Ptr &value) Warning() << "Entity already in model " << value->identifier(); return; } - qDebug() << "Inserting rows " << index << parent; + // qDebug() << "Inserting rows " << index << parent; beginInsertRows(QModelIndex(), index, index); mEntities.insert(childId, value); mTree[id].insert(index, childId); mParents.insert(childId, id); endInsertRows(); - qDebug() << "Inserted rows " << mTree[id].size(); + // qDebug() << "Inserted rows " << mTree[id].size(); } diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 88f785f..1b46b82 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -340,7 +340,7 @@ KAsync::Job ResourceAccess::sendRevisionReplayedCommand(qint64 revision) void ResourceAccess::open() { if (d->socket && d->socket->isValid()) { - Trace() << "Socket valid, so not opening again"; + // Trace() << "Socket valid, so not opening again"; return; } if (d->openingSocket) { -- cgit v1.2.3 From a4acb7e251cba5ba6d66bf6235736202255c4eac Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 25 Nov 2015 09:37:59 +0100 Subject: Only use the parent index when it's available --- common/facade.cpp | 14 ++++++++------ common/modelresult.cpp | 8 ++++++-- common/modelresult.h | 2 +- 3 files changed, 15 insertions(+), 9 deletions(-) (limited to 'common') diff --git a/common/facade.cpp b/common/facade.cpp index 850d28b..68770b5 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -350,12 +350,14 @@ template qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) { auto modifiedQuery = query; - if (parent && !query.parentProperty.isEmpty()) { - Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); - } else { - Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); + if (!query.parentProperty.isEmpty()) { + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); + } } return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 4102cda..935e2e8 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -27,7 +27,8 @@ template ModelResult::ModelResult(const Akonadi2::Query &query, const QList &propertyColumns) :QAbstractItemModel(), - mPropertyColumns(propertyColumns) + mPropertyColumns(propertyColumns), + mQuery(query) { } @@ -42,7 +43,10 @@ static qint64 getIdentifier(const QModelIndex &idx) template qint64 ModelResult::parentId(const Ptr &value) { - return qHash(value->getProperty("parent").toByteArray()); + if (!mQuery.parentProperty.isEmpty()) { + return qHash(value->getProperty(mQuery.parentProperty).toByteArray()); + } + return qHash(QByteArray()); } template diff --git a/common/modelresult.h b/common/modelresult.h index 40a9d9d..66dfce5 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -53,7 +53,7 @@ public: void setFetcher(const std::function &fetcher); private: - static qint64 parentId(const Ptr &value); + qint64 parentId(const Ptr &value); QModelIndex createIndexFromId(const qint64 &id) const; void fetchEntities(const QModelIndex &parent); -- cgit v1.2.3 From 89aa339dd91765d67b4606938e60358f41d33884 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 25 Nov 2015 15:40:41 +0100 Subject: Fixed modifications. Without this modifications are ignored also in incremental queries. --- common/facade.cpp | 6 +++--- common/facade.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'common') diff --git a/common/facade.cpp b/common/facade.cpp index 68770b5..2806f4d 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -329,7 +329,7 @@ qint64 GenericFacade::load(const Akonadi2::Query &query, const std:: QSet remainingFilters; auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, initialQuery); replaySet(filteredSet, resultProvider); resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); return Akonadi2::Storage::maxRevision(transaction); @@ -343,7 +343,7 @@ qint64 GenericFacade::executeIncrementalQuery(const Akonadi2::Query Trace() << "Running incremental query " << baseRevision; return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider); + }, resultProvider, false); } template @@ -361,7 +361,7 @@ qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &que } return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider); + }, resultProvider, true); } template class Akonadi2::GenericFacade; diff --git a/common/facade.h b/common/facade.h index 794e35e..df09d73 100644 --- a/common/facade.h +++ b/common/facade.h @@ -72,7 +72,7 @@ private: ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery); std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider); + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); -- cgit v1.2.3 From 27164870a7a664daaca4ab6d3e3893a91d4eab5a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 26 Nov 2015 14:28:34 +0100 Subject: Avoid repeatedly opening the name db. Although, the benchmarks say it doesn't really have an impact on performance. --- common/facade.cpp | 16 ++++++++-------- common/facade.h | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) (limited to 'common') diff --git a/common/facade.cpp b/common/facade.cpp index 2806f4d..92124fc 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -203,16 +203,15 @@ void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::Result } template -void GenericFacade::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) +void GenericFacade::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) { - const auto bufferType = bufferTypeForDomainType(); //This only works for a 1:1 mapping of resource to domain types. //Not i.e. for tags that are stored as flags in each entity of an imap store. //additional properties that don't have a 1:1 mapping (such as separately stored tags), //could be added to the adaptor. // // Akonadi2::Storage::getLatest(transaction, bufferTye, key); - transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { Akonadi2::EntityBuffer buffer(value.data(), value.size()); const Akonadi2::Entity &entity = buffer.entity(); const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); @@ -270,15 +269,15 @@ ResultSet GenericFacade::loadIncrementalResultSet(qint64 baseRevisio } template -ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) +ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) { auto resultSetPtr = QSharedPointer::create(resultSet); //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { + std::function)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { while (resultSetPtr->next()) { //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { //Always remove removals, they probably don't match due to non-available properties if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { if (initialQuery) { @@ -319,17 +318,18 @@ std::function -qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider) +qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) { Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main"); QSet remainingFilters; auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, initialQuery); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); replaySet(filteredSet, resultProvider); resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); return Akonadi2::Storage::maxRevision(transaction); diff --git a/common/facade.h b/common/facade.h index df09d73..d8b878b 100644 --- a/common/facade.h +++ b/common/facade.h @@ -65,12 +65,12 @@ private: //TODO move into result provider? static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); - void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); + void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery); + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); -- cgit v1.2.3 From 5b41b26a349967acf2197f9f9228526193fd826e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 27 Nov 2015 17:30:04 +0100 Subject: Introduced a QueryRunner object The QueryRunner object lives for the duration of the query (so just for the initial query for non-live queries, and for the lifetime of the result model for live queries). It's supposed to handle all the threading internally and decouple the lifetime of the facade. --- common/CMakeLists.txt | 1 + common/clientapi.cpp | 62 +++------- common/facade.cpp | 282 +------------------------------------------- common/facade.h | 17 +-- common/facadeinterface.h | 29 ++++- common/modelresult.cpp | 22 ++++ common/modelresult.h | 6 +- common/queryrunner.cpp | 292 ++++++++++++++++++++++++++++++++++++++++++++++ common/queryrunner.h | 107 +++++++++++++++++ common/resourceaccess.h | 2 + common/resourcefacade.cpp | 17 ++- common/resourcefacade.h | 3 +- common/resultprovider.h | 150 +++++------------------- common/threadboundary.cpp | 5 +- 14 files changed, 522 insertions(+), 473 deletions(-) create mode 100644 common/queryrunner.cpp create mode 100644 common/queryrunner.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 01056d0..be312b9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -26,6 +26,7 @@ set(command_SRCS resource.cpp genericresource.cpp resourceaccess.cpp + queryrunner.cpp listener.cpp storage_common.cpp threadboundary.cpp diff --git a/common/clientapi.cpp b/common/clientapi.cpp index 02f8ce6..b24dfa8 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp @@ -34,6 +34,7 @@ #include "definitions.h" #include "resourceconfig.h" #include "facadefactory.h" +#include "modelresult.h" #include "log.h" #define ASYNCINTHREAD @@ -100,38 +101,8 @@ template QSharedPointer > Store::load(Query query) { auto resultSet = QSharedPointer >::create(); - - //Execute the search in a thread. - //We must guarantee that the emitter is returned before the first result is emitted. - //The result provider must be threadsafe. - async::run([query, resultSet](){ - QEventLoop eventLoop; - resultSet->onDone([&eventLoop](){ - eventLoop.quit(); - }); - // Query all resources and aggregate results - KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) - .template each([query, resultSet](const QByteArray &resource, KAsync::Future &future) { - if (auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource)) { - facade->load(query, *resultSet).template then([&future](){future.setFinished();}).exec(); - //Keep the facade alive for the lifetime of the resultSet. - resultSet->setFacade(facade); - } else { - //Ignore the error and carry on - future.setFinished(); - } - }).template then([query, resultSet]() { - resultSet->initialResultSetComplete(); - if (!query.liveQuery) { - resultSet->complete(); - } - }).exec(); - - //Keep the thread alive until the result is ready - if (!resultSet->isDone()) { - eventLoop.exec(); - } - }); + qWarning() << "Main thread " << QThread::currentThreadId(); + //FIXME remove return resultSet->emitter(); } @@ -139,28 +110,29 @@ template QSharedPointer Store::loadModel(Query query) { auto model = QSharedPointer >::create(query, query.requestedProperties.toList()); - auto resultProvider = std::make_shared >(model); - //Keep the resultprovider alive for as long as the model lives - model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr(resultProvider))); + + //* Client defines lifetime of model + //* The model lifetime defines the duration of live-queries + //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks + //* The emitter needs to live or the duration of query (respectively, the model) + //* The result provider needs to live for as long as results are provided (until the last thread exits). // Query all resources and aggregate results KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName())) - .template each([query, resultProvider](const QByteArray &resource, KAsync::Future &future) { + .template each([query, model](const QByteArray &resource, KAsync::Future &future) { auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); if (facade) { - facade->load(query, *resultProvider).template then([&future](){future.setFinished();}).exec(); - //Keep the facade alive for the lifetime of the resultSet. - //FIXME this would have to become a list - resultProvider->setFacade(facade); + Trace() << "Trying to fetch from resource"; + auto result = facade->load(query); + auto emitter = result.second; + //TODO use aggregating emitter instead + model->setEmitter(emitter); + model->fetchMore(QModelIndex()); + result.first.template then([&future](){future.setFinished();}).exec(); } else { //Ignore the error and carry on future.setFinished(); } - }).template then([query, resultProvider]() { - resultProvider->initialResultSetComplete(); - if (!query.liveQuery) { - resultProvider->complete(); - } }).exec(); return model; diff --git a/common/facade.cpp b/common/facade.cpp index 92124fc..1d6b9a7 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -24,76 +24,10 @@ #include "storage.h" #include "definitions.h" #include "domainadaptor.h" +#include "queryrunner.h" using namespace Akonadi2; -/** - * A QueryRunner runs a query and updates the corresponding result set. - * - * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), - * and by how long a result set must be updated. If the query is one off the runner dies after the execution, - * otherwise it lives on the react to changes and updates the corresponding result set. - * - * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. - */ -class QueryRunner : public QObject -{ - Q_OBJECT -public: - typedef std::function()> QueryFunction; - - QueryRunner(const Akonadi2::Query &query) {}; - /** - * Starts query - */ - KAsync::Job run(qint64 newRevision = 0) - { - return queryFunction(); - } - - /** - * Set the query to run - */ - void setQuery(const QueryFunction &query) - { - queryFunction = query; - } - -public slots: - /** - * Rerun query with new revision - */ - void revisionChanged(qint64 newRevision) - { - Trace() << "New revision: " << newRevision; - run().exec(); - } - -private: - QueryFunction queryFunction; -}; - -static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - //TODO use a result set with an iterator, to read values on demand - QVector keys; - transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - keys << Akonadi2::Storage::uidFromKey(key); - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); - - Trace() << "Full scan found " << keys.size() << " results"; - return ResultSet(keys); -} - - template GenericFacade::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer resourceAccess) @@ -150,220 +84,14 @@ KAsync::Job GenericFacade::remove(const DomainType &domainObje } template -KAsync::Job GenericFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) -{ - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { - const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - }); - - - //In case of a live query we keep the runner for as long alive as the result provider exists - if (query.liveQuery) { - auto runner = QSharedPointer::create(query); - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - runner->setQuery([this, query, &resultProvider] () -> KAsync::Job { - return KAsync::start([this, query, &resultProvider](KAsync::Future &future) { - const qint64 newRevision = executeIncrementalQuery(query, resultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - future.setFinished(); - }); - }); - resultProvider.setQueryRunner(runner); - //Ensure the connection is open, if it wasn't already opened - //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates - mResourceAccess->open(); - QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); - } - return KAsync::null(); -} - - //TODO move into result provider? -template -void GenericFacade::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) -{ - while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { - switch (operation) { - case Akonadi2::Operation_Creation: - // Trace() << "Got creation"; - resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Modification: - // Trace() << "Got modification"; - resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - case Akonadi2::Operation_Removal: - // Trace() << "Got removal"; - resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); - break; - } - return true; - })){}; -} - -template -void GenericFacade::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) -{ - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - // - // Akonadi2::Storage::getLatest(transaction, bufferTye, key); - db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Akonadi2::EntityBuffer buffer(value.data(), value.size()); - const Akonadi2::Entity &entity = buffer.entity(); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); - return false; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); -} - -template -ResultSet GenericFacade::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +QPair, typename ResultEmitter::Ptr> GenericFacade::load(const Akonadi2::Query &query) { - QSet appliedFilters; - auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - - //We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, bufferTypeForDomainType()); - } - return resultSet; -} - -template -ResultSet GenericFacade::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -{ - const auto bufferType = bufferTypeForDomainType(); - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = query.propertyFilter.keys().toSet(); - return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { - const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); - Trace() << "Revision" << *revisionCounter << type << uid; - if (type != bufferType) { - //Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - Trace() << "Finished reading incremental result set:" << *revisionCounter; - //We're done - return QByteArray(); - }); -} - -template -ResultSet GenericFacade::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) -{ - auto resultSetPtr = QSharedPointer::create(resultSet); - - //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { - while (resultSetPtr->next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { - //Always remove removals, they probably don't match due to non-available properties - if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { - if (initialQuery) { - //We're not interested in removals during the initial query - if (operation != Akonadi2::Operation_Removal) { - callback(domainObject, Akonadi2::Operation_Creation); - } - } else { - callback(domainObject, operation); - } - } - }); - } - return false; - }; - return ResultSet(generator); -} - - -template -std::function GenericFacade::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) -{ - return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - for (const auto &filterProperty : remainingFilters) { - const auto property = domainObject->getProperty(filterProperty); - if (property.isValid()) { - //TODO implement other comparison operators than equality - if (property != query.propertyFilter.value(filterProperty)) { - Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); - return false; - } - } else { - Warning() << "Ignored property filter because value is invalid: " << filterProperty; - } - } - return true; - }; -} - -template -qint64 GenericFacade::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) -{ - Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); - auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main"); - - QSet remainingFilters; - auto resultSet = baseSetRetriever(transaction, remainingFilters); - auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); - replaySet(filteredSet, resultProvider); - resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); - return Akonadi2::Storage::maxRevision(transaction); + //The runner lives for the lifetime of the query + auto runner = new QueryRunner(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); + return qMakePair(KAsync::null(), runner->emitter()); } -template -qint64 GenericFacade::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) -{ - const qint64 baseRevision = resultProvider.revision() + 1; - Trace() << "Running incremental query " << baseRevision; - return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider, false); -} - -template -qint64 GenericFacade::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) -{ - auto modifiedQuery = query; - if (!query.parentProperty.isEmpty()) { - if (parent) { - Trace() << "Running initial query for parent:" << parent->identifier(); - modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); - } else { - Trace() << "Running initial query for toplevel"; - modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); - } - } - return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { - return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider, true); -} - template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; template class Akonadi2::GenericFacade; diff --git a/common/facade.h b/common/facade.h index d8b878b..de67e05 100644 --- a/common/facade.h +++ b/common/facade.h @@ -59,22 +59,7 @@ public: KAsync::Job create(const DomainType &domainObject) Q_DECL_OVERRIDE; KAsync::Job modify(const DomainType &domainObject) Q_DECL_OVERRIDE; KAsync::Job remove(const DomainType &domainObject) Q_DECL_OVERRIDE; - KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; - -private: - //TODO move into result provider? - static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); - - void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); - - ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); - std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); - qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); - qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); + QPair, typename ResultEmitter::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; protected: //TODO use one resource access instance per application & per resource diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 7ec21bc..318abf3 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "applicationdomaintype.h" #include "resultprovider.h" @@ -42,10 +43,32 @@ class StoreFacade { public: virtual ~StoreFacade(){}; QByteArray type() const { return ApplicationDomain::getTypeName(); } + + /** + * Create an entity in the store. + * + * The job returns succefully once the task has been successfully placed in the queue + */ virtual KAsync::Job create(const DomainType &domainObject) = 0; + + /** + * Modify an entity in the store. + * + * The job returns succefully once the task has been successfully placed in the queue + */ virtual KAsync::Job modify(const DomainType &domainObject) = 0; + + /** + * Remove an entity from the store. + * + * The job returns succefully once the task has been successfully placed in the queue + */ virtual KAsync::Job remove(const DomainType &domainObject) = 0; - virtual KAsync::Job load(const Query &query, Akonadi2::ResultProviderInterface &resultProvider) = 0; + + /** + * Load entities from the store. + */ + virtual QPair, typename Akonadi2::ResultEmitter::Ptr > load(const Query &query) = 0; }; template @@ -67,9 +90,9 @@ public: return KAsync::error(-1, "Failed to create a facade"); } - KAsync::Job load(const Query &query, Akonadi2::ResultProviderInterface &resultProvider) + QPair, typename Akonadi2::ResultEmitter::Ptr > load(const Query &query) { - return KAsync::error(-1, "Failed to create a facade"); + return qMakePair(KAsync::null(), typename Akonadi2::ResultEmitter::Ptr()); } }; diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 935e2e8..65eaba9 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -182,6 +182,28 @@ void ModelResult::setFetcher(const std::function +void ModelResult::setEmitter(const typename Akonadi2::ResultEmitter::Ptr &emitter) +{ + setFetcher(emitter->mFetcher); + emitter->onAdded([this](const Ptr &value) { + this->add(value); + }); + emitter->onModified([this](const Ptr &value) { + this->modify(value); + }); + emitter->onRemoved([this](const Ptr &value) { + this->remove(value); + }); + emitter->onInitialResultSetComplete([this]() { + }); + emitter->onComplete([this]() { + }); + emitter->onClear([this]() { + }); + mEmitter = emitter; +} + template void ModelResult::modify(const Ptr &value) { diff --git a/common/modelresult.h b/common/modelresult.h index 66dfce5..eb6c86b 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -23,20 +23,23 @@ #include #include #include +#include #include #include "query.h" +#include "resultprovider.h" template class ModelResult : public QAbstractItemModel { public: - enum Roles { DomainObjectRole = Qt::UserRole + 1 }; ModelResult(const Akonadi2::Query &query, const QList &propertyColumns); + void setEmitter(const typename Akonadi2::ResultEmitter::Ptr &); + int rowCount(const QModelIndex &parent = QModelIndex()) const; int columnCount(const QModelIndex &parent = QModelIndex()) const; QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; @@ -65,5 +68,6 @@ private: QList mPropertyColumns; Akonadi2::Query mQuery; std::function loadEntities; + typename Akonadi2::ResultEmitter::Ptr mEmitter; }; diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp new file mode 100644 index 0000000..4159112 --- /dev/null +++ b/common/queryrunner.cpp @@ -0,0 +1,292 @@ +/* + Copyright (c) 2015 Christian Mollekopf + + This library is free software; you can redistribute it and/or modify it + under the terms of the GNU Library General Public License as published by + the Free Software Foundation; either version 2 of the License, or (at your + option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public + License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. +*/ +#include "queryrunner.h" + +#include +#include +#include +#include "commands.h" +#include "log.h" +#include "storage.h" +#include "definitions.h" +#include "domainadaptor.h" + +using namespace Akonadi2; + +static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + //TODO use a result set with an iterator, to read values on demand + QVector keys; + transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { + //Skip internals + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + keys << Akonadi2::Storage::uidFromKey(key); + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + + Trace() << "Full scan found " << keys.size() << " results"; + return ResultSet(keys); +} + +template +QueryRunner::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) + : QueryRunnerBase(), + mResourceAccess(resourceAccess), + mResultProvider(new ResultProvider), + mDomainTypeAdaptorFactory(factory), + mQuery(query), + mResourceInstanceIdentifier(instanceIdentifier), + mBufferType(bufferType) +{ + Trace() << "Starting query"; + //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { + Trace() << "Running fetcher"; + + // auto watcher = new QFutureWatcher; + // QObject::connect(watcher, &QFutureWatcher::finished, [](qint64 newRevision) { + // mResourceAccess->sendRevisionReplayedCommand(newRevision); + // }); + // auto future = QtConcurrent::run([&resultProvider]() -> qint64 { + // const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); + // return newRevision; + // }); + // watcher->setFuture(future); + const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + }); + + + //In case of a live query we keep the runner for as long alive as the result provider exists + if (query.liveQuery) { + //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + setQuery([this, query] () -> KAsync::Job { + return KAsync::start([this, query](KAsync::Future &future) { + //TODO execute in thread + const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + future.setFinished(); + }); + }); + //Ensure the connection is open, if it wasn't already opened + //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates + mResourceAccess->open(); + QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); + } +} + +template +typename Akonadi2::ResultEmitter::Ptr QueryRunner::emitter() +{ + return mResultProvider->emitter(); +} + +//TODO move into result provider? +template +void QueryRunner::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) +{ + // Trace() << "Replay set"; + while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + // Trace() << "Got creation"; + resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Modification: + // Trace() << "Got modification"; + resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + case Akonadi2::Operation_Removal: + // Trace() << "Got removal"; + resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value).template staticCast()); + break; + } + return true; + })){}; +} + +template +void QueryRunner::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) +{ + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); + return false; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); +} + +template +ResultSet QueryRunner::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + QSet appliedFilters; + auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + + //We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + //TODO this should be replaced by an index lookup as well + resultSet = fullScan(transaction, mBufferType); + } + return resultSet; +} + +template +ResultSet QueryRunner::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +{ + const auto bufferType = mBufferType; + auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = query.propertyFilter.keys().toSet(); + return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { + const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + // Trace() << "Revision" << *revisionCounter << type << uid; + if (type != bufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + Trace() << "Finished reading incremental result set:" << *revisionCounter; + //We're done + return QByteArray(); + }); +} + +template +ResultSet QueryRunner::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) +{ + auto resultSetPtr = QSharedPointer::create(resultSet); + + //Read through the source values and return whatever matches the filter + std::function)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { + while (resultSetPtr->next()) { + //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { + //Always remove removals, they probably don't match due to non-available properties + if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { + Trace() << "entity is not filtered" << initialQuery; + if (initialQuery) { + //We're not interested in removals during the initial query + if (operation != Akonadi2::Operation_Removal) { + callback(domainObject, Akonadi2::Operation_Creation); + } + } else { + callback(domainObject, operation); + } + } + }); + } + return false; + }; + return ResultSet(generator); +} + + +template +std::function QueryRunner::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) +{ + return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + for (const auto &filterProperty : remainingFilters) { + const auto property = domainObject->getProperty(filterProperty); + if (property.isValid()) { + //TODO implement other comparison operators than equality + if (property != query.propertyFilter.value(filterProperty)) { + Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); + return false; + } + } else { + Warning() << "Ignored property filter because value is invalid: " << filterProperty; + } + } + return true; + }; +} + +template +qint64 QueryRunner::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) +{ + Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); + storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); + auto db = transaction.openDatabase(mBufferType + ".main"); + + QSet remainingFilters; + auto resultSet = baseSetRetriever(transaction, remainingFilters); + auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); + replaySet(filteredSet, resultProvider); + resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); + return Akonadi2::Storage::maxRevision(transaction); +} + + +template +qint64 QueryRunner::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +{ + const qint64 baseRevision = resultProvider.revision() + 1; + Trace() << "Running incremental query " << baseRevision; + return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); + }, resultProvider, false); +} + +template +qint64 QueryRunner::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) +{ + auto modifiedQuery = query; + if (!query.parentProperty.isEmpty()) { + if (parent) { + Trace() << "Running initial query for parent:" << parent->identifier(); + modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); + } else { + Trace() << "Running initial query for toplevel"; + modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); + } + } + return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); + }, resultProvider, true); +} + +template class QueryRunner; +template class QueryRunner; +template class QueryRunner; diff --git a/common/queryrunner.h b/common/queryrunner.h new file mode 100644 index 0000000..e2af9de --- /dev/null +++ b/common/queryrunner.h @@ -0,0 +1,107 @@ +/* + Copyright (c) 2015 Christian Mollekopf + + This library is free software; you can redistribute it and/or modify it + under the terms of the GNU Library General Public License as published by + the Free Software Foundation; either version 2 of the License, or (at your + option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public + License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. +*/ + +#pragma once + +#include +#include "facadeinterface.h" +#include "resourceaccess.h" +#include "resultprovider.h" +#include "domaintypeadaptorfactoryinterface.h" +#include "storage.h" +#include "query.h" + +/** + * A QueryRunner runs a query and updates the corresponding result set. + * + * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), + * and by how long a result set must be updated. If the query is one off the runner dies after the execution, + * otherwise it lives on the react to changes and updates the corresponding result set. + * + * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. + */ + +class QueryRunnerBase : public QObject +{ + Q_OBJECT +protected: + typedef std::function()> QueryFunction; + + /** + * Set the query to run + */ + void setQuery(const QueryFunction &query) + { + queryFunction = query; + } + + +protected slots: + /** + * Rerun query with new revision + */ + void revisionChanged(qint64 newRevision) + { + Trace() << "New revision: " << newRevision; + run().exec(); + } + +private: + /** + * Starts query + */ + KAsync::Job run(qint64 newRevision = 0) + { + return queryFunction(); + } + + QueryFunction queryFunction; +}; + +template +class QueryRunner : public QueryRunnerBase +{ +public: + QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); + + typename Akonadi2::ResultEmitter::Ptr emitter(); + +private: + static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider); + + void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); + + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); + qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); + +private: + QSharedPointer > mResultProvider; + QSharedPointer mResourceAccess; + DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; + QByteArray mResourceInstanceIdentifier; + QByteArray mBufferType; + Akonadi2::Query mQuery; +}; + diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 8e27054..e87a1f7 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -37,6 +37,8 @@ class ResourceAccessInterface : public QObject { Q_OBJECT public: + typedef QSharedPointer Ptr; + ResourceAccessInterface() {} virtual ~ResourceAccessInterface() {} virtual KAsync::Job sendCommand(int commandId) = 0; diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 1796271..3d207e4 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -54,9 +54,15 @@ KAsync::Job ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon }); } -KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +QPair, typename Akonadi2::ResultEmitter::Ptr > ResourceFacade::load(const Akonadi2::Query &query) { - return KAsync::start([query, &resultProvider]() { + auto resultProvider = new Akonadi2::ResultProvider(); + auto emitter = resultProvider->emitter(); + resultProvider->setFetcher([](const QSharedPointer &) {}); + resultProvider->onDone([resultProvider]() { + delete resultProvider; + }); + auto job = KAsync::start([query, resultProvider]() { const auto configuredResources = ResourceConfig::getResources(); for (const auto &res : configuredResources.keys()) { const auto type = configuredResources.value(res); @@ -64,12 +70,13 @@ KAsync::Job ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::R auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create(); resource->setProperty("identifier", res); resource->setProperty("type", type); - resultProvider.add(resource); + resultProvider->add(resource); } } //TODO initialResultSetComplete should be implicit - resultProvider.initialResultSetComplete(); - resultProvider.complete(); + resultProvider->initialResultSetComplete(); + resultProvider->complete(); }); + return qMakePair(job, emitter); } diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 123b481..38e0c0e 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h @@ -37,5 +37,6 @@ public: KAsync::Job create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; KAsync::Job remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; - KAsync::Job load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) Q_DECL_OVERRIDE; + QPair, typename Akonadi2::ResultEmitter::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; }; + diff --git a/common/resultprovider.h b/common/resultprovider.h index 921cd6b..86382ef 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -20,12 +20,12 @@ #pragma once +#include #include #include #include "threadboundary.h" #include "resultset.h" #include "log.h" -#include "modelresult.h" using namespace async; @@ -53,12 +53,7 @@ public: virtual void initialResultSetComplete() = 0; virtual void complete() = 0; virtual void clear() = 0; - virtual void setFetcher(const std::function &fetcher) - { - } - - virtual void setFacade(const std::shared_ptr &facade) = 0; - virtual void setQueryRunner(const QSharedPointer &runner) = 0; + virtual void setFetcher(const std::function &fetcher) = 0; void setRevision(qint64 revision) { @@ -74,101 +69,6 @@ private: qint64 mRevision; }; -template -class ModelResultProvider : public ResultProviderInterface { -public: - ModelResultProvider(QWeakPointer > model) - : ResultProviderInterface(), - mModel(model) - { - - } - - void add(const Ptr &value) - { - if (auto model = mModel.toStrongRef()) { - model->add(value); - } - } - - void modify(const Ptr &value) - { - if (auto model = mModel.toStrongRef()) { - model->modify(value); - } - } - - void remove(const Ptr &value) - { - if (auto model = mModel.toStrongRef()) { - model->remove(value); - } - } - - void initialResultSetComplete() - { - // mResultEmitter->initialResultSetComplete(); - } - - void complete() - { - // mResultEmitter->complete(); - } - - void clear() - { - // mResultEmitter->clear(); - } - - /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ - void setFacade(const std::shared_ptr &facade) - { - mFacade = facade; - } - - void onDone(const std::function &callback) - { - mOnDoneCallback = callback; - } - - bool isDone() const - { - //The existance of the emitter currently defines wether we're done or not. - // return mResultEmitter.toStrongRef().isNull(); - return true; - } - - void setFetcher(const std::function &fetcher) - { - if (auto model = mModel.toStrongRef()) { - model->setFetcher(fetcher); - } - } - - void setQueryRunner(const QSharedPointer &runner) - { - mQueryRunner = runner; - } - -private: - void done() - { - qWarning() << "done"; - if (mOnDoneCallback) { - mOnDoneCallback(); - mOnDoneCallback = std::function(); - } - } - - QWeakPointer > mModel; - QSharedPointer mQueryRunner; - std::shared_ptr mFacade; - std::function mOnDoneCallback; -}; - /* * The promise side for the result emitter */ @@ -204,6 +104,12 @@ private: } public: + typedef QSharedPointer > Ptr; + + ~ResultProvider() + { + } + //Called from worker thread void add(const T &value) { @@ -261,30 +167,16 @@ public: //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); mResultEmitter = sharedPtr; + sharedPtr->setFetcher([this](const T &parent) { + Q_ASSERT(mFetcher); + mFetcher(parent); + }); return sharedPtr; } return mResultEmitter.toStrongRef(); } - /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ - void setQueryRunner(const QSharedPointer &runner) - { - mQueryRunner = runner; - } - - /** - * For lifetimemanagement only. - * We keep the runner alive as long as the result provider exists. - */ - void setFacade(const std::shared_ptr &facade) - { - mFacade = facade; - } - void onDone(const std::function &callback) { mThreadBoundary = QSharedPointer::create(); @@ -299,7 +191,7 @@ public: void setFetcher(const std::function &fetcher) { - fetcher(T()); + mFetcher = fetcher; } private: @@ -307,16 +199,17 @@ private: { qWarning() << "done"; if (mOnDoneCallback) { - mOnDoneCallback(); + auto callback = mOnDoneCallback; mOnDoneCallback = std::function(); + //This may delete this object + callback(); } } QWeakPointer > mResultEmitter; - QSharedPointer mQueryRunner; - std::shared_ptr mFacade; std::function mOnDoneCallback; QSharedPointer mThreadBoundary; + std::function mFetcher; }; /* @@ -334,6 +227,8 @@ private: template class ResultEmitter { public: + typedef QSharedPointer > Ptr; + void onAdded(const std::function &handler) { addHandler = handler; @@ -394,6 +289,13 @@ public: clearHandler(); } + void setFetcher(const std::function &fetcher) + { + mFetcher = fetcher; + } + + std::function mFetcher; + private: friend class ResultProvider; diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp index 47ec508..48fd11a 100644 --- a/common/threadboundary.cpp +++ b/common/threadboundary.cpp @@ -24,6 +24,9 @@ Q_DECLARE_METATYPE(std::function); namespace async { ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType >("std::function"); } -ThreadBoundary:: ~ThreadBoundary() {} +ThreadBoundary:: ~ThreadBoundary() +{ +} + } -- cgit v1.2.3 From 4926e7f613ea3e03a2865eec66c6a8c1ec0b6516 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 28 Nov 2015 16:07:15 +0100 Subject: Cleanup --- common/clientapi.cpp | 12 +----------- common/clientapi.h | 12 ------------ common/modelresult.cpp | 2 +- common/queryrunner.cpp | 24 ++++++++++++++++++++---- common/queryrunner.h | 1 + common/resultprovider.h | 2 +- 6 files changed, 24 insertions(+), 29 deletions(-) (limited to 'common') diff --git a/common/clientapi.cpp b/common/clientapi.cpp index b24dfa8..29b7e68 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp @@ -41,7 +41,7 @@ namespace async { - void run(const std::function &runner) { + static void run(const std::function &runner) { auto timer = new QTimer(); timer->setSingleShot(true); QObject::connect(timer, &QTimer::timeout, [runner, timer]() { @@ -97,15 +97,6 @@ QList Store::getResources(const QList &resourceFilter, c return resources; } -template -QSharedPointer > Store::load(Query query) -{ - auto resultSet = QSharedPointer >::create(); - qWarning() << "Main thread " << QThread::currentThreadId(); - //FIXME remove - return resultSet->emitter(); -} - template QSharedPointer Store::loadModel(Query query) { @@ -213,7 +204,6 @@ KAsync::Job Store::synchronize(const Akonadi2::Query &query) #define REGISTER_TYPE(T) template KAsync::Job Store::remove(const T &domainObject); \ template KAsync::Job Store::create(const T &domainObject); \ template KAsync::Job Store::modify(const T &domainObject); \ - template QSharedPointer > Store::load(Query query); \ template QSharedPointer Store::loadModel(Query query); \ REGISTER_TYPE(ApplicationDomain::Event); diff --git a/common/clientapi.h b/common/clientapi.h index c48c6e9..7fee6ae 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -33,12 +33,6 @@ Q_DECLARE_METATYPE(std::shared_ptr); class QAbstractItemModel; -namespace async { - //This should abstract if we execute from eventloop or in thread. - //It supposed to allow the caller to finish the current method before executing the runner. - void run(const std::function &runner); -} - namespace Akonadi2 { using namespace async; @@ -54,12 +48,6 @@ public: static QString storageLocation(); static QByteArray resourceName(const QByteArray &instanceIdentifier); - /** - * Asynchronusly load a dataset - */ - template - static QSharedPointer > load(Query query); - enum Roles { DomainObjectRole = Qt::UserRole + 1 //Must be the same as in ModelResult }; diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 65eaba9..930048f 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -141,7 +141,7 @@ void ModelResult::add(const Ptr &value) return; } // qDebug() << "Inserting rows " << index << parent; - beginInsertRows(QModelIndex(), index, index); + beginInsertRows(parent, index, index); mEntities.insert(childId, value); mTree[id].insert(index, childId); mParents.insert(childId, id); diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 4159112..3f62f6a 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "commands.h" #include "log.h" #include "storage.h" @@ -45,7 +46,7 @@ static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transacti qWarning() << "Error during query: " << error.message; }); - Trace() << "Full scan found " << keys.size() << " results"; + Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; return ResultSet(keys); } @@ -96,6 +97,12 @@ QueryRunner::QueryRunner(const Akonadi2::Query &query, const Akonadi } } +template +QueryRunner::~QueryRunner() +{ + Trace() << "Stopped query"; +} + template typename Akonadi2::ResultEmitter::Ptr QueryRunner::emitter() { @@ -202,7 +209,6 @@ ResultSet QueryRunner::filterSet(const ResultSet &resultSet, const s readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { //Always remove removals, they probably don't match due to non-available properties if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { - Trace() << "entity is not filtered" << initialQuery; if (initialQuery) { //We're not interested in removals during the initial query if (operation != Akonadi2::Operation_Removal) { @@ -262,16 +268,24 @@ qint64 QueryRunner::load(const Akonadi2::Query &query, const std::fu template qint64 QueryRunner::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) { + QTime time; + time.start(); + const qint64 baseRevision = resultProvider.revision() + 1; Trace() << "Running incremental query " << baseRevision; - return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + auto revision = load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); }, resultProvider, false); + Trace() << "Incremental query took: " << time.elapsed() << " ms"; + return revision; } template qint64 QueryRunner::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) { + QTime time; + time.start(); + auto modifiedQuery = query; if (!query.parentProperty.isEmpty()) { if (parent) { @@ -282,9 +296,11 @@ qint64 QueryRunner::executeInitialQuery(const Akonadi2::Query &query modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); } } - return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { + auto revision = load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); }, resultProvider, true); + Trace() << "Initial query took: " << time.elapsed() << " ms"; + return revision; } template class QueryRunner; diff --git a/common/queryrunner.h b/common/queryrunner.h index e2af9de..c918dcb 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h @@ -79,6 +79,7 @@ class QueryRunner : public QueryRunnerBase { public: QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); + virtual ~QueryRunner(); typename Akonadi2::ResultEmitter::Ptr emitter(); diff --git a/common/resultprovider.h b/common/resultprovider.h index 86382ef..6d7867a 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -106,7 +106,7 @@ private: public: typedef QSharedPointer > Ptr; - ~ResultProvider() + virtual ~ResultProvider() { } -- cgit v1.2.3 From 887abffb3f712acaa23eae174d5890f337fe43cb Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 28 Nov 2015 16:20:38 +0100 Subject: Cleanup --- common/clientapi.h | 5 ----- 1 file changed, 5 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index 7fee6ae..04f1305 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -26,17 +26,12 @@ #include #include "query.h" -#include "resultprovider.h" #include "applicationdomaintype.h" -Q_DECLARE_METATYPE(std::shared_ptr); - class QAbstractItemModel; namespace Akonadi2 { -using namespace async; - /** * Store interface used in the client API. */ -- cgit v1.2.3 From 67d573d98da247d2cd16ce65fdd37457c5ee74ec Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 30 Nov 2015 10:30:31 +0100 Subject: ModelResult hasChildren, cleanup --- common/modelresult.cpp | 27 +++++++++++++++++++++------ common/modelresult.h | 1 + 2 files changed, 22 insertions(+), 6 deletions(-) (limited to 'common') diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 930048f..582f8ff 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -44,26 +44,32 @@ template qint64 ModelResult::parentId(const Ptr &value) { if (!mQuery.parentProperty.isEmpty()) { - return qHash(value->getProperty(mQuery.parentProperty).toByteArray()); + const auto property = value->getProperty(mQuery.parentProperty).toByteArray(); + if (!property.isEmpty()) { + return qHash(property); + } } - return qHash(QByteArray()); + return 0; } template int ModelResult::rowCount(const QModelIndex &parent) const { + qDebug() << "row count " << mTree[getIdentifier(parent)].size(); return mTree[getIdentifier(parent)].size(); } template int ModelResult::columnCount(const QModelIndex &parent) const { + qDebug() << "porperty count " << mPropertyColumns.size(); return mPropertyColumns.size(); } template QVariant ModelResult::data(const QModelIndex &index, int role) const { + qDebug() << index; if (role == DomainObjectRole) { Q_ASSERT(mEntities.contains(index.internalId())); return QVariant::fromValue(mEntities.value(index.internalId())); @@ -83,8 +89,8 @@ QVariant ModelResult::data(const QModelIndex &index, int role) const template QModelIndex ModelResult::index(int row, int column, const QModelIndex &parent) const { - auto id = getIdentifier(parent); - auto childId = mTree.value(id).at(row); + const auto id = getIdentifier(parent); + const auto childId = mTree.value(id).at(row); return createIndex(row, column, childId); } @@ -104,6 +110,15 @@ QModelIndex ModelResult::parent(const QModelIndex &index) const return createIndexFromId(parentId); } +template +bool ModelResult::hasChildren(const QModelIndex &parent) const +{ + if (mQuery.parentProperty.isEmpty() && parent.isValid()) { + return false; + } + return QAbstractItemModel::hasChildren(parent); +} + template bool ModelResult::canFetchMore(const QModelIndex &parent) const { @@ -121,8 +136,8 @@ void ModelResult::fetchMore(const QModelIndex &parent) template void ModelResult::add(const Ptr &value) { - auto childId = qHash(value->identifier()); - auto id = parentId(value); + const auto childId = qHash(value->identifier()); + const auto id = parentId(value); //Ignore updates we get before the initial fetch is done if (!mEntityChildrenFetched[id]) { return; diff --git a/common/modelresult.h b/common/modelresult.h index eb6c86b..3ccf629 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -45,6 +45,7 @@ public: QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; QModelIndex index(int row, int column, const QModelIndex & parent = QModelIndex()) const; QModelIndex parent(const QModelIndex &index) const; + bool hasChildren(const QModelIndex &parent = QModelIndex()) const; bool canFetchMore(const QModelIndex &parent) const; void fetchMore(const QModelIndex &parent); -- cgit v1.2.3 From 0413c13c6a9d2e3f5f5dc018f635f3043f09514b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 30 Nov 2015 10:31:00 +0100 Subject: Only install the headers we need. We go rid of large parts of the header entanglements. --- common/CMakeLists.txt | 6 ------ 1 file changed, 6 deletions(-) (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index be312b9..e56ece9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -74,12 +74,6 @@ install(FILES clientapi.h domain/applicationdomaintype.h query.h - threadboundary.h - resultprovider.h - facadefactory.h - log.h - listmodelresult.h bufferadaptor.h - facadeinterface.h DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel ) -- cgit v1.2.3 From bf839f1a38518fd9302f4742ddeac16e891ac408 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 30 Nov 2015 10:32:14 +0100 Subject: Debug output --- common/domain/event.cpp | 1 + common/queryrunner.cpp | 5 ++++- common/resourceaccess.cpp | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) (limited to 'common') diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 87e13bc..42c13e2 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -47,6 +47,7 @@ ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, }); appliedFilters << "uid"; } + Trace() << "Index lookup found " << keys.size() << " keys."; return ResultSet(keys); } diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 3f62f6a..bb1127c 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -114,7 +114,9 @@ template void QueryRunner::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider) { // Trace() << "Replay set"; - while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + int counter = 0; + while (resultSet.next([&resultProvider, &counter](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + counter++; switch (operation) { case Akonadi2::Operation_Creation: // Trace() << "Got creation"; @@ -131,6 +133,7 @@ void QueryRunner::replaySet(ResultSet &resultSet, Akonadi2::ResultPr } return true; })){}; + Trace() << "Replayed " << counter << " results"; } template diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 1b46b82..be25533 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -282,6 +282,7 @@ KAsync::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatB KAsync::Job ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) { + Trace() << "Sending synchronize command: " << sourceSync << localSync; flatbuffers::FlatBufferBuilder fbb; auto command = Akonadi2::CreateSynchronize(fbb, sourceSync, localSync); Akonadi2::FinishSynchronizeBuffer(fbb, command); -- cgit v1.2.3 From b5648af02ea7246b41d24e242c5f94e43e43980e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 30 Nov 2015 11:09:31 +0100 Subject: Provide status information about children fetch state The fetch state is per parent. --- common/clientapi.h | 3 ++- common/modelresult.cpp | 30 ++++++++++++++++++++---------- common/modelresult.h | 8 ++++++-- common/queryrunner.cpp | 1 + common/resourcefacade.cpp | 2 +- common/resultprovider.h | 30 +++++++++++++++++++++--------- 6 files changed, 51 insertions(+), 23 deletions(-) (limited to 'common') diff --git a/common/clientapi.h b/common/clientapi.h index 04f1305..8f87562 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -44,7 +44,8 @@ public: static QByteArray resourceName(const QByteArray &instanceIdentifier); enum Roles { - DomainObjectRole = Qt::UserRole + 1 //Must be the same as in ModelResult + DomainObjectRole = Qt::UserRole + 1, //Must be the same as in ModelResult + ChildrenFetchedRole }; /** diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 582f8ff..4def20f 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -74,6 +74,9 @@ QVariant ModelResult::data(const QModelIndex &index, int role) const Q_ASSERT(mEntities.contains(index.internalId())); return QVariant::fromValue(mEntities.value(index.internalId())); } + if (role == ChildrenFetchedRole) { + return childrenFetched(index); + } if (role == Qt::DisplayRole) { if (index.column() < mPropertyColumns.size()) { Q_ASSERT(mEntities.contains(index.internalId())); @@ -122,8 +125,8 @@ bool ModelResult::hasChildren(const QModelIndex &parent) const template bool ModelResult::canFetchMore(const QModelIndex &parent) const { - qDebug() << "Can fetch more: " << parent << mEntityChildrenFetched.value(parent.internalId()); - return !mEntityChildrenFetched.value(parent.internalId(), false); + qDebug() << "Can fetch more: " << parent << mEntityChildrenFetched.contains(parent.internalId()); + return !mEntityChildrenFetched.contains(parent.internalId()); } template @@ -139,7 +142,8 @@ void ModelResult::add(const Ptr &value) const auto childId = qHash(value->identifier()); const auto id = parentId(value); //Ignore updates we get before the initial fetch is done - if (!mEntityChildrenFetched[id]) { + if (!mEntityChildrenFetched.contains(id)) { + qDebug() << "Children not yet fetched"; return; } auto parent = createIndexFromId(id); @@ -185,7 +189,7 @@ template void ModelResult::fetchEntities(const QModelIndex &parent) { const auto id = getIdentifier(parent); - mEntityChildrenFetched[id] = true; + mEntityChildrenFetched.insert(id); Trace() << "Loading child entities"; loadEntities(parent.data(DomainObjectRole).template value()); } @@ -210,22 +214,28 @@ void ModelResult::setEmitter(const typename Akonadi2::ResultEmitter emitter->onRemoved([this](const Ptr &value) { this->remove(value); }); - emitter->onInitialResultSetComplete([this]() { - }); - emitter->onComplete([this]() { - }); - emitter->onClear([this]() { + emitter->onInitialResultSetComplete([this](const Ptr &parent) { + qint64 parentId = parent ? qHash(parent->identifier()) : 0; + const auto parentIndex = createIndexFromId(parentId); + mEntityChildrenFetchComplete.insert(parentId); + emit dataChanged(parentIndex, parentIndex, QVector() << ChildrenFetchedRole); }); mEmitter = emitter; } +template +bool ModelResult::childrenFetched(const QModelIndex &index) const +{ + return mEntityChildrenFetchComplete.contains(getIdentifier(index)); +} + template void ModelResult::modify(const Ptr &value) { auto childId = qHash(value->identifier()); auto id = parentId(value); //Ignore updates we get before the initial fetch is done - if (!mEntityChildrenFetched[id]) { + if (!mEntityChildrenFetched.contains(id)) { return; } auto parent = createIndexFromId(id); diff --git a/common/modelresult.h b/common/modelresult.h index 3ccf629..700064b 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -33,7 +33,8 @@ class ModelResult : public QAbstractItemModel { public: enum Roles { - DomainObjectRole = Qt::UserRole + 1 + DomainObjectRole = Qt::UserRole + 1, + ChildrenFetchedRole }; ModelResult(const Akonadi2::Query &query, const QList &propertyColumns); @@ -56,6 +57,8 @@ public: void setFetcher(const std::function &fetcher); + bool childrenFetched(const QModelIndex &) const; + private: qint64 parentId(const Ptr &value); QModelIndex createIndexFromId(const qint64 &id) const; @@ -65,7 +68,8 @@ private: QMap mEntities; QMap /* child entity id*/> mTree; QMap mParents; - QMap mEntityChildrenFetched; + QSet mEntityChildrenFetched; + QSet mEntityChildrenFetchComplete; QList mPropertyColumns; Akonadi2::Query mQuery; std::function loadEntities; diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index bb1127c..e365cfc 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -303,6 +303,7 @@ qint64 QueryRunner::executeInitialQuery(const Akonadi2::Query &query return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); }, resultProvider, true); Trace() << "Initial query took: " << time.elapsed() << " ms"; + resultProvider.initialResultSetComplete(parent); return revision; } diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 3d207e4..6510c90 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -74,7 +74,7 @@ QPair, typename Akonadi2::ResultEmitterinitialResultSetComplete(); + resultProvider->initialResultSetComplete(Akonadi2::ApplicationDomain::AkonadiResource::Ptr()); resultProvider->complete(); }); return qMakePair(job, emitter); diff --git a/common/resultprovider.h b/common/resultprovider.h index 6d7867a..d50f3f6 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -50,7 +50,7 @@ public: virtual void add(const T &value) = 0; virtual void modify(const T &value) = 0; virtual void remove(const T &value) = 0; - virtual void initialResultSetComplete() = 0; + virtual void initialResultSetComplete(const T &parent) = 0; virtual void complete() = 0; virtual void clear() = 0; virtual void setFetcher(const std::function &fetcher) = 0; @@ -144,9 +144,15 @@ public: }); } - void initialResultSetComplete() + void initialResultSetComplete(const T &parent) { - callInMainThreadOnEmitter(&ResultEmitter::initialResultSetComplete); + //Because I don't know how to use bind + auto weakEmitter = mResultEmitter; + callInMainThreadOnEmitter([weakEmitter, parent](){ + if (auto strongRef = weakEmitter.toStrongRef()) { + strongRef->initialResultSetComplete(parent); + } + }); } //Called from worker thread @@ -244,7 +250,7 @@ public: removeHandler = handler; } - void onInitialResultSetComplete(const std::function &handler) + void onInitialResultSetComplete(const std::function &handler) { initialResultSetCompleteHandler = handler; } @@ -274,19 +280,25 @@ public: removeHandler(value); } - void initialResultSetComplete() + void initialResultSetComplete(const DomainType &parent) { - initialResultSetCompleteHandler(); + if (initialResultSetCompleteHandler) { + initialResultSetCompleteHandler(parent); + } } void complete() { - completeHandler(); + if (completeHandler) { + completeHandler(); + } } void clear() { - clearHandler(); + if (clearHandler) { + clearHandler(); + } } void setFetcher(const std::function &fetcher) @@ -302,7 +314,7 @@ private: std::function addHandler; std::function modifyHandler; std::function removeHandler; - std::function initialResultSetCompleteHandler; + std::function initialResultSetCompleteHandler; std::function completeHandler; std::function clearHandler; ThreadBoundary mThreadBoundary; -- cgit v1.2.3 From bf28c2e3f43038165dc83c10267d103e779b245e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 30 Nov 2015 13:45:51 +0100 Subject: ModelResult: return an invalid QModelIndex for the toplevel parent --- common/modelresult.cpp | 3 +++ 1 file changed, 3 insertions(+) (limited to 'common') diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 4def20f..e2a05f8 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -100,6 +100,9 @@ QModelIndex ModelResult::index(int row, int column, const QModelIndex &p template QModelIndex ModelResult::createIndexFromId(const qint64 &id) const { + if (id == 0) { + return QModelIndex(); + } auto grandParentId = mParents.value(id, 0); auto row = mTree.value(grandParentId).indexOf(id); return createIndex(row, 0, id); -- cgit v1.2.3 From 0b8850f85f420fcb08643463afe01b026e58dde5 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 30 Nov 2015 15:58:23 +0100 Subject: Less debug output --- common/modelresult.cpp | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) (limited to 'common') diff --git a/common/modelresult.cpp b/common/modelresult.cpp index e2a05f8..c7fcd49 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -55,21 +55,18 @@ qint64 ModelResult::parentId(const Ptr &value) template int ModelResult::rowCount(const QModelIndex &parent) const { - qDebug() << "row count " << mTree[getIdentifier(parent)].size(); return mTree[getIdentifier(parent)].size(); } template int ModelResult::columnCount(const QModelIndex &parent) const { - qDebug() << "porperty count " << mPropertyColumns.size(); return mPropertyColumns.size(); } template QVariant ModelResult::data(const QModelIndex &index, int role) const { - qDebug() << index; if (role == DomainObjectRole) { Q_ASSERT(mEntities.contains(index.internalId())); return QVariant::fromValue(mEntities.value(index.internalId())); @@ -128,14 +125,12 @@ bool ModelResult::hasChildren(const QModelIndex &parent) const template bool ModelResult::canFetchMore(const QModelIndex &parent) const { - qDebug() << "Can fetch more: " << parent << mEntityChildrenFetched.contains(parent.internalId()); return !mEntityChildrenFetched.contains(parent.internalId()); } template void ModelResult::fetchMore(const QModelIndex &parent) { - qDebug() << "Fetch more: " << parent; fetchEntities(parent); } @@ -146,7 +141,6 @@ void ModelResult::add(const Ptr &value) const auto id = parentId(value); //Ignore updates we get before the initial fetch is done if (!mEntityChildrenFetched.contains(id)) { - qDebug() << "Children not yet fetched"; return; } auto parent = createIndexFromId(id); @@ -178,7 +172,7 @@ void ModelResult::remove(const Ptr &value) auto childId = qHash(value->identifier()); auto id = parentId(value); auto parent = createIndexFromId(id); - qDebug() << "Removed entity" << childId; + // qDebug() << "Removed entity" << childId; auto index = mTree[id].indexOf(qHash(value->identifier())); beginRemoveRows(parent, index, index); mEntities.remove(childId); @@ -218,7 +212,7 @@ void ModelResult::setEmitter(const typename Akonadi2::ResultEmitter this->remove(value); }); emitter->onInitialResultSetComplete([this](const Ptr &parent) { - qint64 parentId = parent ? qHash(parent->identifier()) : 0; + const qint64 parentId = parent ? qHash(parent->identifier()) : 0; const auto parentIndex = createIndexFromId(parentId); mEntityChildrenFetchComplete.insert(parentId); emit dataChanged(parentIndex, parentIndex, QVector() << ChildrenFetchedRole); @@ -242,7 +236,7 @@ void ModelResult::modify(const Ptr &value) return; } auto parent = createIndexFromId(id); - qDebug() << "Modified entity" << childId; + // qDebug() << "Modified entity" << childId; auto i = mTree[id].indexOf(childId); mEntities.remove(childId); mEntities.insert(childId, value); -- cgit v1.2.3