diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-24 10:41:11 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-24 10:41:11 +0200 |
commit | 12539f35e385c9250cd67e387c67dbaff4de34f3 (patch) | |
tree | 5223ebcdc7e94827862f2c9c76a3f6123c8abd1e /common/clientapi.h | |
parent | 2998d9d3d5dfc825904b53393e9ae12e7cd5b72b (diff) | |
download | sink-12539f35e385c9250cd67e387c67dbaff4de34f3.tar.gz sink-12539f35e385c9250cd67e387c67dbaff4de34f3.zip |
Keep thread alive until the end of the query, and cleanup the resultSet.
Diffstat (limited to 'common/clientapi.h')
-rw-r--r-- | common/clientapi.h | 68 |
1 files changed, 52 insertions, 16 deletions
diff --git a/common/clientapi.h b/common/clientapi.h index c6c43ee..4786398 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -107,7 +107,7 @@ namespace async { | |||
107 | { | 107 | { |
108 | if (!mResultEmitter) { | 108 | if (!mResultEmitter) { |
109 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again | 109 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
110 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create(); | 110 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ done(); delete emitter; }); |
111 | mResultEmitter = sharedPtr; | 111 | mResultEmitter = sharedPtr; |
112 | return sharedPtr; | 112 | return sharedPtr; |
113 | } | 113 | } |
@@ -124,9 +124,30 @@ namespace async { | |||
124 | mQueryRunner = runner; | 124 | mQueryRunner = runner; |
125 | } | 125 | } |
126 | 126 | ||
127 | void onDone(const std::function<void()> &callback) | ||
128 | { | ||
129 | mOnDoneCallback = callback; | ||
130 | } | ||
131 | |||
132 | bool isDone() const | ||
133 | { | ||
134 | //The existance of the emitter currently defines wether we're done or not. | ||
135 | return mResultEmitter.toStrongRef().isNull(); | ||
136 | } | ||
137 | |||
127 | private: | 138 | private: |
139 | void done() | ||
140 | { | ||
141 | qWarning() << "done"; | ||
142 | if (mOnDoneCallback) { | ||
143 | mOnDoneCallback(); | ||
144 | } | ||
145 | mOnDoneCallback = std::function<void()>(); | ||
146 | } | ||
147 | |||
128 | QWeakPointer<ResultEmitter<T> > mResultEmitter; | 148 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
129 | QSharedPointer<QObject> mQueryRunner; | 149 | QSharedPointer<QObject> mQueryRunner; |
150 | std::function<void()> mOnDoneCallback; | ||
130 | }; | 151 | }; |
131 | 152 | ||
132 | /* | 153 | /* |
@@ -482,31 +503,46 @@ public: | |||
482 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) | 503 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) |
483 | { | 504 | { |
484 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); | 505 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); |
506 | // FIXME This is ridiculous but otherwise I can't release the shared pointer before the thread quits | ||
507 | auto resultSetPtr = QSharedPointer<QSharedPointer<ResultProvider<typename DomainType::Ptr> > >::create(resultSet); | ||
485 | 508 | ||
486 | //Execute the search in a thread. | 509 | //Execute the search in a thread. |
487 | //We must guarantee that the emitter is returned before the first result is emitted. | 510 | //We must guarantee that the emitter is returned before the first result is emitted. |
488 | //The result provider must be threadsafe. | 511 | //The result provider must be threadsafe. |
489 | async::run([resultSet, query](){ | 512 | async::run([resultSetPtr, query](){ |
490 | // Query all resources and aggregate results | 513 | // Query all resources and aggregate results |
491 | const QList<QByteArray> resources = query.resources; | 514 | const QList<QByteArray> resources = query.resources; |
492 | Async::start<QList<QByteArray>>([resources](){return resources;}) | 515 | Async::start<QList<QByteArray>>([resources](){return resources;}) |
493 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, Async::Future<void> &future) { | 516 | .template each<void, QByteArray>([query, resultSetPtr](const QByteArray &resource, Async::Future<void> &future) { |
517 | //TODO pass resource identifier to factory | ||
494 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 518 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
495 | // TODO The following is a necessary hack to keep the facade alive. | 519 | if (auto resultSet = *resultSetPtr) { |
496 | // Otherwise this would reduce to: | 520 | facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec(); |
497 | // facade->load(query, addCallback).exec(); | 521 | } else { |
498 | // We somehow have to guarantee that the facade remains valid for the duration of the job | 522 | qWarning() << "result set is already gone"; |
499 | // TODO: Use one result set per facade, and merge the results separately | ||
500 | // resultSet->addSubset(facade->query(query)); | ||
501 | facade->load(query, resultSet).template then<void>([&future, facade]() { | ||
502 | future.setFinished(); | 523 | future.setFinished(); |
503 | }).exec(); | 524 | } |
504 | }).template then<void>([resultSet]() { | 525 | }).template then<void>([resultSetPtr]() { |
505 | qDebug() << "Query complete"; | 526 | qDebug() << "Query complete"; |
506 | resultSet->complete(); | 527 | if (auto resultSet = *resultSetPtr) { |
507 | }).exec().waitForFinished(); //We use the eventloop provided by waitForFinished to keep the thread alive until all is done | 528 | resultSet->complete(); |
508 | //FIXME for live query the thread dies after the initial query? | 529 | } else { |
509 | //TODO associate the thread with the query runner | 530 | qWarning() << "result set is already gone"; |
531 | } | ||
532 | }).exec(); | ||
533 | |||
534 | if (auto resultSet = *resultSetPtr) { | ||
535 | resultSetPtr->clear(); | ||
536 | if (!resultSet->isDone()) { | ||
537 | QEventLoop eventLoop; | ||
538 | resultSet->onDone([&eventLoop](){ | ||
539 | eventLoop.quit(); | ||
540 | }); | ||
541 | eventLoop.exec(); | ||
542 | } | ||
543 | } else { | ||
544 | qWarning() << "result set is already gone"; | ||
545 | } | ||
510 | }); | 546 | }); |
511 | return resultSet->emitter(); | 547 | return resultSet->emitter(); |
512 | } | 548 | } |