From a044a7f0ea054502fb8b6aedcfa213b192a7b05a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 27 Apr 2015 00:38:36 +0200 Subject: Fixed lifetime management of resultSet. The resultSet remains valid for the duration of the thread. We keep the thread running until the ResultEmitter is deleted. --- common/clientapi.h | 92 +++++++++++++++++++++++++++++++------------------ common/facade.h | 1 - common/synclistresult.h | 8 ++--- tests/clientapitest.cpp | 5 +-- 4 files changed, 64 insertions(+), 42 deletions(-) diff --git a/common/clientapi.h b/common/clientapi.h index 4786398..6d66e40 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -29,6 +29,7 @@ #include #include #include +#include #include "threadboundary.h" #include "async/src/async.h" @@ -91,6 +92,11 @@ namespace async { }); } + void initialResultSetComplete() + { + callInMainThreadOnEmitter(&ResultEmitter::initialResultSetComplete); + } + //Called from worker thread void complete() { @@ -124,6 +130,15 @@ namespace async { mQueryRunner = runner; } + /** + * For lifetimemanagement only. + * We keep the runner alive as long as the result provider exists. + */ + void setFacade(const std::shared_ptr &facade) + { + mFacade = facade; + } + void onDone(const std::function &callback) { mOnDoneCallback = callback; @@ -141,12 +156,13 @@ namespace async { qWarning() << "done"; if (mOnDoneCallback) { mOnDoneCallback(); + mOnDoneCallback = std::function(); } - mOnDoneCallback = std::function(); } QWeakPointer > mResultEmitter; QSharedPointer mQueryRunner; + std::shared_ptr mFacade; std::function mOnDoneCallback; }; @@ -170,6 +186,10 @@ namespace async { addHandler = handler; } // void onRemoved(const std::function &handler); + void onInitialResultSetComplete(const std::function &handler) + { + initialResultSetCompleteHandler = handler; + } void onComplete(const std::function &handler) { completeHandler = handler; @@ -184,6 +204,11 @@ namespace async { addHandler(value); } + void initialResultSetComplete() + { + initialResultSetCompleteHandler(); + } + void complete() { completeHandler(); @@ -199,6 +224,7 @@ namespace async { std::function addHandler; // std::function removeHandler; + std::function initialResultSetCompleteHandler; std::function completeHandler; std::function clearHandler; ThreadBoundary mThreadBoundary; @@ -486,6 +512,14 @@ private: QHash mFacadeRegistry; }; +template +struct LifeExtender { + LifeExtender(const QSharedPointer > &f) : facade(f) {} +private: + QSharedPointer > facade; +}; + + /** * Store interface used in the client API. */ @@ -503,45 +537,37 @@ public: static QSharedPointer > load(Query query) { auto resultSet = QSharedPointer >::create(); - // FIXME This is ridiculous but otherwise I can't release the shared pointer before the thread quits - auto resultSetPtr = QSharedPointer > >::create(resultSet); //Execute the search in a thread. //We must guarantee that the emitter is returned before the first result is emitted. //The result provider must be threadsafe. - async::run([resultSetPtr, query](){ + async::run([query, resultSet](){ // Query all resources and aggregate results const QList resources = query.resources; - Async::start>([resources](){return resources;}) - .template each([query, resultSetPtr](const QByteArray &resource, Async::Future &future) { - //TODO pass resource identifier to factory - auto facade = FacadeFactory::instance().getFacade(resource); - if (auto resultSet = *resultSetPtr) { + { + Async::start>([resources](){return resources;}) + .template each([query, resultSet](const QByteArray &resource, Async::Future &future) { + //TODO pass resource identifier to factory + auto facade = FacadeFactory::instance().getFacade(resource); facade->load(query, resultSet).template then([&future](){future.setFinished();}).exec(); - } else { - qWarning() << "result set is already gone"; - future.setFinished(); - } - }).template then([resultSetPtr]() { - qDebug() << "Query complete"; - if (auto resultSet = *resultSetPtr) { - resultSet->complete(); - } else { - qWarning() << "result set is already gone"; - } - }).exec(); - - if (auto resultSet = *resultSetPtr) { - resultSetPtr->clear(); - if (!resultSet->isDone()) { - QEventLoop eventLoop; - resultSet->onDone([&eventLoop](){ - eventLoop.quit(); - }); - eventLoop.exec(); - } - } else { - qWarning() << "result set is already gone"; + //Keep the facade alive for the duration for the lifetime of the resultSet. + //TODO If the factory returned a std::shared_ptr we wouldn't require LifeExtender + resultSet->setFacade(std::make_shared >(facade)); + }).template then([query, resultSet]() { + resultSet->initialResultSetComplete(); + if (!query.liveQuery) { + resultSet->complete(); + } + }).exec(); + } + + //Keep the thread alive until the result is ready + if (!resultSet->isDone()) { + QEventLoop eventLoop; + resultSet->onDone([&eventLoop](){ + eventLoop.quit(); + }); + eventLoop.exec(); } }); return resultSet->emitter(); diff --git a/common/facade.h b/common/facade.h index a5858d5..b6ae4a6 100644 --- a/common/facade.h +++ b/common/facade.h @@ -163,7 +163,6 @@ public: load(query, addCallback).template then([resultProvider, &future](qint64 queriedRevision) { //TODO set revision in result provider? //TODO update all existing results with new revision - resultProvider->complete(); future.setValue(queriedRevision); future.setFinished(); }).exec(); diff --git a/common/synclistresult.h b/common/synclistresult.h index 5fa0efd..0a86f8c 100644 --- a/common/synclistresult.h +++ b/common/synclistresult.h @@ -19,20 +19,21 @@ class SyncListResult : public QList { public: SyncListResult(const QSharedPointer > &emitter) :QList(), - mComplete(false), mEmitter(emitter) { emitter->onAdded([this](const T &value) { this->append(value); }); - emitter->onComplete([this]() { - mComplete = true; + emitter->onInitialResultSetComplete([this]() { if (eventLoopAborter) { eventLoopAborter(); //Be safe in case of a second invocation of the complete handler eventLoopAborter = std::function(); } }); + emitter->onComplete([this]() { + mEmitter.clear(); + }); emitter->onClear([this]() { this->clear(); }); @@ -46,7 +47,6 @@ public: } private: - bool mComplete; QSharedPointer > mEmitter; std::function eventLoopAborter; }; diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp index 057495e..1a5d873 100644 --- a/tests/clientapitest.cpp +++ b/tests/clientapitest.cpp @@ -61,10 +61,7 @@ public: resultProvider->clear(); //rerun query std::function addCallback = std::bind(&Akonadi2::ResultProvider::add, resultProvider, std::placeholders::_1); - load(query, addCallback).then([resultProvider, &future](qint64 queriedRevision) { - //TODO set revision in result provider? - //TODO update all existing results with new revision - resultProvider->complete(); + load(query, addCallback).then([resultProvider, &future, query](qint64 queriedRevision) { future.setValue(queriedRevision); future.setFinished(); }).exec(); -- cgit v1.2.3