/* * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "store.h" #include #include #include #include #include #include "resourceaccess.h" #include "commands.h" #include "resourcefacade.h" #include "definitions.h" #include "resourceconfig.h" #include "facadefactory.h" #include "modelresult.h" #include "storage.h" #include "log.h" SINK_DEBUG_AREA("store") Q_DECLARE_METATYPE(QSharedPointer>) Q_DECLARE_METATYPE(QSharedPointer); Q_DECLARE_METATYPE(std::shared_ptr); namespace Sink { QString Store::storageLocation() { return Sink::storageLocation(); } QString Store::getTemporaryFilePath() { return Sink::temporaryFileLocation() + "/" + QUuid::createUuid().toString(); } /* * Returns a map of resource instance identifiers and resource type */ static QMap getResources(const Sink::Query::Filter &query, const QByteArray &type = QByteArray()) { const QList resourceFilter = query.ids; const auto filterResource = [&](const QByteArray &res) { const auto configuration = ResourceConfig::getConfiguration(res); for (const auto &filterProperty : query.propertyFilter.keys()) { const auto filter = query.propertyFilter.value(filterProperty); if (!filter.matches(configuration.value(filterProperty))) { return true; } } return false; }; QMap resources; // Return the global resource (signified by an empty name) for types that don't belong to a specific resource if (ApplicationDomain::isGlobalType(type)) { resources.insert("", ""); return resources; } const auto configuredResources = ResourceConfig::getResources(); if (resourceFilter.isEmpty()) { for (const auto &res : configuredResources.keys()) { const auto type = configuredResources.value(res); if (filterResource(res)) { continue; } // TODO filter by entity type resources.insert(res, type); } } else { for (const auto &res : resourceFilter) { if (configuredResources.contains(res)) { if (filterResource(res)) { continue; } resources.insert(res, configuredResources.value(res)); } else { SinkWarning() << "Resource is not existing: " << res; } } } SinkTrace() << "Found resources: " << resources; return resources; } template KAsync::Job queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter::Ptr aggregatingEmitter) { auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); if (facade) { SinkTrace() << "Trying to fetch from resource " << resourceInstanceIdentifier; auto result = facade->load(query); if (result.second) { aggregatingEmitter->addEmitter(result.second); } else { SinkWarning() << "Null emitter for resource " << resourceInstanceIdentifier; } return result.first; } else { SinkTrace() << "Couldn' find a facade for " << resourceInstanceIdentifier; // Ignore the error and carry on return KAsync::null(); } } template QSharedPointer Store::loadModel(Query query) { query.setType(ApplicationDomain::getTypeName()); SinkTrace() << "Loading model: " << query; auto model = QSharedPointer>::create(query, query.requestedProperties); //* Client defines lifetime of model //* The model lifetime defines the duration of live-queries //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks //* The emitter needs to live or the duration of query (respectively, the model) //* The result provider needs to live for as long as results are provided (until the last thread exits). // Query all resources and aggregate results auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName()); auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); model->setEmitter(aggregatingEmitter); if (query.liveQuery() && query.getResourceFilter().ids.isEmpty() && !ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { SinkTrace() << "Listening for new resources"; auto facade = FacadeFactory::instance().getFacade("", ""); Q_ASSERT(facade); Sink::Query resourceQuery; query.setFlags(Query::LiveQuery); auto result = facade->load(resourceQuery); auto emitter = result.second; emitter->onAdded([query, aggregatingEmitter](const ApplicationDomain::SinkResource::Ptr &resource) { SinkTrace() << "Found new resources: " << resource->identifier(); const auto resourceType = ResourceConfig::getResourceType(resource->identifier()); Q_ASSERT(!resourceType.isEmpty()); queryResource(resourceType, resource->identifier(), query, aggregatingEmitter).exec(); }); emitter->onModified([](const ApplicationDomain::SinkResource::Ptr &) { }); emitter->onRemoved([](const ApplicationDomain::SinkResource::Ptr &) { }); emitter->onInitialResultSetComplete([](const ApplicationDomain::SinkResource::Ptr &) { }); emitter->onComplete([query, aggregatingEmitter]() { SinkTrace() << "Resource query complete"; }); model->setProperty("resourceEmitter", QVariant::fromValue(emitter)); result.first.exec(); } KAsync::value(resources.keys()) .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier) { const auto resourceType = resources.value(resourceInstanceIdentifier); return queryResource(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter); }) .exec(); model->fetchMore(QModelIndex()); return model; } template static std::shared_ptr> getFacade(const QByteArray &resourceInstanceIdentifier) { if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { if (auto facade = FacadeFactory::instance().getFacade("", "")) { return facade; } } if (auto facade = FacadeFactory::instance().getFacade(ResourceConfig::getResourceType(resourceInstanceIdentifier), resourceInstanceIdentifier)) { return facade; } return std::make_shared>(); } template KAsync::Job Store::create(const DomainType &domainObject) { SinkTrace() << "Create: " << domainObject; // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->create(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; }); } template KAsync::Job Store::modify(const DomainType &domainObject) { SinkTrace() << "Modify: " << domainObject; // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->modify(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); } template KAsync::Job Store::remove(const DomainType &domainObject) { SinkTrace() << "Remove: " << domainObject; // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->remove(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) { // All databases are going to become invalid, nuke the environments // TODO: all clients should react to a notification the resource Sink::Storage::DataStore::clearEnv(); SinkTrace() << "Remove data from disk " << identifier; auto time = QSharedPointer::create(); time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) .addToContext(resourceAccess) .then([resourceAccess](KAsync::Future &future) { if (resourceAccess->isReady()) { //Wait for the resource shutdown QObject::connect(resourceAccess.data(), &ResourceAccess::ready, [&future](bool ready) { if (!ready) { future.setFinished(); } }); } else { future.setFinished(); } }) .syncThen([time]() { SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } KAsync::Job Store::synchronize(const Sink::Query &query) { auto resources = getResources(query.getResourceFilter()).keys(); SinkTrace() << "synchronize" << resources; return KAsync::value(resources) .template each([query](const QByteArray &resource) { SinkTrace() << "Synchronizing " << resource; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); return resourceAccess->synchronizeResource(true, false) .addToContext(resourceAccess) .then([](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during sync."; return KAsync::error(error); } SinkTrace() << "synced."; return KAsync::null(); }); }); } KAsync::Job Store::synchronize(const Sink::SyncScope &scope) { auto resources = getResources(scope.getResourceFilter()).keys(); SinkTrace() << "synchronize" << resources; return KAsync::value(resources) .template each([scope](const QByteArray &resource) { SinkTrace() << "Synchronizing " << resource; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); return resourceAccess->synchronizeResource(scope) .addToContext(resourceAccess) .then([](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during sync."; return KAsync::error(error); } SinkTrace() << "synced."; return KAsync::null(); }); }); } template KAsync::Job Store::fetchOne(const Sink::Query &query) { return fetch(query, 1).template then>([](const QList &list) { return KAsync::value(*list.first()); }); } template KAsync::Job> Store::fetchAll(const Sink::Query &query) { return fetch(query); } template KAsync::Job> Store::fetch(const Sink::Query &query, int minimumAmount) { auto model = loadModel(query); auto list = QSharedPointer>::create(); auto context = QSharedPointer::create(); return KAsync::start>([model, list, context, minimumAmount](KAsync::Future> &future) { if (model->rowCount() >= 1) { for (int i = 0; i < model->rowCount(); i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } } else { QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, &future, list](const QModelIndex &index, int start, int end) { for (int i = start; i <= end; i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } }); QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector &roles) { if (roles.contains(ModelResult::ChildrenFetchedRole)) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); future.setFinished(); } } }); } if (model->data(QModelIndex(), ModelResult::ChildrenFetchedRole).toBool()) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); } future.setFinished(); } }); } template DomainType Store::readOne(const Sink::Query &query) { const auto list = read(query); if (!list.isEmpty()) { return list.first(); } return DomainType(); } template QList Store::read(const Sink::Query &q) { auto query = q; query.setFlags(Query::SynchronousQuery); QList list; auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName()); auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); aggregatingEmitter->onAdded([&list](const typename DomainType::Ptr &value){ SinkTrace() << "Found value: " << value->identifier(); list << *value; }); for (const auto &resourceInstanceIdentifier : resources.keys()) { const auto resourceType = resources.value(resourceInstanceIdentifier); SinkTrace() << "Querying resource: " << resourceType << resourceInstanceIdentifier; auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); if (facade) { SinkTrace() << "Trying to fetch from resource " << resourceInstanceIdentifier; auto result = facade->load(query); if (result.second) { aggregatingEmitter->addEmitter(result.second); } else { SinkWarning() << "Null emitter for resource " << resourceInstanceIdentifier; } result.first.exec(); } else { SinkTrace() << "Couldn't find a facade for " << resourceInstanceIdentifier; // Ignore the error and carry on } } aggregatingEmitter->fetch(typename DomainType::Ptr()); return list; } #define REGISTER_TYPE(T) \ template KAsync::Job Store::remove(const T &domainObject); \ template KAsync::Job Store::create(const T &domainObject); \ template KAsync::Job Store::modify(const T &domainObject); \ template QSharedPointer Store::loadModel(Query query); \ template KAsync::Job Store::fetchOne(const Query &); \ template KAsync::Job> Store::fetchAll(const Query &); \ template KAsync::Job> Store::fetch(const Query &, int); \ template T Store::readOne(const Query &); \ template QList Store::read(const Query &); REGISTER_TYPE(ApplicationDomain::Event); REGISTER_TYPE(ApplicationDomain::Mail); REGISTER_TYPE(ApplicationDomain::Folder); REGISTER_TYPE(ApplicationDomain::SinkResource); REGISTER_TYPE(ApplicationDomain::SinkAccount); REGISTER_TYPE(ApplicationDomain::Identity); } // namespace Sink