summaryrefslogtreecommitdiffstats
path: root/common/clientapi.h
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-13 20:15:14 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-15 09:30:32 +0200
commitc55054e899660f2d667af2c2e573a1267d47358e (patch)
tree0f547effcad0c20521f0bc047a9eb1d4130b052b /common/clientapi.h
parent4652a39fc6869fc5af46367c35027b2b53478268 (diff)
downloadsink-c55054e899660f2d667af2c2e573a1267d47358e.tar.gz
sink-c55054e899660f2d667af2c2e573a1267d47358e.zip
Use a queryrunner to execute queries.
The queryrunner is responsible for running queries and keeping them up to date. This is required for self-updating queries. To get this to work properly the ResultProvider/emitter had to be fixed. The emitter now only lives as long as the client holds a reference to it, allowing the provider to detect when it is no longer necessary to keep the query alive (because noone is listening). In the process various lifetime issues have been fixed, that we're caused by lambdas capturing smartpointers, that then extended the lifetime of the associated objects unpredictably.
Diffstat (limited to 'common/clientapi.h')
-rw-r--r--common/clientapi.h181
1 files changed, 117 insertions, 64 deletions
diff --git a/common/clientapi.h b/common/clientapi.h
index 22448b3..c2b9493 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -49,17 +49,44 @@ namespace async {
49 */ 49 */
50 template<class T> 50 template<class T>
51 class ResultProvider { 51 class ResultProvider {
52 public: 52 private:
53 //Called from worker thread 53 void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)())
54 void add(const T &value)
55 { 54 {
56 //We use the eventloop to call the addHandler directly from the main eventloop. 55 //We use the eventloop to call the addHandler directly from the main eventloop.
57 //That way the result emitter implementation doesn't have to care about threadsafety at all. 56 //That way the result emitter implementation doesn't have to care about threadsafety at all.
58 //The alternative would be to make all handlers of the emitter threadsafe. 57 //The alternative would be to make all handlers of the emitter threadsafe.
59 auto emitter = mResultEmitter; 58 if (auto emitter = mResultEmitter.toStrongRef()) {
60 mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { 59 auto weakEmitter = mResultEmitter;
61 if (emitter) { 60 //We don't want to keep the emitter alive here, so we only capture a weak reference
62 emitter->addHandler(value); 61 emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() {
62 if (auto strongRef = weakEmitter.toStrongRef()) {
63 (strongRef.data()->*f)();
64 }
65 });
66 }
67 }
68
69 void callInMainThreadOnEmitter(const std::function<void()> &f)
70 {
71 //We use the eventloop to call the addHandler directly from the main eventloop.
72 //That way the result emitter implementation doesn't have to care about threadsafety at all.
73 //The alternative would be to make all handlers of the emitter threadsafe.
74 if (auto emitter = mResultEmitter.toStrongRef()) {
75 emitter->mThreadBoundary.callInMainThread([f]() {
76 f();
77 });
78 }
79 }
80
81 public:
82 //Called from worker thread
83 void add(const T &value)
84 {
85 //Because I don't know how to use bind
86 auto weakEmitter = mResultEmitter;
87 callInMainThreadOnEmitter([weakEmitter, value](){
88 if (auto strongRef = weakEmitter.toStrongRef()) {
89 strongRef->addHandler(value);
63 } 90 }
64 }); 91 });
65 } 92 }
@@ -67,25 +94,39 @@ namespace async {
67 //Called from worker thread 94 //Called from worker thread
68 void complete() 95 void complete()
69 { 96 {
70 auto emitter = mResultEmitter; 97 callInMainThreadOnEmitter(&ResultEmitter<T>::complete);
71 mResultEmitter->mThreadBoundary.callInMainThread([emitter]() {
72 if (emitter) {
73 emitter->completeHandler();
74 }
75 });
76 } 98 }
77 99
100 void clear()
101 {
102 callInMainThreadOnEmitter(&ResultEmitter<T>::clear);
103 }
104
105
78 QSharedPointer<ResultEmitter<T> > emitter() 106 QSharedPointer<ResultEmitter<T> > emitter()
79 { 107 {
80 if (!mResultEmitter) { 108 if (!mResultEmitter) {
81 mResultEmitter = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>()); 109 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
110 auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create();
111 mResultEmitter = sharedPtr;
112 return sharedPtr;
82 } 113 }
83 114
84 return mResultEmitter; 115 return mResultEmitter.toStrongRef();
116 }
117
118 /**
119 * For lifetimemanagement only.
120 * We keep the runner alive as long as the result provider exists.
121 */
122 void setQueryRunner(const QSharedPointer<QObject> &runner)
123 {
124 mQueryRunner = runner;
85 } 125 }
86 126
87 private: 127 private:
88 QSharedPointer<ResultEmitter<T> > mResultEmitter; 128 QWeakPointer<ResultEmitter<T> > mResultEmitter;
129 QSharedPointer<QObject> mQueryRunner;
89 }; 130 };
90 131
91 /* 132 /*
@@ -99,7 +140,6 @@ namespace async {
99 * * build async interfaces with signals 140 * * build async interfaces with signals
100 * * build sync interfaces that block when accessing the value 141 * * build sync interfaces that block when accessing the value
101 * 142 *
102 * TODO: This should probably be merged with daniels futurebase used in Async
103 */ 143 */
104 template<class DomainType> 144 template<class DomainType>
105 class ResultEmitter { 145 class ResultEmitter {
@@ -113,51 +153,36 @@ namespace async {
113 { 153 {
114 completeHandler = handler; 154 completeHandler = handler;
115 } 155 }
156 void onClear(const std::function<void(void)> &handler)
157 {
158 clearHandler = handler;
159 }
116 160
117 private: 161 void add(const DomainType &value)
118 friend class ResultProvider<DomainType>; 162 {
119 std::function<void(const DomainType&)> addHandler; 163 addHandler(value);
120 // std::function<void(const T&)> removeHandler; 164 }
121 std::function<void(void)> completeHandler;
122 ThreadBoundary mThreadBoundary;
123 };
124
125 165
126 /* 166 void complete()
127 * A result set specialization that provides a syncronous list
128 */
129 template<class T>
130 class SyncListResult : public QList<T> {
131 public:
132 SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter)
133 :QList<T>(),
134 mComplete(false),
135 mEmitter(emitter)
136 { 167 {
137 emitter->onAdded([this](const T &value) { 168 completeHandler();
138 this->append(value);
139 });
140 emitter->onComplete([this]() {
141 mComplete = true;
142 auto loop = mWaitLoop.toStrongRef();
143 if (loop) {
144 loop->quit();
145 }
146 });
147 } 169 }
148 170
149 void exec() 171 void clear()
150 { 172 {
151 auto loop = QSharedPointer<QEventLoop>::create(); 173 clearHandler();
152 mWaitLoop = loop;
153 loop->exec(QEventLoop::ExcludeUserInputEvents);
154 } 174 }
155 175
156 private: 176 private:
157 bool mComplete; 177 friend class ResultProvider<DomainType>;
158 QWeakPointer<QEventLoop> mWaitLoop; 178
159 QSharedPointer<ResultEmitter<T> > mEmitter; 179 std::function<void(const DomainType&)> addHandler;
180 // std::function<void(const T&)> removeHandler;
181 std::function<void(void)> completeHandler;
182 std::function<void(void)> clearHandler;
183 ThreadBoundary mThreadBoundary;
160 }; 184 };
185
161} 186}
162 187
163namespace Akonadi2 { 188namespace Akonadi2 {
@@ -307,7 +332,7 @@ using namespace async;
307class Query 332class Query
308{ 333{
309public: 334public:
310 Query() : syncOnDemand(true), processAll(false) {} 335 Query() : syncOnDemand(true), processAll(false), liveQuery(false) {}
311 //Could also be a propertyFilter 336 //Could also be a propertyFilter
312 QByteArrayList resources; 337 QByteArrayList resources;
313 //Could also be a propertyFilter 338 //Could also be a propertyFilter
@@ -318,6 +343,8 @@ public:
318 QSet<QByteArray> requestedProperties; 343 QSet<QByteArray> requestedProperties;
319 bool syncOnDemand; 344 bool syncOnDemand;
320 bool processAll; 345 bool processAll;
346 //If live query is false, this query will not continuously be updated
347 bool liveQuery;
321}; 348};
322 349
323 350
@@ -337,7 +364,9 @@ public:
337 virtual Async::Job<void> create(const DomainType &domainObject) = 0; 364 virtual Async::Job<void> create(const DomainType &domainObject) = 0;
338 virtual Async::Job<void> modify(const DomainType &domainObject) = 0; 365 virtual Async::Job<void> modify(const DomainType &domainObject) = 0;
339 virtual Async::Job<void> remove(const DomainType &domainObject) = 0; 366 virtual Async::Job<void> remove(const DomainType &domainObject) = 0;
340 virtual Async::Job<void> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; 367 //TODO remove from public API
368 virtual Async::Job<qint64> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0;
369 virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) { return Async::null<void>(); };
341}; 370};
342 371
343 372
@@ -349,6 +378,8 @@ public:
349 378
350class FacadeFactory { 379class FacadeFactory {
351public: 380public:
381 typedef std::function<void*(bool &externallyManaged)> FactoryFunction;
382
352 //FIXME: proper singleton implementation 383 //FIXME: proper singleton implementation
353 static FacadeFactory &instance() 384 static FacadeFactory &instance()
354 { 385 {
@@ -365,7 +396,7 @@ public:
365 void registerFacade(const QByteArray &resource) 396 void registerFacade(const QByteArray &resource)
366 { 397 {
367 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 398 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
368 mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); 399 mFacadeRegistry.insert(key(resource, typeName), [](bool &externallyManaged){ return new Facade; });
369 } 400 }
370 401
371 /* 402 /*
@@ -375,29 +406,51 @@ public:
375 * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. 406 * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer.
376 * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) 407 * Supplied factory functions should therefore always return a new pointer (i.e. via clone())
377 * 408 *
378 * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. 409 * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. As a workaround the factory function can define wether it manages the lifetime of the facade itself.
379 */ 410 */
380 template<class DomainType, class Facade> 411 template<class DomainType, class Facade>
381 void registerFacade(const QByteArray &resource, const std::function<void*(void)> &customFactoryFunction) 412 void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction)
382 { 413 {
383 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 414 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
384 mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); 415 mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction);
385 } 416 }
386 417
418 /*
419 * Can be used to clear the factory.
420 *
421 * Primarily for testing.
422 */
423 void resetFactory()
424 {
425 mFacadeRegistry.clear();
426 }
427
428 static void doNothingDeleter(void *)
429 {
430 qWarning() << "Do nothing";
431 }
432
387 template<class DomainType> 433 template<class DomainType>
388 QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) 434 QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource)
389 { 435 {
390 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 436 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
391 auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); 437 auto factoryFunction = mFacadeRegistry.value(key(resource, typeName));
392 if (factoryFunction) { 438 if (factoryFunction) {
393 return QSharedPointer<StoreFacade<DomainType> >(static_cast<StoreFacade<DomainType>* >(factoryFunction())); 439 bool externallyManaged = false;
440 auto ptr = static_cast<StoreFacade<DomainType>* >(factoryFunction(externallyManaged));
441 if (externallyManaged) {
442 //Allows tests to manage the lifetime of injected facades themselves
443 return QSharedPointer<StoreFacade<DomainType> >(ptr, doNothingDeleter);
444 } else {
445 return QSharedPointer<StoreFacade<DomainType> >(ptr);
446 }
394 } 447 }
395 qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; 448 qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName;
396 return QSharedPointer<StoreFacade<DomainType> >(); 449 return QSharedPointer<StoreFacade<DomainType> >();
397 } 450 }
398 451
399private: 452private:
400 QHash<QByteArray, std::function<void*(void)> > mFacadeRegistry; 453 QHash<QByteArray, FactoryFunction> mFacadeRegistry;
401}; 454};
402 455
403/** 456/**
@@ -416,7 +469,7 @@ public:
416 template <class DomainType> 469 template <class DomainType>
417 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) 470 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query)
418 { 471 {
419 QSharedPointer<ResultProvider<typename DomainType::Ptr> > resultSet(new ResultProvider<typename DomainType::Ptr>); 472 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create();
420 473
421 //Execute the search in a thread. 474 //Execute the search in a thread.
422 //We must guarantee that the emitter is returned before the first result is emitted. 475 //We must guarantee that the emitter is returned before the first result is emitted.
@@ -428,15 +481,15 @@ public:
428 Async::Job<void> job = Async::null<void>(); 481 Async::Job<void> job = Async::null<void>();
429 for(const QByteArray &resource : query.resources) { 482 for(const QByteArray &resource : query.resources) {
430 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 483 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
431 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive.
432 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1);
433 484
434 // TODO The following is a necessary hack to keep the facade alive. 485 // TODO The following is a necessary hack to keep the facade alive.
435 // Otherwise this would reduce to: 486 // Otherwise this would reduce to:
436 // job = job.then(facade->load(query, addCallback)); 487 // job = job.then(facade->load(query, addCallback));
437 // We somehow have to guarantee that the facade remains valid for the duration of the job 488 // We somehow have to guarantee that the facade remains valid for the duration of the job
438 job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { 489 // TODO: Use one result set per facade, and merge the results separately
439 Async::Job<void> j = facade->load(query, addCallback); 490 // resultSet->addSubset(facade->query(query));
491 job = job.then<void>([facade, query, resultSet](Async::Future<void> &future) {
492 Async::Job<void> j = facade->load(query, resultSet);
440 j.then<void>([&future, facade](Async::Future<void> &f) { 493 j.then<void>([&future, facade](Async::Future<void> &f) {
441 future.setFinished(); 494 future.setFinished();
442 f.setFinished(); 495 f.setFinished();