summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-27 00:38:36 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-27 00:38:36 +0200
commita044a7f0ea054502fb8b6aedcfa213b192a7b05a (patch)
treeb13f9f501352a1251a2ced52e8a09cffb6424d9b
parent12539f35e385c9250cd67e387c67dbaff4de34f3 (diff)
downloadsink-a044a7f0ea054502fb8b6aedcfa213b192a7b05a.tar.gz
sink-a044a7f0ea054502fb8b6aedcfa213b192a7b05a.zip
Fixed lifetime management of resultSet.
The resultSet remains valid for the duration of the thread. We keep the thread running until the ResultEmitter is deleted.
-rw-r--r--common/clientapi.h92
-rw-r--r--common/facade.h1
-rw-r--r--common/synclistresult.h8
-rw-r--r--tests/clientapitest.cpp5
4 files changed, 64 insertions, 42 deletions
diff --git a/common/clientapi.h b/common/clientapi.h
index 4786398..6d66e40 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -29,6 +29,7 @@
29#include <QEventLoop> 29#include <QEventLoop>
30#include <QtConcurrent/QtConcurrentRun> 30#include <QtConcurrent/QtConcurrentRun>
31#include <functional> 31#include <functional>
32#include <memory>
32#include "threadboundary.h" 33#include "threadboundary.h"
33#include "async/src/async.h" 34#include "async/src/async.h"
34 35
@@ -91,6 +92,11 @@ namespace async {
91 }); 92 });
92 } 93 }
93 94
95 void initialResultSetComplete()
96 {
97 callInMainThreadOnEmitter(&ResultEmitter<T>::initialResultSetComplete);
98 }
99
94 //Called from worker thread 100 //Called from worker thread
95 void complete() 101 void complete()
96 { 102 {
@@ -124,6 +130,15 @@ namespace async {
124 mQueryRunner = runner; 130 mQueryRunner = runner;
125 } 131 }
126 132
133 /**
134 * For lifetimemanagement only.
135 * We keep the runner alive as long as the result provider exists.
136 */
137 void setFacade(const std::shared_ptr<void> &facade)
138 {
139 mFacade = facade;
140 }
141
127 void onDone(const std::function<void()> &callback) 142 void onDone(const std::function<void()> &callback)
128 { 143 {
129 mOnDoneCallback = callback; 144 mOnDoneCallback = callback;
@@ -141,12 +156,13 @@ namespace async {
141 qWarning() << "done"; 156 qWarning() << "done";
142 if (mOnDoneCallback) { 157 if (mOnDoneCallback) {
143 mOnDoneCallback(); 158 mOnDoneCallback();
159 mOnDoneCallback = std::function<void()>();
144 } 160 }
145 mOnDoneCallback = std::function<void()>();
146 } 161 }
147 162
148 QWeakPointer<ResultEmitter<T> > mResultEmitter; 163 QWeakPointer<ResultEmitter<T> > mResultEmitter;
149 QSharedPointer<QObject> mQueryRunner; 164 QSharedPointer<QObject> mQueryRunner;
165 std::shared_ptr<void> mFacade;
150 std::function<void()> mOnDoneCallback; 166 std::function<void()> mOnDoneCallback;
151 }; 167 };
152 168
@@ -170,6 +186,10 @@ namespace async {
170 addHandler = handler; 186 addHandler = handler;
171 } 187 }
172 // void onRemoved(const std::function<void(const T&)> &handler); 188 // void onRemoved(const std::function<void(const T&)> &handler);
189 void onInitialResultSetComplete(const std::function<void(void)> &handler)
190 {
191 initialResultSetCompleteHandler = handler;
192 }
173 void onComplete(const std::function<void(void)> &handler) 193 void onComplete(const std::function<void(void)> &handler)
174 { 194 {
175 completeHandler = handler; 195 completeHandler = handler;
@@ -184,6 +204,11 @@ namespace async {
184 addHandler(value); 204 addHandler(value);
185 } 205 }
186 206
207 void initialResultSetComplete()
208 {
209 initialResultSetCompleteHandler();
210 }
211
187 void complete() 212 void complete()
188 { 213 {
189 completeHandler(); 214 completeHandler();
@@ -199,6 +224,7 @@ namespace async {
199 224
200 std::function<void(const DomainType&)> addHandler; 225 std::function<void(const DomainType&)> addHandler;
201 // std::function<void(const T&)> removeHandler; 226 // std::function<void(const T&)> removeHandler;
227 std::function<void(void)> initialResultSetCompleteHandler;
202 std::function<void(void)> completeHandler; 228 std::function<void(void)> completeHandler;
203 std::function<void(void)> clearHandler; 229 std::function<void(void)> clearHandler;
204 ThreadBoundary mThreadBoundary; 230 ThreadBoundary mThreadBoundary;
@@ -486,6 +512,14 @@ private:
486 QHash<QByteArray, FactoryFunction> mFacadeRegistry; 512 QHash<QByteArray, FactoryFunction> mFacadeRegistry;
487}; 513};
488 514
515template <class DomainType>
516struct LifeExtender {
517 LifeExtender(const QSharedPointer<StoreFacade<DomainType> > &f) : facade(f) {}
518private:
519 QSharedPointer<StoreFacade<DomainType> > facade;
520};
521
522
489/** 523/**
490 * Store interface used in the client API. 524 * Store interface used in the client API.
491 */ 525 */
@@ -503,45 +537,37 @@ public:
503 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) 537 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query)
504 { 538 {
505 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); 539 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create();
506 // FIXME This is ridiculous but otherwise I can't release the shared pointer before the thread quits
507 auto resultSetPtr = QSharedPointer<QSharedPointer<ResultProvider<typename DomainType::Ptr> > >::create(resultSet);
508 540
509 //Execute the search in a thread. 541 //Execute the search in a thread.
510 //We must guarantee that the emitter is returned before the first result is emitted. 542 //We must guarantee that the emitter is returned before the first result is emitted.
511 //The result provider must be threadsafe. 543 //The result provider must be threadsafe.
512 async::run([resultSetPtr, query](){ 544 async::run([query, resultSet](){
513 // Query all resources and aggregate results 545 // Query all resources and aggregate results
514 const QList<QByteArray> resources = query.resources; 546 const QList<QByteArray> resources = query.resources;
515 Async::start<QList<QByteArray>>([resources](){return resources;}) 547 {
516 .template each<void, QByteArray>([query, resultSetPtr](const QByteArray &resource, Async::Future<void> &future) { 548 Async::start<QList<QByteArray>>([resources](){return resources;})
517 //TODO pass resource identifier to factory 549 .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, Async::Future<void> &future) {
518 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 550 //TODO pass resource identifier to factory
519 if (auto resultSet = *resultSetPtr) { 551 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
520 facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec(); 552 facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec();
521 } else { 553 //Keep the facade alive for the duration for the lifetime of the resultSet.
522 qWarning() << "result set is already gone"; 554 //TODO If the factory returned a std::shared_ptr we wouldn't require LifeExtender
523 future.setFinished(); 555 resultSet->setFacade(std::make_shared<LifeExtender<DomainType> >(facade));
524 } 556 }).template then<void>([query, resultSet]() {
525 }).template then<void>([resultSetPtr]() { 557 resultSet->initialResultSetComplete();
526 qDebug() << "Query complete"; 558 if (!query.liveQuery) {
527 if (auto resultSet = *resultSetPtr) { 559 resultSet->complete();
528 resultSet->complete(); 560 }
529 } else { 561 }).exec();
530 qWarning() << "result set is already gone"; 562 }
531 } 563
532 }).exec(); 564 //Keep the thread alive until the result is ready
533 565 if (!resultSet->isDone()) {
534 if (auto resultSet = *resultSetPtr) { 566 QEventLoop eventLoop;
535 resultSetPtr->clear(); 567 resultSet->onDone([&eventLoop](){
536 if (!resultSet->isDone()) { 568 eventLoop.quit();
537 QEventLoop eventLoop; 569 });
538 resultSet->onDone([&eventLoop](){ 570 eventLoop.exec();
539 eventLoop.quit();
540 });
541 eventLoop.exec();
542 }
543 } else {
544 qWarning() << "result set is already gone";
545 } 571 }
546 }); 572 });
547 return resultSet->emitter(); 573 return resultSet->emitter();
diff --git a/common/facade.h b/common/facade.h
index a5858d5..b6ae4a6 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -163,7 +163,6 @@ public:
163 load(query, addCallback).template then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { 163 load(query, addCallback).template then<void, qint64>([resultProvider, &future](qint64 queriedRevision) {
164 //TODO set revision in result provider? 164 //TODO set revision in result provider?
165 //TODO update all existing results with new revision 165 //TODO update all existing results with new revision
166 resultProvider->complete();
167 future.setValue(queriedRevision); 166 future.setValue(queriedRevision);
168 future.setFinished(); 167 future.setFinished();
169 }).exec(); 168 }).exec();
diff --git a/common/synclistresult.h b/common/synclistresult.h
index 5fa0efd..0a86f8c 100644
--- a/common/synclistresult.h
+++ b/common/synclistresult.h
@@ -19,20 +19,21 @@ class SyncListResult : public QList<T> {
19public: 19public:
20 SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) 20 SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter)
21 :QList<T>(), 21 :QList<T>(),
22 mComplete(false),
23 mEmitter(emitter) 22 mEmitter(emitter)
24 { 23 {
25 emitter->onAdded([this](const T &value) { 24 emitter->onAdded([this](const T &value) {
26 this->append(value); 25 this->append(value);
27 }); 26 });
28 emitter->onComplete([this]() { 27 emitter->onInitialResultSetComplete([this]() {
29 mComplete = true;
30 if (eventLoopAborter) { 28 if (eventLoopAborter) {
31 eventLoopAborter(); 29 eventLoopAborter();
32 //Be safe in case of a second invocation of the complete handler 30 //Be safe in case of a second invocation of the complete handler
33 eventLoopAborter = std::function<void()>(); 31 eventLoopAborter = std::function<void()>();
34 } 32 }
35 }); 33 });
34 emitter->onComplete([this]() {
35 mEmitter.clear();
36 });
36 emitter->onClear([this]() { 37 emitter->onClear([this]() {
37 this->clear(); 38 this->clear();
38 }); 39 });
@@ -46,7 +47,6 @@ public:
46 } 47 }
47 48
48private: 49private:
49 bool mComplete;
50 QSharedPointer<ResultEmitter<T> > mEmitter; 50 QSharedPointer<ResultEmitter<T> > mEmitter;
51 std::function<void()> eventLoopAborter; 51 std::function<void()> eventLoopAborter;
52}; 52};
diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp
index 057495e..1a5d873 100644
--- a/tests/clientapitest.cpp
+++ b/tests/clientapitest.cpp
@@ -61,10 +61,7 @@ public:
61 resultProvider->clear(); 61 resultProvider->clear();
62 //rerun query 62 //rerun query
63 std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1); 63 std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1);
64 load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { 64 load(query, addCallback).then<void, qint64>([resultProvider, &future, query](qint64 queriedRevision) {
65 //TODO set revision in result provider?
66 //TODO update all existing results with new revision
67 resultProvider->complete();
68 future.setValue(queriedRevision); 65 future.setValue(queriedRevision);
69 future.setFinished(); 66 future.setFinished();
70 }).exec(); 67 }).exec();