summaryrefslogtreecommitdiffstats
path: root/common/clientapi.h
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-27 00:38:36 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-27 00:38:36 +0200
commita044a7f0ea054502fb8b6aedcfa213b192a7b05a (patch)
treeb13f9f501352a1251a2ced52e8a09cffb6424d9b /common/clientapi.h
parent12539f35e385c9250cd67e387c67dbaff4de34f3 (diff)
downloadsink-a044a7f0ea054502fb8b6aedcfa213b192a7b05a.tar.gz
sink-a044a7f0ea054502fb8b6aedcfa213b192a7b05a.zip
Fixed lifetime management of resultSet.
The resultSet remains valid for the duration of the thread. We keep the thread running until the ResultEmitter is deleted.
Diffstat (limited to 'common/clientapi.h')
-rw-r--r--common/clientapi.h92
1 files changed, 59 insertions, 33 deletions
diff --git a/common/clientapi.h b/common/clientapi.h
index 4786398..6d66e40 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -29,6 +29,7 @@
29#include <QEventLoop> 29#include <QEventLoop>
30#include <QtConcurrent/QtConcurrentRun> 30#include <QtConcurrent/QtConcurrentRun>
31#include <functional> 31#include <functional>
32#include <memory>
32#include "threadboundary.h" 33#include "threadboundary.h"
33#include "async/src/async.h" 34#include "async/src/async.h"
34 35
@@ -91,6 +92,11 @@ namespace async {
91 }); 92 });
92 } 93 }
93 94
95 void initialResultSetComplete()
96 {
97 callInMainThreadOnEmitter(&ResultEmitter<T>::initialResultSetComplete);
98 }
99
94 //Called from worker thread 100 //Called from worker thread
95 void complete() 101 void complete()
96 { 102 {
@@ -124,6 +130,15 @@ namespace async {
124 mQueryRunner = runner; 130 mQueryRunner = runner;
125 } 131 }
126 132
133 /**
134 * For lifetimemanagement only.
135 * We keep the runner alive as long as the result provider exists.
136 */
137 void setFacade(const std::shared_ptr<void> &facade)
138 {
139 mFacade = facade;
140 }
141
127 void onDone(const std::function<void()> &callback) 142 void onDone(const std::function<void()> &callback)
128 { 143 {
129 mOnDoneCallback = callback; 144 mOnDoneCallback = callback;
@@ -141,12 +156,13 @@ namespace async {
141 qWarning() << "done"; 156 qWarning() << "done";
142 if (mOnDoneCallback) { 157 if (mOnDoneCallback) {
143 mOnDoneCallback(); 158 mOnDoneCallback();
159 mOnDoneCallback = std::function<void()>();
144 } 160 }
145 mOnDoneCallback = std::function<void()>();
146 } 161 }
147 162
148 QWeakPointer<ResultEmitter<T> > mResultEmitter; 163 QWeakPointer<ResultEmitter<T> > mResultEmitter;
149 QSharedPointer<QObject> mQueryRunner; 164 QSharedPointer<QObject> mQueryRunner;
165 std::shared_ptr<void> mFacade;
150 std::function<void()> mOnDoneCallback; 166 std::function<void()> mOnDoneCallback;
151 }; 167 };
152 168
@@ -170,6 +186,10 @@ namespace async {
170 addHandler = handler; 186 addHandler = handler;
171 } 187 }
172 // void onRemoved(const std::function<void(const T&)> &handler); 188 // void onRemoved(const std::function<void(const T&)> &handler);
189 void onInitialResultSetComplete(const std::function<void(void)> &handler)
190 {
191 initialResultSetCompleteHandler = handler;
192 }
173 void onComplete(const std::function<void(void)> &handler) 193 void onComplete(const std::function<void(void)> &handler)
174 { 194 {
175 completeHandler = handler; 195 completeHandler = handler;
@@ -184,6 +204,11 @@ namespace async {
184 addHandler(value); 204 addHandler(value);
185 } 205 }
186 206
207 void initialResultSetComplete()
208 {
209 initialResultSetCompleteHandler();
210 }
211
187 void complete() 212 void complete()
188 { 213 {
189 completeHandler(); 214 completeHandler();
@@ -199,6 +224,7 @@ namespace async {
199 224
200 std::function<void(const DomainType&)> addHandler; 225 std::function<void(const DomainType&)> addHandler;
201 // std::function<void(const T&)> removeHandler; 226 // std::function<void(const T&)> removeHandler;
227 std::function<void(void)> initialResultSetCompleteHandler;
202 std::function<void(void)> completeHandler; 228 std::function<void(void)> completeHandler;
203 std::function<void(void)> clearHandler; 229 std::function<void(void)> clearHandler;
204 ThreadBoundary mThreadBoundary; 230 ThreadBoundary mThreadBoundary;
@@ -486,6 +512,14 @@ private:
486 QHash<QByteArray, FactoryFunction> mFacadeRegistry; 512 QHash<QByteArray, FactoryFunction> mFacadeRegistry;
487}; 513};
488 514
515template <class DomainType>
516struct LifeExtender {
517 LifeExtender(const QSharedPointer<StoreFacade<DomainType> > &f) : facade(f) {}
518private:
519 QSharedPointer<StoreFacade<DomainType> > facade;
520};
521
522
489/** 523/**
490 * Store interface used in the client API. 524 * Store interface used in the client API.
491 */ 525 */
@@ -503,45 +537,37 @@ public:
503 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) 537 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query)
504 { 538 {
505 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); 539 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create();
506 // FIXME This is ridiculous but otherwise I can't release the shared pointer before the thread quits
507 auto resultSetPtr = QSharedPointer<QSharedPointer<ResultProvider<typename DomainType::Ptr> > >::create(resultSet);
508 540
509 //Execute the search in a thread. 541 //Execute the search in a thread.
510 //We must guarantee that the emitter is returned before the first result is emitted. 542 //We must guarantee that the emitter is returned before the first result is emitted.
511 //The result provider must be threadsafe. 543 //The result provider must be threadsafe.
512 async::run([resultSetPtr, query](){ 544 async::run([query, resultSet](){
513 // Query all resources and aggregate results 545 // Query all resources and aggregate results
514 const QList<QByteArray> resources = query.resources; 546 const QList<QByteArray> resources = query.resources;
515 Async::start<QList<QByteArray>>([resources](){return resources;}) 547 {
516 .template each<void, QByteArray>([query, resultSetPtr](const QByteArray &resource, Async::Future<void> &future) { 548 Async::start<QList<QByteArray>>([resources](){return resources;})
517 //TODO pass resource identifier to factory 549 .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, Async::Future<void> &future) {
518 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 550 //TODO pass resource identifier to factory
519 if (auto resultSet = *resultSetPtr) { 551 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
520 facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec(); 552 facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec();
521 } else { 553 //Keep the facade alive for the duration for the lifetime of the resultSet.
522 qWarning() << "result set is already gone"; 554 //TODO If the factory returned a std::shared_ptr we wouldn't require LifeExtender
523 future.setFinished(); 555 resultSet->setFacade(std::make_shared<LifeExtender<DomainType> >(facade));
524 } 556 }).template then<void>([query, resultSet]() {
525 }).template then<void>([resultSetPtr]() { 557 resultSet->initialResultSetComplete();
526 qDebug() << "Query complete"; 558 if (!query.liveQuery) {
527 if (auto resultSet = *resultSetPtr) { 559 resultSet->complete();
528 resultSet->complete(); 560 }
529 } else { 561 }).exec();
530 qWarning() << "result set is already gone"; 562 }
531 } 563
532 }).exec(); 564 //Keep the thread alive until the result is ready
533 565 if (!resultSet->isDone()) {
534 if (auto resultSet = *resultSetPtr) { 566 QEventLoop eventLoop;
535 resultSetPtr->clear(); 567 resultSet->onDone([&eventLoop](){
536 if (!resultSet->isDone()) { 568 eventLoop.quit();
537 QEventLoop eventLoop; 569 });
538 resultSet->onDone([&eventLoop](){ 570 eventLoop.exec();
539 eventLoop.quit();
540 });
541 eventLoop.exec();
542 }
543 } else {
544 qWarning() << "result set is already gone";
545 } 571 }
546 }); 572 });
547 return resultSet->emitter(); 573 return resultSet->emitter();