diff options
Diffstat (limited to 'common/clientapi.h')
-rw-r--r-- | common/clientapi.h | 181 |
1 files changed, 117 insertions, 64 deletions
diff --git a/common/clientapi.h b/common/clientapi.h index 22448b3..c2b9493 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -49,17 +49,44 @@ namespace async { | |||
49 | */ | 49 | */ |
50 | template<class T> | 50 | template<class T> |
51 | class ResultProvider { | 51 | class ResultProvider { |
52 | public: | 52 | private: |
53 | //Called from worker thread | 53 | void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)()) |
54 | void add(const T &value) | ||
55 | { | 54 | { |
56 | //We use the eventloop to call the addHandler directly from the main eventloop. | 55 | //We use the eventloop to call the addHandler directly from the main eventloop. |
57 | //That way the result emitter implementation doesn't have to care about threadsafety at all. | 56 | //That way the result emitter implementation doesn't have to care about threadsafety at all. |
58 | //The alternative would be to make all handlers of the emitter threadsafe. | 57 | //The alternative would be to make all handlers of the emitter threadsafe. |
59 | auto emitter = mResultEmitter; | 58 | if (auto emitter = mResultEmitter.toStrongRef()) { |
60 | mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { | 59 | auto weakEmitter = mResultEmitter; |
61 | if (emitter) { | 60 | //We don't want to keep the emitter alive here, so we only capture a weak reference |
62 | emitter->addHandler(value); | 61 | emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() { |
62 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
63 | (strongRef.data()->*f)(); | ||
64 | } | ||
65 | }); | ||
66 | } | ||
67 | } | ||
68 | |||
69 | void callInMainThreadOnEmitter(const std::function<void()> &f) | ||
70 | { | ||
71 | //We use the eventloop to call the addHandler directly from the main eventloop. | ||
72 | //That way the result emitter implementation doesn't have to care about threadsafety at all. | ||
73 | //The alternative would be to make all handlers of the emitter threadsafe. | ||
74 | if (auto emitter = mResultEmitter.toStrongRef()) { | ||
75 | emitter->mThreadBoundary.callInMainThread([f]() { | ||
76 | f(); | ||
77 | }); | ||
78 | } | ||
79 | } | ||
80 | |||
81 | public: | ||
82 | //Called from worker thread | ||
83 | void add(const T &value) | ||
84 | { | ||
85 | //Because I don't know how to use bind | ||
86 | auto weakEmitter = mResultEmitter; | ||
87 | callInMainThreadOnEmitter([weakEmitter, value](){ | ||
88 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
89 | strongRef->addHandler(value); | ||
63 | } | 90 | } |
64 | }); | 91 | }); |
65 | } | 92 | } |
@@ -67,25 +94,39 @@ namespace async { | |||
67 | //Called from worker thread | 94 | //Called from worker thread |
68 | void complete() | 95 | void complete() |
69 | { | 96 | { |
70 | auto emitter = mResultEmitter; | 97 | callInMainThreadOnEmitter(&ResultEmitter<T>::complete); |
71 | mResultEmitter->mThreadBoundary.callInMainThread([emitter]() { | ||
72 | if (emitter) { | ||
73 | emitter->completeHandler(); | ||
74 | } | ||
75 | }); | ||
76 | } | 98 | } |
77 | 99 | ||
100 | void clear() | ||
101 | { | ||
102 | callInMainThreadOnEmitter(&ResultEmitter<T>::clear); | ||
103 | } | ||
104 | |||
105 | |||
78 | QSharedPointer<ResultEmitter<T> > emitter() | 106 | QSharedPointer<ResultEmitter<T> > emitter() |
79 | { | 107 | { |
80 | if (!mResultEmitter) { | 108 | if (!mResultEmitter) { |
81 | mResultEmitter = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>()); | 109 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
110 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create(); | ||
111 | mResultEmitter = sharedPtr; | ||
112 | return sharedPtr; | ||
82 | } | 113 | } |
83 | 114 | ||
84 | return mResultEmitter; | 115 | return mResultEmitter.toStrongRef(); |
116 | } | ||
117 | |||
118 | /** | ||
119 | * For lifetimemanagement only. | ||
120 | * We keep the runner alive as long as the result provider exists. | ||
121 | */ | ||
122 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
123 | { | ||
124 | mQueryRunner = runner; | ||
85 | } | 125 | } |
86 | 126 | ||
87 | private: | 127 | private: |
88 | QSharedPointer<ResultEmitter<T> > mResultEmitter; | 128 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
129 | QSharedPointer<QObject> mQueryRunner; | ||
89 | }; | 130 | }; |
90 | 131 | ||
91 | /* | 132 | /* |
@@ -99,7 +140,6 @@ namespace async { | |||
99 | * * build async interfaces with signals | 140 | * * build async interfaces with signals |
100 | * * build sync interfaces that block when accessing the value | 141 | * * build sync interfaces that block when accessing the value |
101 | * | 142 | * |
102 | * TODO: This should probably be merged with daniels futurebase used in Async | ||
103 | */ | 143 | */ |
104 | template<class DomainType> | 144 | template<class DomainType> |
105 | class ResultEmitter { | 145 | class ResultEmitter { |
@@ -113,51 +153,36 @@ namespace async { | |||
113 | { | 153 | { |
114 | completeHandler = handler; | 154 | completeHandler = handler; |
115 | } | 155 | } |
156 | void onClear(const std::function<void(void)> &handler) | ||
157 | { | ||
158 | clearHandler = handler; | ||
159 | } | ||
116 | 160 | ||
117 | private: | 161 | void add(const DomainType &value) |
118 | friend class ResultProvider<DomainType>; | 162 | { |
119 | std::function<void(const DomainType&)> addHandler; | 163 | addHandler(value); |
120 | // std::function<void(const T&)> removeHandler; | 164 | } |
121 | std::function<void(void)> completeHandler; | ||
122 | ThreadBoundary mThreadBoundary; | ||
123 | }; | ||
124 | |||
125 | 165 | ||
126 | /* | 166 | void complete() |
127 | * A result set specialization that provides a syncronous list | ||
128 | */ | ||
129 | template<class T> | ||
130 | class SyncListResult : public QList<T> { | ||
131 | public: | ||
132 | SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) | ||
133 | :QList<T>(), | ||
134 | mComplete(false), | ||
135 | mEmitter(emitter) | ||
136 | { | 167 | { |
137 | emitter->onAdded([this](const T &value) { | 168 | completeHandler(); |
138 | this->append(value); | ||
139 | }); | ||
140 | emitter->onComplete([this]() { | ||
141 | mComplete = true; | ||
142 | auto loop = mWaitLoop.toStrongRef(); | ||
143 | if (loop) { | ||
144 | loop->quit(); | ||
145 | } | ||
146 | }); | ||
147 | } | 169 | } |
148 | 170 | ||
149 | void exec() | 171 | void clear() |
150 | { | 172 | { |
151 | auto loop = QSharedPointer<QEventLoop>::create(); | 173 | clearHandler(); |
152 | mWaitLoop = loop; | ||
153 | loop->exec(QEventLoop::ExcludeUserInputEvents); | ||
154 | } | 174 | } |
155 | 175 | ||
156 | private: | 176 | private: |
157 | bool mComplete; | 177 | friend class ResultProvider<DomainType>; |
158 | QWeakPointer<QEventLoop> mWaitLoop; | 178 | |
159 | QSharedPointer<ResultEmitter<T> > mEmitter; | 179 | std::function<void(const DomainType&)> addHandler; |
180 | // std::function<void(const T&)> removeHandler; | ||
181 | std::function<void(void)> completeHandler; | ||
182 | std::function<void(void)> clearHandler; | ||
183 | ThreadBoundary mThreadBoundary; | ||
160 | }; | 184 | }; |
185 | |||
161 | } | 186 | } |
162 | 187 | ||
163 | namespace Akonadi2 { | 188 | namespace Akonadi2 { |
@@ -307,7 +332,7 @@ using namespace async; | |||
307 | class Query | 332 | class Query |
308 | { | 333 | { |
309 | public: | 334 | public: |
310 | Query() : syncOnDemand(true), processAll(false) {} | 335 | Query() : syncOnDemand(true), processAll(false), liveQuery(false) {} |
311 | //Could also be a propertyFilter | 336 | //Could also be a propertyFilter |
312 | QByteArrayList resources; | 337 | QByteArrayList resources; |
313 | //Could also be a propertyFilter | 338 | //Could also be a propertyFilter |
@@ -318,6 +343,8 @@ public: | |||
318 | QSet<QByteArray> requestedProperties; | 343 | QSet<QByteArray> requestedProperties; |
319 | bool syncOnDemand; | 344 | bool syncOnDemand; |
320 | bool processAll; | 345 | bool processAll; |
346 | //If live query is false, this query will not continuously be updated | ||
347 | bool liveQuery; | ||
321 | }; | 348 | }; |
322 | 349 | ||
323 | 350 | ||
@@ -337,7 +364,9 @@ public: | |||
337 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; | 364 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; |
338 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; | 365 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; |
339 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; | 366 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; |
340 | virtual Async::Job<void> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | 367 | //TODO remove from public API |
368 | virtual Async::Job<qint64> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | ||
369 | virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) { return Async::null<void>(); }; | ||
341 | }; | 370 | }; |
342 | 371 | ||
343 | 372 | ||
@@ -349,6 +378,8 @@ public: | |||
349 | 378 | ||
350 | class FacadeFactory { | 379 | class FacadeFactory { |
351 | public: | 380 | public: |
381 | typedef std::function<void*(bool &externallyManaged)> FactoryFunction; | ||
382 | |||
352 | //FIXME: proper singleton implementation | 383 | //FIXME: proper singleton implementation |
353 | static FacadeFactory &instance() | 384 | static FacadeFactory &instance() |
354 | { | 385 | { |
@@ -365,7 +396,7 @@ public: | |||
365 | void registerFacade(const QByteArray &resource) | 396 | void registerFacade(const QByteArray &resource) |
366 | { | 397 | { |
367 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 398 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
368 | mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); | 399 | mFacadeRegistry.insert(key(resource, typeName), [](bool &externallyManaged){ return new Facade; }); |
369 | } | 400 | } |
370 | 401 | ||
371 | /* | 402 | /* |
@@ -375,29 +406,51 @@ public: | |||
375 | * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. | 406 | * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. |
376 | * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) | 407 | * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) |
377 | * | 408 | * |
378 | * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. | 409 | * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. As a workaround the factory function can define wether it manages the lifetime of the facade itself. |
379 | */ | 410 | */ |
380 | template<class DomainType, class Facade> | 411 | template<class DomainType, class Facade> |
381 | void registerFacade(const QByteArray &resource, const std::function<void*(void)> &customFactoryFunction) | 412 | void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction) |
382 | { | 413 | { |
383 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 414 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
384 | mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); | 415 | mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); |
385 | } | 416 | } |
386 | 417 | ||
418 | /* | ||
419 | * Can be used to clear the factory. | ||
420 | * | ||
421 | * Primarily for testing. | ||
422 | */ | ||
423 | void resetFactory() | ||
424 | { | ||
425 | mFacadeRegistry.clear(); | ||
426 | } | ||
427 | |||
428 | static void doNothingDeleter(void *) | ||
429 | { | ||
430 | qWarning() << "Do nothing"; | ||
431 | } | ||
432 | |||
387 | template<class DomainType> | 433 | template<class DomainType> |
388 | QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) | 434 | QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) |
389 | { | 435 | { |
390 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 436 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
391 | auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); | 437 | auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); |
392 | if (factoryFunction) { | 438 | if (factoryFunction) { |
393 | return QSharedPointer<StoreFacade<DomainType> >(static_cast<StoreFacade<DomainType>* >(factoryFunction())); | 439 | bool externallyManaged = false; |
440 | auto ptr = static_cast<StoreFacade<DomainType>* >(factoryFunction(externallyManaged)); | ||
441 | if (externallyManaged) { | ||
442 | //Allows tests to manage the lifetime of injected facades themselves | ||
443 | return QSharedPointer<StoreFacade<DomainType> >(ptr, doNothingDeleter); | ||
444 | } else { | ||
445 | return QSharedPointer<StoreFacade<DomainType> >(ptr); | ||
446 | } | ||
394 | } | 447 | } |
395 | qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; | 448 | qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; |
396 | return QSharedPointer<StoreFacade<DomainType> >(); | 449 | return QSharedPointer<StoreFacade<DomainType> >(); |
397 | } | 450 | } |
398 | 451 | ||
399 | private: | 452 | private: |
400 | QHash<QByteArray, std::function<void*(void)> > mFacadeRegistry; | 453 | QHash<QByteArray, FactoryFunction> mFacadeRegistry; |
401 | }; | 454 | }; |
402 | 455 | ||
403 | /** | 456 | /** |
@@ -416,7 +469,7 @@ public: | |||
416 | template <class DomainType> | 469 | template <class DomainType> |
417 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) | 470 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) |
418 | { | 471 | { |
419 | QSharedPointer<ResultProvider<typename DomainType::Ptr> > resultSet(new ResultProvider<typename DomainType::Ptr>); | 472 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); |
420 | 473 | ||
421 | //Execute the search in a thread. | 474 | //Execute the search in a thread. |
422 | //We must guarantee that the emitter is returned before the first result is emitted. | 475 | //We must guarantee that the emitter is returned before the first result is emitted. |
@@ -428,15 +481,15 @@ public: | |||
428 | Async::Job<void> job = Async::null<void>(); | 481 | Async::Job<void> job = Async::null<void>(); |
429 | for(const QByteArray &resource : query.resources) { | 482 | for(const QByteArray &resource : query.resources) { |
430 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 483 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
431 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. | ||
432 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); | ||
433 | 484 | ||
434 | // TODO The following is a necessary hack to keep the facade alive. | 485 | // TODO The following is a necessary hack to keep the facade alive. |
435 | // Otherwise this would reduce to: | 486 | // Otherwise this would reduce to: |
436 | // job = job.then(facade->load(query, addCallback)); | 487 | // job = job.then(facade->load(query, addCallback)); |
437 | // We somehow have to guarantee that the facade remains valid for the duration of the job | 488 | // We somehow have to guarantee that the facade remains valid for the duration of the job |
438 | job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { | 489 | // TODO: Use one result set per facade, and merge the results separately |
439 | Async::Job<void> j = facade->load(query, addCallback); | 490 | // resultSet->addSubset(facade->query(query)); |
491 | job = job.then<void>([facade, query, resultSet](Async::Future<void> &future) { | ||
492 | Async::Job<void> j = facade->load(query, resultSet); | ||
440 | j.then<void>([&future, facade](Async::Future<void> &f) { | 493 | j.then<void>([&future, facade](Async::Future<void> &f) { |
441 | future.setFinished(); | 494 | future.setFinished(); |
442 | f.setFinished(); | 495 | f.setFinished(); |