diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-13 20:15:14 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-15 09:30:32 +0200 |
commit | c55054e899660f2d667af2c2e573a1267d47358e (patch) | |
tree | 0f547effcad0c20521f0bc047a9eb1d4130b052b /common/clientapi.h | |
parent | 4652a39fc6869fc5af46367c35027b2b53478268 (diff) | |
download | sink-c55054e899660f2d667af2c2e573a1267d47358e.tar.gz sink-c55054e899660f2d667af2c2e573a1267d47358e.zip |
Use a queryrunner to execute queries.
The queryrunner is responsible for running queries and keeping them
up to date. This is required for self-updating queries.
To get this to work properly the ResultProvider/emitter had to be fixed.
The emitter now only lives as long as the client holds a reference to
it, allowing the provider to detect when it is no longer necessary to
keep the query alive (because noone is listening).
In the process various lifetime issues have been fixed, that we're
caused by lambdas capturing smartpointers, that then extended the
lifetime of the associated objects unpredictably.
Diffstat (limited to 'common/clientapi.h')
-rw-r--r-- | common/clientapi.h | 181 |
1 files changed, 117 insertions, 64 deletions
diff --git a/common/clientapi.h b/common/clientapi.h index 22448b3..c2b9493 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -49,17 +49,44 @@ namespace async { | |||
49 | */ | 49 | */ |
50 | template<class T> | 50 | template<class T> |
51 | class ResultProvider { | 51 | class ResultProvider { |
52 | public: | 52 | private: |
53 | //Called from worker thread | 53 | void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)()) |
54 | void add(const T &value) | ||
55 | { | 54 | { |
56 | //We use the eventloop to call the addHandler directly from the main eventloop. | 55 | //We use the eventloop to call the addHandler directly from the main eventloop. |
57 | //That way the result emitter implementation doesn't have to care about threadsafety at all. | 56 | //That way the result emitter implementation doesn't have to care about threadsafety at all. |
58 | //The alternative would be to make all handlers of the emitter threadsafe. | 57 | //The alternative would be to make all handlers of the emitter threadsafe. |
59 | auto emitter = mResultEmitter; | 58 | if (auto emitter = mResultEmitter.toStrongRef()) { |
60 | mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { | 59 | auto weakEmitter = mResultEmitter; |
61 | if (emitter) { | 60 | //We don't want to keep the emitter alive here, so we only capture a weak reference |
62 | emitter->addHandler(value); | 61 | emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() { |
62 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
63 | (strongRef.data()->*f)(); | ||
64 | } | ||
65 | }); | ||
66 | } | ||
67 | } | ||
68 | |||
69 | void callInMainThreadOnEmitter(const std::function<void()> &f) | ||
70 | { | ||
71 | //We use the eventloop to call the addHandler directly from the main eventloop. | ||
72 | //That way the result emitter implementation doesn't have to care about threadsafety at all. | ||
73 | //The alternative would be to make all handlers of the emitter threadsafe. | ||
74 | if (auto emitter = mResultEmitter.toStrongRef()) { | ||
75 | emitter->mThreadBoundary.callInMainThread([f]() { | ||
76 | f(); | ||
77 | }); | ||
78 | } | ||
79 | } | ||
80 | |||
81 | public: | ||
82 | //Called from worker thread | ||
83 | void add(const T &value) | ||
84 | { | ||
85 | //Because I don't know how to use bind | ||
86 | auto weakEmitter = mResultEmitter; | ||
87 | callInMainThreadOnEmitter([weakEmitter, value](){ | ||
88 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
89 | strongRef->addHandler(value); | ||
63 | } | 90 | } |
64 | }); | 91 | }); |
65 | } | 92 | } |
@@ -67,25 +94,39 @@ namespace async { | |||
67 | //Called from worker thread | 94 | //Called from worker thread |
68 | void complete() | 95 | void complete() |
69 | { | 96 | { |
70 | auto emitter = mResultEmitter; | 97 | callInMainThreadOnEmitter(&ResultEmitter<T>::complete); |
71 | mResultEmitter->mThreadBoundary.callInMainThread([emitter]() { | ||
72 | if (emitter) { | ||
73 | emitter->completeHandler(); | ||
74 | } | ||
75 | }); | ||
76 | } | 98 | } |
77 | 99 | ||
100 | void clear() | ||
101 | { | ||
102 | callInMainThreadOnEmitter(&ResultEmitter<T>::clear); | ||
103 | } | ||
104 | |||
105 | |||
78 | QSharedPointer<ResultEmitter<T> > emitter() | 106 | QSharedPointer<ResultEmitter<T> > emitter() |
79 | { | 107 | { |
80 | if (!mResultEmitter) { | 108 | if (!mResultEmitter) { |
81 | mResultEmitter = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>()); | 109 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
110 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create(); | ||
111 | mResultEmitter = sharedPtr; | ||
112 | return sharedPtr; | ||
82 | } | 113 | } |
83 | 114 | ||
84 | return mResultEmitter; | 115 | return mResultEmitter.toStrongRef(); |
116 | } | ||
117 | |||
118 | /** | ||
119 | * For lifetimemanagement only. | ||
120 | * We keep the runner alive as long as the result provider exists. | ||
121 | */ | ||
122 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
123 | { | ||
124 | mQueryRunner = runner; | ||
85 | } | 125 | } |
86 | 126 | ||
87 | private: | 127 | private: |
88 | QSharedPointer<ResultEmitter<T> > mResultEmitter; | 128 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
129 | QSharedPointer<QObject> mQueryRunner; | ||
89 | }; | 130 | }; |
90 | 131 | ||
91 | /* | 132 | /* |
@@ -99,7 +140,6 @@ namespace async { | |||
99 | * * build async interfaces with signals | 140 | * * build async interfaces with signals |
100 | * * build sync interfaces that block when accessing the value | 141 | * * build sync interfaces that block when accessing the value |
101 | * | 142 | * |
102 | * TODO: This should probably be merged with daniels futurebase used in Async | ||
103 | */ | 143 | */ |
104 | template<class DomainType> | 144 | template<class DomainType> |
105 | class ResultEmitter { | 145 | class ResultEmitter { |
@@ -113,51 +153,36 @@ namespace async { | |||
113 | { | 153 | { |
114 | completeHandler = handler; | 154 | completeHandler = handler; |
115 | } | 155 | } |
156 | void onClear(const std::function<void(void)> &handler) | ||
157 | { | ||
158 | clearHandler = handler; | ||
159 | } | ||
116 | 160 | ||
117 | private: | 161 | void add(const DomainType &value) |
118 | friend class ResultProvider<DomainType>; | 162 | { |
119 | std::function<void(const DomainType&)> addHandler; | 163 | addHandler(value); |
120 | // std::function<void(const T&)> removeHandler; | 164 | } |
121 | std::function<void(void)> completeHandler; | ||
122 | ThreadBoundary mThreadBoundary; | ||
123 | }; | ||
124 | |||
125 | 165 | ||
126 | /* | 166 | void complete() |
127 | * A result set specialization that provides a syncronous list | ||
128 | */ | ||
129 | template<class T> | ||
130 | class SyncListResult : public QList<T> { | ||
131 | public: | ||
132 | SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) | ||
133 | :QList<T>(), | ||
134 | mComplete(false), | ||
135 | mEmitter(emitter) | ||
136 | { | 167 | { |
137 | emitter->onAdded([this](const T &value) { | 168 | completeHandler(); |
138 | this->append(value); | ||
139 | }); | ||
140 | emitter->onComplete([this]() { | ||
141 | mComplete = true; | ||
142 | auto loop = mWaitLoop.toStrongRef(); | ||
143 | if (loop) { | ||
144 | loop->quit(); | ||
145 | } | ||
146 | }); | ||
147 | } | 169 | } |
148 | 170 | ||
149 | void exec() | 171 | void clear() |
150 | { | 172 | { |
151 | auto loop = QSharedPointer<QEventLoop>::create(); | 173 | clearHandler(); |
152 | mWaitLoop = loop; | ||
153 | loop->exec(QEventLoop::ExcludeUserInputEvents); | ||
154 | } | 174 | } |
155 | 175 | ||
156 | private: | 176 | private: |
157 | bool mComplete; | 177 | friend class ResultProvider<DomainType>; |
158 | QWeakPointer<QEventLoop> mWaitLoop; | 178 | |
159 | QSharedPointer<ResultEmitter<T> > mEmitter; | 179 | std::function<void(const DomainType&)> addHandler; |
180 | // std::function<void(const T&)> removeHandler; | ||
181 | std::function<void(void)> completeHandler; | ||
182 | std::function<void(void)> clearHandler; | ||
183 | ThreadBoundary mThreadBoundary; | ||
160 | }; | 184 | }; |
185 | |||
161 | } | 186 | } |
162 | 187 | ||
163 | namespace Akonadi2 { | 188 | namespace Akonadi2 { |
@@ -307,7 +332,7 @@ using namespace async; | |||
307 | class Query | 332 | class Query |
308 | { | 333 | { |
309 | public: | 334 | public: |
310 | Query() : syncOnDemand(true), processAll(false) {} | 335 | Query() : syncOnDemand(true), processAll(false), liveQuery(false) {} |
311 | //Could also be a propertyFilter | 336 | //Could also be a propertyFilter |
312 | QByteArrayList resources; | 337 | QByteArrayList resources; |
313 | //Could also be a propertyFilter | 338 | //Could also be a propertyFilter |
@@ -318,6 +343,8 @@ public: | |||
318 | QSet<QByteArray> requestedProperties; | 343 | QSet<QByteArray> requestedProperties; |
319 | bool syncOnDemand; | 344 | bool syncOnDemand; |
320 | bool processAll; | 345 | bool processAll; |
346 | //If live query is false, this query will not continuously be updated | ||
347 | bool liveQuery; | ||
321 | }; | 348 | }; |
322 | 349 | ||
323 | 350 | ||
@@ -337,7 +364,9 @@ public: | |||
337 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; | 364 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; |
338 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; | 365 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; |
339 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; | 366 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; |
340 | virtual Async::Job<void> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | 367 | //TODO remove from public API |
368 | virtual Async::Job<qint64> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | ||
369 | virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) { return Async::null<void>(); }; | ||
341 | }; | 370 | }; |
342 | 371 | ||
343 | 372 | ||
@@ -349,6 +378,8 @@ public: | |||
349 | 378 | ||
350 | class FacadeFactory { | 379 | class FacadeFactory { |
351 | public: | 380 | public: |
381 | typedef std::function<void*(bool &externallyManaged)> FactoryFunction; | ||
382 | |||
352 | //FIXME: proper singleton implementation | 383 | //FIXME: proper singleton implementation |
353 | static FacadeFactory &instance() | 384 | static FacadeFactory &instance() |
354 | { | 385 | { |
@@ -365,7 +396,7 @@ public: | |||
365 | void registerFacade(const QByteArray &resource) | 396 | void registerFacade(const QByteArray &resource) |
366 | { | 397 | { |
367 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 398 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
368 | mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); | 399 | mFacadeRegistry.insert(key(resource, typeName), [](bool &externallyManaged){ return new Facade; }); |
369 | } | 400 | } |
370 | 401 | ||
371 | /* | 402 | /* |
@@ -375,29 +406,51 @@ public: | |||
375 | * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. | 406 | * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. |
376 | * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) | 407 | * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) |
377 | * | 408 | * |
378 | * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. | 409 | * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. As a workaround the factory function can define wether it manages the lifetime of the facade itself. |
379 | */ | 410 | */ |
380 | template<class DomainType, class Facade> | 411 | template<class DomainType, class Facade> |
381 | void registerFacade(const QByteArray &resource, const std::function<void*(void)> &customFactoryFunction) | 412 | void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction) |
382 | { | 413 | { |
383 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 414 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
384 | mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); | 415 | mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); |
385 | } | 416 | } |
386 | 417 | ||
418 | /* | ||
419 | * Can be used to clear the factory. | ||
420 | * | ||
421 | * Primarily for testing. | ||
422 | */ | ||
423 | void resetFactory() | ||
424 | { | ||
425 | mFacadeRegistry.clear(); | ||
426 | } | ||
427 | |||
428 | static void doNothingDeleter(void *) | ||
429 | { | ||
430 | qWarning() << "Do nothing"; | ||
431 | } | ||
432 | |||
387 | template<class DomainType> | 433 | template<class DomainType> |
388 | QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) | 434 | QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) |
389 | { | 435 | { |
390 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 436 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
391 | auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); | 437 | auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); |
392 | if (factoryFunction) { | 438 | if (factoryFunction) { |
393 | return QSharedPointer<StoreFacade<DomainType> >(static_cast<StoreFacade<DomainType>* >(factoryFunction())); | 439 | bool externallyManaged = false; |
440 | auto ptr = static_cast<StoreFacade<DomainType>* >(factoryFunction(externallyManaged)); | ||
441 | if (externallyManaged) { | ||
442 | //Allows tests to manage the lifetime of injected facades themselves | ||
443 | return QSharedPointer<StoreFacade<DomainType> >(ptr, doNothingDeleter); | ||
444 | } else { | ||
445 | return QSharedPointer<StoreFacade<DomainType> >(ptr); | ||
446 | } | ||
394 | } | 447 | } |
395 | qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; | 448 | qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; |
396 | return QSharedPointer<StoreFacade<DomainType> >(); | 449 | return QSharedPointer<StoreFacade<DomainType> >(); |
397 | } | 450 | } |
398 | 451 | ||
399 | private: | 452 | private: |
400 | QHash<QByteArray, std::function<void*(void)> > mFacadeRegistry; | 453 | QHash<QByteArray, FactoryFunction> mFacadeRegistry; |
401 | }; | 454 | }; |
402 | 455 | ||
403 | /** | 456 | /** |
@@ -416,7 +469,7 @@ public: | |||
416 | template <class DomainType> | 469 | template <class DomainType> |
417 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) | 470 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) |
418 | { | 471 | { |
419 | QSharedPointer<ResultProvider<typename DomainType::Ptr> > resultSet(new ResultProvider<typename DomainType::Ptr>); | 472 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); |
420 | 473 | ||
421 | //Execute the search in a thread. | 474 | //Execute the search in a thread. |
422 | //We must guarantee that the emitter is returned before the first result is emitted. | 475 | //We must guarantee that the emitter is returned before the first result is emitted. |
@@ -428,15 +481,15 @@ public: | |||
428 | Async::Job<void> job = Async::null<void>(); | 481 | Async::Job<void> job = Async::null<void>(); |
429 | for(const QByteArray &resource : query.resources) { | 482 | for(const QByteArray &resource : query.resources) { |
430 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 483 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
431 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. | ||
432 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); | ||
433 | 484 | ||
434 | // TODO The following is a necessary hack to keep the facade alive. | 485 | // TODO The following is a necessary hack to keep the facade alive. |
435 | // Otherwise this would reduce to: | 486 | // Otherwise this would reduce to: |
436 | // job = job.then(facade->load(query, addCallback)); | 487 | // job = job.then(facade->load(query, addCallback)); |
437 | // We somehow have to guarantee that the facade remains valid for the duration of the job | 488 | // We somehow have to guarantee that the facade remains valid for the duration of the job |
438 | job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { | 489 | // TODO: Use one result set per facade, and merge the results separately |
439 | Async::Job<void> j = facade->load(query, addCallback); | 490 | // resultSet->addSubset(facade->query(query)); |
491 | job = job.then<void>([facade, query, resultSet](Async::Future<void> &future) { | ||
492 | Async::Job<void> j = facade->load(query, resultSet); | ||
440 | j.then<void>([&future, facade](Async::Future<void> &f) { | 493 | j.then<void>([&future, facade](Async::Future<void> &f) { |
441 | future.setFinished(); | 494 | future.setFinished(); |
442 | f.setFinished(); | 495 | f.setFinished(); |