/* * Copyright (C) 2014 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include #include #include #include #include #include #include #include #include #include "threadboundary.h" #include "async/src/async.h" namespace async { //This should abstract if we execute from eventloop or in thread. //It supposed to allow the caller to finish the current method before executing the runner. void run(const std::function &runner); /** * Query result set */ template class ResultEmitter; /* * The promise side for the result emitter */ template class ResultProvider { public: //Called from worker thread void add(const T &value) { //We use the eventloop to call the addHandler directly from the main eventloop. //That way the result emitter implementation doesn't have to care about threadsafety at all. //The alternative would be to make all handlers of the emitter threadsafe. auto emitter = mResultEmitter; mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { if (emitter) { emitter->addHandler(value); } }); } //Called from worker thread void complete() { auto emitter = mResultEmitter; mResultEmitter->mThreadBoundary.callInMainThread([emitter]() { if (emitter) { emitter->completeHandler(); } }); } QSharedPointer > emitter() { if (!mResultEmitter) { mResultEmitter = QSharedPointer >(new ResultEmitter()); } return mResultEmitter; } private: QSharedPointer > mResultEmitter; }; /* * The future side for the client. * * It does not directly hold the state. * * The advantage of this is that we can specialize it to: * * do inline transformations to the data * * directly store the state in a suitable datastructure: QList, QSet, std::list, QVector, ... * * build async interfaces with signals * * build sync interfaces that block when accessing the value * * TODO: This should probably be merged with daniels futurebase used in Async */ template class ResultEmitter { public: void onAdded(const std::function &handler) { addHandler = handler; } // void onRemoved(const std::function &handler); void onComplete(const std::function &handler) { completeHandler = handler; } private: friend class ResultProvider; std::function addHandler; // std::function removeHandler; std::function completeHandler; ThreadBoundary mThreadBoundary; }; /* * A result set specialization that provides a syncronous list */ template class SyncListResult : public QList { public: SyncListResult(const QSharedPointer > &emitter) :QList(), mComplete(false), mEmitter(emitter) { emitter->onAdded([this](const T &value) { this->append(value); }); emitter->onComplete([this]() { mComplete = true; auto loop = mWaitLoop.toStrongRef(); if (loop) { loop->quit(); } }); } void exec() { auto loop = QSharedPointer::create(); mWaitLoop = loop; loop->exec(QEventLoop::ExcludeUserInputEvents); } private: bool mComplete; QWeakPointer mWaitLoop; QSharedPointer > mEmitter; }; } namespace Akonadi2 { /** * Standardized Domain Types * * They don't adhere to any standard and can be freely extended * Their sole purpose is providing a standardized interface to access data. * * This is necessary to decouple resource-backends from application domain containers (otherwise each resource would have to provide a faceade for each application domain container). * * These types will be frequently modified (for every new feature that should be exposed to the any client) */ namespace Domain { /** * This class has to be implemented by resources and can be used as generic interface to access the buffer properties */ class BufferAdaptor { public: virtual QVariant getProperty(const QString &key) const { return QVariant(); } virtual void setProperty(const QString &key, const QVariant &value) {} virtual QStringList availableProperties() const { return QStringList(); } }; class MemoryBufferAdaptor : public BufferAdaptor { public: MemoryBufferAdaptor() : BufferAdaptor() { } MemoryBufferAdaptor(const BufferAdaptor &buffer) : BufferAdaptor() { for(const auto &property : buffer.availableProperties()) { mValues.insert(property, buffer.getProperty(property)); } } virtual QVariant getProperty(const QString &key) const { return mValues.value(key); } virtual void setProperty(const QString &key, const QVariant &value) { mValues.insert(key, value); } virtual QStringList availableProperties() const { return mValues.keys(); } private: QHash mValues; }; /** * The domain type interface has two purposes: * * provide a unified interface to read buffers (for zero-copy reading) * * record changes to generate changesets for modifications */ class AkonadiDomainType { public: AkonadiDomainType() :mAdaptor(new MemoryBufferAdaptor()) { } AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer &adaptor) : mAdaptor(adaptor), mResourceName(resourceName), mIdentifier(identifier), mRevision(revision) { } virtual QVariant getProperty(const QString &key){ return mAdaptor->getProperty(key); } virtual void setProperty(const QString &key, const QVariant &value){ mChangeSet.insert(key, value); } private: QSharedPointer mAdaptor; QHash mChangeSet; /* * Each domain object needs to store the resource, identifier, revision triple so we can link back to the storage location. */ QString mResourceName; QString mIdentifier; qint64 mRevision; }; struct Event : public AkonadiDomainType { typedef QSharedPointer Ptr; using AkonadiDomainType::AkonadiDomainType; }; struct Todo : public AkonadiDomainType { typedef QSharedPointer Ptr; using AkonadiDomainType::AkonadiDomainType; }; struct Calendar : public AkonadiDomainType { typedef QSharedPointer Ptr; using AkonadiDomainType::AkonadiDomainType; }; class Mail : public AkonadiDomainType { }; class Folder : public AkonadiDomainType { }; /** * All types need to be registered here an MUST return a different name. * * Do not store these types to disk, they may change over time. */ template QString getTypeName(); template<> QString getTypeName(); template<> QString getTypeName(); } using namespace async; /** * A query that matches a set of objects * * The query will have to be updated regularly similary to the domain objects. * It probably also makes sense to have a domain specific part of the query, * such as what properties we're interested in (necessary information for on-demand * loading of data). * * The query defines: * * what resources to search * * filters on various properties (parent collection, startDate range, ....) * * properties we need (for on-demand querying) */ class Query { public: Query() : syncOnDemand(true) {} //Could also be a propertyFilter QStringList resources; //Could also be a propertyFilter QStringList ids; //Filters to apply QHash propertyFilter; //Properties to retrieve QSet requestedProperties; bool syncOnDemand; }; /** * Interface for the store facade. * * All methods are synchronous. * Facades are stateful (they hold connections to resources and database). * * TODO: would it make sense to split the write, read and notification parts? (we could potentially save some connections) */ template class StoreFacade { public: virtual ~StoreFacade(){}; QString type() const { return Domain::getTypeName(); } virtual Async::Job create(const DomainType &domainObject) = 0; virtual Async::Job modify(const DomainType &domainObject) = 0; virtual Async::Job remove(const DomainType &domainObject) = 0; virtual Async::Job load(const Query &query, const std::function &resultCallback) = 0; }; /** * Facade factory that returns a store facade implementation, by loading a plugin and providing the relevant implementation. * * If we were to provide default implementations for certain capabilities. Here would be the place to do so. * * TODO: pluginmechansims for resources to provide their implementations. * * We may want a way to recycle facades to avoid recreating socket connections all the time? */ class FacadeFactory { public: //FIXME: proper singleton implementation static FacadeFactory &instance() { static FacadeFactory factory; return factory; } static QString key(const QString &resource, const QString &type) { return resource + type; } template void registerFacade(const QString &resource) { const QString typeName = Domain::getTypeName(); mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); } /* * Allows the registrar to register a specific instance. * * Primarily for testing. * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) * * FIXME the factory function should really be returning QSharedPointer, which doesn't work (std::shared_pointer would though). That way i.e. a test could keep the object alive until it's done. */ template void registerFacade(const QString &resource, const std::function &customFactoryFunction) { const QString typeName = Domain::getTypeName(); mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); } template QSharedPointer > getFacade(const QString &resource) { const QString typeName = Domain::getTypeName(); auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); if (factoryFunction) { return QSharedPointer >(static_cast* >(factoryFunction())); } qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; return QSharedPointer >(); } private: QHash > mFacadeRegistry; }; /** * Store interface used in the client API. * * TODO: For testing we need to be able to inject dummy StoreFacades. Should we work with a store instance, or a singleton factory? */ class Store { public: static QString storageLocation() { return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage"; } /** * Asynchronusly load a dataset */ template static QSharedPointer > load(Query query) { QSharedPointer > resultSet(new ResultProvider); //Execute the search in a thread. //We must guarantee that the emitter is returned before the first result is emitted. //The result provider must be threadsafe. async::run([resultSet, query](){ // Query all resources and aggregate results // query tells us in which resources we're interested // TODO: queries to individual resources could be parallelized auto eventloop = QSharedPointer::create(); Async::Job job = Async::null(); for(const QString &resource : query.resources) { auto facade = FacadeFactory::instance().getFacade(resource); //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. std::function addCallback = std::bind(&ResultProvider::add, resultSet, std::placeholders::_1); //We copy the facade pointer to keep it alive job = job.then([facade, query, addCallback](Async::Future &future) { Async::Job j = facade->load(query, addCallback); j.then([&future, facade](Async::Future &f) { future.setFinished(); f.setFinished(); }).exec(); }); } job.then([eventloop, resultSet](Async::Future &future) { resultSet->complete(); eventloop->quit(); future.setFinished(); }).exec(); //The thread contains no eventloop, so execute one here eventloop->exec(QEventLoop::ExcludeUserInputEvents); }); return resultSet->emitter(); } /** * Asynchronusly load a dataset with tree structure information */ // template // static TreeSet loadTree(Query) // { // } /** * Create a new entity. */ //TODO return job that tracks progress until resource has stored the message in it's queue? template static void create(const DomainType &domainObject, const QString &resourceIdentifier) { //Potentially move to separate thread as well auto facade = FacadeFactory::instance().getFacade(resourceIdentifier); auto job = facade->create(domainObject); auto future = job.exec(); future.waitForFinished(); //TODO return job? } /** * Modify an entity. * * This includes moving etc. since these are also simple settings on a property. */ template static void modify(const DomainType &domainObject, const QString &resourceIdentifier) { //Potentially move to separate thread as well auto facade = FacadeFactory::instance().getFacade(resourceIdentifier); facade.modify(domainObject); } /** * Remove an entity. */ template static void remove(const DomainType &domainObject, const QString &resourceIdentifier) { //Potentially move to separate thread as well auto facade = FacadeFactory::instance().getFacade(resourceIdentifier); facade.remove(domainObject); } }; }