summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-24 10:41:11 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-24 10:41:11 +0200
commit12539f35e385c9250cd67e387c67dbaff4de34f3 (patch)
tree5223ebcdc7e94827862f2c9c76a3f6123c8abd1e
parent2998d9d3d5dfc825904b53393e9ae12e7cd5b72b (diff)
downloadsink-12539f35e385c9250cd67e387c67dbaff4de34f3.tar.gz
sink-12539f35e385c9250cd67e387c67dbaff4de34f3.zip
Keep thread alive until the end of the query, and cleanup the resultSet.
-rw-r--r--common/clientapi.h68
-rw-r--r--tests/clientapitest.cpp54
2 files changed, 97 insertions, 25 deletions
diff --git a/common/clientapi.h b/common/clientapi.h
index c6c43ee..4786398 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -107,7 +107,7 @@ namespace async {
107 { 107 {
108 if (!mResultEmitter) { 108 if (!mResultEmitter) {
109 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again 109 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
110 auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create(); 110 auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ done(); delete emitter; });
111 mResultEmitter = sharedPtr; 111 mResultEmitter = sharedPtr;
112 return sharedPtr; 112 return sharedPtr;
113 } 113 }
@@ -124,9 +124,30 @@ namespace async {
124 mQueryRunner = runner; 124 mQueryRunner = runner;
125 } 125 }
126 126
127 void onDone(const std::function<void()> &callback)
128 {
129 mOnDoneCallback = callback;
130 }
131
132 bool isDone() const
133 {
134 //The existance of the emitter currently defines wether we're done or not.
135 return mResultEmitter.toStrongRef().isNull();
136 }
137
127 private: 138 private:
139 void done()
140 {
141 qWarning() << "done";
142 if (mOnDoneCallback) {
143 mOnDoneCallback();
144 }
145 mOnDoneCallback = std::function<void()>();
146 }
147
128 QWeakPointer<ResultEmitter<T> > mResultEmitter; 148 QWeakPointer<ResultEmitter<T> > mResultEmitter;
129 QSharedPointer<QObject> mQueryRunner; 149 QSharedPointer<QObject> mQueryRunner;
150 std::function<void()> mOnDoneCallback;
130 }; 151 };
131 152
132 /* 153 /*
@@ -482,31 +503,46 @@ public:
482 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) 503 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query)
483 { 504 {
484 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); 505 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create();
506 // FIXME This is ridiculous but otherwise I can't release the shared pointer before the thread quits
507 auto resultSetPtr = QSharedPointer<QSharedPointer<ResultProvider<typename DomainType::Ptr> > >::create(resultSet);
485 508
486 //Execute the search in a thread. 509 //Execute the search in a thread.
487 //We must guarantee that the emitter is returned before the first result is emitted. 510 //We must guarantee that the emitter is returned before the first result is emitted.
488 //The result provider must be threadsafe. 511 //The result provider must be threadsafe.
489 async::run([resultSet, query](){ 512 async::run([resultSetPtr, query](){
490 // Query all resources and aggregate results 513 // Query all resources and aggregate results
491 const QList<QByteArray> resources = query.resources; 514 const QList<QByteArray> resources = query.resources;
492 Async::start<QList<QByteArray>>([resources](){return resources;}) 515 Async::start<QList<QByteArray>>([resources](){return resources;})
493 .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, Async::Future<void> &future) { 516 .template each<void, QByteArray>([query, resultSetPtr](const QByteArray &resource, Async::Future<void> &future) {
517 //TODO pass resource identifier to factory
494 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 518 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
495 // TODO The following is a necessary hack to keep the facade alive. 519 if (auto resultSet = *resultSetPtr) {
496 // Otherwise this would reduce to: 520 facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec();
497 // facade->load(query, addCallback).exec(); 521 } else {
498 // We somehow have to guarantee that the facade remains valid for the duration of the job 522 qWarning() << "result set is already gone";
499 // TODO: Use one result set per facade, and merge the results separately
500 // resultSet->addSubset(facade->query(query));
501 facade->load(query, resultSet).template then<void>([&future, facade]() {
502 future.setFinished(); 523 future.setFinished();
503 }).exec(); 524 }
504 }).template then<void>([resultSet]() { 525 }).template then<void>([resultSetPtr]() {
505 qDebug() << "Query complete"; 526 qDebug() << "Query complete";
506 resultSet->complete(); 527 if (auto resultSet = *resultSetPtr) {
507 }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done 528 resultSet->complete();
508 //FIXME for live query the thread dies after the initial query? 529 } else {
509 //TODO associate the thread with the query runner 530 qWarning() << "result set is already gone";
531 }
532 }).exec();
533
534 if (auto resultSet = *resultSetPtr) {
535 resultSetPtr->clear();
536 if (!resultSet->isDone()) {
537 QEventLoop eventLoop;
538 resultSet->onDone([&eventLoop](){
539 eventLoop.quit();
540 });
541 eventLoop.exec();
542 }
543 } else {
544 qWarning() << "result set is already gone";
545 }
510 }); 546 });
511 return resultSet->emitter(); 547 return resultSet->emitter();
512 } 548 }
diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp
index 7bcd7b0..057495e 100644
--- a/tests/clientapitest.cpp
+++ b/tests/clientapitest.cpp
@@ -24,10 +24,11 @@ class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDo
24{ 24{
25public: 25public:
26 ~DummyResourceFacade(){}; 26 ~DummyResourceFacade(){};
27 virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 27 Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { return Async::null<void>(); };
28 virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 28 Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { return Async::null<void>(); };
29 virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 29 Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE { return Async::null<void>(); };
30 virtual Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) 30
31 Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback)
31 { 32 {
32 return Async::start<qint64>([this, resultCallback](Async::Future<qint64> &future) { 33 return Async::start<qint64>([this, resultCallback](Async::Future<qint64> &future) {
33 qDebug() << "load called"; 34 qDebug() << "load called";
@@ -39,14 +40,23 @@ public:
39 }); 40 });
40 } 41 }
41 42
42 Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) 43 Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) Q_DECL_OVERRIDE
43 { 44 {
44 auto runner = QSharedPointer<QueryRunner>::create(query); 45 auto runner = QSharedPointer<QueryRunner>::create(query);
45 //The runner only lives as long as the resultProvider 46 //The runner only lives as long as the resultProvider
46 resultProvider->setQueryRunner(runner); 47 resultProvider->setQueryRunner(runner);
47 runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> { 48 QWeakPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > weakResultProvider = resultProvider;
49 capturedResultProvider = resultProvider;
50 runner->setQuery([this, weakResultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> {
48 qDebug() << "Creating query for revisions: " << oldRevision << newRevision; 51 qDebug() << "Creating query for revisions: " << oldRevision << newRevision;
49 return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) { 52 return Async::start<qint64>([this, weakResultProvider, query](Async::Future<qint64> &future) {
53 auto resultProvider = weakResultProvider.toStrongRef();
54 if (!resultProvider) {
55 Warning() << "Tried executing query after result provider is already gone";
56 future.setError(0, QString());
57 future.setFinished();
58 return;
59 }
50 //TODO only emit changes and don't replace everything 60 //TODO only emit changes and don't replace everything
51 resultProvider->clear(); 61 resultProvider->clear();
52 //rerun query 62 //rerun query
@@ -84,6 +94,7 @@ public:
84 94
85 QList<Akonadi2::ApplicationDomain::Event::Ptr> results; 95 QList<Akonadi2::ApplicationDomain::Event::Ptr> results;
86 QSharedPointer<RevisionNotifier> notifier; 96 QSharedPointer<RevisionNotifier> notifier;
97 QWeakPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > capturedResultProvider;
87}; 98};
88 99
89class ClientAPITest : public QObject 100class ClientAPITest : public QObject
@@ -123,8 +134,8 @@ private Q_SLOTS:
123 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); 134 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>());
124 135
125 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", 136 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource",
126 [&facade](bool &externallManage){ 137 [&facade](bool &externallyManaged){
127 externallManage = true; 138 externallyManaged = true;
128 return &facade; 139 return &facade;
129 } 140 }
130 ); 141 );
@@ -145,6 +156,31 @@ private Q_SLOTS:
145 QTRY_COMPARE(result.size(), 2); 156 QTRY_COMPARE(result.size(), 2);
146 } 157 }
147 158
159 void testQueryLifetime()
160 {
161 DummyResourceFacade facade;
162 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>());
163
164 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource",
165 [&facade](bool &externallyManaged){
166 externallyManaged = true;
167 return &facade;
168 }
169 );
170
171 Akonadi2::Query query;
172 query.resources << "dummyresource";
173 query.liveQuery = true;
174
175 {
176 async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query));
177 result.exec();
178 QCOMPARE(result.size(), 1);
179 }
180 //It's running in a separate thread, so we have to wait for a moment.
181 QTRY_VERIFY(!facade.capturedResultProvider);
182 }
183
148}; 184};
149 185
150QTEST_MAIN(ClientAPITest) 186QTEST_MAIN(ClientAPITest)