From c55054e899660f2d667af2c2e573a1267d47358e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 13 Apr 2015 20:15:14 +0200 Subject: Use a queryrunner to execute queries. The queryrunner is responsible for running queries and keeping them up to date. This is required for self-updating queries. To get this to work properly the ResultProvider/emitter had to be fixed. The emitter now only lives as long as the client holds a reference to it, allowing the provider to detect when it is no longer necessary to keep the query alive (because noone is listening). In the process various lifetime issues have been fixed, that we're caused by lambdas capturing smartpointers, that then extended the lifetime of the associated objects unpredictably. --- common/CMakeLists.txt | 1 + common/clientapi.h | 181 +++++++++++++++++++++++++++--------------- common/facade.cpp | 20 +++++ common/facade.h | 51 +++++++++++- common/synclistresult.h | 54 +++++++++++++ common/test/clientapitest.cpp | 106 ++++++++++++++++++++++++- 6 files changed, 345 insertions(+), 68 deletions(-) create mode 100644 common/facade.cpp create mode 100644 common/synclistresult.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index b06718f..a97c7f9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -29,6 +29,7 @@ set(command_SRCS clientapi.cpp commands.cpp console.cpp + facade.cpp pipeline.cpp domainadaptor.cpp resource.cpp diff --git a/common/clientapi.h b/common/clientapi.h index 22448b3..c2b9493 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -49,17 +49,44 @@ namespace async { */ template class ResultProvider { - public: - //Called from worker thread - void add(const T &value) + private: + void callInMainThreadOnEmitter(void (ResultEmitter::*f)()) { //We use the eventloop to call the addHandler directly from the main eventloop. //That way the result emitter implementation doesn't have to care about threadsafety at all. //The alternative would be to make all handlers of the emitter threadsafe. - auto emitter = mResultEmitter; - mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { - if (emitter) { - emitter->addHandler(value); + if (auto emitter = mResultEmitter.toStrongRef()) { + auto weakEmitter = mResultEmitter; + //We don't want to keep the emitter alive here, so we only capture a weak reference + emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() { + if (auto strongRef = weakEmitter.toStrongRef()) { + (strongRef.data()->*f)(); + } + }); + } + } + + void callInMainThreadOnEmitter(const std::function &f) + { + //We use the eventloop to call the addHandler directly from the main eventloop. + //That way the result emitter implementation doesn't have to care about threadsafety at all. + //The alternative would be to make all handlers of the emitter threadsafe. + if (auto emitter = mResultEmitter.toStrongRef()) { + emitter->mThreadBoundary.callInMainThread([f]() { + f(); + }); + } + } + + public: + //Called from worker thread + void add(const T &value) + { + //Because I don't know how to use bind + auto weakEmitter = mResultEmitter; + callInMainThreadOnEmitter([weakEmitter, value](){ + if (auto strongRef = weakEmitter.toStrongRef()) { + strongRef->addHandler(value); } }); } @@ -67,25 +94,39 @@ namespace async { //Called from worker thread void complete() { - auto emitter = mResultEmitter; - mResultEmitter->mThreadBoundary.callInMainThread([emitter]() { - if (emitter) { - emitter->completeHandler(); - } - }); + callInMainThreadOnEmitter(&ResultEmitter::complete); } + void clear() + { + callInMainThreadOnEmitter(&ResultEmitter::clear); + } + + QSharedPointer > emitter() { if (!mResultEmitter) { - mResultEmitter = QSharedPointer >(new ResultEmitter()); + //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again + auto sharedPtr = QSharedPointer >::create(); + mResultEmitter = sharedPtr; + return sharedPtr; } - return mResultEmitter; + return mResultEmitter.toStrongRef(); + } + + /** + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ + void setQueryRunner(const QSharedPointer &runner) + { + mQueryRunner = runner; } private: - QSharedPointer > mResultEmitter; + QWeakPointer > mResultEmitter; + QSharedPointer mQueryRunner; }; /* @@ -99,7 +140,6 @@ namespace async { * * build async interfaces with signals * * build sync interfaces that block when accessing the value * - * TODO: This should probably be merged with daniels futurebase used in Async */ template class ResultEmitter { @@ -113,51 +153,36 @@ namespace async { { completeHandler = handler; } + void onClear(const std::function &handler) + { + clearHandler = handler; + } - private: - friend class ResultProvider; - std::function addHandler; - // std::function removeHandler; - std::function completeHandler; - ThreadBoundary mThreadBoundary; - }; - + void add(const DomainType &value) + { + addHandler(value); + } - /* - * A result set specialization that provides a syncronous list - */ - template - class SyncListResult : public QList { - public: - SyncListResult(const QSharedPointer > &emitter) - :QList(), - mComplete(false), - mEmitter(emitter) + void complete() { - emitter->onAdded([this](const T &value) { - this->append(value); - }); - emitter->onComplete([this]() { - mComplete = true; - auto loop = mWaitLoop.toStrongRef(); - if (loop) { - loop->quit(); - } - }); + completeHandler(); } - void exec() + void clear() { - auto loop = QSharedPointer::create(); - mWaitLoop = loop; - loop->exec(QEventLoop::ExcludeUserInputEvents); + clearHandler(); } private: - bool mComplete; - QWeakPointer mWaitLoop; - QSharedPointer > mEmitter; + friend class ResultProvider; + + std::function addHandler; + // std::function removeHandler; + std::function completeHandler; + std::function clearHandler; + ThreadBoundary mThreadBoundary; }; + } namespace Akonadi2 { @@ -307,7 +332,7 @@ using namespace async; class Query { public: - Query() : syncOnDemand(true), processAll(false) {} + Query() : syncOnDemand(true), processAll(false), liveQuery(false) {} //Could also be a propertyFilter QByteArrayList resources; //Could also be a propertyFilter @@ -318,6 +343,8 @@ public: QSet requestedProperties; bool syncOnDemand; bool processAll; + //If live query is false, this query will not continuously be updated + bool liveQuery; }; @@ -337,7 +364,9 @@ public: virtual Async::Job create(const DomainType &domainObject) = 0; virtual Async::Job modify(const DomainType &domainObject) = 0; virtual Async::Job remove(const DomainType &domainObject) = 0; - virtual Async::Job load(const Query &query, const std::function &resultCallback) = 0; + //TODO remove from public API + virtual Async::Job load(const Query &query, const std::function &resultCallback) = 0; + virtual Async::Job load(const Query &query, const QSharedPointer > &resultProvider) { return Async::null(); }; }; @@ -349,6 +378,8 @@ public: class FacadeFactory { public: + typedef std::function FactoryFunction; + //FIXME: proper singleton implementation static FacadeFactory &instance() { @@ -365,7 +396,7 @@ public: void registerFacade(const QByteArray &resource) { const QByteArray typeName = ApplicationDomain::getTypeName(); - mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); + mFacadeRegistry.insert(key(resource, typeName), [](bool &externallyManaged){ return new Facade; }); } /* @@ -375,29 +406,51 @@ public: * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) * - * FIXME the factory function should really be returning QSharedPointer, which doesn't work (std::shared_pointer would though). That way i.e. a test could keep the object alive until it's done. + * FIXME the factory function should really be returning QSharedPointer, which doesn't work (std::shared_pointer would though). That way i.e. a test could keep the object alive until it's done. As a workaround the factory function can define wether it manages the lifetime of the facade itself. */ template - void registerFacade(const QByteArray &resource, const std::function &customFactoryFunction) + void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction) { const QByteArray typeName = ApplicationDomain::getTypeName(); mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); } + /* + * Can be used to clear the factory. + * + * Primarily for testing. + */ + void resetFactory() + { + mFacadeRegistry.clear(); + } + + static void doNothingDeleter(void *) + { + qWarning() << "Do nothing"; + } + template QSharedPointer > getFacade(const QByteArray &resource) { const QByteArray typeName = ApplicationDomain::getTypeName(); auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); if (factoryFunction) { - return QSharedPointer >(static_cast* >(factoryFunction())); + bool externallyManaged = false; + auto ptr = static_cast* >(factoryFunction(externallyManaged)); + if (externallyManaged) { + //Allows tests to manage the lifetime of injected facades themselves + return QSharedPointer >(ptr, doNothingDeleter); + } else { + return QSharedPointer >(ptr); + } } qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; return QSharedPointer >(); } private: - QHash > mFacadeRegistry; + QHash mFacadeRegistry; }; /** @@ -416,7 +469,7 @@ public: template static QSharedPointer > load(Query query) { - QSharedPointer > resultSet(new ResultProvider); + auto resultSet = QSharedPointer >::create(); //Execute the search in a thread. //We must guarantee that the emitter is returned before the first result is emitted. @@ -428,15 +481,15 @@ public: Async::Job job = Async::null(); for(const QByteArray &resource : query.resources) { auto facade = FacadeFactory::instance().getFacade(resource); - //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. - std::function addCallback = std::bind(&ResultProvider::add, resultSet, std::placeholders::_1); // TODO The following is a necessary hack to keep the facade alive. // Otherwise this would reduce to: // job = job.then(facade->load(query, addCallback)); // We somehow have to guarantee that the facade remains valid for the duration of the job - job = job.then([facade, query, addCallback](Async::Future &future) { - Async::Job j = facade->load(query, addCallback); + // TODO: Use one result set per facade, and merge the results separately + // resultSet->addSubset(facade->query(query)); + job = job.then([facade, query, resultSet](Async::Future &future) { + Async::Job j = facade->load(query, resultSet); j.then([&future, facade](Async::Future &f) { future.setFinished(); f.setFinished(); diff --git a/common/facade.cpp b/common/facade.cpp new file mode 100644 index 0000000..e51b32a --- /dev/null +++ b/common/facade.cpp @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2015 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "facade.h" diff --git a/common/facade.h b/common/facade.h index 98bcb38..f9b5a83 100644 --- a/common/facade.h +++ b/common/facade.h @@ -30,6 +30,55 @@ #include "domainadaptor.h" #include "entitybuffer.h" +/** + * A QueryRunner runs a query and updates the corresponding result set. + * + * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), + * and by how long a result set must be updated. If the query is one off the runner dies after the execution, + * otherwise it lives on the react to changes and updates the corresponding result set. + * + * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. + */ +class QueryRunner : public QObject +{ + Q_OBJECT +public: + typedef std::function(qint64 oldRevision, qint64 newRevision)> QueryFunction; + + QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; + /** + * Starts query + */ + Async::Job run(qint64 newRevision = 0) + { + //TODO: JOBAPI: that last empty .then should not be necessary + return queryFunction(mLatestRevision, newRevision).then([this](qint64 revision) { + mLatestRevision = revision; + }).then([](){}); + } + + /** + * + */ + void setQuery(const QueryFunction &query) + { + queryFunction = query; + } + +public slots: + /** + * Rerun query with new revision + */ + void revisionChanged(qint64 newRevision) + { + run(newRevision).exec(); + } + +private: + QueryFunction queryFunction; + qint64 mLatestRevision; +}; + namespace Akonadi2 { class ResourceAccess; /** @@ -80,7 +129,7 @@ protected: return Async::null(); } -private: +protected: //TODO use one resource access instance per application => make static QSharedPointer mResourceAccess; }; diff --git a/common/synclistresult.h b/common/synclistresult.h new file mode 100644 index 0000000..5fa0efd --- /dev/null +++ b/common/synclistresult.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include +#include + +namespace async { + +/* +* A result set specialization that provides a syncronous list. +* +* Only for testing purposes. +* +* WARNING: The nested eventloop can cause all sorts of trouble. Use only in testing code. +*/ +template +class SyncListResult : public QList { +public: + SyncListResult(const QSharedPointer > &emitter) + :QList(), + mComplete(false), + mEmitter(emitter) + { + emitter->onAdded([this](const T &value) { + this->append(value); + }); + emitter->onComplete([this]() { + mComplete = true; + if (eventLoopAborter) { + eventLoopAborter(); + //Be safe in case of a second invocation of the complete handler + eventLoopAborter = std::function(); + } + }); + emitter->onClear([this]() { + this->clear(); + }); + } + + void exec() + { + QEventLoop eventLoop; + eventLoopAborter = [&eventLoop]() { eventLoop.quit(); }; + eventLoop.exec(); + } + +private: + bool mComplete; + QSharedPointer > mEmitter; + std::function eventLoopAborter; +}; + +} diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 24b3fb9..789c656 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp @@ -3,6 +3,22 @@ #include #include "../clientapi.h" +#include "../facade.h" +#include "../synclistresult.h" + +class RevisionNotifier : public QObject +{ + Q_OBJECT +public: + RevisionNotifier() : QObject() {}; + void notify(qint64 revision) + { + emit revisionChanged(revision); + } + +Q_SIGNALS: + void revisionChanged(qint64); +}; class DummyResourceFacade : public Akonadi2::StoreFacade { @@ -11,18 +27,63 @@ public: virtual Async::Job create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null(); }; virtual Async::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null(); }; virtual Async::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null(); }; - virtual Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) + virtual Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) { - return Async::start([this, resultCallback](Async::Future &future) { + return Async::start([this, resultCallback](Async::Future &future) { qDebug() << "load called"; for(const auto &result : results) { resultCallback(result); } + future.setValue(0); future.setFinished(); }); } + Async::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + { + auto runner = QSharedPointer::create(query); + //The runner only lives as long as the resultProvider + resultProvider->setQueryRunner(runner); + runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job { + qDebug() << "Creating query for revisions: " << oldRevision << newRevision; + return Async::start([this, resultProvider, query](Async::Future &future) { + //TODO only emit changes and don't replace everything + resultProvider->clear(); + //rerun query + std::function addCallback = std::bind(&Akonadi2::ResultProvider::add, resultProvider, std::placeholders::_1); + load(query, addCallback).then([resultProvider, &future](qint64 queriedRevision) { + //TODO set revision in result provider? + //TODO update all existing results with new revision + resultProvider->complete(); + future.setValue(queriedRevision); + future.setFinished(); + }).exec(); + }); + }); + + //Ensure the notification is emitted in the right thread + //Otherwise we get crashes as we call revisionChanged from the test. + if (!notifier) { + notifier.reset(new RevisionNotifier); + } + + //TODO somehow disconnect as resultNotifier is destroyed. Otherwise we keep the runner alive forever. + if (query.liveQuery) { + QObject::connect(notifier.data(), &RevisionNotifier::revisionChanged, [runner](qint64 newRevision) { + runner->revisionChanged(newRevision); + }); + } + + return Async::start([runner](Async::Future &future) { + runner->run().then([&future]() { + //TODO if not live query, destroy runner. + future.setFinished(); + }).exec(); + }); + } + QList results; + QSharedPointer notifier; }; class ClientAPITest : public QObject @@ -30,21 +91,60 @@ class ClientAPITest : public QObject Q_OBJECT private Q_SLOTS: + void initTestCase() + { + Akonadi2::FacadeFactory::instance().resetFactory(); + } + void testLoad() { DummyResourceFacade facade; facade.results << QSharedPointer::create("resource", "id", 0, QSharedPointer()); - Akonadi2::FacadeFactory::instance().registerFacade("dummyresource", [facade](){ return new DummyResourceFacade(facade); }); + Akonadi2::FacadeFactory::instance().registerFacade("dummyresource", + [&facade](bool &externallyManaged) { + externallyManaged = true; + return &facade; + } + ); Akonadi2::Query query; query.resources << "dummyresource"; + query.liveQuery = false; async::SyncListResult result(Akonadi2::Store::load(query)); result.exec(); QCOMPARE(result.size(), 1); } + void testLiveQuery() + { + DummyResourceFacade facade; + facade.results << QSharedPointer::create("resource", "id", 0, QSharedPointer()); + + Akonadi2::FacadeFactory::instance().registerFacade("dummyresource", + [&facade](bool &externallManage){ + externallManage = true; + return &facade; + } + ); + + Akonadi2::Query query; + query.resources << "dummyresource"; + query.liveQuery = true; + + async::SyncListResult result(Akonadi2::Store::load(query)); + result.exec(); + QCOMPARE(result.size(), 1); + + //Enter a second result + facade.results << QSharedPointer::create("resource", "id2", 0, QSharedPointer()); + qWarning() << &facade; + QVERIFY(facade.notifier); + facade.notifier->revisionChanged(2); + QTRY_COMPARE(result.size(), 2); + } + }; QTEST_MAIN(ClientAPITest) -- cgit v1.2.3