/* * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "store.h" #include #include #include #include #include #include "resourceaccess.h" #include "commands.h" #include "resourcefacade.h" #include "definitions.h" #include "resourceconfig.h" #include "facadefactory.h" #include "modelresult.h" #include "storage.h" #include "log.h" SINK_DEBUG_AREA("store") Q_DECLARE_METATYPE(QSharedPointer>) Q_DECLARE_METATYPE(QSharedPointer); Q_DECLARE_METATYPE(std::shared_ptr); namespace Sink { QString Store::storageLocation() { return Sink::storageLocation(); } QString Store::getTemporaryFilePath() { return Sink::temporaryFileLocation() + "/" + QUuid::createUuid().toString(); } /* * Returns a map of resource instance identifiers and resource type */ static QMap getResources(const Sink::Query::Filter &query, const QByteArray &type = QByteArray()) { const QList resourceFilter = query.ids; const auto filterResource = [&](const QByteArray &res) { const auto configuration = ResourceConfig::getConfiguration(res); for (const auto &filterProperty : query.propertyFilter.keys()) { const auto filter = query.propertyFilter.value(filterProperty); if (!filter.matches(configuration.value(filterProperty))) { return true; } } return false; }; QMap resources; // Return the global resource (signified by an empty name) for types that don't belong to a specific resource if (ApplicationDomain::isGlobalType(type)) { resources.insert("", ""); return resources; } const auto configuredResources = ResourceConfig::getResources(); if (resourceFilter.isEmpty()) { for (const auto &res : configuredResources.keys()) { const auto type = configuredResources.value(res); if (filterResource(res)) { continue; } // TODO filter by entity type resources.insert(res, type); } } else { for (const auto &res : resourceFilter) { if (configuredResources.contains(res)) { if (filterResource(res)) { continue; } resources.insert(res, configuredResources.value(res)); } else { SinkWarning() << "Resource is not existing: " << res; } } } SinkTrace() << "Found resources: " << resources; return resources; } template KAsync::Job queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_) { auto ctx = ctx_.subContext(resourceInstanceIdentifier); auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); if (facade) { SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier; auto result = facade->load(query, ctx); if (result.second) { aggregatingEmitter->addEmitter(result.second); } else { SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier; } return result.first; } else { SinkTraceCtx(ctx) << "Couldn' find a facade for " << resourceInstanceIdentifier; // Ignore the error and carry on return KAsync::null(); } } template QSharedPointer Store::loadModel(Query query) { Log::Context ctx{query.id()}; query.setType(ApplicationDomain::getTypeName()); SinkTraceCtx(ctx) << "Loading model: " << query; auto model = QSharedPointer>::create(query, query.requestedProperties); //* Client defines lifetime of model //* The model lifetime defines the duration of live-queries //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks //* The emitter needs to live or the duration of query (respectively, the model) //* The result provider needs to live for as long as results are provided (until the last thread exits). // Query all resources and aggregate results auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName()); auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); model->setEmitter(aggregatingEmitter); if (query.liveQuery() && query.getResourceFilter().ids.isEmpty() && !ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { SinkTraceCtx(ctx) << "Listening for new resources"; auto resourceCtx = ctx.subContext("resourceQuery"); auto facade = FacadeFactory::instance().getFacade(); Q_ASSERT(facade); Sink::Query resourceQuery; query.setFlags(Query::LiveQuery); auto result = facade->load(resourceQuery, resourceCtx); auto emitter = result.second; emitter->onAdded([query, aggregatingEmitter, resourceCtx](const ApplicationDomain::SinkResource::Ptr &resource) { SinkTraceCtx(resourceCtx) << "Found new resources: " << resource->identifier(); const auto resourceType = ResourceConfig::getResourceType(resource->identifier()); Q_ASSERT(!resourceType.isEmpty()); queryResource(resourceType, resource->identifier(), query, aggregatingEmitter, resourceCtx).exec(); }); emitter->onModified([](const ApplicationDomain::SinkResource::Ptr &) { }); emitter->onRemoved([](const ApplicationDomain::SinkResource::Ptr &) { }); emitter->onInitialResultSetComplete([](const ApplicationDomain::SinkResource::Ptr &, bool) { }); emitter->onComplete([query, aggregatingEmitter, resourceCtx]() { SinkTraceCtx(resourceCtx) << "Resource query complete"; }); model->setProperty("resourceEmitter", QVariant::fromValue(emitter)); result.first.exec(); } KAsync::value(resources.keys()) .template each([query, aggregatingEmitter, resources, ctx](const QByteArray &resourceInstanceIdentifier) { const auto resourceType = resources.value(resourceInstanceIdentifier); return queryResource(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter, ctx); }) .exec(); model->fetchMore(QModelIndex()); return model; } template static std::shared_ptr> getFacade(const QByteArray &resourceInstanceIdentifier) { if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { if (auto facade = FacadeFactory::instance().getFacade()) { return facade; } } if (auto facade = FacadeFactory::instance().getFacade(ResourceConfig::getResourceType(resourceInstanceIdentifier), resourceInstanceIdentifier)) { return facade; } return std::make_shared>(); } template KAsync::Job Store::create(const DomainType &domainObject) { SinkTrace() << "Create: " << domainObject; // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->create(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; }); } template KAsync::Job Store::modify(const DomainType &domainObject) { SinkTrace() << "Modify: " << domainObject; // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->modify(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); } template KAsync::Job Store::move(const DomainType &domainObject, const QByteArray &newResource) { SinkTrace() << "Move: " << domainObject << newResource; // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->move(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move"; }); } template KAsync::Job Store::copy(const DomainType &domainObject, const QByteArray &newResource) { SinkTrace() << "Copy: " << domainObject << newResource; // Potentially copy to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->copy(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy"; }); } template KAsync::Job Store::remove(const DomainType &domainObject) { SinkTrace() << "Remove: " << domainObject; // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->remove(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) { // All databases are going to become invalid, nuke the environments // TODO: all clients should react to a notification the resource Sink::Storage::DataStore::clearEnv(); SinkTrace() << "Remove data from disk " << identifier; auto time = QSharedPointer::create(); time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) .addToContext(resourceAccess) .then([resourceAccess](KAsync::Future &future) { if (resourceAccess->isReady()) { //Wait for the resource shutdown QObject::connect(resourceAccess.data(), &ResourceAccess::ready, [&future](bool ready) { if (!ready) { future.setFinished(); } }); } else { future.setFinished(); } }) .syncThen([time]() { SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } static KAsync::Job synchronize(const QByteArray &resource, const Sink::SyncScope &scope) { SinkLog() << "Synchronizing " << resource; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); return resourceAccess->synchronizeResource(scope) .addToContext(resourceAccess) .then([](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during sync."; return KAsync::error(error); } SinkTrace() << "synced."; return KAsync::null(); }); } KAsync::Job Store::synchronize(const Sink::Query &query) { return synchronize(Sink::SyncScope{query}); } KAsync::Job Store::synchronize(const Sink::SyncScope &scope) { auto resources = getResources(scope.getResourceFilter()).keys(); SinkLog() << "Synchronize" << resources; return KAsync::value(resources) .template each([scope](const QByteArray &resource) { return synchronize(resource, scope); }); } template KAsync::Job Store::fetchOne(const Sink::Query &query) { return fetch(query, 1).template then>([](const QList &list) { return KAsync::value(*list.first()); }); } template KAsync::Job> Store::fetchAll(const Sink::Query &query) { return fetch(query); } template KAsync::Job> Store::fetch(const Sink::Query &query, int minimumAmount) { auto model = loadModel(query); auto list = QSharedPointer>::create(); auto context = QSharedPointer::create(); return KAsync::start>([model, list, context, minimumAmount](KAsync::Future> &future) { if (model->rowCount() >= 1) { for (int i = 0; i < model->rowCount(); i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } } else { QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, &future, list](const QModelIndex &index, int start, int end) { for (int i = start; i <= end; i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } }); QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector &roles) { if (roles.contains(ModelResult::ChildrenFetchedRole)) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); future.setFinished(); } } }); } if (model->data(QModelIndex(), ModelResult::ChildrenFetchedRole).toBool()) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); } future.setFinished(); } }); } template DomainType Store::readOne(const Sink::Query &query) { const auto list = read(query); if (!list.isEmpty()) { return list.first(); } SinkWarning() << "Tried to read value but no values are available."; return DomainType(); } template QList Store::read(const Sink::Query &q) { Log::Context ctx{q.id()}; auto query = q; query.setFlags(Query::SynchronousQuery); QList list; auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName()); auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ SinkTraceCtx(ctx) << "Found value: " << value->identifier(); list << *value; }); for (const auto &resourceInstanceIdentifier : resources.keys()) { const auto resourceType = resources.value(resourceInstanceIdentifier); SinkTraceCtx(ctx) << "Querying resource: " << resourceType << resourceInstanceIdentifier; auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); if (facade) { SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier; auto result = facade->load(query, ctx); if (result.second) { aggregatingEmitter->addEmitter(result.second); } else { SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier; } result.first.exec(); } else { SinkTraceCtx(ctx) << "Couldn't find a facade for " << resourceInstanceIdentifier; // Ignore the error and carry on } } aggregatingEmitter->fetch(typename DomainType::Ptr()); return list; } #define REGISTER_TYPE(T) \ template KAsync::Job Store::remove(const T &domainObject); \ template KAsync::Job Store::create(const T &domainObject); \ template KAsync::Job Store::modify(const T &domainObject); \ template KAsync::Job Store::move(const T &domainObject, const QByteArray &newResource); \ template KAsync::Job Store::copy(const T &domainObject, const QByteArray &newResource); \ template QSharedPointer Store::loadModel(Query query); \ template KAsync::Job Store::fetchOne(const Query &); \ template KAsync::Job> Store::fetchAll(const Query &); \ template KAsync::Job> Store::fetch(const Query &, int); \ template T Store::readOne(const Query &); \ template QList Store::read(const Query &); SINK_REGISTER_TYPES() } // namespace Sink