diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-27 00:38:36 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-27 00:38:36 +0200 |
commit | a044a7f0ea054502fb8b6aedcfa213b192a7b05a (patch) | |
tree | b13f9f501352a1251a2ced52e8a09cffb6424d9b | |
parent | 12539f35e385c9250cd67e387c67dbaff4de34f3 (diff) | |
download | sink-a044a7f0ea054502fb8b6aedcfa213b192a7b05a.tar.gz sink-a044a7f0ea054502fb8b6aedcfa213b192a7b05a.zip |
Fixed lifetime management of resultSet.
The resultSet remains valid for the duration of the thread.
We keep the thread running until the ResultEmitter is deleted.
-rw-r--r-- | common/clientapi.h | 92 | ||||
-rw-r--r-- | common/facade.h | 1 | ||||
-rw-r--r-- | common/synclistresult.h | 8 | ||||
-rw-r--r-- | tests/clientapitest.cpp | 5 |
4 files changed, 64 insertions, 42 deletions
diff --git a/common/clientapi.h b/common/clientapi.h index 4786398..6d66e40 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -29,6 +29,7 @@ | |||
29 | #include <QEventLoop> | 29 | #include <QEventLoop> |
30 | #include <QtConcurrent/QtConcurrentRun> | 30 | #include <QtConcurrent/QtConcurrentRun> |
31 | #include <functional> | 31 | #include <functional> |
32 | #include <memory> | ||
32 | #include "threadboundary.h" | 33 | #include "threadboundary.h" |
33 | #include "async/src/async.h" | 34 | #include "async/src/async.h" |
34 | 35 | ||
@@ -91,6 +92,11 @@ namespace async { | |||
91 | }); | 92 | }); |
92 | } | 93 | } |
93 | 94 | ||
95 | void initialResultSetComplete() | ||
96 | { | ||
97 | callInMainThreadOnEmitter(&ResultEmitter<T>::initialResultSetComplete); | ||
98 | } | ||
99 | |||
94 | //Called from worker thread | 100 | //Called from worker thread |
95 | void complete() | 101 | void complete() |
96 | { | 102 | { |
@@ -124,6 +130,15 @@ namespace async { | |||
124 | mQueryRunner = runner; | 130 | mQueryRunner = runner; |
125 | } | 131 | } |
126 | 132 | ||
133 | /** | ||
134 | * For lifetimemanagement only. | ||
135 | * We keep the runner alive as long as the result provider exists. | ||
136 | */ | ||
137 | void setFacade(const std::shared_ptr<void> &facade) | ||
138 | { | ||
139 | mFacade = facade; | ||
140 | } | ||
141 | |||
127 | void onDone(const std::function<void()> &callback) | 142 | void onDone(const std::function<void()> &callback) |
128 | { | 143 | { |
129 | mOnDoneCallback = callback; | 144 | mOnDoneCallback = callback; |
@@ -141,12 +156,13 @@ namespace async { | |||
141 | qWarning() << "done"; | 156 | qWarning() << "done"; |
142 | if (mOnDoneCallback) { | 157 | if (mOnDoneCallback) { |
143 | mOnDoneCallback(); | 158 | mOnDoneCallback(); |
159 | mOnDoneCallback = std::function<void()>(); | ||
144 | } | 160 | } |
145 | mOnDoneCallback = std::function<void()>(); | ||
146 | } | 161 | } |
147 | 162 | ||
148 | QWeakPointer<ResultEmitter<T> > mResultEmitter; | 163 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
149 | QSharedPointer<QObject> mQueryRunner; | 164 | QSharedPointer<QObject> mQueryRunner; |
165 | std::shared_ptr<void> mFacade; | ||
150 | std::function<void()> mOnDoneCallback; | 166 | std::function<void()> mOnDoneCallback; |
151 | }; | 167 | }; |
152 | 168 | ||
@@ -170,6 +186,10 @@ namespace async { | |||
170 | addHandler = handler; | 186 | addHandler = handler; |
171 | } | 187 | } |
172 | // void onRemoved(const std::function<void(const T&)> &handler); | 188 | // void onRemoved(const std::function<void(const T&)> &handler); |
189 | void onInitialResultSetComplete(const std::function<void(void)> &handler) | ||
190 | { | ||
191 | initialResultSetCompleteHandler = handler; | ||
192 | } | ||
173 | void onComplete(const std::function<void(void)> &handler) | 193 | void onComplete(const std::function<void(void)> &handler) |
174 | { | 194 | { |
175 | completeHandler = handler; | 195 | completeHandler = handler; |
@@ -184,6 +204,11 @@ namespace async { | |||
184 | addHandler(value); | 204 | addHandler(value); |
185 | } | 205 | } |
186 | 206 | ||
207 | void initialResultSetComplete() | ||
208 | { | ||
209 | initialResultSetCompleteHandler(); | ||
210 | } | ||
211 | |||
187 | void complete() | 212 | void complete() |
188 | { | 213 | { |
189 | completeHandler(); | 214 | completeHandler(); |
@@ -199,6 +224,7 @@ namespace async { | |||
199 | 224 | ||
200 | std::function<void(const DomainType&)> addHandler; | 225 | std::function<void(const DomainType&)> addHandler; |
201 | // std::function<void(const T&)> removeHandler; | 226 | // std::function<void(const T&)> removeHandler; |
227 | std::function<void(void)> initialResultSetCompleteHandler; | ||
202 | std::function<void(void)> completeHandler; | 228 | std::function<void(void)> completeHandler; |
203 | std::function<void(void)> clearHandler; | 229 | std::function<void(void)> clearHandler; |
204 | ThreadBoundary mThreadBoundary; | 230 | ThreadBoundary mThreadBoundary; |
@@ -486,6 +512,14 @@ private: | |||
486 | QHash<QByteArray, FactoryFunction> mFacadeRegistry; | 512 | QHash<QByteArray, FactoryFunction> mFacadeRegistry; |
487 | }; | 513 | }; |
488 | 514 | ||
515 | template <class DomainType> | ||
516 | struct LifeExtender { | ||
517 | LifeExtender(const QSharedPointer<StoreFacade<DomainType> > &f) : facade(f) {} | ||
518 | private: | ||
519 | QSharedPointer<StoreFacade<DomainType> > facade; | ||
520 | }; | ||
521 | |||
522 | |||
489 | /** | 523 | /** |
490 | * Store interface used in the client API. | 524 | * Store interface used in the client API. |
491 | */ | 525 | */ |
@@ -503,45 +537,37 @@ public: | |||
503 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) | 537 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) |
504 | { | 538 | { |
505 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); | 539 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); |
506 | // FIXME This is ridiculous but otherwise I can't release the shared pointer before the thread quits | ||
507 | auto resultSetPtr = QSharedPointer<QSharedPointer<ResultProvider<typename DomainType::Ptr> > >::create(resultSet); | ||
508 | 540 | ||
509 | //Execute the search in a thread. | 541 | //Execute the search in a thread. |
510 | //We must guarantee that the emitter is returned before the first result is emitted. | 542 | //We must guarantee that the emitter is returned before the first result is emitted. |
511 | //The result provider must be threadsafe. | 543 | //The result provider must be threadsafe. |
512 | async::run([resultSetPtr, query](){ | 544 | async::run([query, resultSet](){ |
513 | // Query all resources and aggregate results | 545 | // Query all resources and aggregate results |
514 | const QList<QByteArray> resources = query.resources; | 546 | const QList<QByteArray> resources = query.resources; |
515 | Async::start<QList<QByteArray>>([resources](){return resources;}) | 547 | { |
516 | .template each<void, QByteArray>([query, resultSetPtr](const QByteArray &resource, Async::Future<void> &future) { | 548 | Async::start<QList<QByteArray>>([resources](){return resources;}) |
517 | //TODO pass resource identifier to factory | 549 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, Async::Future<void> &future) { |
518 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 550 | //TODO pass resource identifier to factory |
519 | if (auto resultSet = *resultSetPtr) { | 551 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
520 | facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec(); | 552 | facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec(); |
521 | } else { | 553 | //Keep the facade alive for the duration for the lifetime of the resultSet. |
522 | qWarning() << "result set is already gone"; | 554 | //TODO If the factory returned a std::shared_ptr we wouldn't require LifeExtender |
523 | future.setFinished(); | 555 | resultSet->setFacade(std::make_shared<LifeExtender<DomainType> >(facade)); |
524 | } | 556 | }).template then<void>([query, resultSet]() { |
525 | }).template then<void>([resultSetPtr]() { | 557 | resultSet->initialResultSetComplete(); |
526 | qDebug() << "Query complete"; | 558 | if (!query.liveQuery) { |
527 | if (auto resultSet = *resultSetPtr) { | 559 | resultSet->complete(); |
528 | resultSet->complete(); | 560 | } |
529 | } else { | 561 | }).exec(); |
530 | qWarning() << "result set is already gone"; | 562 | } |
531 | } | 563 | |
532 | }).exec(); | 564 | //Keep the thread alive until the result is ready |
533 | 565 | if (!resultSet->isDone()) { | |
534 | if (auto resultSet = *resultSetPtr) { | 566 | QEventLoop eventLoop; |
535 | resultSetPtr->clear(); | 567 | resultSet->onDone([&eventLoop](){ |
536 | if (!resultSet->isDone()) { | 568 | eventLoop.quit(); |
537 | QEventLoop eventLoop; | 569 | }); |
538 | resultSet->onDone([&eventLoop](){ | 570 | eventLoop.exec(); |
539 | eventLoop.quit(); | ||
540 | }); | ||
541 | eventLoop.exec(); | ||
542 | } | ||
543 | } else { | ||
544 | qWarning() << "result set is already gone"; | ||
545 | } | 571 | } |
546 | }); | 572 | }); |
547 | return resultSet->emitter(); | 573 | return resultSet->emitter(); |
diff --git a/common/facade.h b/common/facade.h index a5858d5..b6ae4a6 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -163,7 +163,6 @@ public: | |||
163 | load(query, addCallback).template then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { | 163 | load(query, addCallback).template then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { |
164 | //TODO set revision in result provider? | 164 | //TODO set revision in result provider? |
165 | //TODO update all existing results with new revision | 165 | //TODO update all existing results with new revision |
166 | resultProvider->complete(); | ||
167 | future.setValue(queriedRevision); | 166 | future.setValue(queriedRevision); |
168 | future.setFinished(); | 167 | future.setFinished(); |
169 | }).exec(); | 168 | }).exec(); |
diff --git a/common/synclistresult.h b/common/synclistresult.h index 5fa0efd..0a86f8c 100644 --- a/common/synclistresult.h +++ b/common/synclistresult.h | |||
@@ -19,20 +19,21 @@ class SyncListResult : public QList<T> { | |||
19 | public: | 19 | public: |
20 | SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) | 20 | SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) |
21 | :QList<T>(), | 21 | :QList<T>(), |
22 | mComplete(false), | ||
23 | mEmitter(emitter) | 22 | mEmitter(emitter) |
24 | { | 23 | { |
25 | emitter->onAdded([this](const T &value) { | 24 | emitter->onAdded([this](const T &value) { |
26 | this->append(value); | 25 | this->append(value); |
27 | }); | 26 | }); |
28 | emitter->onComplete([this]() { | 27 | emitter->onInitialResultSetComplete([this]() { |
29 | mComplete = true; | ||
30 | if (eventLoopAborter) { | 28 | if (eventLoopAborter) { |
31 | eventLoopAborter(); | 29 | eventLoopAborter(); |
32 | //Be safe in case of a second invocation of the complete handler | 30 | //Be safe in case of a second invocation of the complete handler |
33 | eventLoopAborter = std::function<void()>(); | 31 | eventLoopAborter = std::function<void()>(); |
34 | } | 32 | } |
35 | }); | 33 | }); |
34 | emitter->onComplete([this]() { | ||
35 | mEmitter.clear(); | ||
36 | }); | ||
36 | emitter->onClear([this]() { | 37 | emitter->onClear([this]() { |
37 | this->clear(); | 38 | this->clear(); |
38 | }); | 39 | }); |
@@ -46,7 +47,6 @@ public: | |||
46 | } | 47 | } |
47 | 48 | ||
48 | private: | 49 | private: |
49 | bool mComplete; | ||
50 | QSharedPointer<ResultEmitter<T> > mEmitter; | 50 | QSharedPointer<ResultEmitter<T> > mEmitter; |
51 | std::function<void()> eventLoopAborter; | 51 | std::function<void()> eventLoopAborter; |
52 | }; | 52 | }; |
diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp index 057495e..1a5d873 100644 --- a/tests/clientapitest.cpp +++ b/tests/clientapitest.cpp | |||
@@ -61,10 +61,7 @@ public: | |||
61 | resultProvider->clear(); | 61 | resultProvider->clear(); |
62 | //rerun query | 62 | //rerun query |
63 | std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1); | 63 | std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1); |
64 | load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { | 64 | load(query, addCallback).then<void, qint64>([resultProvider, &future, query](qint64 queriedRevision) { |
65 | //TODO set revision in result provider? | ||
66 | //TODO update all existing results with new revision | ||
67 | resultProvider->complete(); | ||
68 | future.setValue(queriedRevision); | 65 | future.setValue(queriedRevision); |
69 | future.setFinished(); | 66 | future.setFinished(); |
70 | }).exec(); | 67 | }).exec(); |