From 12539f35e385c9250cd67e387c67dbaff4de34f3 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 24 Apr 2015 10:41:11 +0200 Subject: Keep thread alive until the end of the query, and cleanup the resultSet. --- common/clientapi.h | 68 +++++++++++++++++++++++++++++++++++++------------ tests/clientapitest.cpp | 54 ++++++++++++++++++++++++++++++++------- 2 files changed, 97 insertions(+), 25 deletions(-) diff --git a/common/clientapi.h b/common/clientapi.h index c6c43ee..4786398 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -107,7 +107,7 @@ namespace async { { if (!mResultEmitter) { //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again - auto sharedPtr = QSharedPointer >::create(); + auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ done(); delete emitter; }); mResultEmitter = sharedPtr; return sharedPtr; } @@ -124,9 +124,30 @@ namespace async { mQueryRunner = runner; } + void onDone(const std::function &callback) + { + mOnDoneCallback = callback; + } + + bool isDone() const + { + //The existance of the emitter currently defines wether we're done or not. + return mResultEmitter.toStrongRef().isNull(); + } + private: + void done() + { + qWarning() << "done"; + if (mOnDoneCallback) { + mOnDoneCallback(); + } + mOnDoneCallback = std::function(); + } + QWeakPointer > mResultEmitter; QSharedPointer mQueryRunner; + std::function mOnDoneCallback; }; /* @@ -482,31 +503,46 @@ public: static QSharedPointer > load(Query query) { auto resultSet = QSharedPointer >::create(); + // FIXME This is ridiculous but otherwise I can't release the shared pointer before the thread quits + auto resultSetPtr = QSharedPointer > >::create(resultSet); //Execute the search in a thread. //We must guarantee that the emitter is returned before the first result is emitted. //The result provider must be threadsafe. - async::run([resultSet, query](){ + async::run([resultSetPtr, query](){ // Query all resources and aggregate results const QList resources = query.resources; Async::start>([resources](){return resources;}) - .template each([query, resultSet](const QByteArray &resource, Async::Future &future) { + .template each([query, resultSetPtr](const QByteArray &resource, Async::Future &future) { + //TODO pass resource identifier to factory auto facade = FacadeFactory::instance().getFacade(resource); - // TODO The following is a necessary hack to keep the facade alive. - // Otherwise this would reduce to: - // facade->load(query, addCallback).exec(); - // We somehow have to guarantee that the facade remains valid for the duration of the job - // TODO: Use one result set per facade, and merge the results separately - // resultSet->addSubset(facade->query(query)); - facade->load(query, resultSet).template then([&future, facade]() { + if (auto resultSet = *resultSetPtr) { + facade->load(query, resultSet).template then([&future](){future.setFinished();}).exec(); + } else { + qWarning() << "result set is already gone"; future.setFinished(); - }).exec(); - }).template then([resultSet]() { + } + }).template then([resultSetPtr]() { qDebug() << "Query complete"; - resultSet->complete(); - }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done - //FIXME for live query the thread dies after the initial query? - //TODO associate the thread with the query runner + if (auto resultSet = *resultSetPtr) { + resultSet->complete(); + } else { + qWarning() << "result set is already gone"; + } + }).exec(); + + if (auto resultSet = *resultSetPtr) { + resultSetPtr->clear(); + if (!resultSet->isDone()) { + QEventLoop eventLoop; + resultSet->onDone([&eventLoop](){ + eventLoop.quit(); + }); + eventLoop.exec(); + } + } else { + qWarning() << "result set is already gone"; + } }); return resultSet->emitter(); } diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp index 7bcd7b0..057495e 100644 --- a/tests/clientapitest.cpp +++ b/tests/clientapitest.cpp @@ -24,10 +24,11 @@ class DummyResourceFacade : public Akonadi2::StoreFacade create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null(); }; - virtual Async::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null(); }; - virtual Async::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null(); }; - virtual Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) + Async::Job create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { return Async::null(); }; + Async::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { return Async::null(); }; + Async::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { return Async::null(); }; + + Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) { return Async::start([this, resultCallback](Async::Future &future) { qDebug() << "load called"; @@ -39,14 +40,23 @@ public: }); } - Async::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) + Async::Job load(const Akonadi2::Query &query, const QSharedPointer > &resultProvider) Q_DECL_OVERRIDE { auto runner = QSharedPointer::create(query); //The runner only lives as long as the resultProvider resultProvider->setQueryRunner(runner); - runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job { + QWeakPointer > weakResultProvider = resultProvider; + capturedResultProvider = resultProvider; + runner->setQuery([this, weakResultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job { qDebug() << "Creating query for revisions: " << oldRevision << newRevision; - return Async::start([this, resultProvider, query](Async::Future &future) { + return Async::start([this, weakResultProvider, query](Async::Future &future) { + auto resultProvider = weakResultProvider.toStrongRef(); + if (!resultProvider) { + Warning() << "Tried executing query after result provider is already gone"; + future.setError(0, QString()); + future.setFinished(); + return; + } //TODO only emit changes and don't replace everything resultProvider->clear(); //rerun query @@ -84,6 +94,7 @@ public: QList results; QSharedPointer notifier; + QWeakPointer > capturedResultProvider; }; class ClientAPITest : public QObject @@ -123,8 +134,8 @@ private Q_SLOTS: facade.results << QSharedPointer::create("resource", "id", 0, QSharedPointer()); Akonadi2::FacadeFactory::instance().registerFacade("dummyresource", - [&facade](bool &externallManage){ - externallManage = true; + [&facade](bool &externallyManaged){ + externallyManaged = true; return &facade; } ); @@ -145,6 +156,31 @@ private Q_SLOTS: QTRY_COMPARE(result.size(), 2); } + void testQueryLifetime() + { + DummyResourceFacade facade; + facade.results << QSharedPointer::create("resource", "id", 0, QSharedPointer()); + + Akonadi2::FacadeFactory::instance().registerFacade("dummyresource", + [&facade](bool &externallyManaged){ + externallyManaged = true; + return &facade; + } + ); + + Akonadi2::Query query; + query.resources << "dummyresource"; + query.liveQuery = true; + + { + async::SyncListResult result(Akonadi2::Store::load(query)); + result.exec(); + QCOMPARE(result.size(), 1); + } + //It's running in a separate thread, so we have to wait for a moment. + QTRY_VERIFY(!facade.capturedResultProvider); + } + }; QTEST_MAIN(ClientAPITest) -- cgit v1.2.3