diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-13 20:15:14 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-15 09:30:32 +0200 |
commit | c55054e899660f2d667af2c2e573a1267d47358e (patch) | |
tree | 0f547effcad0c20521f0bc047a9eb1d4130b052b | |
parent | 4652a39fc6869fc5af46367c35027b2b53478268 (diff) | |
download | sink-c55054e899660f2d667af2c2e573a1267d47358e.tar.gz sink-c55054e899660f2d667af2c2e573a1267d47358e.zip |
Use a queryrunner to execute queries.
The queryrunner is responsible for running queries and keeping them
up to date. This is required for self-updating queries.
To get this to work properly the ResultProvider/emitter had to be fixed.
The emitter now only lives as long as the client holds a reference to
it, allowing the provider to detect when it is no longer necessary to
keep the query alive (because noone is listening).
In the process various lifetime issues have been fixed, that we're
caused by lambdas capturing smartpointers, that then extended the
lifetime of the associated objects unpredictably.
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/clientapi.h | 181 | ||||
-rw-r--r-- | common/facade.cpp | 20 | ||||
-rw-r--r-- | common/facade.h | 51 | ||||
-rw-r--r-- | common/synclistresult.h | 54 | ||||
-rw-r--r-- | common/test/clientapitest.cpp | 106 | ||||
-rw-r--r-- | dummyresource/facade.cpp | 44 | ||||
-rw-r--r-- | dummyresource/facade.h | 9 | ||||
-rw-r--r-- | tests/dummyresourcebenchmark.cpp | 1 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 2 |
10 files changed, 393 insertions, 76 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index b06718f..a97c7f9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -29,6 +29,7 @@ set(command_SRCS | |||
29 | clientapi.cpp | 29 | clientapi.cpp |
30 | commands.cpp | 30 | commands.cpp |
31 | console.cpp | 31 | console.cpp |
32 | facade.cpp | ||
32 | pipeline.cpp | 33 | pipeline.cpp |
33 | domainadaptor.cpp | 34 | domainadaptor.cpp |
34 | resource.cpp | 35 | resource.cpp |
diff --git a/common/clientapi.h b/common/clientapi.h index 22448b3..c2b9493 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -49,17 +49,44 @@ namespace async { | |||
49 | */ | 49 | */ |
50 | template<class T> | 50 | template<class T> |
51 | class ResultProvider { | 51 | class ResultProvider { |
52 | public: | 52 | private: |
53 | //Called from worker thread | 53 | void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)()) |
54 | void add(const T &value) | ||
55 | { | 54 | { |
56 | //We use the eventloop to call the addHandler directly from the main eventloop. | 55 | //We use the eventloop to call the addHandler directly from the main eventloop. |
57 | //That way the result emitter implementation doesn't have to care about threadsafety at all. | 56 | //That way the result emitter implementation doesn't have to care about threadsafety at all. |
58 | //The alternative would be to make all handlers of the emitter threadsafe. | 57 | //The alternative would be to make all handlers of the emitter threadsafe. |
59 | auto emitter = mResultEmitter; | 58 | if (auto emitter = mResultEmitter.toStrongRef()) { |
60 | mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { | 59 | auto weakEmitter = mResultEmitter; |
61 | if (emitter) { | 60 | //We don't want to keep the emitter alive here, so we only capture a weak reference |
62 | emitter->addHandler(value); | 61 | emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() { |
62 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
63 | (strongRef.data()->*f)(); | ||
64 | } | ||
65 | }); | ||
66 | } | ||
67 | } | ||
68 | |||
69 | void callInMainThreadOnEmitter(const std::function<void()> &f) | ||
70 | { | ||
71 | //We use the eventloop to call the addHandler directly from the main eventloop. | ||
72 | //That way the result emitter implementation doesn't have to care about threadsafety at all. | ||
73 | //The alternative would be to make all handlers of the emitter threadsafe. | ||
74 | if (auto emitter = mResultEmitter.toStrongRef()) { | ||
75 | emitter->mThreadBoundary.callInMainThread([f]() { | ||
76 | f(); | ||
77 | }); | ||
78 | } | ||
79 | } | ||
80 | |||
81 | public: | ||
82 | //Called from worker thread | ||
83 | void add(const T &value) | ||
84 | { | ||
85 | //Because I don't know how to use bind | ||
86 | auto weakEmitter = mResultEmitter; | ||
87 | callInMainThreadOnEmitter([weakEmitter, value](){ | ||
88 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
89 | strongRef->addHandler(value); | ||
63 | } | 90 | } |
64 | }); | 91 | }); |
65 | } | 92 | } |
@@ -67,25 +94,39 @@ namespace async { | |||
67 | //Called from worker thread | 94 | //Called from worker thread |
68 | void complete() | 95 | void complete() |
69 | { | 96 | { |
70 | auto emitter = mResultEmitter; | 97 | callInMainThreadOnEmitter(&ResultEmitter<T>::complete); |
71 | mResultEmitter->mThreadBoundary.callInMainThread([emitter]() { | ||
72 | if (emitter) { | ||
73 | emitter->completeHandler(); | ||
74 | } | ||
75 | }); | ||
76 | } | 98 | } |
77 | 99 | ||
100 | void clear() | ||
101 | { | ||
102 | callInMainThreadOnEmitter(&ResultEmitter<T>::clear); | ||
103 | } | ||
104 | |||
105 | |||
78 | QSharedPointer<ResultEmitter<T> > emitter() | 106 | QSharedPointer<ResultEmitter<T> > emitter() |
79 | { | 107 | { |
80 | if (!mResultEmitter) { | 108 | if (!mResultEmitter) { |
81 | mResultEmitter = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>()); | 109 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
110 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create(); | ||
111 | mResultEmitter = sharedPtr; | ||
112 | return sharedPtr; | ||
82 | } | 113 | } |
83 | 114 | ||
84 | return mResultEmitter; | 115 | return mResultEmitter.toStrongRef(); |
116 | } | ||
117 | |||
118 | /** | ||
119 | * For lifetimemanagement only. | ||
120 | * We keep the runner alive as long as the result provider exists. | ||
121 | */ | ||
122 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
123 | { | ||
124 | mQueryRunner = runner; | ||
85 | } | 125 | } |
86 | 126 | ||
87 | private: | 127 | private: |
88 | QSharedPointer<ResultEmitter<T> > mResultEmitter; | 128 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
129 | QSharedPointer<QObject> mQueryRunner; | ||
89 | }; | 130 | }; |
90 | 131 | ||
91 | /* | 132 | /* |
@@ -99,7 +140,6 @@ namespace async { | |||
99 | * * build async interfaces with signals | 140 | * * build async interfaces with signals |
100 | * * build sync interfaces that block when accessing the value | 141 | * * build sync interfaces that block when accessing the value |
101 | * | 142 | * |
102 | * TODO: This should probably be merged with daniels futurebase used in Async | ||
103 | */ | 143 | */ |
104 | template<class DomainType> | 144 | template<class DomainType> |
105 | class ResultEmitter { | 145 | class ResultEmitter { |
@@ -113,51 +153,36 @@ namespace async { | |||
113 | { | 153 | { |
114 | completeHandler = handler; | 154 | completeHandler = handler; |
115 | } | 155 | } |
156 | void onClear(const std::function<void(void)> &handler) | ||
157 | { | ||
158 | clearHandler = handler; | ||
159 | } | ||
116 | 160 | ||
117 | private: | 161 | void add(const DomainType &value) |
118 | friend class ResultProvider<DomainType>; | 162 | { |
119 | std::function<void(const DomainType&)> addHandler; | 163 | addHandler(value); |
120 | // std::function<void(const T&)> removeHandler; | 164 | } |
121 | std::function<void(void)> completeHandler; | ||
122 | ThreadBoundary mThreadBoundary; | ||
123 | }; | ||
124 | |||
125 | 165 | ||
126 | /* | 166 | void complete() |
127 | * A result set specialization that provides a syncronous list | ||
128 | */ | ||
129 | template<class T> | ||
130 | class SyncListResult : public QList<T> { | ||
131 | public: | ||
132 | SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) | ||
133 | :QList<T>(), | ||
134 | mComplete(false), | ||
135 | mEmitter(emitter) | ||
136 | { | 167 | { |
137 | emitter->onAdded([this](const T &value) { | 168 | completeHandler(); |
138 | this->append(value); | ||
139 | }); | ||
140 | emitter->onComplete([this]() { | ||
141 | mComplete = true; | ||
142 | auto loop = mWaitLoop.toStrongRef(); | ||
143 | if (loop) { | ||
144 | loop->quit(); | ||
145 | } | ||
146 | }); | ||
147 | } | 169 | } |
148 | 170 | ||
149 | void exec() | 171 | void clear() |
150 | { | 172 | { |
151 | auto loop = QSharedPointer<QEventLoop>::create(); | 173 | clearHandler(); |
152 | mWaitLoop = loop; | ||
153 | loop->exec(QEventLoop::ExcludeUserInputEvents); | ||
154 | } | 174 | } |
155 | 175 | ||
156 | private: | 176 | private: |
157 | bool mComplete; | 177 | friend class ResultProvider<DomainType>; |
158 | QWeakPointer<QEventLoop> mWaitLoop; | 178 | |
159 | QSharedPointer<ResultEmitter<T> > mEmitter; | 179 | std::function<void(const DomainType&)> addHandler; |
180 | // std::function<void(const T&)> removeHandler; | ||
181 | std::function<void(void)> completeHandler; | ||
182 | std::function<void(void)> clearHandler; | ||
183 | ThreadBoundary mThreadBoundary; | ||
160 | }; | 184 | }; |
185 | |||
161 | } | 186 | } |
162 | 187 | ||
163 | namespace Akonadi2 { | 188 | namespace Akonadi2 { |
@@ -307,7 +332,7 @@ using namespace async; | |||
307 | class Query | 332 | class Query |
308 | { | 333 | { |
309 | public: | 334 | public: |
310 | Query() : syncOnDemand(true), processAll(false) {} | 335 | Query() : syncOnDemand(true), processAll(false), liveQuery(false) {} |
311 | //Could also be a propertyFilter | 336 | //Could also be a propertyFilter |
312 | QByteArrayList resources; | 337 | QByteArrayList resources; |
313 | //Could also be a propertyFilter | 338 | //Could also be a propertyFilter |
@@ -318,6 +343,8 @@ public: | |||
318 | QSet<QByteArray> requestedProperties; | 343 | QSet<QByteArray> requestedProperties; |
319 | bool syncOnDemand; | 344 | bool syncOnDemand; |
320 | bool processAll; | 345 | bool processAll; |
346 | //If live query is false, this query will not continuously be updated | ||
347 | bool liveQuery; | ||
321 | }; | 348 | }; |
322 | 349 | ||
323 | 350 | ||
@@ -337,7 +364,9 @@ public: | |||
337 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; | 364 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; |
338 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; | 365 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; |
339 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; | 366 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; |
340 | virtual Async::Job<void> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | 367 | //TODO remove from public API |
368 | virtual Async::Job<qint64> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | ||
369 | virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) { return Async::null<void>(); }; | ||
341 | }; | 370 | }; |
342 | 371 | ||
343 | 372 | ||
@@ -349,6 +378,8 @@ public: | |||
349 | 378 | ||
350 | class FacadeFactory { | 379 | class FacadeFactory { |
351 | public: | 380 | public: |
381 | typedef std::function<void*(bool &externallyManaged)> FactoryFunction; | ||
382 | |||
352 | //FIXME: proper singleton implementation | 383 | //FIXME: proper singleton implementation |
353 | static FacadeFactory &instance() | 384 | static FacadeFactory &instance() |
354 | { | 385 | { |
@@ -365,7 +396,7 @@ public: | |||
365 | void registerFacade(const QByteArray &resource) | 396 | void registerFacade(const QByteArray &resource) |
366 | { | 397 | { |
367 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 398 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
368 | mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); | 399 | mFacadeRegistry.insert(key(resource, typeName), [](bool &externallyManaged){ return new Facade; }); |
369 | } | 400 | } |
370 | 401 | ||
371 | /* | 402 | /* |
@@ -375,29 +406,51 @@ public: | |||
375 | * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. | 406 | * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. |
376 | * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) | 407 | * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) |
377 | * | 408 | * |
378 | * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. | 409 | * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. As a workaround the factory function can define wether it manages the lifetime of the facade itself. |
379 | */ | 410 | */ |
380 | template<class DomainType, class Facade> | 411 | template<class DomainType, class Facade> |
381 | void registerFacade(const QByteArray &resource, const std::function<void*(void)> &customFactoryFunction) | 412 | void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction) |
382 | { | 413 | { |
383 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 414 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
384 | mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); | 415 | mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); |
385 | } | 416 | } |
386 | 417 | ||
418 | /* | ||
419 | * Can be used to clear the factory. | ||
420 | * | ||
421 | * Primarily for testing. | ||
422 | */ | ||
423 | void resetFactory() | ||
424 | { | ||
425 | mFacadeRegistry.clear(); | ||
426 | } | ||
427 | |||
428 | static void doNothingDeleter(void *) | ||
429 | { | ||
430 | qWarning() << "Do nothing"; | ||
431 | } | ||
432 | |||
387 | template<class DomainType> | 433 | template<class DomainType> |
388 | QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) | 434 | QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) |
389 | { | 435 | { |
390 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 436 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
391 | auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); | 437 | auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); |
392 | if (factoryFunction) { | 438 | if (factoryFunction) { |
393 | return QSharedPointer<StoreFacade<DomainType> >(static_cast<StoreFacade<DomainType>* >(factoryFunction())); | 439 | bool externallyManaged = false; |
440 | auto ptr = static_cast<StoreFacade<DomainType>* >(factoryFunction(externallyManaged)); | ||
441 | if (externallyManaged) { | ||
442 | //Allows tests to manage the lifetime of injected facades themselves | ||
443 | return QSharedPointer<StoreFacade<DomainType> >(ptr, doNothingDeleter); | ||
444 | } else { | ||
445 | return QSharedPointer<StoreFacade<DomainType> >(ptr); | ||
446 | } | ||
394 | } | 447 | } |
395 | qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; | 448 | qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; |
396 | return QSharedPointer<StoreFacade<DomainType> >(); | 449 | return QSharedPointer<StoreFacade<DomainType> >(); |
397 | } | 450 | } |
398 | 451 | ||
399 | private: | 452 | private: |
400 | QHash<QByteArray, std::function<void*(void)> > mFacadeRegistry; | 453 | QHash<QByteArray, FactoryFunction> mFacadeRegistry; |
401 | }; | 454 | }; |
402 | 455 | ||
403 | /** | 456 | /** |
@@ -416,7 +469,7 @@ public: | |||
416 | template <class DomainType> | 469 | template <class DomainType> |
417 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) | 470 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) |
418 | { | 471 | { |
419 | QSharedPointer<ResultProvider<typename DomainType::Ptr> > resultSet(new ResultProvider<typename DomainType::Ptr>); | 472 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); |
420 | 473 | ||
421 | //Execute the search in a thread. | 474 | //Execute the search in a thread. |
422 | //We must guarantee that the emitter is returned before the first result is emitted. | 475 | //We must guarantee that the emitter is returned before the first result is emitted. |
@@ -428,15 +481,15 @@ public: | |||
428 | Async::Job<void> job = Async::null<void>(); | 481 | Async::Job<void> job = Async::null<void>(); |
429 | for(const QByteArray &resource : query.resources) { | 482 | for(const QByteArray &resource : query.resources) { |
430 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 483 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
431 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. | ||
432 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); | ||
433 | 484 | ||
434 | // TODO The following is a necessary hack to keep the facade alive. | 485 | // TODO The following is a necessary hack to keep the facade alive. |
435 | // Otherwise this would reduce to: | 486 | // Otherwise this would reduce to: |
436 | // job = job.then(facade->load(query, addCallback)); | 487 | // job = job.then(facade->load(query, addCallback)); |
437 | // We somehow have to guarantee that the facade remains valid for the duration of the job | 488 | // We somehow have to guarantee that the facade remains valid for the duration of the job |
438 | job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { | 489 | // TODO: Use one result set per facade, and merge the results separately |
439 | Async::Job<void> j = facade->load(query, addCallback); | 490 | // resultSet->addSubset(facade->query(query)); |
491 | job = job.then<void>([facade, query, resultSet](Async::Future<void> &future) { | ||
492 | Async::Job<void> j = facade->load(query, resultSet); | ||
440 | j.then<void>([&future, facade](Async::Future<void> &f) { | 493 | j.then<void>([&future, facade](Async::Future<void> &f) { |
441 | future.setFinished(); | 494 | future.setFinished(); |
442 | f.setFinished(); | 495 | f.setFinished(); |
diff --git a/common/facade.cpp b/common/facade.cpp new file mode 100644 index 0000000..e51b32a --- /dev/null +++ b/common/facade.cpp | |||
@@ -0,0 +1,20 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | |||
20 | #include "facade.h" | ||
diff --git a/common/facade.h b/common/facade.h index 98bcb38..f9b5a83 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -30,6 +30,55 @@ | |||
30 | #include "domainadaptor.h" | 30 | #include "domainadaptor.h" |
31 | #include "entitybuffer.h" | 31 | #include "entitybuffer.h" |
32 | 32 | ||
33 | /** | ||
34 | * A QueryRunner runs a query and updates the corresponding result set. | ||
35 | * | ||
36 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
37 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
38 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
39 | * | ||
40 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
41 | */ | ||
42 | class QueryRunner : public QObject | ||
43 | { | ||
44 | Q_OBJECT | ||
45 | public: | ||
46 | typedef std::function<Async::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction; | ||
47 | |||
48 | QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; | ||
49 | /** | ||
50 | * Starts query | ||
51 | */ | ||
52 | Async::Job<void> run(qint64 newRevision = 0) | ||
53 | { | ||
54 | //TODO: JOBAPI: that last empty .then should not be necessary | ||
55 | return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) { | ||
56 | mLatestRevision = revision; | ||
57 | }).then<void>([](){}); | ||
58 | } | ||
59 | |||
60 | /** | ||
61 | * | ||
62 | */ | ||
63 | void setQuery(const QueryFunction &query) | ||
64 | { | ||
65 | queryFunction = query; | ||
66 | } | ||
67 | |||
68 | public slots: | ||
69 | /** | ||
70 | * Rerun query with new revision | ||
71 | */ | ||
72 | void revisionChanged(qint64 newRevision) | ||
73 | { | ||
74 | run(newRevision).exec(); | ||
75 | } | ||
76 | |||
77 | private: | ||
78 | QueryFunction queryFunction; | ||
79 | qint64 mLatestRevision; | ||
80 | }; | ||
81 | |||
33 | namespace Akonadi2 { | 82 | namespace Akonadi2 { |
34 | class ResourceAccess; | 83 | class ResourceAccess; |
35 | /** | 84 | /** |
@@ -80,7 +129,7 @@ protected: | |||
80 | return Async::null<void>(); | 129 | return Async::null<void>(); |
81 | } | 130 | } |
82 | 131 | ||
83 | private: | 132 | protected: |
84 | //TODO use one resource access instance per application => make static | 133 | //TODO use one resource access instance per application => make static |
85 | QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; | 134 | QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; |
86 | }; | 135 | }; |
diff --git a/common/synclistresult.h b/common/synclistresult.h new file mode 100644 index 0000000..5fa0efd --- /dev/null +++ b/common/synclistresult.h | |||
@@ -0,0 +1,54 @@ | |||
1 | #pragma once | ||
2 | |||
3 | #include <QList> | ||
4 | #include <functional> | ||
5 | #include <QSharedPointer> | ||
6 | #include <clientapi.h> | ||
7 | |||
8 | namespace async { | ||
9 | |||
10 | /* | ||
11 | * A result set specialization that provides a syncronous list. | ||
12 | * | ||
13 | * Only for testing purposes. | ||
14 | * | ||
15 | * WARNING: The nested eventloop can cause all sorts of trouble. Use only in testing code. | ||
16 | */ | ||
17 | template<class T> | ||
18 | class SyncListResult : public QList<T> { | ||
19 | public: | ||
20 | SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) | ||
21 | :QList<T>(), | ||
22 | mComplete(false), | ||
23 | mEmitter(emitter) | ||
24 | { | ||
25 | emitter->onAdded([this](const T &value) { | ||
26 | this->append(value); | ||
27 | }); | ||
28 | emitter->onComplete([this]() { | ||
29 | mComplete = true; | ||
30 | if (eventLoopAborter) { | ||
31 | eventLoopAborter(); | ||
32 | //Be safe in case of a second invocation of the complete handler | ||
33 | eventLoopAborter = std::function<void()>(); | ||
34 | } | ||
35 | }); | ||
36 | emitter->onClear([this]() { | ||
37 | this->clear(); | ||
38 | }); | ||
39 | } | ||
40 | |||
41 | void exec() | ||
42 | { | ||
43 | QEventLoop eventLoop; | ||
44 | eventLoopAborter = [&eventLoop]() { eventLoop.quit(); }; | ||
45 | eventLoop.exec(); | ||
46 | } | ||
47 | |||
48 | private: | ||
49 | bool mComplete; | ||
50 | QSharedPointer<ResultEmitter<T> > mEmitter; | ||
51 | std::function<void()> eventLoopAborter; | ||
52 | }; | ||
53 | |||
54 | } | ||
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 24b3fb9..789c656 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp | |||
@@ -3,6 +3,22 @@ | |||
3 | #include <functional> | 3 | #include <functional> |
4 | 4 | ||
5 | #include "../clientapi.h" | 5 | #include "../clientapi.h" |
6 | #include "../facade.h" | ||
7 | #include "../synclistresult.h" | ||
8 | |||
9 | class RevisionNotifier : public QObject | ||
10 | { | ||
11 | Q_OBJECT | ||
12 | public: | ||
13 | RevisionNotifier() : QObject() {}; | ||
14 | void notify(qint64 revision) | ||
15 | { | ||
16 | emit revisionChanged(revision); | ||
17 | } | ||
18 | |||
19 | Q_SIGNALS: | ||
20 | void revisionChanged(qint64); | ||
21 | }; | ||
6 | 22 | ||
7 | class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::Event> | 23 | class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::Event> |
8 | { | 24 | { |
@@ -11,18 +27,63 @@ public: | |||
11 | virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; | 27 | virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; |
12 | virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; | 28 | virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; |
13 | virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; | 29 | virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; |
14 | virtual Async::Job<void> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) | 30 | virtual Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) |
15 | { | 31 | { |
16 | return Async::start<void>([this, resultCallback](Async::Future<void> &future) { | 32 | return Async::start<qint64>([this, resultCallback](Async::Future<qint64> &future) { |
17 | qDebug() << "load called"; | 33 | qDebug() << "load called"; |
18 | for(const auto &result : results) { | 34 | for(const auto &result : results) { |
19 | resultCallback(result); | 35 | resultCallback(result); |
20 | } | 36 | } |
37 | future.setValue(0); | ||
21 | future.setFinished(); | 38 | future.setFinished(); |
22 | }); | 39 | }); |
23 | } | 40 | } |
24 | 41 | ||
42 | Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) | ||
43 | { | ||
44 | auto runner = QSharedPointer<QueryRunner>::create(query); | ||
45 | //The runner only lives as long as the resultProvider | ||
46 | resultProvider->setQueryRunner(runner); | ||
47 | runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> { | ||
48 | qDebug() << "Creating query for revisions: " << oldRevision << newRevision; | ||
49 | return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) { | ||
50 | //TODO only emit changes and don't replace everything | ||
51 | resultProvider->clear(); | ||
52 | //rerun query | ||
53 | std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1); | ||
54 | load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { | ||
55 | //TODO set revision in result provider? | ||
56 | //TODO update all existing results with new revision | ||
57 | resultProvider->complete(); | ||
58 | future.setValue(queriedRevision); | ||
59 | future.setFinished(); | ||
60 | }).exec(); | ||
61 | }); | ||
62 | }); | ||
63 | |||
64 | //Ensure the notification is emitted in the right thread | ||
65 | //Otherwise we get crashes as we call revisionChanged from the test. | ||
66 | if (!notifier) { | ||
67 | notifier.reset(new RevisionNotifier); | ||
68 | } | ||
69 | |||
70 | //TODO somehow disconnect as resultNotifier is destroyed. Otherwise we keep the runner alive forever. | ||
71 | if (query.liveQuery) { | ||
72 | QObject::connect(notifier.data(), &RevisionNotifier::revisionChanged, [runner](qint64 newRevision) { | ||
73 | runner->revisionChanged(newRevision); | ||
74 | }); | ||
75 | } | ||
76 | |||
77 | return Async::start<void>([runner](Async::Future<void> &future) { | ||
78 | runner->run().then<void>([&future]() { | ||
79 | //TODO if not live query, destroy runner. | ||
80 | future.setFinished(); | ||
81 | }).exec(); | ||
82 | }); | ||
83 | } | ||
84 | |||
25 | QList<Akonadi2::ApplicationDomain::Event::Ptr> results; | 85 | QList<Akonadi2::ApplicationDomain::Event::Ptr> results; |
86 | QSharedPointer<RevisionNotifier> notifier; | ||
26 | }; | 87 | }; |
27 | 88 | ||
28 | class ClientAPITest : public QObject | 89 | class ClientAPITest : public QObject |
@@ -30,21 +91,60 @@ class ClientAPITest : public QObject | |||
30 | Q_OBJECT | 91 | Q_OBJECT |
31 | private Q_SLOTS: | 92 | private Q_SLOTS: |
32 | 93 | ||
94 | void initTestCase() | ||
95 | { | ||
96 | Akonadi2::FacadeFactory::instance().resetFactory(); | ||
97 | } | ||
98 | |||
33 | void testLoad() | 99 | void testLoad() |
34 | { | 100 | { |
35 | DummyResourceFacade facade; | 101 | DummyResourceFacade facade; |
36 | facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); | 102 | facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); |
37 | 103 | ||
38 | Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", [facade](){ return new DummyResourceFacade(facade); }); | 104 | Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", |
105 | [&facade](bool &externallyManaged) { | ||
106 | externallyManaged = true; | ||
107 | return &facade; | ||
108 | } | ||
109 | ); | ||
39 | 110 | ||
40 | Akonadi2::Query query; | 111 | Akonadi2::Query query; |
41 | query.resources << "dummyresource"; | 112 | query.resources << "dummyresource"; |
113 | query.liveQuery = false; | ||
42 | 114 | ||
43 | async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); | 115 | async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); |
44 | result.exec(); | 116 | result.exec(); |
45 | QCOMPARE(result.size(), 1); | 117 | QCOMPARE(result.size(), 1); |
46 | } | 118 | } |
47 | 119 | ||
120 | void testLiveQuery() | ||
121 | { | ||
122 | DummyResourceFacade facade; | ||
123 | facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); | ||
124 | |||
125 | Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", | ||
126 | [&facade](bool &externallManage){ | ||
127 | externallManage = true; | ||
128 | return &facade; | ||
129 | } | ||
130 | ); | ||
131 | |||
132 | Akonadi2::Query query; | ||
133 | query.resources << "dummyresource"; | ||
134 | query.liveQuery = true; | ||
135 | |||
136 | async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); | ||
137 | result.exec(); | ||
138 | QCOMPARE(result.size(), 1); | ||
139 | |||
140 | //Enter a second result | ||
141 | facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id2", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); | ||
142 | qWarning() << &facade; | ||
143 | QVERIFY(facade.notifier); | ||
144 | facade.notifier->revisionChanged(2); | ||
145 | QTRY_COMPARE(result.size(), 2); | ||
146 | } | ||
147 | |||
48 | }; | 148 | }; |
49 | 149 | ||
50 | QTEST_MAIN(ClientAPITest) | 150 | QTEST_MAIN(ClientAPITest) |
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 1477fcf..f603c56 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp | |||
@@ -147,15 +147,51 @@ void DummyResourceFacade::readValue(QSharedPointer<Akonadi2::Storage> storage, c | |||
147 | }); | 147 | }); |
148 | } | 148 | } |
149 | 149 | ||
150 | Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) | 150 | Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) |
151 | { | 151 | { |
152 | return synchronizeResource(query.syncOnDemand, query.processAll).then<void>([=](Async::Future<void> &future) { | 152 | auto runner = QSharedPointer<QueryRunner>::create(query); |
153 | //The runner only lives as long as the resultProvider | ||
154 | resultProvider->setQueryRunner(runner); | ||
155 | runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> { | ||
156 | return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) { | ||
157 | //TODO only emit changes and don't replace everything | ||
158 | resultProvider->clear(); | ||
159 | //rerun query | ||
160 | std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1); | ||
161 | load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { | ||
162 | //TODO set revision in result provider? | ||
163 | //TODO update all existing results with new revision | ||
164 | resultProvider->complete(); | ||
165 | future.setValue(queriedRevision); | ||
166 | future.setFinished(); | ||
167 | }).exec(); | ||
168 | }); | ||
169 | }); | ||
170 | |||
171 | auto resourceAccess = mResourceAccess; | ||
172 | //TODO we need to somehow disconnect this | ||
173 | QObject::connect(resourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, [runner](qint64 newRevision) { | ||
174 | runner->revisionChanged(newRevision); | ||
175 | }); | ||
176 | |||
177 | return Async::start<void>([runner](Async::Future<void> &future) { | ||
178 | runner->run().then<void>([&future]() { | ||
179 | //TODO if not live query, destroy runner. | ||
180 | future.setFinished(); | ||
181 | }).exec(); | ||
182 | }); | ||
183 | } | ||
184 | |||
185 | Async::Job<qint64> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) | ||
186 | { | ||
187 | return synchronizeResource(query.syncOnDemand, query.processAll).then<qint64>([=](Async::Future<qint64> &future) { | ||
153 | //Now that the sync is complete we can execute the query | 188 | //Now that the sync is complete we can execute the query |
154 | const auto preparedQuery = prepareQuery(query); | 189 | const auto preparedQuery = prepareQuery(query); |
155 | 190 | ||
156 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); | 191 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); |
157 | 192 | ||
158 | //TODO use transaction over full query and record store revision. We'll need it to update the query. | 193 | storage->startTransaction(Akonadi2::Storage::ReadOnly); |
194 | const qint64 revision = storage->maxRevision(); | ||
159 | 195 | ||
160 | //Index lookups | 196 | //Index lookups |
161 | QVector<QByteArray> keys; | 197 | QVector<QByteArray> keys; |
@@ -177,6 +213,8 @@ Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const s | |||
177 | readValue(storage, key, resultCallback, preparedQuery); | 213 | readValue(storage, key, resultCallback, preparedQuery); |
178 | } | 214 | } |
179 | } | 215 | } |
216 | storage->abortTransaction(); | ||
217 | future.setValue(revision); | ||
180 | future.setFinished(); | 218 | future.setFinished(); |
181 | }); | 219 | }); |
182 | } | 220 | } |
diff --git a/dummyresource/facade.h b/dummyresource/facade.h index 37ed81d..3ddfe15 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h | |||
@@ -34,10 +34,11 @@ class DummyResourceFacade : public Akonadi2::GenericFacade<Akonadi2::Application | |||
34 | public: | 34 | public: |
35 | DummyResourceFacade(); | 35 | DummyResourceFacade(); |
36 | virtual ~DummyResourceFacade(); | 36 | virtual ~DummyResourceFacade(); |
37 | virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject); | 37 | Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; |
38 | virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject); | 38 | Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; |
39 | virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject); | 39 | Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; |
40 | virtual Async::Job<void> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback); | 40 | Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) Q_DECL_OVERRIDE; |
41 | Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) Q_DECL_OVERRIDE; | ||
41 | 42 | ||
42 | private: | 43 | private: |
43 | void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local)>); | 44 | void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local)>); |
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index d83cb70..e350747 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp | |||
@@ -7,6 +7,7 @@ | |||
7 | #include "clientapi.h" | 7 | #include "clientapi.h" |
8 | #include "commands.h" | 8 | #include "commands.h" |
9 | #include "entitybuffer.h" | 9 | #include "entitybuffer.h" |
10 | #include "synclistresult.h" | ||
10 | 11 | ||
11 | #include "event_generated.h" | 12 | #include "event_generated.h" |
12 | #include "entity_generated.h" | 13 | #include "entity_generated.h" |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 5fed7cd..9523e5d 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -2,13 +2,13 @@ | |||
2 | 2 | ||
3 | #include <QString> | 3 | #include <QString> |
4 | 4 | ||
5 | // #include "dummycalendar_generated.h" | ||
6 | #include "event_generated.h" | 5 | #include "event_generated.h" |
7 | #include "entity_generated.h" | 6 | #include "entity_generated.h" |
8 | #include "metadata_generated.h" | 7 | #include "metadata_generated.h" |
9 | #include "createentity_generated.h" | 8 | #include "createentity_generated.h" |
10 | #include "dummyresource/resourcefactory.h" | 9 | #include "dummyresource/resourcefactory.h" |
11 | #include "clientapi.h" | 10 | #include "clientapi.h" |
11 | #include "synclistresult.h" | ||
12 | #include "commands.h" | 12 | #include "commands.h" |
13 | #include "entitybuffer.h" | 13 | #include "entitybuffer.h" |
14 | 14 | ||